跳转至

cryptoservice.services.market_service

cryptoservice.services.market_service

市场数据服务模块。

提供加密货币市场数据获取、处理和存储功能。

Classes

RateLimitManager(base_delay: float = 0.5)

API频率限制管理器

Source code in src/cryptoservice/services/market_service.py
def __init__(self, base_delay: float = 0.5):
    self.base_delay = base_delay
    self.current_delay = base_delay
    self.last_request_time = 0.0
    self.request_count = 0
    self.window_start_time = time.time()
    self.consecutive_errors = 0
    self.max_requests_per_minute = 1800  # 保守估计,低于API限制
    self.lock = threading.Lock()
Functions
wait_before_request()

在请求前等待适当的时间

Source code in src/cryptoservice/services/market_service.py
def wait_before_request(self):
    """在请求前等待适当的时间"""
    with self.lock:
        current_time = time.time()

        # 重置计数窗口(每分钟)
        if current_time - self.window_start_time >= 60:
            self.request_count = 0
            self.window_start_time = current_time
            # 如果长时间没有错误,逐渐降低延迟
            if self.consecutive_errors == 0:
                self.current_delay = max(self.base_delay, self.current_delay * 0.9)

                # 检查是否接近频率限制
        requests_this_minute = self.request_count

        if requests_this_minute >= self.max_requests_per_minute * 0.8:  # 达到80%限制时开始减速
            additional_delay = 2.0
            logger.warning(f"⚠️ 接近频率限制,增加延迟: {additional_delay}秒")
        else:
            additional_delay = 0

        # 计算需要等待的时间
        time_since_last = current_time - self.last_request_time
        total_delay = self.current_delay + additional_delay

        if time_since_last < total_delay:
            wait_time = total_delay - time_since_last
            if wait_time > 0.1:  # 只记录较长的等待时间
                logger.debug(f"等待 {wait_time:.2f}秒 (当前延迟: {self.current_delay:.2f}秒)")
            time.sleep(wait_time)

        self.last_request_time = time.time()
        self.request_count += 1
handle_rate_limit_error()

处理频率限制错误

Source code in src/cryptoservice/services/market_service.py
def handle_rate_limit_error(self):
    """处理频率限制错误"""
    with self.lock:
        self.consecutive_errors += 1

        # 动态增加延迟
        if self.consecutive_errors <= 3:
            self.current_delay = min(10.0, self.current_delay * 2)
            wait_time = 60  # 等待1分钟
        elif self.consecutive_errors <= 6:
            self.current_delay = min(15.0, self.current_delay * 1.5)
            wait_time = 120  # 等待2分钟
        else:
            self.current_delay = 20.0
            wait_time = 300  # 等待5分钟

        logger.warning(
            f"🚫 频率限制错误 #{self.consecutive_errors},等待 {wait_time}秒,调整延迟至 {self.current_delay:.2f}秒"
        )

        # 重置请求计数
        self.request_count = 0
        self.window_start_time = time.time()

        return wait_time
handle_success()

处理成功请求

Source code in src/cryptoservice/services/market_service.py
def handle_success(self):
    """处理成功请求"""
    with self.lock:
        if self.consecutive_errors > 0:
            self.consecutive_errors = max(0, self.consecutive_errors - 1)
            if self.consecutive_errors == 0:
                logger.info(f"✅ 恢复正常,当前延迟: {self.current_delay:.2f}秒")

ExponentialBackoff(config: RetryConfig)

指数退避实现

Source code in src/cryptoservice/services/market_service.py
def __init__(self, config: RetryConfig):
    self.config = config
    self.attempt = 0
Functions
reset()

重置重试计数

Source code in src/cryptoservice/services/market_service.py
def reset(self):
    """重置重试计数"""
    self.attempt = 0
wait() -> float

计算并执行等待时间

Source code in src/cryptoservice/services/market_service.py
def wait(self) -> float:
    """计算并执行等待时间"""
    if self.attempt >= self.config.max_retries:
        raise Exception(f"超过最大重试次数: {self.config.max_retries}")

    # 计算基础延迟
    delay = min(
        self.config.base_delay * (self.config.backoff_multiplier**self.attempt),
        self.config.max_delay,
    )

    # 添加抖动以避免惊群效应
    if self.config.jitter:
        delay *= 0.5 + random.random() * 0.5

    self.attempt += 1

    logger.debug(f"指数退避: 第{self.attempt}次重试, 等待{delay:.2f}秒")
    time.sleep(delay)

    return delay

EnhancedErrorHandler

增强错误处理器

Functions
classify_error(error: Exception) -> ErrorSeverity staticmethod

错误分类

