跳转至

数据库管理

cryptoservice.storage.Database(db_path: str | Path, **options)

数据库主入口类.

组合各个专门的存储器和查询器,提供统一的数据库操作接口.

初始化数据库.

参数 描述
db_path

数据库文件路径

类型: str | Path

**options

连接池选项

默认: {}

Source code in src/cryptoservice/storage/database.py
def __init__(self, db_path: str | Path, **options):
    """初始化数据库.

    Args:
        db_path: 数据库文件路径
        **options: 连接池选项
    """
    self.db_path = Path(db_path)

    # 基础设施
    self.pool = ConnectionPool(db_path, **options)
    self.schema = DatabaseSchema()

    # 存储层
    self.kline_store = KlineStore(self.pool)
    self.funding_store = FundingStore(self.pool)
    self.interest_store = InterestStore(self.pool)
    self.ratio_store = RatioStore(self.pool)

    # 查询层
    self.kline_query = KlineQuery(self.pool)
    self.metrics_query = MetricsQuery(self.pool)

    # 功能组件
    self.incremental = IncrementalManager(self.kline_query, self.metrics_query)
    self.resampler = DataResampler()

    # 导出器
    self.numpy_exporter = NumpyExporter(self.kline_query, self.resampler, self.metrics_query)
    self.csv_exporter = CsvExporter(self.kline_query)
    self.parquet_exporter = ParquetExporter(self.kline_query)

    self._initialized = False

Functions

insert_klines(klines: list[PerpetualMarketTicker], freq: Freq, batch_size: int = 1000) -> int async

插入K线数据.

参数 描述
klines

K线数据列表

类型: list[PerpetualMarketTicker]

freq

数据频率

类型: Freq

batch_size

批量大小

类型: int 默认: 1000

返回 描述
int

插入的记录数

源代码位于: src/cryptoservice/storage/database.py
async def insert_klines(self, klines: list[PerpetualMarketTicker], freq: Freq, batch_size: int = 1000) -> int:
    """插入K线数据.

    Args:
        klines: K线数据列表
        freq: 数据频率
        batch_size: 批量大小

    Returns:
        插入的记录数
    """
    if not self._initialized:
        await self.initialize()
    return await self.kline_store.insert(klines, freq, batch_size)

select_klines(symbols: list[str], start_time: str, end_time: str, freq: Freq, columns: list[str] | None = None) -> pd.DataFrame async

查询K线数据.

参数 描述
symbols

交易对列表

类型: list[str]

start_time

开始时间

类型: str

end_time

结束时间

类型: str

freq

数据频率

类型: Freq

columns

需要查询的列

类型: list[str] | None 默认: None

返回 描述
DataFrame

K线数据DataFrame

源代码位于: src/cryptoservice/storage/database.py
async def select_klines(self, symbols: list[str], start_time: str, end_time: str, freq: Freq, columns: list[str] | None = None) -> pd.DataFrame:
    """查询K线数据.

    Args:
        symbols: 交易对列表
        start_time: 开始时间
        end_time: 结束时间
        freq: 数据频率
        columns: 需要查询的列

    Returns:
        K线数据DataFrame
    """
    if not self._initialized:
        await self.initialize()
    return await self.kline_query.select_by_time_range(symbols, start_time, end_time, freq, columns)

export_to_numpy(symbols: list[str], start_time: str, end_time: str, freq: Freq, output_path: Path, target_freq: Freq, chunk_days: int = 30) -> None async

导出数据为NumPy格式.

参数 描述
symbols

交易对列表

类型: list[str]

start_time

开始时间

类型: str

end_time

结束时间

类型: str

freq

数据频率

类型: Freq

output_path

输出路径

类型: Path

target_freq

目标频率

类型: Freq

chunk_days

分块天数

类型: int 默认: 30

