异步迭代器使用指南
本文档介绍 Database 类提供的异步迭代器功能,这些功能让您可以高效地处理大量数据。
概述
Database 类提供了三个主要的异步迭代器方法:
iter_symbols()- 迭代所有交易对符号iter_klines_by_symbol()- 按交易对逐个返回K线数据iter_klines_chunked()- 分块迭代K线数据,适合大数据集
为什么使用异步迭代器?
异步迭代器的主要优势:
- 内存效率: 逐个处理数据,避免一次性加载所有数据到内存
- 流式处理: 可以边获取数据边处理,提高响应速度
- 代码简洁: 使用
async for语法,代码更加简洁易读 - 灵活性: 可以随时中断迭代,根据需要只处理部分数据
使用示例
1. 迭代交易对符号
from cryptoservice.models import Freq
from cryptoservice.storage import Database
async def list_symbols():
db = Database("data/database/market.db")
# 迭代所有 1小时 频率的交易对
async for symbol in db.iter_symbols(freq=Freq.h1):
print(symbol)
await db.close()
使用场景: - 遍历所有交易对进行批量操作 - 动态筛选符合条件的交易对 - 实时监控数据库中的交易对
2. 按交易对迭代K线数据
from cryptoservice.models import Freq
from cryptoservice.storage import Database
async def process_by_symbol():
db = Database("data/database/market.db")
symbols = await db.get_symbols(freq=Freq.h1)
# 逐个处理每个交易对的数据
async for symbol, df in db.iter_klines_by_symbol(
symbols,
start_time="2024-01-01",
end_time="2024-12-31",
freq=Freq.h1
):
# 对每个交易对的数据进行独立处理
print(f"处理 {symbol}: {len(df)} 条记录")
# 计算统计信息
mean_volume = df["volume"].mean()
max_price = df["high_price"].max()
print(f" 平均成交量: {mean_volume}")
print(f" 最高价: {max_price}")
await db.close()
使用场景: - 对每个交易对进行独立的分析 - 生成每个交易对的报告 - 按交易对分别导出数据
3. 分块迭代K线数据
from cryptoservice.models import Freq
from cryptoservice.storage import Database
async def process_in_chunks():
db = Database("data/database/market.db")
symbols = await db.get_symbols(freq=Freq.m5)
# 分块处理数据,每次1000行
chunk_num = 0
async for chunk_df in db.iter_klines_chunked(
symbols,
start_time="2024-01-01",
end_time="2024-12-31",
freq=Freq.m5,
chunk_size=1000
):
chunk_num += 1
# 处理当前块
print(f"处理块 {chunk_num}: {len(chunk_df)} 行")
# 示例:保存到文件
# chunk_df.to_csv(f"chunk_{chunk_num}.csv")
# 示例:计算统计信息
# stats = chunk_df.describe()
print(f"总共处理了 {chunk_num} 个块")
await db.close()
使用场景: - 处理超大数据集,避免内存溢出 - 流式数据处理和转换 - 增量式数据导出
性能对比
传统方式(一次性加载)
# ❌ 可能导致内存溢出
df = await db.select_klines(symbols, start_time, end_time, freq)
# 内存占用:全部数据
for i in range(len(df)):
process_row(df.iloc[i])
使用迭代器(流式处理)
# ✅ 内存高效
async for chunk_df in db.iter_klines_chunked(symbols, start_time, end_time, freq, chunk_size=1000):
# 内存占用:仅当前块
for i in range(len(chunk_df)):
process_row(chunk_df.iloc[i])
高级用法
条件中断迭代
async def find_first_high_volume_symbol():
db = Database("data/database/market.db")
async for symbol, df in db.iter_klines_by_symbol(
symbols,
start_time="2024-01-01",
end_time="2024-12-31",
freq=Freq.h1
):
avg_volume = df["volume"].mean()
# 找到第一个符合条件的交易对就停止
if avg_volume > 1000000:
print(f"找到高成交量交易对: {symbol}")
break
await db.close()
并行处理
import asyncio
async def process_symbol(symbol: str, df):
"""处理单个交易对的数据"""
# 你的处理逻辑
pass
async def parallel_process():
db = Database("data/database/market.db")
tasks = []
# 收集任务
async for symbol, df in db.iter_klines_by_symbol(
symbols,
start_time="2024-01-01",
end_time="2024-12-31",
freq=Freq.h1
):
# 创建并行任务
task = asyncio.create_task(process_symbol(symbol, df))
tasks.append(task)
# 等待所有任务完成
await asyncio.gather(*tasks)
await db.close()
结合上下文管理器
from cryptoservice.models import Freq
from cryptoservice.storage import Database
async def process_with_context():
# 使用 async with 自动管理连接
async with Database("data/database/market.db") as db:
async for symbol in db.iter_symbols(freq=Freq.h1):
print(symbol)
# 数据库会在退出 with 块时自动关闭
完整示例脚本
查看 demo/iterator_examples.py 获取完整的工作示例。
运行示例:
API 参考
iter_symbols(freq: Freq | None = None)
迭代所有交易对符号。
参数:
- freq: 数据频率过滤,None 表示所有频率
返回:异步生成器,每次返回一个交易对符号(字符串)
iter_klines_by_symbol(symbols, start_time, end_time, freq, columns=None)
按交易对迭代K线数据。
参数:
- symbols: 交易对列表
- start_time: 开始时间
- end_time: 结束时间
- freq: 数据频率
- columns: 需要查询的列(可选)
返回:异步生成器,每次返回 (symbol, dataframe) 元组
iter_klines_chunked(symbols, start_time, end_time, freq, chunk_size=10000, columns=None)
分块迭代K线数据。
参数:
- symbols: 交易对列表
- start_time: 开始时间
- end_time: 结束时间
- freq: 数据频率
- chunk_size: 每块的行数(默认10000)
- columns: 需要查询的列(可选)
返回:异步生成器,每次返回一个 DataFrame 块
注意事项
- 异步上下文: 所有迭代器方法必须在异步函数中使用
- 连接管理: 使用
async with或记得调用await db.close() - 内存管理: 对于大数据集,优先使用
iter_klines_chunked() - 性能优化: 合理设置
chunk_size以平衡内存和速度