Source code in src/cryptoservice/services/market_service.py
@staticmethod
def classify_error(error: Exception) -> ErrorSeverity:
    """错误分类"""
    error_str = str(error).lower()

    # API频率限制
    if any(
        keyword in error_str
        for keyword in [
            "too many requests",
            "rate limit",
            "429",
            "request limit",
            "-1003",
        ]
    ):
        return ErrorSeverity.MEDIUM

    # 网络相关错误
    if any(keyword in error_str for keyword in ["connection", "timeout", "network", "dns", "socket"]):
        return ErrorSeverity.MEDIUM

    # 无效交易对
    if any(keyword in error_str for keyword in ["invalid symbol", "symbol not found", "unknown symbol"]):
        return ErrorSeverity.LOW

    # 服务器错误
    if any(
        keyword in error_str
        for keyword in [
            "500",
            "502",
            "503",
            "504",
            "server error",
            "internal error",
        ]
    ):
        return ErrorSeverity.HIGH

    # 认证错误
    if any(keyword in error_str for keyword in ["unauthorized", "forbidden", "api key", "signature"]):
        return ErrorSeverity.CRITICAL

    # 默认为中等严重性
    return ErrorSeverity.MEDIUM
should_retry(error: Exception, attempt: int, max_retries: int) -> bool staticmethod

判断是否应该重试

Source code in src/cryptoservice/services/market_service.py
@staticmethod
def should_retry(error: Exception, attempt: int, max_retries: int) -> bool:
    """判断是否应该重试"""
    severity = EnhancedErrorHandler.classify_error(error)

    if severity == ErrorSeverity.CRITICAL:
        return False

    if severity == ErrorSeverity.LOW and attempt > 1:
        return False

    return attempt < max_retries

获取推荐处理动作

Source code in src/cryptoservice/services/market_service.py
@staticmethod
def get_recommended_action(error: Exception) -> str:
    """获取推荐处理动作"""
    severity = EnhancedErrorHandler.classify_error(error)
    error_str = str(error).lower()

    if severity == ErrorSeverity.CRITICAL:
        return "检查API密钥和权限设置"
    elif "rate limit" in error_str or "-1003" in error_str:
        return "频率限制,自动调整请求间隔"
    elif "connection" in error_str:
        return "检查网络连接,考虑使用代理"
    elif "invalid symbol" in error_str:
        return "验证交易对是否存在和可交易"
    else:
        return "检查API文档和错误详情"
is_rate_limit_error(error: Exception) -> bool staticmethod

判断是否为频率限制错误

Source code in src/cryptoservice/services/market_service.py
@staticmethod
def is_rate_limit_error(error: Exception) -> bool:
    """判断是否为频率限制错误"""
    error_str = str(error).lower()
    return any(keyword in error_str for keyword in ["too many requests", "rate limit", "429", "-1003"])

MarketDataService(api_key: str, api_secret: str)

Bases: IMarketDataService

市场数据服务实现类。

初始化市场数据服务。

PARAMETER DESCRIPTION
api_key

用户API密钥

TYPE: str

api_secret

用户API密钥

TYPE: str

Source code in src/cryptoservice/services/market_service.py
def __init__(self, api_key: str, api_secret: str) -> None:
    """初始化市场数据服务。

    Args:
        api_key: 用户API密钥
        api_secret: 用户API密钥
    """
    self.client = BinanceClientFactory.create_client(api_key, api_secret)
    self.converter = DataConverter()
    self.db: MarketDB | None = None
    self.rate_limit_manager = RateLimitManager()
Functions
get_symbol_ticker(symbol: str | None = None) -> SymbolTicker | list[SymbolTicker]

获取单个或所有交易对的行情数据。

PARAMETER DESCRIPTION
symbol

交易对名称

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
SymbolTicker | list[SymbolTicker]

SymbolTicker | list[SymbolTicker]: 单个交易对的行情数据或所有交易对的行情数据

Source code in src/cryptoservice/services/market_service.py
def get_symbol_ticker(self, symbol: str | None = None) -> SymbolTicker | list[SymbolTicker]:
    """获取单个或所有交易对的行情数据。

    Args:
        symbol: 交易对名称

    Returns:
        SymbolTicker | list[SymbolTicker]: 单个交易对的行情数据或所有交易对的行情数据
    """
    try:
        ticker = self.client.get_symbol_ticker(symbol=symbol)
        if not ticker:
            raise InvalidSymbolError(f"Invalid symbol: {symbol}")

        if isinstance(ticker, list):
            return [SymbolTicker.from_binance_ticker(t) for t in ticker]
        return SymbolTicker.from_binance_ticker(ticker)

    except Exception as e:
        logger.error(f"[red]Error fetching ticker for {symbol}: {e}[/red]")
        raise MarketDataFetchError(f"Failed to fetch ticker: {e}") from e
get_perpetual_symbols(only_trading: bool = True, quote_asset: str = 'USDT') -> list[str]

获取当前市场上所有永续合约交易对。

PARAMETER DESCRIPTION
only_trading

是否只返回当前可交易的交易对

TYPE: bool DEFAULT: True

quote_asset

基准资产,默认为USDT,只返回以该资产结尾的交易对

TYPE: str DEFAULT: 'USDT'

RETURNS DESCRIPTION
list[str]

list[str]: 永续合约交易对列表

