""" Beast Trader API — 市场结构、信号诊断、交易监测数据 """ import os import time import hmac import hashlib import json import sqlite3 import ccxt import pandas as pd import numpy as np from datetime import datetime, timezone, timedelta from typing import Optional from fastapi import FastAPI, HTTPException, Request, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from pydantic import BaseModel import uvicorn from indicators import compute_all_indicators # ============================================= # 配置 # ============================================= API_KEY = os.environ.get("BEAST_API_KEY", "beast2025") BINANCE_RATE_LIMIT_DELAY = 1.0 # 秒 DATA_CACHE_SECONDS = 30 # 缓存时间 # Freqtrade 数据库路径(只读) FREQTRADE_DB = os.environ.get( "FREQTRADE_DB", os.path.expanduser("~/freqtrade/user_data/tradesv3.sqlite") ) INITIAL_WALLET = float(os.environ.get("INITIAL_WALLET", 10000)) app = FastAPI(title="Beast Trader API", version="1.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================= # 缓存 # ============================================= _cache = {"data": None, "timestamp": 0} # ============================================= # 认证中间件 # ============================================= @app.middleware("http") async def auth_middleware(request: Request, call_next): if request.method == "OPTIONS" or request.url.path in ["/api/health", "/", "/index.html"]: return await call_next(request) auth = request.headers.get("X-API-Key") if not auth or auth != API_KEY: raise HTTPException(status_code=401, detail="Invalid API Key") return await call_next(request) # ============================================= # 数据获取 # ============================================= def fetch_binance_data(): """获取 BTC 和 ETH 的 D1/4H/1H K线数据 + 实时ticker""" exchange = ccxt.binance({ "enableRateLimit": True, }) now = exchange.milliseconds() since_1d = now - 180 * 24 * 60 * 60 * 1000 # 180天 D1 since_4h = now - 60 * 24 * 60 * 60 * 1000 # 60天 4H since_1h = now - 14 * 24 * 60 * 60 * 1000 # 14天 1H result = {} for symbol in ["ETH/USDT", "BTC/USDT"]: try: ohlcv_1d = exchange.fetch_ohlcv(symbol, "1d", since_1d, 200) ohlcv_4h = exchange.fetch_ohlcv(symbol, "4h", since_4h, 400) ohlcv_1h = exchange.fetch_ohlcv(symbol, "1h", since_1h, 400) ticker = exchange.fetch_ticker(symbol) df_1d = pd.DataFrame(ohlcv_1d, columns=["timestamp", "open", "high", "low", "close", "volume"]) df_1d["timestamp"] = pd.to_datetime(df_1d["timestamp"], unit="ms") df_1d.set_index("timestamp", inplace=True) df_4h = pd.DataFrame(ohlcv_4h, columns=["timestamp", "open", "high", "low", "close", "volume"]) df_4h["timestamp"] = pd.to_datetime(df_4h["timestamp"], unit="ms") df_4h.set_index("timestamp", inplace=True) df_1h = pd.DataFrame(ohlcv_1h, columns=["timestamp", "open", "high", "low", "close", "volume"]) df_1h["timestamp"] = pd.to_datetime(df_1h["timestamp"], unit="ms") df_1h.set_index("timestamp", inplace=True) result[symbol] = {"1d": df_1d, "4h": df_4h, "1h": df_1h, "ticker": ticker} time.sleep(0.5) # rate limit except Exception as e: print(f"Error fetching {symbol}: {e}") return result def get_cached_data(): """获取缓存数据或刷新""" now = time.time() if now - _cache["timestamp"] < DATA_CACHE_SECONDS and _cache["data"] is not None: return _cache["data"] raw = fetch_binance_data() result = {} for symbol, frames in raw.items(): try: computed = compute_all_indicators(frames["1h"], frames["4h"], frames["1d"]) # 用实时ticker覆盖current_price(指标里用的是1H open,不实时) price = frames.get("ticker", {}).get("last") if price is not None: computed["current_price"] = round(float(price), 2) result[symbol] = computed # 附加最近K线数据用于图表 result[symbol]["price_history"] = { "1h": { "timestamps": frames["1h"].index[-48:].strftime("%m-%d %H:%M").tolist(), "close": [round(float(x), 2) for x in frames["1h"]["close"].iloc[-48:].tolist()], } } except Exception as e: print(f"Error computing {symbol}: {e}") result[symbol] = {"error": str(e)} _cache["data"] = result _cache["timestamp"] = now return result # ============================================= # API 路由 # ============================================= @app.get("/api/health") def health(): return {"status": "ok", "time": datetime.now(timezone.utc).isoformat()} @app.get("/api/market-structure") def market_structure(): """ 市场结构分析:S/R 位、Swing Point、趋势方向、供需区 """ data = get_cached_data() eth = data.get("ETH/USDT", {}) btc = data.get("BTC/USDT", {}) if "error" in eth: raise HTTPException(status_code=500, detail=eth["error"]) return { "eth": { "current_price": eth.get("current_price"), "support": eth.get("diagnosis", {}).get("support"), "resistance": eth.get("diagnosis", {}).get("resistance"), "zone_width_pct": eth.get("diagnosis", {}).get("zone_width_pct"), "price_position_pct": eth.get("diagnosis", {}).get("price_position_in_zone"), "trend_1d": eth.get("trend_1d"), "trend_strength_4h": eth.get("trend_strength_4h"), "swing_points": eth.get("swing_points"), "price_history_1h": eth.get("price_history", {}).get("1h"), }, "btc": { "current_price": btc.get("current_price") if "error" not in btc else None, "support_4h": btc.get("diagnosis", {}).get("support_4h"), "resistance_4h": btc.get("diagnosis", {}).get("resistance_4h"), "trend_1d": btc.get("trend_1d"), }, "cached_at": datetime.now(timezone.utc).isoformat(), } @app.get("/api/signal-diagnosis") def signal_diagnosis(): """ 信号诊断:8 层过滤条件逐一检查 """ data = get_cached_data() eth = data.get("ETH/USDT", {}) btc = data.get("BTC/USDT", {}) if "error" in eth: raise HTTPException(status_code=500, detail=eth["error"]) return { "eth": { "can_enter_long": eth.get("can_enter_long"), "can_enter_short": eth.get("can_enter_short"), "diagnosis": eth.get("diagnosis"), "filters": eth.get("filters"), "candle_1h": eth.get("candle_1h"), "trend_strength_4h": eth.get("trend_strength_4h"), }, "btc": { "can_enter_long": btc.get("can_enter_long"), "can_enter_short": btc.get("can_enter_short"), "diagnosis": btc.get("diagnosis"), "filters": btc.get("filters"), } if "error" not in btc else {"error": btc.get("error")}, "cached_at": datetime.now(timezone.utc).isoformat(), } # ============================================= # 静态文件服务(前端) # ============================================= FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "..", "frontend") @app.get("/") def serve_index(): return FileResponse(os.path.join(FRONTEND_DIR, "index.html")) # ============================================= # 交易监测 API(读取 freqtrade SQLite 数据库) # ============================================= def _get_db(): """获取只读数据库连接""" conn = sqlite3.connect(FREQTRADE_DB) conn.row_factory = sqlite3.Row return conn def _row_to_dict(row): """将 sqlite3.Row 转为 dict,处理 datetime 和 boolean""" if row is None: return None d = {} for k in row.keys(): v = row[k] if isinstance(v, bytes): d[k] = v.decode("utf-8", errors="replace") elif hasattr(v, "isoformat"): d[k] = v.isoformat() if v else None else: d[k] = v return d @app.get("/api/account") def account_overview(): """ 账户概览:当前余额、盈亏统计、策略信息 数据来源:freqtrade SQLite 数据库 (wallet_history + trades) """ try: conn = _get_db() # 最新钱包余额 wallet = conn.execute( "SELECT * FROM wallet_history ORDER BY timestamp DESC LIMIT 1" ).fetchone() # 交易统计 stats = conn.execute(""" SELECT COUNT(*) as total_trades, SUM(CASE WHEN is_open = 1 THEN 1 ELSE 0 END) as open_trades, SUM(CASE WHEN is_open = 0 THEN 1 ELSE 0 END) as closed_trades, SUM(CASE WHEN is_open = 0 AND close_profit > 0 THEN 1 ELSE 0 END) as winning_trades, SUM(CASE WHEN is_open = 0 AND close_profit <= 0 THEN 1 ELSE 0 END) as losing_trades, ROUND(SUM(close_profit_abs), 8) as total_profit_abs, ROUND(AVG(CASE WHEN is_open = 0 THEN close_profit ELSE NULL END), 6) as avg_profit_pct, ROUND(MAX(CASE WHEN is_open = 0 THEN close_profit ELSE NULL END), 6) as best_trade_pct, ROUND(MIN(CASE WHEN is_open = 0 THEN close_profit ELSE NULL END), 6) as worst_trade_pct FROM trades """).fetchone() # 当前持仓详情 open_positions = conn.execute(""" SELECT id, pair, is_short, open_rate, stake_amount, amount, open_date, stop_loss, max_rate, min_rate, ROUND((open_rate - ?) / open_rate * 100, 4) as unrealized_pnl_pct FROM trades WHERE is_open = 1 ORDER BY open_date DESC """, [0]).fetchall() # placeholder, real current price handled in response # 策略名称 strategy_row = conn.execute( "SELECT DISTINCT strategy FROM trades WHERE strategy IS NOT NULL ORDER BY id DESC LIMIT 1" ).fetchone() strategy_name = strategy_row["strategy"] if strategy_row else "v2.2c" # 近期每日盈亏(近30天) daily_pnl = conn.execute(""" SELECT DATE(close_date) as day, ROUND(SUM(close_profit_abs), 8) as pnl_abs, COUNT(*) as trade_count, SUM(CASE WHEN close_profit > 0 THEN 1 ELSE 0 END) as wins FROM trades WHERE is_open = 0 AND close_date IS NOT NULL GROUP BY DATE(close_date) ORDER BY day DESC LIMIT 30 """).fetchall() conn.close() # 计算当前余额 if wallet: current_balance = wallet["total_quote"] or wallet["balance"] last_update = _row_to_dict(wallet)["timestamp"] else: current_balance = INITIAL_WALLET last_update = None # 计算总收益 total_pnl_pct = round((current_balance - INITIAL_WALLET) / INITIAL_WALLET * 100, 2) if current_balance else 0 # 胜率 closed = stats["closed_trades"] or 0 wins = stats["winning_trades"] or 0 win_rate = round(wins / closed * 100, 1) if closed > 0 else 0 return { "account": { "initial_wallet": INITIAL_WALLET, "current_balance": round(current_balance, 2), "total_pnl_pct": total_pnl_pct, "total_pnl_abs": round(current_balance - INITIAL_WALLET, 2), "last_update": last_update, "strategy": strategy_name, }, "stats": { "total_trades": stats["total_trades"] or 0, "open_trades": stats["open_trades"] or 0, "closed_trades": closed, "win_rate": win_rate, "wins": wins, "losses": stats["losing_trades"] or 0, "avg_profit_pct": stats["avg_profit_pct"], "best_trade_pct": stats["best_trade_pct"], "worst_trade_pct": stats["worst_trade_pct"], "total_profit_abs": stats["total_profit_abs"] or 0, }, "open_positions": [_row_to_dict(r) for r in open_positions], "daily_pnl": [_row_to_dict(r) for r in daily_pnl], "cached_at": datetime.now(timezone.utc).isoformat(), } except Exception as e: raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") @app.get("/api/trades") def trade_list( limit: int = Query(default=30, ge=1, le=200), offset: int = Query(default=0, ge=0), status: str = Query(default="all", pattern="^(all|open|closed)$"), sort: str = Query(default="desc", pattern="^(asc|desc)$"), ): """ 交易列表:支持分页和状态筛选 """ try: conn = _get_db() where_clause = "" if status == "open": where_clause = "WHERE is_open = 1" elif status == "closed": where_clause = "WHERE is_open = 0" order = "DESC" if sort == "desc" else "ASC" rows = conn.execute(f""" SELECT id, pair, is_open, is_short, open_rate, close_rate, close_profit, close_profit_abs, stake_amount, amount, open_date, close_date, exit_reason, strategy, max_rate, min_rate, stop_loss, initial_stop_loss, enter_tag, leverage FROM trades {where_clause} ORDER BY open_date {order} LIMIT ? OFFSET ? """, [limit, offset]).fetchall() total = conn.execute(f"SELECT COUNT(*) as cnt FROM trades {where_clause}").fetchone() conn.close() return { "trades": [_row_to_dict(r) for r in rows], "pagination": { "total": total["cnt"], "limit": limit, "offset": offset, "has_more": offset + limit < total["cnt"], }, "cached_at": datetime.now(timezone.utc).isoformat(), } except Exception as e: raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") @app.get("/api/trades/stats") def trade_stats(): """ 交易统计:按方向/时间/策略维度的聚合数据 """ try: conn = _get_db() # 按方向 by_direction = conn.execute(""" SELECT CASE WHEN is_short = 1 THEN 'SHORT' ELSE 'LONG' END as direction, COUNT(*) as total, SUM(CASE WHEN is_open = 0 AND close_profit > 0 THEN 1 ELSE 0 END) as wins, SUM(CASE WHEN is_open = 0 AND close_profit <= 0 THEN 1 ELSE 0 END) as losses, ROUND(AVG(CASE WHEN is_open = 0 THEN close_profit ELSE NULL END), 6) as avg_profit_pct, ROUND(SUM(CASE WHEN is_open = 0 THEN close_profit_abs ELSE 0 END), 8) as total_pnl_abs FROM trades WHERE is_open = 0 GROUP BY is_short """).fetchall() # 按退出原因 by_exit = conn.execute(""" SELECT exit_reason, COUNT(*) as count, ROUND(AVG(close_profit), 6) as avg_profit_pct, ROUND(SUM(close_profit_abs), 8) as total_pnl_abs FROM trades WHERE is_open = 0 AND exit_reason IS NOT NULL GROUP BY exit_reason ORDER BY count DESC LIMIT 10 """).fetchall() # 月度表现 monthly = conn.execute(""" SELECT strftime('%Y-%m', open_date) as month, COUNT(*) as total, SUM(CASE WHEN close_profit > 0 THEN 1 ELSE 0 END) as wins, ROUND(AVG(close_profit), 6) as avg_profit_pct, ROUND(SUM(close_profit_abs), 8) as total_pnl_abs FROM trades WHERE is_open = 0 GROUP BY strftime('%Y-%m', open_date) ORDER BY month DESC LIMIT 12 """).fetchall() # 盈亏分布(区间统计) distribution = conn.execute(""" SELECT CASE WHEN close_profit <= -0.10 THEN '≤ -10%' WHEN close_profit <= -0.05 THEN '-10% ~ -5%' WHEN close_profit <= -0.02 THEN '-5% ~ -2%' WHEN close_profit < 0 THEN '-2% ~ 0%' WHEN close_profit = 0 THEN '0%' WHEN close_profit <= 0.02 THEN '0% ~ 2%' WHEN close_profit <= 0.05 THEN '2% ~ 5%' WHEN close_profit <= 0.10 THEN '5% ~ 10%' WHEN close_profit <= 0.20 THEN '10% ~ 20%' ELSE '> 20%' END as profit_range, COUNT(*) as count FROM trades WHERE is_open = 0 GROUP BY 1 ORDER BY MIN(close_profit) """).fetchall() conn.close() return { "by_direction": [_row_to_dict(r) for r in by_direction], "by_exit_reason": [_row_to_dict(r) for r in by_exit], "monthly": [_row_to_dict(r) for r in monthly], "profit_distribution": [_row_to_dict(r) for r in distribution], "cached_at": datetime.now(timezone.utc).isoformat(), } except Exception as e: raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") # ============================================= # 启动 # ============================================= if __name__ == "__main__": port = int(os.environ.get("PORT", 8000)) uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)