源代码位于: src/cryptoservice/storage/database.py
async def export_to_numpy(
    self,
    symbols: list[str],
    start_time: str,
    end_time: str,
    freq: Freq,
    output_path: Path,
    target_freq: Freq,
    chunk_days: int = 30,
) -> None:
    """导出数据为NumPy格式.

    Args:
        symbols: 交易对列表
        start_time: 开始时间
        end_time: 结束时间
        freq: 数据频率
        output_path: 输出路径
        target_freq: 目标频率
        chunk_days: 分块天数
    """
    if not self._initialized:
        await self.initialize()
    await self.numpy_exporter.export_klines(symbols, start_time, end_time, freq, output_path, target_freq, chunk_days)

export_to_csv(symbols: list[str], start_time: str, end_time: str, freq: Freq, output_path: Path, chunk_size: int = 100000) -> None async

导出数据为CSV格式.

参数 描述
symbols

交易对列表

类型: list[str]

start_time

开始时间

类型: str

end_time

结束时间

类型: str

freq

数据频率

类型: Freq

output_path

输出路径

类型: Path

chunk_size

分块大小

类型: int 默认: 100000

源代码位于: src/cryptoservice/storage/database.py
async def export_to_csv(
    self,
    symbols: list[str],
    start_time: str,
    end_time: str,
    freq: Freq,
    output_path: Path,
    chunk_size: int = 100000,
) -> None:
    """导出数据为CSV格式.

    Args:
        symbols: 交易对列表
        start_time: 开始时间
        end_time: 结束时间
        freq: 数据频率
        output_path: 输出路径
        chunk_size: 分块大小
    """
    if not self._initialized:
        await self.initialize()
    await self.csv_exporter.export_klines(symbols, start_time, end_time, freq, output_path, chunk_size)

export_to_parquet(symbols: list[str], start_time: str, end_time: str, freq: Freq, output_path: Path, compression: Literal['snappy', 'gzip', 'brotli', 'lz4', 'zstd'] = 'snappy') -> None async

导出数据为Parquet格式.

参数 描述
symbols

交易对列表

类型: list[str]

start_time

开始时间

类型: str

end_time

结束时间

类型: str

freq

数据频率

类型: Freq

output_path

输出路径

类型: Path

compression

压缩方式

类型: Literal['snappy', 'gzip', 'brotli', 'lz4', 'zstd'] 默认: 'snappy'

源代码位于: src/cryptoservice/storage/database.py
async def export_to_parquet(
    self,
    symbols: list[str],
    start_time: str,
    end_time: str,
    freq: Freq,
    output_path: Path,
    compression: Literal["snappy", "gzip", "brotli", "lz4", "zstd"] = "snappy",
) -> None:
    """导出数据为Parquet格式.

    Args:
        symbols: 交易对列表
        start_time: 开始时间
        end_time: 结束时间
        freq: 数据频率
        output_path: 输出路径
        compression: 压缩方式
    """
    if not self._initialized:
        await self.initialize()
    await self.parquet_exporter.export_klines(symbols, start_time, end_time, freq, output_path, compression)

plan_kline_download(symbols: list[str], start_date: str, end_date: str, freq: Freq) -> dict[str, dict[str, int | str]] async

制定K线数据增量下载计划.

参数 描述
symbols

交易对列表

类型: list[str]

start_date

开始日期

类型: str

end_date

结束日期

类型: str

freq

数据频率

类型: Freq

返回 描述
dict[str, dict[str, int | str]]

增量下载计划

源代码位于: src/cryptoservice/storage/database.py
async def plan_kline_download(self, symbols: list[str], start_date: str, end_date: str, freq: Freq) -> dict[str, dict[str, int | str]]:
    """制定K线数据增量下载计划.

    Args:
        symbols: 交易对列表
        start_date: 开始日期
        end_date: 结束日期
        freq: 数据频率

    Returns:
        增量下载计划
    """
    if not self._initialized:
        await self.initialize()
    return await self.incremental.plan_kline_download(symbols, start_date, end_date, freq)

close() async

关闭数据库.

源代码位于: src/cryptoservice/storage/database.py
async def close(self):
    """关闭数据库."""
    await self.pool.close()
    self._initialized = False
    logger.debug("database_closed")