Source code in src/cryptoservice/services/market_service.py
def get_perpetual_symbols(self, only_trading: bool = True, quote_asset: str = "USDT") -> list[str]:
    """获取当前市场上所有永续合约交易对。

    Args:
        only_trading: 是否只返回当前可交易的交易对
        quote_asset: 基准资产,默认为USDT,只返回以该资产结尾的交易对

    Returns:
        list[str]: 永续合约交易对列表
    """
    try:
        logger.info(f"获取当前永续合约交易对列表(筛选条件:{quote_asset}结尾)")
        futures_info = self.client.futures_exchange_info()
        perpetual_symbols = [
            symbol["symbol"]
            for symbol in futures_info["symbols"]
            if symbol["contractType"] == "PERPETUAL"
            and (not only_trading or symbol["status"] == "TRADING")
            and symbol["symbol"].endswith(quote_asset)
        ]

        logger.info(f"找到 {len(perpetual_symbols)}{quote_asset}永续合约交易对")
        return perpetual_symbols

    except Exception as e:
        logger.error(f"[red]获取永续合约交易对失败: {e}[/red]")
        raise MarketDataFetchError(f"获取永续合约交易对失败: {e}") from e
check_symbol_exists_on_date(symbol: str, date: str) -> bool

检查指定日期是否存在该交易对。

PARAMETER DESCRIPTION
symbol

交易对名称

TYPE: str

date

日期,格式为 'YYYY-MM-DD'

TYPE: str

RETURNS DESCRIPTION
bool

是否存在该交易对

TYPE: bool

Source code in src/cryptoservice/services/market_service.py
def check_symbol_exists_on_date(self, symbol: str, date: str) -> bool:
    """检查指定日期是否存在该交易对。

    Args:
        symbol: 交易对名称
        date: 日期,格式为 'YYYY-MM-DD'

    Returns:
        bool: 是否存在该交易对
    """
    try:
        # 将日期转换为时间戳范围
        start_time, end_time = self._date_to_timestamp_range(date)

        # 尝试获取该时间范围内的K线数据
        klines = self.client.futures_klines(
            symbol=symbol,
            interval="1d",
            startTime=start_time,
            endTime=end_time,
            limit=1,
        )

        # 如果有数据,说明该日期存在该交易对
        return bool(klines and len(klines) > 0)

    except Exception as e:
        logger.debug(f"检查交易对 {symbol}{date} 是否存在时出错: {e}")
        return False
get_top_coins(limit: int = settings.DEFAULT_LIMIT, sort_by: SortBy = SortBy.QUOTE_VOLUME, quote_asset: str | None = None) -> list[DailyMarketTicker]

获取前N个交易对。

PARAMETER DESCRIPTION
limit

数量

TYPE: int DEFAULT: DEFAULT_LIMIT

sort_by

排序方式

TYPE: SortBy DEFAULT: QUOTE_VOLUME

quote_asset

基准资产

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
list[DailyMarketTicker]

list[DailyMarketTicker]: 前N个交易对

Source code in src/cryptoservice/services/market_service.py
def get_top_coins(
    self,
    limit: int = settings.DEFAULT_LIMIT,
    sort_by: SortBy = SortBy.QUOTE_VOLUME,
    quote_asset: str | None = None,
) -> list[DailyMarketTicker]:
    """获取前N个交易对。

    Args:
        limit: 数量
        sort_by: 排序方式
        quote_asset: 基准资产

    Returns:
        list[DailyMarketTicker]: 前N个交易对
    """
    try:
        tickers = self.client.get_ticker()
        market_tickers = [DailyMarketTicker.from_binance_ticker(t) for t in tickers]

        if quote_asset:
            market_tickers = [t for t in market_tickers if t.symbol.endswith(quote_asset)]

        return sorted(
            market_tickers,
            key=lambda x: getattr(x, sort_by.value),
            reverse=True,
        )[:limit]

    except Exception as e:
        logger.error(f"[red]Error getting top coins: {e}[/red]")
        raise MarketDataFetchError(f"Failed to get top coins: {e}") from e
get_market_summary(interval: Freq = Freq.d1) -> dict[str, Any]

获取市场概览。

PARAMETER DESCRIPTION
interval

时间间隔

TYPE: Freq DEFAULT: d1

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: 市场概览

Source code in src/cryptoservice/services/market_service.py
def get_market_summary(self, interval: Freq = Freq.d1) -> dict[str, Any]:
    """获取市场概览。

    Args:
        interval: 时间间隔

    Returns:
        dict[str, Any]: 市场概览
    """
    try:
        summary: dict[str, Any] = {"snapshot_time": datetime.now(), "data": {}}
        tickers_result = self.get_symbol_ticker()
        if isinstance(tickers_result, list):
            tickers = [ticker.to_dict() for ticker in tickers_result]
        else:
            tickers = [tickers_result.to_dict()]
        summary["data"] = tickers

        return summary

    except Exception as e:
        logger.error(f"[red]Error getting market summary: {e}[/red]")
        raise MarketDataFetchError(f"Failed to get market summary: {e}") from e
