Files
beast-trader/dashboard/indicators.py

400 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
指标计算模块 — 从 v2.2b 策略提取的独立版本
用于 APP 后端实时计算市场结构和信号诊断
"""
import numpy as np
import pandas as pd
def detect_swing_points(
high: pd.Series,
low: pd.Series,
window: int = 5,
) -> tuple[pd.Series, pd.Series]:
"""检测 Swing High / Swing Low与 v2.2b 策略一致"""
n = len(high)
sh = pd.Series(np.nan, index=high.index, dtype=float)
sl = pd.Series(np.nan, index=low.index, dtype=float)
for i in range(window, n - window):
if high.iloc[i] > high.iloc[i - window : i].max() and high.iloc[i] > high.iloc[i + 1 : i + window + 1].max():
sh.iloc[i] = high.iloc[i]
if low.iloc[i] < low.iloc[i - window : i].min() and low.iloc[i] < low.iloc[i + 1 : i + window + 1].min():
sl.iloc[i] = low.iloc[i]
return sh, sl
def build_structure(
high: pd.Series,
low: pd.Series,
close: pd.Series,
swing_high: pd.Series,
swing_low: pd.Series,
) -> pd.DataFrame:
"""构建价格结构趋势方向、S/R 位、供需区),与 v2.2b 一致"""
n = len(high)
trend_up_arr = np.full(n, False)
trend_down_arr = np.full(n, False)
nearest_support = np.full(n, np.nan)
nearest_resistance = np.full(n, np.nan)
in_demand_zone = np.full(n, False)
in_supply_zone = np.full(n, False)
sh_prices = []
sl_prices = []
for i in range(n):
if pd.notna(swing_high.iloc[i]):
sh_prices.append(swing_high.iloc[i])
if len(sh_prices) > 4:
sh_prices.pop(0)
if pd.notna(swing_low.iloc[i]):
sl_prices.append(swing_low.iloc[i])
if len(sl_prices) > 4:
sl_prices.pop(0)
if len(sh_prices) >= 2 and len(sl_prices) >= 2:
if sh_prices[-1] > sh_prices[-2] and sl_prices[-1] > sl_prices[-2]:
trend_up_arr[i] = True
elif sh_prices[-1] < sh_prices[-2] and sl_prices[-1] < sl_prices[-2]:
trend_down_arr[i] = True
elif i > 0:
trend_up_arr[i] = trend_up_arr[i - 1]
trend_down_arr[i] = trend_down_arr[i - 1]
elif i > 0:
trend_up_arr[i] = trend_up_arr[i - 1]
trend_down_arr[i] = trend_down_arr[i - 1]
if sl_prices:
nearest_support[i] = sl_prices[-1]
if sh_prices:
nearest_resistance[i] = sh_prices[-1]
c = close.iloc[i]
if not np.isnan(nearest_support[i]) and not np.isnan(nearest_resistance[i]):
zone_range = nearest_resistance[i] - nearest_support[i]
if zone_range > 0:
pos_pct = (c - nearest_support[i]) / zone_range
in_demand_zone[i] = pos_pct < 0.35
in_supply_zone[i] = pos_pct > 0.65
return pd.DataFrame({
"trend_up": trend_up_arr,
"trend_down": trend_down_arr,
"support": nearest_support,
"resistance": nearest_resistance,
"in_demand": in_demand_zone,
"in_supply": in_supply_zone,
}, index=high.index)
def compute_trend_strength(
high: pd.Series,
low: pd.Series,
swing_high: pd.Series,
swing_low: pd.Series,
min_strength: float = -0.20,
) -> pd.DataFrame:
"""计算趋势强度4H Swing Point 间距变化),与 v2.2b 一致"""
sh_prices = []
sl_prices = []
trend_strength_up = np.full(len(high), np.nan)
trend_strength_down = np.full(len(high), np.nan)
strong_uptrend = np.full(len(high), False)
strong_downtrend = np.full(len(high), False)
for i in range(len(high)):
if pd.notna(swing_high.iloc[i]):
sh_prices.append(swing_high.iloc[i])
if len(sh_prices) > 4:
sh_prices.pop(0)
if pd.notna(swing_low.iloc[i]):
sl_prices.append(swing_low.iloc[i])
if len(sl_prices) > 4:
sl_prices.pop(0)
if len(sh_prices) >= 2 and len(sl_prices) >= 2:
hh_dist = (sh_prices[-1] - sh_prices[-2]) / sh_prices[-2] if sh_prices[-2] > 0 else 0
hl_dist = (sl_prices[-1] - sl_prices[-2]) / sl_prices[-2] if sl_prices[-2] > 0 else 0
trend_strength_up[i] = hh_dist + hl_dist
trend_strength_down[i] = -(hh_dist + hl_dist)
strong_uptrend = pd.Series(trend_strength_up) > min_strength
strong_downtrend = pd.Series(trend_strength_down) > min_strength
return pd.DataFrame({
"trend_strength_up": pd.Series(trend_strength_up, index=high.index),
"trend_strength_down": pd.Series(trend_strength_down, index=high.index),
"strong_uptrend": strong_uptrend.values,
"strong_downtrend": strong_downtrend.values,
}, index=high.index)
def compute_support_alive(
low: pd.Series,
close: pd.Series,
support: pd.Series,
) -> pd.Series:
"""检查支撑是否活着(最近测试过没跌破),与 v2.2b 一致"""
touched = (
(low <= support * 1.005) &
(low >= support * 0.995)
)
held = close > support
tested_and_held = touched & held
return tested_and_held.rolling(3, min_periods=1).max() > 0
def compute_resistance_alive(
high: pd.Series,
close: pd.Series,
resistance: pd.Series,
) -> pd.Series:
"""检查阻力是否活着(最近测试过没突破),与 v2.2b 一致"""
touched = (
(high >= resistance * 0.995) &
(high <= resistance * 1.005)
)
held = close < resistance
tested_and_held = touched & held
return tested_and_held.rolling(3, min_periods=1).max() > 0
def detect_candle_patterns(
open_: pd.Series,
high: pd.Series,
low: pd.Series,
close: pd.Series,
pin_bar_wick_ratio: float = 0.6,
) -> dict:
"""检测 K 线形态,与 v2.2b 一致"""
body = (close - open_).abs()
total_range = (high - low).replace(0, 0.0001)
upper_wick = high - close.where(close > open_, open_)
lower_wick = open_.where(close > open_, close) - low
is_pin = (upper_wick + lower_wick) / total_range > pin_bar_wick_ratio
bullish_pin = is_pin & (close > open_) & (lower_wick > upper_wick)
bearish_pin = is_pin & (close < open_) & (upper_wick > lower_wick)
prev_open = open_.shift(1)
prev_close = close.shift(1)
bullish_engulf = (close > prev_open) & (open_ < prev_close) & (close > open_)
bearish_engulf = (close < prev_open) & (open_ > prev_close) & (close < open_)
return {
"bullish_pinbar": bullish_pin,
"bearish_pinbar": bearish_pin,
"bullish_signal": bullish_pin | bullish_engulf,
"bearish_signal": bearish_pin | bearish_engulf,
}
def compute_all_indicators(
df_1h: pd.DataFrame,
df_4h: pd.DataFrame,
df_1d: pd.DataFrame,
params: dict = None,
) -> dict:
"""
完整计算所有指标的入口函数
返回 APP 需要的全部数据
params 可选参数:
swing_lookback_d1: int (default 5)
swing_lookback_h4: int (default 8)
pin_bar_wick_ratio: float (default 0.6)
max_stop_dist: float (default 0.50 = 5%)
trend_strength_min: float (default -0.20)
"""
if params is None:
params = {}
swing_lookback_d1 = params.get("swing_lookback_d1", 10) # v2.2d 策略默认 10
swing_lookback_h4 = params.get("swing_lookback_h4", 8)
swing_lookback_1h = params.get("swing_lookback_1h", 5)
pin_bar_wick_ratio = params.get("pin_bar_wick_ratio", 0.6)
max_stop_dist = params.get("max_stop_dist", 0.50)
trend_strength_min = params.get("trend_strength_min", -0.20)
# ---- D1 结构 ----
sh_d1, sl_d1 = detect_swing_points(df_1d["high"], df_1d["low"], swing_lookback_d1)
struct_d1 = build_structure(df_1d["high"], df_1d["low"], df_1d["close"], sh_d1, sl_d1)
trend_up_1d = bool(struct_d1["trend_up"].iloc[-1]) if len(struct_d1) > 0 else False
trend_down_1d = bool(struct_d1["trend_down"].iloc[-1]) if len(struct_d1) > 0 else False
# ---- D1 震荡检测(基于最近两个 swing 点,修复 build_structure 继承前值的误导) ----
sh_d1_valid = sh_d1.dropna()
sl_d1_valid = sl_d1.dropna()
if len(sh_d1_valid) >= 2 and len(sl_d1_valid) >= 2:
last_sh = sh_d1_valid.iloc[-1]
prev_sh = sh_d1_valid.iloc[-2]
last_sl = sl_d1_valid.iloc[-1]
prev_sl = sl_d1_valid.iloc[-2]
is_uptrend = last_sh > prev_sh and last_sl > prev_sl
is_downtrend = last_sh < prev_sh and last_sl < prev_sl
if not is_uptrend and not is_downtrend:
trend_up_1d = False
trend_down_1d = False
# ---- 4H 结构(仅趋势强度) ----
sh_4h, sl_4h = detect_swing_points(df_4h["high"], df_4h["low"], swing_lookback_h4)
struct_4h = build_structure(df_4h["high"], df_4h["low"], df_4h["close"], sh_4h, sl_4h)
strength_4h = compute_trend_strength(
df_4h["high"], df_4h["low"], sh_4h, sl_4h,
min_strength=trend_strength_min,
)
strong_uptrend_4h = bool(strength_4h["strong_uptrend"].iloc[-1]) if len(strength_4h) > 0 else False
strong_downtrend_4h = bool(strength_4h["strong_downtrend"].iloc[-1]) if len(strength_4h) > 0 else False
trend_strength_up_val = float(strength_4h["trend_strength_up"].iloc[-1]) if len(strength_4h) > 0 else 0
trend_strength_down_val = float(strength_4h["trend_strength_down"].iloc[-1]) if len(strength_4h) > 0 else 0
# ---- 1H 结构S/R 适配 v2.2c ----
sh_1h, sl_1h = detect_swing_points(df_1h["high"], df_1h["low"], swing_lookback_1h)
struct_1h = build_structure(df_1h["high"], df_1h["low"], df_1h["close"], sh_1h, sl_1h)
last_1h_s = struct_1h.iloc[-1] if len(struct_1h) > 0 else None
support_1h = float(last_1h_s["support"]) if last_1h_s is not None and pd.notna(last_1h_s["support"]) else None
resistance_1h = float(last_1h_s["resistance"]) if last_1h_s is not None and pd.notna(last_1h_s["resistance"]) else None
in_demand_1h = bool(last_1h_s["in_demand"]) if last_1h_s is not None else False
in_supply_1h = bool(last_1h_s["in_supply"]) if last_1h_s is not None else False
# 1H 活 S/R 检查
support_alive_1h = False
resistance_alive_1h = False
if support_1h is not None:
support_alive_1h = bool(compute_support_alive(df_1h["low"], df_1h["close"], struct_1h["support"]).iloc[-1])
if resistance_1h is not None:
resistance_alive_1h = bool(compute_resistance_alive(df_1h["high"], df_1h["close"], struct_1h["resistance"]).iloc[-1])
# ---- 1H K线形态 ----
last_1h_candle = df_1h.iloc[-1] if len(df_1h) > 0 else None
candle = detect_candle_patterns(
df_1h["open"], df_1h["high"], df_1h["low"], df_1h["close"],
pin_bar_wick_ratio,
)
bullish_signal = bool(candle["bullish_signal"].iloc[-1]) if len(df_1h) > 0 else False
bearish_signal = bool(candle["bearish_signal"].iloc[-1]) if len(df_1h) > 0 else False
bullish_pinbar = bool(candle["bullish_pinbar"].iloc[-1]) if len(df_1h) > 0 else False
bearish_pinbar = bool(candle["bearish_pinbar"].iloc[-1]) if len(df_1h) > 0 else False
# ---- 入场距离1H S/R ----
current_price = float(df_1h["open"].iloc[-1]) if len(df_1h) > 0 else 0
long_stop_dist = None
short_stop_dist = None
if support_1h is not None and support_1h > 0:
long_stop_dist = abs((current_price - support_1h) / current_price)
if resistance_1h is not None and resistance_1h > 0:
short_stop_dist = abs((resistance_1h - current_price) / current_price)
long_dist_ok = long_stop_dist is not None and long_stop_dist <= max_stop_dist and long_stop_dist > 0.003
short_dist_ok = short_stop_dist is not None and short_stop_dist <= max_stop_dist and short_stop_dist > 0.003
# ---- Swing Point 历史1H 用于画图) ----
swing_highs_1h = df_1h[sh_1h.notna()].index.strftime("%Y-%m-%d %H:%M").tolist() if len(sh_1h) > 0 else []
swing_high_prices_1h = [float(x) for x in sh_1h.dropna().tolist()]
swing_lows_1h = df_1h[sl_1h.notna()].index.strftime("%Y-%m-%d %H:%M").tolist() if len(sl_1h) > 0 else []
swing_low_prices_1h = [float(x) for x in sl_1h.dropna().tolist()]
# ---- 信号诊断结果1H S/R ----
diagnosis = {
"market_price": round(current_price, 2),
"support": round(support_1h, 2) if support_1h else None,
"resistance": round(resistance_1h, 2) if resistance_1h else None,
"zone_width_pct": round(abs(resistance_1h - support_1h) / support_1h * 100, 2) if support_1h and resistance_1h else None,
"price_position_in_zone": round((current_price - support_1h) / (resistance_1h - support_1h) * 100, 1) if support_1h and resistance_1h and resistance_1h > support_1h else None,
}
filters = {
"trend_up_1d": {
"pass": trend_up_1d,
"desc": "D1 上升趋势HH+HL 都在抬高)",
"value": "上升" if trend_up_1d else ("下降" if trend_down_1d else "震荡"),
},
"in_demand_1h": {
"pass": in_demand_1h,
"desc": "价格在 1H 需求区zone 底部 35%",
"value": f"位置 {diagnosis['price_position_in_zone']}%" if diagnosis['price_position_in_zone'] is not None else "N/A",
},
"long_stop_distance": {
"pass": long_dist_ok,
"desc": f"入场距支撑距离 ({'%.2f' % (long_stop_dist*100) if long_stop_dist else 'N/A'}%)",
"value": f"{'%.2f' % (long_stop_dist*100)}%" if long_stop_dist else "N/A",
},
"support_alive_1h": {
"pass": support_alive_1h,
"desc": "1H 支撑最近被测试过且未跌破",
"value": "有效" if support_alive_1h else "失效/未测试",
},
"strong_uptrend_4h": {
"pass": strong_uptrend_4h,
"desc": "4H 趋势强度(扩张中)",
"value": f"{'%.2f' % (trend_strength_up_val*100)}%",
},
"trend_down_1d": {
"pass": trend_down_1d,
"desc": "D1 下降趋势LH+LL 都在降低)",
"value": "下降" if trend_down_1d else ("上升" if trend_up_1d else "震荡"),
},
"in_supply_1h": {
"pass": in_supply_1h,
"desc": "价格在 1H 供给区zone 顶部 65%+",
"value": f"位置 {diagnosis['price_position_in_zone']}%" if diagnosis['price_position_in_zone'] is not None else "N/A",
},
"short_stop_distance": {
"pass": short_dist_ok,
"desc": f"入场距阻力距离 ({'%.2f' % (short_stop_dist*100) if short_stop_dist else 'N/A'}%)",
"value": f"{'%.2f' % (short_stop_dist*100)}%" if short_stop_dist else "N/A",
},
"resistance_alive_1h": {
"pass": resistance_alive_1h,
"desc": "1H 阻力最近被测试过且未突破",
"value": "有效" if resistance_alive_1h else "失效/未测试",
},
"strong_downtrend_4h": {
"pass": strong_downtrend_4h,
"desc": "4H 下降趋势强度(扩张中)",
"value": f"{'%.2f' % (trend_strength_down_val*100)}%",
},
}
# 入场可行性结论1H S/R + 4H 趋势)
long_ok = all([
trend_up_1d, in_demand_1h, long_dist_ok,
support_alive_1h, strong_uptrend_4h,
])
short_ok = all([
trend_down_1d, in_supply_1h, short_dist_ok,
resistance_alive_1h, strong_downtrend_4h,
])
return {
"timestamp": pd.Timestamp.now().isoformat(),
"current_price": round(current_price, 2),
"diagnosis": diagnosis,
"filters": filters,
"can_enter_long": long_ok,
"can_enter_short": short_ok,
"trend_1d": "up" if trend_up_1d else ("down" if trend_down_1d else "neutral"),
"candle_1h": {
"bullish_pinbar": bullish_pinbar,
"bearish_pinbar": bearish_pinbar,
"bullish_signal": bullish_signal,
"bearish_signal": bearish_signal,
},
"swing_points": {
"highs": [{"time": t, "price": p} for t, p in zip(swing_highs_1h[-10:], swing_high_prices_1h[-10:])],
"lows": [{"time": t, "price": p} for t, p in zip(swing_lows_1h[-10:], swing_low_prices_1h[-10:])],
},
"trend_strength_4h": {
"up": round(trend_strength_up_val * 100, 2),
"down": round(trend_strength_down_val * 100, 2),
},
}