Files
beast-trader/dashboard/main.py

500 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Beast Trader API — 市场结构、信号诊断、交易监测数据
"""
import os
import time
import hmac
import hashlib
import json
import sqlite3
import ccxt
import pandas as pd
import numpy as np
from datetime import datetime, timezone, timedelta
from typing import Optional
from fastapi import FastAPI, HTTPException, Request, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel
import uvicorn
from indicators import compute_all_indicators
# =============================================
# 配置
# =============================================
API_KEY = os.environ.get("BEAST_API_KEY", "beast2025")
BINANCE_RATE_LIMIT_DELAY = 1.0 # 秒
DATA_CACHE_SECONDS = 30 # 缓存时间
# Freqtrade 数据库路径(只读)
FREQTRADE_DB = os.environ.get(
"FREQTRADE_DB",
os.path.expanduser("~/freqtrade/user_data/tradesv3.sqlite")
)
INITIAL_WALLET = float(os.environ.get("INITIAL_WALLET", 10000))
app = FastAPI(title="Beast Trader API", version="1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =============================================
# 缓存
# =============================================
_cache = {"data": None, "timestamp": 0}
# =============================================
# 认证中间件
# =============================================
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
if request.method == "OPTIONS" or request.url.path in ["/api/health", "/", "/index.html"]:
return await call_next(request)
auth = request.headers.get("X-API-Key")
if not auth or auth != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API Key")
return await call_next(request)
# =============================================
# 数据获取
# =============================================
def fetch_binance_data():
"""获取 BTC 和 ETH 的 D1/4H/1H K线数据 + 实时ticker"""
exchange = ccxt.binance({
"enableRateLimit": True,
})
now = exchange.milliseconds()
since_1d = now - 180 * 24 * 60 * 60 * 1000 # 180天 D1
since_4h = now - 60 * 24 * 60 * 60 * 1000 # 60天 4H
since_1h = now - 14 * 24 * 60 * 60 * 1000 # 14天 1H
result = {}
for symbol in ["ETH/USDT", "BTC/USDT"]:
try:
ohlcv_1d = exchange.fetch_ohlcv(symbol, "1d", since_1d, 200)
ohlcv_4h = exchange.fetch_ohlcv(symbol, "4h", since_4h, 400)
ohlcv_1h = exchange.fetch_ohlcv(symbol, "1h", since_1h, 400)
ticker = exchange.fetch_ticker(symbol)
df_1d = pd.DataFrame(ohlcv_1d, columns=["timestamp", "open", "high", "low", "close", "volume"])
df_1d["timestamp"] = pd.to_datetime(df_1d["timestamp"], unit="ms")
df_1d.set_index("timestamp", inplace=True)
df_4h = pd.DataFrame(ohlcv_4h, columns=["timestamp", "open", "high", "low", "close", "volume"])
df_4h["timestamp"] = pd.to_datetime(df_4h["timestamp"], unit="ms")
df_4h.set_index("timestamp", inplace=True)
df_1h = pd.DataFrame(ohlcv_1h, columns=["timestamp", "open", "high", "low", "close", "volume"])
df_1h["timestamp"] = pd.to_datetime(df_1h["timestamp"], unit="ms")
df_1h.set_index("timestamp", inplace=True)
result[symbol] = {"1d": df_1d, "4h": df_4h, "1h": df_1h, "ticker": ticker}
time.sleep(0.5) # rate limit
except Exception as e:
print(f"Error fetching {symbol}: {e}")
return result
def get_cached_data():
"""获取缓存数据或刷新"""
now = time.time()
if now - _cache["timestamp"] < DATA_CACHE_SECONDS and _cache["data"] is not None:
return _cache["data"]
raw = fetch_binance_data()
result = {}
for symbol, frames in raw.items():
try:
computed = compute_all_indicators(frames["1h"], frames["4h"], frames["1d"])
# 用实时ticker覆盖current_price指标里用的是1H open不实时
price = frames.get("ticker", {}).get("last")
if price is not None:
computed["current_price"] = round(float(price), 2)
result[symbol] = computed
# 附加最近K线数据用于图表
result[symbol]["price_history"] = {
"1h": {
"timestamps": frames["1h"].index[-48:].strftime("%m-%d %H:%M").tolist(),
"close": [round(float(x), 2) for x in frames["1h"]["close"].iloc[-48:].tolist()],
}
}
except Exception as e:
print(f"Error computing {symbol}: {e}")
result[symbol] = {"error": str(e)}
_cache["data"] = result
_cache["timestamp"] = now
return result
# =============================================
# API 路由
# =============================================
@app.get("/api/health")
def health():
return {"status": "ok", "time": datetime.now(timezone.utc).isoformat()}
@app.get("/api/market-structure")
def market_structure():
"""
市场结构分析S/R 位、Swing Point、趋势方向、供需区
"""
data = get_cached_data()
eth = data.get("ETH/USDT", {})
btc = data.get("BTC/USDT", {})
if "error" in eth:
raise HTTPException(status_code=500, detail=eth["error"])
return {
"eth": {
"current_price": eth.get("current_price"),
"support": eth.get("diagnosis", {}).get("support"),
"resistance": eth.get("diagnosis", {}).get("resistance"),
"zone_width_pct": eth.get("diagnosis", {}).get("zone_width_pct"),
"price_position_pct": eth.get("diagnosis", {}).get("price_position_in_zone"),
"trend_1d": eth.get("trend_1d"),
"trend_strength_4h": eth.get("trend_strength_4h"),
"swing_points": eth.get("swing_points"),
"price_history_1h": eth.get("price_history", {}).get("1h"),
},
"btc": {
"current_price": btc.get("current_price") if "error" not in btc else None,
"support_4h": btc.get("diagnosis", {}).get("support_4h"),
"resistance_4h": btc.get("diagnosis", {}).get("resistance_4h"),
"trend_1d": btc.get("trend_1d"),
},
"cached_at": datetime.now(timezone.utc).isoformat(),
}
@app.get("/api/signal-diagnosis")
def signal_diagnosis():
"""
信号诊断8 层过滤条件逐一检查
"""
data = get_cached_data()
eth = data.get("ETH/USDT", {})
btc = data.get("BTC/USDT", {})
if "error" in eth:
raise HTTPException(status_code=500, detail=eth["error"])
return {
"eth": {
"can_enter_long": eth.get("can_enter_long"),
"can_enter_short": eth.get("can_enter_short"),
"diagnosis": eth.get("diagnosis"),
"filters": eth.get("filters"),
"candle_1h": eth.get("candle_1h"),
"trend_strength_4h": eth.get("trend_strength_4h"),
},
"btc": {
"can_enter_long": btc.get("can_enter_long"),
"can_enter_short": btc.get("can_enter_short"),
"diagnosis": btc.get("diagnosis"),
"filters": btc.get("filters"),
} if "error" not in btc else {"error": btc.get("error")},
"cached_at": datetime.now(timezone.utc).isoformat(),
}
# =============================================
# 静态文件服务(前端)
# =============================================
FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "..", "frontend")
@app.get("/")
def serve_index():
return FileResponse(os.path.join(FRONTEND_DIR, "index.html"))
# =============================================
# 交易监测 API读取 freqtrade SQLite 数据库)
# =============================================
def _get_db():
"""获取只读数据库连接"""
conn = sqlite3.connect(FREQTRADE_DB)
conn.row_factory = sqlite3.Row
return conn
def _row_to_dict(row):
"""将 sqlite3.Row 转为 dict处理 datetime 和 boolean"""
if row is None:
return None
d = {}
for k in row.keys():
v = row[k]
if isinstance(v, bytes):
d[k] = v.decode("utf-8", errors="replace")
elif hasattr(v, "isoformat"):
d[k] = v.isoformat() if v else None
else:
d[k] = v
return d
@app.get("/api/account")
def account_overview():
"""
账户概览:当前余额、盈亏统计、策略信息
数据来源freqtrade SQLite 数据库 (wallet_history + trades)
"""
try:
conn = _get_db()
# 最新钱包余额
wallet = conn.execute(
"SELECT * FROM wallet_history ORDER BY timestamp DESC LIMIT 1"
).fetchone()
# 交易统计
stats = conn.execute("""
SELECT
COUNT(*) as total_trades,
SUM(CASE WHEN is_open = 1 THEN 1 ELSE 0 END) as open_trades,
SUM(CASE WHEN is_open = 0 THEN 1 ELSE 0 END) as closed_trades,
SUM(CASE WHEN is_open = 0 AND close_profit > 0 THEN 1 ELSE 0 END) as winning_trades,
SUM(CASE WHEN is_open = 0 AND close_profit <= 0 THEN 1 ELSE 0 END) as losing_trades,
ROUND(SUM(close_profit_abs), 8) as total_profit_abs,
ROUND(AVG(CASE WHEN is_open = 0 THEN close_profit ELSE NULL END), 6) as avg_profit_pct,
ROUND(MAX(CASE WHEN is_open = 0 THEN close_profit ELSE NULL END), 6) as best_trade_pct,
ROUND(MIN(CASE WHEN is_open = 0 THEN close_profit ELSE NULL END), 6) as worst_trade_pct
FROM trades
""").fetchone()
# 当前持仓详情
open_positions = conn.execute("""
SELECT id, pair, is_short, open_rate, stake_amount, amount,
open_date, stop_loss, max_rate, min_rate,
ROUND((open_rate - ?) / open_rate * 100, 4) as unrealized_pnl_pct
FROM trades
WHERE is_open = 1
ORDER BY open_date DESC
""", [0]).fetchall() # placeholder, real current price handled in response
# 策略名称
strategy_row = conn.execute(
"SELECT DISTINCT strategy FROM trades WHERE strategy IS NOT NULL ORDER BY id DESC LIMIT 1"
).fetchone()
strategy_name = strategy_row["strategy"] if strategy_row else "v2.2c"
# 近期每日盈亏近30天
daily_pnl = conn.execute("""
SELECT DATE(close_date) as day,
ROUND(SUM(close_profit_abs), 8) as pnl_abs,
COUNT(*) as trade_count,
SUM(CASE WHEN close_profit > 0 THEN 1 ELSE 0 END) as wins
FROM trades
WHERE is_open = 0 AND close_date IS NOT NULL
GROUP BY DATE(close_date)
ORDER BY day DESC
LIMIT 30
""").fetchall()
conn.close()
# 计算当前余额
if wallet:
current_balance = wallet["total_quote"] or wallet["balance"]
last_update = _row_to_dict(wallet)["timestamp"]
else:
current_balance = INITIAL_WALLET
last_update = None
# 计算总收益
total_pnl_pct = round((current_balance - INITIAL_WALLET) / INITIAL_WALLET * 100, 2) if current_balance else 0
# 胜率
closed = stats["closed_trades"] or 0
wins = stats["winning_trades"] or 0
win_rate = round(wins / closed * 100, 1) if closed > 0 else 0
return {
"account": {
"initial_wallet": INITIAL_WALLET,
"current_balance": round(current_balance, 2),
"total_pnl_pct": total_pnl_pct,
"total_pnl_abs": round(current_balance - INITIAL_WALLET, 2),
"last_update": last_update,
"strategy": strategy_name,
},
"stats": {
"total_trades": stats["total_trades"] or 0,
"open_trades": stats["open_trades"] or 0,
"closed_trades": closed,
"win_rate": win_rate,
"wins": wins,
"losses": stats["losing_trades"] or 0,
"avg_profit_pct": stats["avg_profit_pct"],
"best_trade_pct": stats["best_trade_pct"],
"worst_trade_pct": stats["worst_trade_pct"],
"total_profit_abs": stats["total_profit_abs"] or 0,
},
"open_positions": [_row_to_dict(r) for r in open_positions],
"daily_pnl": [_row_to_dict(r) for r in daily_pnl],
"cached_at": datetime.now(timezone.utc).isoformat(),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
@app.get("/api/trades")
def trade_list(
limit: int = Query(default=30, ge=1, le=200),
offset: int = Query(default=0, ge=0),
status: str = Query(default="all", pattern="^(all|open|closed)$"),
sort: str = Query(default="desc", pattern="^(asc|desc)$"),
):
"""
交易列表:支持分页和状态筛选
"""
try:
conn = _get_db()
where_clause = ""
if status == "open":
where_clause = "WHERE is_open = 1"
elif status == "closed":
where_clause = "WHERE is_open = 0"
order = "DESC" if sort == "desc" else "ASC"
rows = conn.execute(f"""
SELECT id, pair, is_open, is_short, open_rate, close_rate,
close_profit, close_profit_abs, stake_amount, amount,
open_date, close_date, exit_reason, strategy,
max_rate, min_rate, stop_loss, initial_stop_loss,
enter_tag, leverage
FROM trades
{where_clause}
ORDER BY open_date {order}
LIMIT ? OFFSET ?
""", [limit, offset]).fetchall()
total = conn.execute(f"SELECT COUNT(*) as cnt FROM trades {where_clause}").fetchone()
conn.close()
return {
"trades": [_row_to_dict(r) for r in rows],
"pagination": {
"total": total["cnt"],
"limit": limit,
"offset": offset,
"has_more": offset + limit < total["cnt"],
},
"cached_at": datetime.now(timezone.utc).isoformat(),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
@app.get("/api/trades/stats")
def trade_stats():
"""
交易统计:按方向/时间/策略维度的聚合数据
"""
try:
conn = _get_db()
# 按方向
by_direction = conn.execute("""
SELECT
CASE WHEN is_short = 1 THEN 'SHORT' ELSE 'LONG' END as direction,
COUNT(*) as total,
SUM(CASE WHEN is_open = 0 AND close_profit > 0 THEN 1 ELSE 0 END) as wins,
SUM(CASE WHEN is_open = 0 AND close_profit <= 0 THEN 1 ELSE 0 END) as losses,
ROUND(AVG(CASE WHEN is_open = 0 THEN close_profit ELSE NULL END), 6) as avg_profit_pct,
ROUND(SUM(CASE WHEN is_open = 0 THEN close_profit_abs ELSE 0 END), 8) as total_pnl_abs
FROM trades
WHERE is_open = 0
GROUP BY is_short
""").fetchall()
# 按退出原因
by_exit = conn.execute("""
SELECT
exit_reason,
COUNT(*) as count,
ROUND(AVG(close_profit), 6) as avg_profit_pct,
ROUND(SUM(close_profit_abs), 8) as total_pnl_abs
FROM trades
WHERE is_open = 0 AND exit_reason IS NOT NULL
GROUP BY exit_reason
ORDER BY count DESC
LIMIT 10
""").fetchall()
# 月度表现
monthly = conn.execute("""
SELECT
strftime('%Y-%m', open_date) as month,
COUNT(*) as total,
SUM(CASE WHEN close_profit > 0 THEN 1 ELSE 0 END) as wins,
ROUND(AVG(close_profit), 6) as avg_profit_pct,
ROUND(SUM(close_profit_abs), 8) as total_pnl_abs
FROM trades
WHERE is_open = 0
GROUP BY strftime('%Y-%m', open_date)
ORDER BY month DESC
LIMIT 12
""").fetchall()
# 盈亏分布(区间统计)
distribution = conn.execute("""
SELECT
CASE
WHEN close_profit <= -0.10 THEN '≤ -10%'
WHEN close_profit <= -0.05 THEN '-10% ~ -5%'
WHEN close_profit <= -0.02 THEN '-5% ~ -2%'
WHEN close_profit < 0 THEN '-2% ~ 0%'
WHEN close_profit = 0 THEN '0%'
WHEN close_profit <= 0.02 THEN '0% ~ 2%'
WHEN close_profit <= 0.05 THEN '2% ~ 5%'
WHEN close_profit <= 0.10 THEN '5% ~ 10%'
WHEN close_profit <= 0.20 THEN '10% ~ 20%'
ELSE '> 20%'
END as profit_range,
COUNT(*) as count
FROM trades
WHERE is_open = 0
GROUP BY 1
ORDER BY MIN(close_profit)
""").fetchall()
conn.close()
return {
"by_direction": [_row_to_dict(r) for r in by_direction],
"by_exit_reason": [_row_to_dict(r) for r in by_exit],
"monthly": [_row_to_dict(r) for r in monthly],
"profit_distribution": [_row_to_dict(r) for r in distribution],
"cached_at": datetime.now(timezone.utc).isoformat(),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
# =============================================
# 启动
# =============================================
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8000))
uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)