get_historical_klines(symbol: str, start_time: str | datetime, end_time: str | datetime | None = None, interval: Freq = Freq.h1, klines_type: HistoricalKlinesType = HistoricalKlinesType.SPOT) -> list[KlineMarketTicker]

获取历史行情数据。

PARAMETER DESCRIPTION
symbol

交易对名称

TYPE: str

start_time

开始时间

TYPE: str | datetime

end_time

结束时间,如果为None则为当前时间

TYPE: str | datetime | None DEFAULT: None

interval

时间间隔

TYPE: Freq DEFAULT: h1

klines_type

K线类型(现货或期货)

TYPE: HistoricalKlinesType DEFAULT: SPOT

RETURNS DESCRIPTION
list[KlineMarketTicker]

list[KlineMarketTicker]: 历史行情数据

Source code in src/cryptoservice/services/market_service.py
def get_historical_klines(
    self,
    symbol: str,
    start_time: str | datetime,
    end_time: str | datetime | None = None,
    interval: Freq = Freq.h1,
    klines_type: HistoricalKlinesType = HistoricalKlinesType.SPOT,
) -> list[KlineMarketTicker]:
    """获取历史行情数据。

    Args:
        symbol: 交易对名称
        start_time: 开始时间
        end_time: 结束时间,如果为None则为当前时间
        interval: 时间间隔
        klines_type: K线类型(现货或期货)

    Returns:
        list[KlineMarketTicker]: 历史行情数据
    """
    try:
        # 处理时间格式
        if isinstance(start_time, str):
            start_time = datetime.fromisoformat(start_time)
        if end_time is None:
            end_time = datetime.now()
        elif isinstance(end_time, str):
            end_time = datetime.fromisoformat(end_time)

        # 转换为时间戳
        start_ts = self._date_to_timestamp_start(start_time.strftime("%Y-%m-%d"))
        end_ts = self._date_to_timestamp_end(end_time.strftime("%Y-%m-%d"))

        logger.info(f"获取 {symbol} 的历史数据 ({interval.value})")

        # 根据klines_type选择API
        if klines_type == HistoricalKlinesType.FUTURES:
            klines = self.client.futures_klines(
                symbol=symbol,
                interval=interval.value,
                startTime=start_ts,
                endTime=end_ts,
                limit=1500,
            )
        else:  # SPOT
            klines = self.client.get_klines(
                symbol=symbol,
                interval=interval.value,
                startTime=start_ts,
                endTime=end_ts,
                limit=1500,
            )

        data = list(klines)
        if not data:
            logger.warning(f"未找到交易对 {symbol} 在指定时间段内的数据")
            return []

        # 转换为KlineMarketTicker对象
        return [
            KlineMarketTicker(
                symbol=symbol,
                last_price=Decimal(str(kline[4])),  # 收盘价作为最新价格
                open_price=Decimal(str(kline[1])),
                high_price=Decimal(str(kline[2])),
                low_price=Decimal(str(kline[3])),
                volume=Decimal(str(kline[5])),
                close_time=kline[6],
            )
            for kline in data
        ]

    except Exception as e:
        logger.error(f"[red]Error getting historical data for {symbol}: {e}[/red]")
        raise MarketDataFetchError(f"Failed to get historical data: {e}") from e
get_perpetual_data(symbols: list[str], start_time: str, db_path: Path | str, end_time: str | None = None, interval: Freq = Freq.h1, max_workers: int = 5, max_retries: int = 3, progress: Progress | None = None, request_delay: float = 0.5, retry_config: Optional[RetryConfig] = None, enable_integrity_check: bool = True) -> IntegrityReport

获取永续合约数据并存储 (增强版).

PARAMETER DESCRIPTION
symbols

交易对列表

TYPE: list[str]

start_time

开始时间 (YYYY-MM-DD)

TYPE: str

db_path

数据库文件路径 (必须指定,如: /path/to/market.db)

TYPE: Path | str

end_time

结束时间 (YYYY-MM-DD)

TYPE: str | None DEFAULT: None

interval

时间间隔

TYPE: Freq DEFAULT: h1

max_workers

最大线程数

TYPE: int DEFAULT: 5

max_retries

最大重试次数

TYPE: int DEFAULT: 3

retry_config

重试配置

TYPE: Optional[RetryConfig] DEFAULT: None

progress

进度显示器

TYPE: Progress | None DEFAULT: None

enable_integrity_check

是否启用完整性检查

TYPE: bool DEFAULT: True

request_delay

每次请求间隔(秒),默认0.5秒

TYPE: float DEFAULT: 0.5

RETURNS DESCRIPTION
IntegrityReport

数据完整性报告

TYPE: IntegrityReport

Source code in src/cryptoservice/services/market_service.py
def get_perpetual_data(
    self,
    symbols: list[str],
    start_time: str,
    db_path: Path | str,
    end_time: str | None = None,
    interval: Freq = Freq.h1,
    max_workers: int = 5,
    max_retries: int = 3,
    progress: Progress | None = None,
    request_delay: float = 0.5,
    # 额外参数,保持向后兼容
    retry_config: Optional[RetryConfig] = None,
    enable_integrity_check: bool = True,
) -> IntegrityReport:
    """获取永续合约数据并存储 (增强版).

    Args:
        symbols: 交易对列表
        start_time: 开始时间 (YYYY-MM-DD)
        db_path: 数据库文件路径 (必须指定,如: /path/to/market.db)
        end_time: 结束时间 (YYYY-MM-DD)
        interval: 时间间隔
        max_workers: 最大线程数
        max_retries: 最大重试次数
        retry_config: 重试配置
        progress: 进度显示器
        enable_integrity_check: 是否启用完整性检查
        request_delay: 每次请求间隔(秒),默认0.5秒

    Returns:
        IntegrityReport: 数据完整性报告
    """
    if retry_config is None:
        retry_config = RetryConfig(max_retries=max_retries)

    # 初始化结果统计
    successful_symbols = []
    failed_symbols = []
    missing_periods = []

    try:
        if not symbols:
            raise ValueError("Symbols list cannot be empty")

        # 验证并准备数据库文件路径
        db_file_path = self._validate_and_prepare_path(db_path, is_file=True)
        end_time = end_time or datetime.now().strftime("%Y-%m-%d")

        # 将日期字符串转换为时间戳
        start_ts = self._date_to_timestamp_start(start_time)
        end_ts = self._date_to_timestamp_end(end_time)

        # 初始化数据库连接
        if self.db is None:
            self.db = MarketDB(str(db_file_path))

        # 重新初始化频率限制管理器,使用用户指定的基础延迟
        self.rate_limit_manager = RateLimitManager(base_delay=request_delay)

        logger.info(f"🚀 开始下载 {len(symbols)} 个交易对的数据")
        logger.info(f"📅 时间范围: {start_time}{end_time}")
        logger.info(f"⚙️ 重试配置: 最大{retry_config.max_retries}次, 基础延迟{retry_config.base_delay}秒")
        logger.info(f"⏱️ 智能频率控制: 基础延迟{request_delay}秒,动态调整")

        # 创建进度跟踪
        if progress is None:
            progress = Progress(
                SpinnerColumn(),
                TextColumn("[progress.description]{task.description}"),
                BarColumn(),
                TimeElapsedColumn(),
            )

        def process_symbol(symbol: str) -> Dict[str, Any]:
            """处理单个交易对的数据获取 (增强版)"""
            result = {
                "symbol": symbol,
                "success": False,
                "records": 0,
                "error": None,
            }

            try:
                data = self._fetch_symbol_data(
                    symbol=symbol,
                    start_ts=start_ts,
                    end_ts=end_ts,
                    interval=interval,
                    retry_config=retry_config,
                )

                if data:
                    if self.db is None:
                        raise MarketDataFetchError("Database is not initialized")

                    self.db.store_data(data, interval)
                    result.update(
                        {
                            "success": True,
                            "records": len(data),
                            "time_range": f"{data[0].open_time} - {data[-1].open_time}",
                        }
                    )
                    logger.debug(f"✅ {symbol}: {len(data)} 条记录")
                    successful_symbols.append(symbol)
                else:
                    result["error"] = "无数据"
                    logger.debug(f"⚠️ {symbol}: 无数据")
                    missing_periods.append(
                        {
                            "symbol": symbol,
                            "period": f"{start_time} - {end_time}",
                            "reason": "no_data",
                        }
                    )

            except InvalidSymbolError as e:
                result["error"] = f"无效交易对: {e}"
                logger.warning(f"⚠️ 跳过无效交易对 {symbol}")
                failed_symbols.append(symbol)

            except Exception as e:
                result["error"] = str(e)
                logger.error(f"❌ {symbol} 失败: {e}")
                failed_symbols.append(symbol)
                missing_periods.append(
                    {
                        "symbol": symbol,
                        "period": f"{start_time} - {end_time}",
                        "reason": str(e),
                    }
                )

            return result

        # 执行并行下载
        results = []
        with progress if progress is not None else nullcontext():
            overall_task = progress.add_task("[cyan]下载交易对数据", total=len(symbols)) if progress else None

            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = [executor.submit(process_symbol, symbol) for symbol in symbols]

                for future in as_completed(futures):
                    try:
                        result = future.result()
                        results.append(result)

                        if progress and overall_task is not None:
                            progress.update(overall_task, advance=1)

                    except Exception as e:
                        logger.error(f"❌ 处理异常: {e}")

        # 生成统计报告
        total_records = sum(r.get("records", 0) for r in results)
        success_rate = len(successful_symbols) / len(symbols) if symbols else 0

        logger.info("📊 下载完成统计:")
        logger.info(f"   ✅ 成功: {len(successful_symbols)}/{len(symbols)} ({success_rate:.1%})")
        logger.info(f"   ❌ 失败: {len(failed_symbols)} 个")
        logger.info(f"   📈 总记录数: {total_records:,} 条")
        logger.info(f"   💾 数据库: {db_file_path}")

        # 执行完整性检查
        if enable_integrity_check and self.db:
            integrity_report = self._create_integrity_report(
                symbols=symbols,
                successful_symbols=successful_symbols,
                failed_symbols=failed_symbols,
                missing_periods=missing_periods,
                start_time=start_time,
                end_time=end_time,
                interval=interval,
                db_file_path=db_file_path,
            )
        else:
            # 生成基础报告
            data_quality_score = len(successful_symbols) / len(symbols) if symbols else 0
            recommendations = []
            if data_quality_score < 0.8:
                recommendations.append("数据成功率较低,建议检查网络和API配置")
            if failed_symbols:
                recommendations.append(f"有{len(failed_symbols)}个交易对下载失败,建议单独重试")

            integrity_report = IntegrityReport(
                total_symbols=len(symbols),
                successful_symbols=len(successful_symbols),
                failed_symbols=failed_symbols,
                missing_periods=missing_periods,
                data_quality_score=data_quality_score,
                recommendations=recommendations,
            )

        return integrity_report

    except Exception as e:
        logger.error(f"❌ 数据下载失败: {e}")
        # 即使失败也要返回报告
        return IntegrityReport(
            total_symbols=len(symbols),
            successful_symbols=len(successful_symbols),
            failed_symbols=failed_symbols,
            missing_periods=missing_periods,
            data_quality_score=0.0,
            recommendations=[f"下载失败: {e}", "检查网络连接和API配置"],
        )
define_universe(start_date: str, end_date: str, t1_months: int, t2_months: int, t3_months: int, output_path: Path | str, top_k: int | None = None, top_ratio: float | None = None, description: str | None = None, delay_days: int = 7, api_delay_seconds: float = 1.0, batch_delay_seconds: float = 3.0, batch_size: int = 5, quote_asset: str = 'USDT') -> UniverseDefinition

定义universe并保存到文件.

PARAMETER DESCRIPTION
start_date

开始日期 (YYYY-MM-DD 或 YYYYMMDD)

TYPE: str

end_date

结束日期 (YYYY-MM-DD 或 YYYYMMDD)

TYPE: str

t1_months

T1时间窗口(月),用于计算mean daily amount

TYPE: int

t2_months

T2滚动频率(月),universe重新选择的频率

TYPE: int

t3_months

T3合约最小创建时间(月),用于筛除新合约

TYPE: int

output_path

universe输出文件路径 (必须指定)

TYPE: Path | str

top_k

选取的top合约数量 (与 top_ratio 二选一)

TYPE: int | None DEFAULT: None

top_ratio

选取的top合约比率 (与 top_k 二选一)

TYPE: float | None DEFAULT: None

description

描述信息

TYPE: str | None DEFAULT: None

delay_days

在重新平衡日期前额外往前推的天数,默认7天

TYPE: int DEFAULT: 7

api_delay_seconds

每个API请求之间的延迟秒数,默认1.0秒

TYPE: float DEFAULT: 1.0

batch_delay_seconds

每批次请求之间的延迟秒数,默认3.0秒

TYPE: float DEFAULT: 3.0

batch_size

每批次的请求数量,默认5个

TYPE: int DEFAULT: 5

quote_asset

基准资产,默认为USDT,只筛选以该资产结尾的交易对

TYPE: str DEFAULT: 'USDT'

RETURNS DESCRIPTION
UniverseDefinition

定义的universe

TYPE: UniverseDefinition

Source code in src/cryptoservice/services/market_service.py
def define_universe(
    self,
    start_date: str,
    end_date: str,
    t1_months: int,
    t2_months: int,
    t3_months: int,
    output_path: Path | str,
    top_k: int | None = None,
    top_ratio: float | None = None,
    description: str | None = None,
    delay_days: int = 7,
    api_delay_seconds: float = 1.0,
    batch_delay_seconds: float = 3.0,
    batch_size: int = 5,
    quote_asset: str = "USDT",
) -> UniverseDefinition:
    """定义universe并保存到文件.

    Args:
        start_date: 开始日期 (YYYY-MM-DD 或 YYYYMMDD)
        end_date: 结束日期 (YYYY-MM-DD 或 YYYYMMDD)
        t1_months: T1时间窗口(月),用于计算mean daily amount
        t2_months: T2滚动频率(月),universe重新选择的频率
        t3_months: T3合约最小创建时间(月),用于筛除新合约
        output_path: universe输出文件路径 (必须指定)
        top_k: 选取的top合约数量 (与 top_ratio 二选一)
        top_ratio: 选取的top合约比率 (与 top_k 二选一)
        description: 描述信息
        delay_days: 在重新平衡日期前额外往前推的天数,默认7天
        api_delay_seconds: 每个API请求之间的延迟秒数,默认1.0秒
        batch_delay_seconds: 每批次请求之间的延迟秒数,默认3.0秒
        batch_size: 每批次的请求数量,默认5个
        quote_asset: 基准资产,默认为USDT,只筛选以该资产结尾的交易对

    Returns:
        UniverseDefinition: 定义的universe
    """
    try:
        # 验证并准备输出路径
        output_path_obj = self._validate_and_prepare_path(
            output_path,
            is_file=True,
            file_name=(
                f"universe_{start_date}_{end_date}_{t1_months}_{t2_months}_{t3_months}_{top_k or top_ratio}.json"
            ),
        )

        # 标准化日期格式
        start_date = self._standardize_date_format(start_date)
        end_date = self._standardize_date_format(end_date)

        # 创建配置
        config = UniverseConfig(
            start_date=start_date,
            end_date=end_date,
            t1_months=t1_months,
            t2_months=t2_months,
            t3_months=t3_months,
            delay_days=delay_days,
            quote_asset=quote_asset,
            top_k=top_k,
            top_ratio=top_ratio,
        )

        logger.info(f"开始定义universe: {start_date}{end_date}")
        log_selection_criteria = f"Top-K={top_k}" if top_k else f"Top-Ratio={top_ratio}"
        logger.info(f"参数: T1={t1_months}月, T2={t2_months}月, T3={t3_months}月, {log_selection_criteria}")

        # 生成重新选择日期序列 (每T2个月)
        # 从起始日期开始,每隔T2个月生成重平衡日期,表示universe重新选择的时间点
        rebalance_dates = self._generate_rebalance_dates(start_date, end_date, t2_months)

        logger.info("重平衡计划:")
        logger.info(f"  - 时间范围: {start_date}{end_date}")
        logger.info(f"  - 重平衡间隔: 每{t2_months}个月")
        logger.info(f"  - 数据延迟: {delay_days}天")
        logger.info(f"  - T1数据窗口: {t1_months}个月")
        logger.info(f"  - 重平衡日期: {rebalance_dates}")

        if not rebalance_dates:
            raise ValueError("无法生成重平衡日期,请检查时间范围和T2参数")

        # 收集所有周期的snapshots
        all_snapshots = []

        # 在每个重新选择日期计算universe
        for i, rebalance_date in enumerate(rebalance_dates):
            logger.info(f"处理日期 {i + 1}/{len(rebalance_dates)}: {rebalance_date}")

            # 计算基准日期(重新平衡日期前delay_days天)
            base_date = pd.to_datetime(rebalance_date) - timedelta(days=delay_days)
            calculated_t1_end = base_date.strftime("%Y-%m-%d")

            # 计算T1回看期间的开始日期(从base_date往前推T1个月)
            calculated_t1_start = self._subtract_months(calculated_t1_end, t1_months)

            logger.info(
                f"周期 {i + 1}: 基准日期={calculated_t1_end} (重新平衡日期前{delay_days}天), "
                f"T1数据期间={calculated_t1_start}{calculated_t1_end}"
            )

            # 获取符合条件的交易对和它们的mean daily amount
            universe_symbols, mean_amounts = self._calculate_universe_for_date(
                calculated_t1_start,
                calculated_t1_end,
                t3_months=t3_months,
                top_k=top_k,
                top_ratio=top_ratio,
                api_delay_seconds=api_delay_seconds,
                batch_delay_seconds=batch_delay_seconds,
                batch_size=batch_size,
                quote_asset=quote_asset,
            )

            # 创建该周期的snapshot
            snapshot = UniverseSnapshot.create_with_dates_and_timestamps(
                usage_t1_start=rebalance_date,  # 实际使用开始日期
                usage_t1_end=min(
                    end_date,
                    (pd.to_datetime(rebalance_date) + pd.DateOffset(months=t1_months)).strftime("%Y-%m-%d"),
                ),  # 实际使用结束日期
                calculated_t1_start=calculated_t1_start,  # 计算周期开始日期
                calculated_t1_end=calculated_t1_end,  # 计算周期结束日期(基准日期)
                symbols=universe_symbols,
                mean_daily_amounts=mean_amounts,
                metadata={
                    "calculated_t1_start": calculated_t1_start,
                    "calculated_t1_end": calculated_t1_end,
                    "delay_days": delay_days,
                    "quote_asset": quote_asset,
                    "selected_symbols_count": len(universe_symbols),
                },
            )

            all_snapshots.append(snapshot)

            logger.info(f"✅ 日期 {rebalance_date}: 选择了 {len(universe_symbols)} 个交易对")

        # 创建完整的universe定义
        universe_def = UniverseDefinition(
            config=config,
            snapshots=all_snapshots,
            creation_time=datetime.now(),
            description=description,
        )

        # 保存汇总的universe定义
        universe_def.save_to_file(output_path_obj)

        logger.info("🎉 Universe定义完成!")
        logger.info(f"📁 包含 {len(all_snapshots)} 个重新平衡周期")
        logger.info(f"📋 汇总文件: {output_path_obj}")

        return universe_def

    except Exception as e:
        logger.error(f"[red]定义universe失败: {e}[/red]")
        raise MarketDataFetchError(f"定义universe失败: {e}") from e
download_universe_data(universe_file: Path | str, db_path: Path | str, data_path: Path | str | None = None, interval: Freq = Freq.m1, max_workers: int = 4, max_retries: int = 3, include_buffer_days: int = 7, retry_config: RetryConfig | None = None, request_delay: float = 0.5) -> None

按周期分别下载universe数据(更精确的下载方式)。

这种方式为每个重平衡周期单独下载数据,可以避免下载不必要的数据。

PARAMETER DESCRIPTION
universe_file

universe定义文件路径 (必须指定)

TYPE: Path | str

db_path

数据库文件路径 (必须指定,如: /path/to/market.db)

TYPE: Path | str

data_path

数据文件存储路径 (可选,用于存储其他数据文件)

TYPE: Path | str | None DEFAULT: None

interval

数据频率

TYPE: Freq DEFAULT: m1

max_workers

并发线程数

TYPE: int DEFAULT: 4

max_retries

最大重试次数

TYPE: int DEFAULT: 3

include_buffer_days

缓冲天数

TYPE: int DEFAULT: 7

request_delay

每次请求间隔(秒),默认0.5秒

TYPE: float DEFAULT: 0.5

Source code in src/cryptoservice/services/market_service.py
def download_universe_data(
    self,
    universe_file: Path | str,
    db_path: Path | str,
    data_path: Path | str | None = None,
    interval: Freq = Freq.m1,
    max_workers: int = 4,
    max_retries: int = 3,
    include_buffer_days: int = 7,
    retry_config: RetryConfig | None = None,
    request_delay: float = 0.5,  # 请求间隔(秒)
) -> None:
    """按周期分别下载universe数据(更精确的下载方式)。

    这种方式为每个重平衡周期单独下载数据,可以避免下载不必要的数据。

    Args:
        universe_file: universe定义文件路径 (必须指定)
        db_path: 数据库文件路径 (必须指定,如: /path/to/market.db)
        data_path: 数据文件存储路径 (可选,用于存储其他数据文件)
        interval: 数据频率
        max_workers: 并发线程数
        max_retries: 最大重试次数
        include_buffer_days: 缓冲天数
        request_delay: 每次请求间隔(秒),默认0.5秒
    """
    try:
        # 验证路径
        universe_file_obj = self._validate_and_prepare_path(universe_file, is_file=True)
        db_file_path = self._validate_and_prepare_path(db_path, is_file=True)

        # data_path是可选的,如果提供则验证
        data_path_obj = None
        if data_path:
            data_path_obj = self._validate_and_prepare_path(data_path, is_file=False)

        # 检查universe文件是否存在
        if not universe_file_obj.exists():
            raise FileNotFoundError(f"Universe文件不存在: {universe_file_obj}")

        # 加载universe定义
        universe_def = UniverseDefinition.load_from_file(universe_file_obj)

        logger.info("📊 按周期下载数据:")
        logger.info(f"   - 总快照数: {len(universe_def.snapshots)}")
        logger.info(f"   - 数据频率: {interval.value}")
        logger.info(f"   - 并发线程: {max_workers}")
        logger.info(f"   - 请求间隔: {request_delay}秒")
        logger.info(f"   - 数据库路径: {db_file_path}")
        if data_path_obj:
            logger.info(f"   - 数据文件路径: {data_path_obj}")

        # 为每个周期单独下载数据
        for i, snapshot in enumerate(universe_def.snapshots):
            logger.info(f"📅 处理快照 {i + 1}/{len(universe_def.snapshots)}: {snapshot.effective_date}")

            logger.info(f"   - 交易对数量: {len(snapshot.symbols)}")
            logger.info(
                f"   - 计算期间: {snapshot.calculated_t1_start}{snapshot.calculated_t1_end} (定义universe)"
            )
            logger.info(f"   - 使用期间: {snapshot.start_date}{snapshot.end_date} (实际使用)")
            logger.info(
                f"   - 下载范围: {snapshot.start_date}{snapshot.end_date} (含{include_buffer_days}天缓冲)"
            )

            # 下载该周期的使用期间数据
            self.get_perpetual_data(
                symbols=snapshot.symbols,
                start_time=snapshot.start_date,
                end_time=snapshot.end_date,
                db_path=db_file_path,
                interval=interval,
                max_workers=max_workers,
                max_retries=max_retries,
                retry_config=retry_config,
                enable_integrity_check=True,
                request_delay=request_delay,
            )

            logger.info(f"   ✅ 快照 {snapshot.effective_date} 下载完成")

        logger.info("🎉 所有universe数据下载完成!")
        logger.info(f"📁 数据已保存到: {db_file_path}")

    except Exception as e:
        logger.error(f"[red]按周期下载universe数据失败: {e}[/red]")
        raise MarketDataFetchError(f"按周期下载universe数据失败: {e}") from e