386 lines
15 KiB
Python
386 lines
15 KiB
Python
"""
|
||
指标计算模块 — 从 v2.2b 策略提取的独立版本
|
||
用于 APP 后端实时计算市场结构和信号诊断
|
||
"""
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
|
||
def detect_swing_points(
|
||
high: pd.Series,
|
||
low: pd.Series,
|
||
window: int = 5,
|
||
) -> tuple[pd.Series, pd.Series]:
|
||
"""检测 Swing High / Swing Low,与 v2.2b 策略一致"""
|
||
n = len(high)
|
||
sh = pd.Series(np.nan, index=high.index, dtype=float)
|
||
sl = pd.Series(np.nan, index=low.index, dtype=float)
|
||
|
||
for i in range(window, n - window):
|
||
if high.iloc[i] > high.iloc[i - window : i].max() and high.iloc[i] > high.iloc[i + 1 : i + window + 1].max():
|
||
sh.iloc[i] = high.iloc[i]
|
||
if low.iloc[i] < low.iloc[i - window : i].min() and low.iloc[i] < low.iloc[i + 1 : i + window + 1].min():
|
||
sl.iloc[i] = low.iloc[i]
|
||
|
||
return sh, sl
|
||
|
||
|
||
def build_structure(
|
||
high: pd.Series,
|
||
low: pd.Series,
|
||
close: pd.Series,
|
||
swing_high: pd.Series,
|
||
swing_low: pd.Series,
|
||
) -> pd.DataFrame:
|
||
"""构建价格结构(趋势方向、S/R 位、供需区),与 v2.2b 一致"""
|
||
n = len(high)
|
||
|
||
trend_up_arr = np.full(n, False)
|
||
trend_down_arr = np.full(n, False)
|
||
nearest_support = np.full(n, np.nan)
|
||
nearest_resistance = np.full(n, np.nan)
|
||
in_demand_zone = np.full(n, False)
|
||
in_supply_zone = np.full(n, False)
|
||
|
||
sh_prices = []
|
||
sl_prices = []
|
||
|
||
for i in range(n):
|
||
if pd.notna(swing_high.iloc[i]):
|
||
sh_prices.append(swing_high.iloc[i])
|
||
if len(sh_prices) > 4:
|
||
sh_prices.pop(0)
|
||
|
||
if pd.notna(swing_low.iloc[i]):
|
||
sl_prices.append(swing_low.iloc[i])
|
||
if len(sl_prices) > 4:
|
||
sl_prices.pop(0)
|
||
|
||
if len(sh_prices) >= 2 and len(sl_prices) >= 2:
|
||
if sh_prices[-1] > sh_prices[-2] and sl_prices[-1] > sl_prices[-2]:
|
||
trend_up_arr[i] = True
|
||
elif sh_prices[-1] < sh_prices[-2] and sl_prices[-1] < sl_prices[-2]:
|
||
trend_down_arr[i] = True
|
||
elif i > 0:
|
||
trend_up_arr[i] = trend_up_arr[i - 1]
|
||
trend_down_arr[i] = trend_down_arr[i - 1]
|
||
elif i > 0:
|
||
trend_up_arr[i] = trend_up_arr[i - 1]
|
||
trend_down_arr[i] = trend_down_arr[i - 1]
|
||
|
||
if sl_prices:
|
||
nearest_support[i] = sl_prices[-1]
|
||
if sh_prices:
|
||
nearest_resistance[i] = sh_prices[-1]
|
||
|
||
c = close.iloc[i]
|
||
if not np.isnan(nearest_support[i]) and not np.isnan(nearest_resistance[i]):
|
||
zone_range = nearest_resistance[i] - nearest_support[i]
|
||
if zone_range > 0:
|
||
pos_pct = (c - nearest_support[i]) / zone_range
|
||
in_demand_zone[i] = pos_pct < 0.35
|
||
in_supply_zone[i] = pos_pct > 0.65
|
||
|
||
return pd.DataFrame({
|
||
"trend_up": trend_up_arr,
|
||
"trend_down": trend_down_arr,
|
||
"support": nearest_support,
|
||
"resistance": nearest_resistance,
|
||
"in_demand": in_demand_zone,
|
||
"in_supply": in_supply_zone,
|
||
}, index=high.index)
|
||
|
||
|
||
def compute_trend_strength(
|
||
high: pd.Series,
|
||
low: pd.Series,
|
||
swing_high: pd.Series,
|
||
swing_low: pd.Series,
|
||
min_strength: float = -0.20,
|
||
) -> pd.DataFrame:
|
||
"""计算趋势强度(4H Swing Point 间距变化),与 v2.2b 一致"""
|
||
sh_prices = []
|
||
sl_prices = []
|
||
trend_strength_up = np.full(len(high), np.nan)
|
||
trend_strength_down = np.full(len(high), np.nan)
|
||
strong_uptrend = np.full(len(high), False)
|
||
strong_downtrend = np.full(len(high), False)
|
||
|
||
for i in range(len(high)):
|
||
if pd.notna(swing_high.iloc[i]):
|
||
sh_prices.append(swing_high.iloc[i])
|
||
if len(sh_prices) > 4:
|
||
sh_prices.pop(0)
|
||
if pd.notna(swing_low.iloc[i]):
|
||
sl_prices.append(swing_low.iloc[i])
|
||
if len(sl_prices) > 4:
|
||
sl_prices.pop(0)
|
||
|
||
if len(sh_prices) >= 2 and len(sl_prices) >= 2:
|
||
hh_dist = (sh_prices[-1] - sh_prices[-2]) / sh_prices[-2] if sh_prices[-2] > 0 else 0
|
||
hl_dist = (sl_prices[-1] - sl_prices[-2]) / sl_prices[-2] if sl_prices[-2] > 0 else 0
|
||
trend_strength_up[i] = hh_dist + hl_dist
|
||
trend_strength_down[i] = -(hh_dist + hl_dist)
|
||
|
||
strong_uptrend = pd.Series(trend_strength_up) > min_strength
|
||
strong_downtrend = pd.Series(trend_strength_down) > min_strength
|
||
|
||
return pd.DataFrame({
|
||
"trend_strength_up": pd.Series(trend_strength_up, index=high.index),
|
||
"trend_strength_down": pd.Series(trend_strength_down, index=high.index),
|
||
"strong_uptrend": strong_uptrend.values,
|
||
"strong_downtrend": strong_downtrend.values,
|
||
}, index=high.index)
|
||
|
||
|
||
def compute_support_alive(
|
||
low: pd.Series,
|
||
close: pd.Series,
|
||
support: pd.Series,
|
||
) -> pd.Series:
|
||
"""检查支撑是否活着(最近测试过没跌破),与 v2.2b 一致"""
|
||
touched = (
|
||
(low <= support * 1.005) &
|
||
(low >= support * 0.995)
|
||
)
|
||
held = close > support
|
||
tested_and_held = touched & held
|
||
return tested_and_held.rolling(3, min_periods=1).max() > 0
|
||
|
||
|
||
def compute_resistance_alive(
|
||
high: pd.Series,
|
||
close: pd.Series,
|
||
resistance: pd.Series,
|
||
) -> pd.Series:
|
||
"""检查阻力是否活着(最近测试过没突破),与 v2.2b 一致"""
|
||
touched = (
|
||
(high >= resistance * 0.995) &
|
||
(high <= resistance * 1.005)
|
||
)
|
||
held = close < resistance
|
||
tested_and_held = touched & held
|
||
return tested_and_held.rolling(3, min_periods=1).max() > 0
|
||
|
||
|
||
def detect_candle_patterns(
|
||
open_: pd.Series,
|
||
high: pd.Series,
|
||
low: pd.Series,
|
||
close: pd.Series,
|
||
pin_bar_wick_ratio: float = 0.6,
|
||
) -> dict:
|
||
"""检测 K 线形态,与 v2.2b 一致"""
|
||
body = (close - open_).abs()
|
||
total_range = (high - low).replace(0, 0.0001)
|
||
|
||
upper_wick = high - close.where(close > open_, open_)
|
||
lower_wick = open_.where(close > open_, close) - low
|
||
is_pin = (upper_wick + lower_wick) / total_range > pin_bar_wick_ratio
|
||
|
||
bullish_pin = is_pin & (close > open_) & (lower_wick > upper_wick)
|
||
bearish_pin = is_pin & (close < open_) & (upper_wick > lower_wick)
|
||
|
||
prev_open = open_.shift(1)
|
||
prev_close = close.shift(1)
|
||
bullish_engulf = (close > prev_open) & (open_ < prev_close) & (close > open_)
|
||
bearish_engulf = (close < prev_open) & (open_ > prev_close) & (close < open_)
|
||
|
||
return {
|
||
"bullish_pinbar": bullish_pin,
|
||
"bearish_pinbar": bearish_pin,
|
||
"bullish_signal": bullish_pin | bullish_engulf,
|
||
"bearish_signal": bearish_pin | bearish_engulf,
|
||
}
|
||
|
||
|
||
def compute_all_indicators(
|
||
df_1h: pd.DataFrame,
|
||
df_4h: pd.DataFrame,
|
||
df_1d: pd.DataFrame,
|
||
params: dict = None,
|
||
) -> dict:
|
||
"""
|
||
完整计算所有指标的入口函数
|
||
返回 APP 需要的全部数据
|
||
|
||
params 可选参数:
|
||
swing_lookback_d1: int (default 5)
|
||
swing_lookback_h4: int (default 8)
|
||
pin_bar_wick_ratio: float (default 0.6)
|
||
max_stop_dist: float (default 0.50 = 5%)
|
||
trend_strength_min: float (default -0.20)
|
||
"""
|
||
if params is None:
|
||
params = {}
|
||
|
||
swing_lookback_d1 = params.get("swing_lookback_d1", 5)
|
||
swing_lookback_h4 = params.get("swing_lookback_h4", 8)
|
||
swing_lookback_1h = params.get("swing_lookback_1h", 5)
|
||
pin_bar_wick_ratio = params.get("pin_bar_wick_ratio", 0.6)
|
||
max_stop_dist = params.get("max_stop_dist", 0.50)
|
||
trend_strength_min = params.get("trend_strength_min", -0.20)
|
||
|
||
# ---- D1 结构 ----
|
||
sh_d1, sl_d1 = detect_swing_points(df_1d["high"], df_1d["low"], swing_lookback_d1)
|
||
struct_d1 = build_structure(df_1d["high"], df_1d["low"], df_1d["close"], sh_d1, sl_d1)
|
||
trend_up_1d = bool(struct_d1["trend_up"].iloc[-1]) if len(struct_d1) > 0 else False
|
||
trend_down_1d = bool(struct_d1["trend_down"].iloc[-1]) if len(struct_d1) > 0 else False
|
||
|
||
# ---- 4H 结构(仅趋势强度) ----
|
||
sh_4h, sl_4h = detect_swing_points(df_4h["high"], df_4h["low"], swing_lookback_h4)
|
||
struct_4h = build_structure(df_4h["high"], df_4h["low"], df_4h["close"], sh_4h, sl_4h)
|
||
strength_4h = compute_trend_strength(
|
||
df_4h["high"], df_4h["low"], sh_4h, sl_4h,
|
||
min_strength=trend_strength_min,
|
||
)
|
||
|
||
strong_uptrend_4h = bool(strength_4h["strong_uptrend"].iloc[-1]) if len(strength_4h) > 0 else False
|
||
strong_downtrend_4h = bool(strength_4h["strong_downtrend"].iloc[-1]) if len(strength_4h) > 0 else False
|
||
|
||
trend_strength_up_val = float(strength_4h["trend_strength_up"].iloc[-1]) if len(strength_4h) > 0 else 0
|
||
trend_strength_down_val = float(strength_4h["trend_strength_down"].iloc[-1]) if len(strength_4h) > 0 else 0
|
||
|
||
# ---- 1H 结构(S/R 适配 v2.2c) ----
|
||
sh_1h, sl_1h = detect_swing_points(df_1h["high"], df_1h["low"], swing_lookback_1h)
|
||
struct_1h = build_structure(df_1h["high"], df_1h["low"], df_1h["close"], sh_1h, sl_1h)
|
||
|
||
last_1h_s = struct_1h.iloc[-1] if len(struct_1h) > 0 else None
|
||
support_1h = float(last_1h_s["support"]) if last_1h_s is not None and pd.notna(last_1h_s["support"]) else None
|
||
resistance_1h = float(last_1h_s["resistance"]) if last_1h_s is not None and pd.notna(last_1h_s["resistance"]) else None
|
||
in_demand_1h = bool(last_1h_s["in_demand"]) if last_1h_s is not None else False
|
||
in_supply_1h = bool(last_1h_s["in_supply"]) if last_1h_s is not None else False
|
||
|
||
# 1H 活 S/R 检查
|
||
support_alive_1h = False
|
||
resistance_alive_1h = False
|
||
if support_1h is not None:
|
||
support_alive_1h = bool(compute_support_alive(df_1h["low"], df_1h["close"], struct_1h["support"]).iloc[-1])
|
||
if resistance_1h is not None:
|
||
resistance_alive_1h = bool(compute_resistance_alive(df_1h["high"], df_1h["close"], struct_1h["resistance"]).iloc[-1])
|
||
|
||
# ---- 1H K线形态 ----
|
||
last_1h_candle = df_1h.iloc[-1] if len(df_1h) > 0 else None
|
||
candle = detect_candle_patterns(
|
||
df_1h["open"], df_1h["high"], df_1h["low"], df_1h["close"],
|
||
pin_bar_wick_ratio,
|
||
)
|
||
bullish_signal = bool(candle["bullish_signal"].iloc[-1]) if len(df_1h) > 0 else False
|
||
bearish_signal = bool(candle["bearish_signal"].iloc[-1]) if len(df_1h) > 0 else False
|
||
bullish_pinbar = bool(candle["bullish_pinbar"].iloc[-1]) if len(df_1h) > 0 else False
|
||
bearish_pinbar = bool(candle["bearish_pinbar"].iloc[-1]) if len(df_1h) > 0 else False
|
||
|
||
# ---- 入场距离(1H S/R) ----
|
||
current_price = float(df_1h["open"].iloc[-1]) if len(df_1h) > 0 else 0
|
||
long_stop_dist = None
|
||
short_stop_dist = None
|
||
if support_1h is not None and support_1h > 0:
|
||
long_stop_dist = abs((current_price - support_1h) / current_price)
|
||
if resistance_1h is not None and resistance_1h > 0:
|
||
short_stop_dist = abs((resistance_1h - current_price) / current_price)
|
||
|
||
long_dist_ok = long_stop_dist is not None and long_stop_dist <= max_stop_dist and long_stop_dist > 0.003
|
||
short_dist_ok = short_stop_dist is not None and short_stop_dist <= max_stop_dist and short_stop_dist > 0.003
|
||
|
||
# ---- Swing Point 历史(1H 用于画图) ----
|
||
swing_highs_1h = df_1h[sh_1h.notna()].index.strftime("%Y-%m-%d %H:%M").tolist() if len(sh_1h) > 0 else []
|
||
swing_high_prices_1h = [float(x) for x in sh_1h.dropna().tolist()]
|
||
swing_lows_1h = df_1h[sl_1h.notna()].index.strftime("%Y-%m-%d %H:%M").tolist() if len(sl_1h) > 0 else []
|
||
swing_low_prices_1h = [float(x) for x in sl_1h.dropna().tolist()]
|
||
|
||
# ---- 信号诊断结果(1H S/R) ----
|
||
diagnosis = {
|
||
"market_price": round(current_price, 2),
|
||
"support": round(support_1h, 2) if support_1h else None,
|
||
"resistance": round(resistance_1h, 2) if resistance_1h else None,
|
||
"zone_width_pct": round(abs(resistance_1h - support_1h) / support_1h * 100, 2) if support_1h and resistance_1h else None,
|
||
"price_position_in_zone": round((current_price - support_1h) / (resistance_1h - support_1h) * 100, 1) if support_1h and resistance_1h and resistance_1h > support_1h else None,
|
||
}
|
||
|
||
filters = {
|
||
"trend_up_1d": {
|
||
"pass": trend_up_1d,
|
||
"desc": "D1 上升趋势(HH+HL 都在抬高)",
|
||
"value": "上升" if trend_up_1d else ("下降" if trend_down_1d else "震荡"),
|
||
},
|
||
"in_demand_1h": {
|
||
"pass": in_demand_1h,
|
||
"desc": "价格在 1H 需求区(zone 底部 35%)",
|
||
"value": f"位置 {diagnosis['price_position_in_zone']}%" if diagnosis['price_position_in_zone'] is not None else "N/A",
|
||
},
|
||
"long_stop_distance": {
|
||
"pass": long_dist_ok,
|
||
"desc": f"入场距支撑距离 ({'%.2f' % (long_stop_dist*100) if long_stop_dist else 'N/A'}%)",
|
||
"value": f"{'%.2f' % (long_stop_dist*100)}%" if long_stop_dist else "N/A",
|
||
},
|
||
"support_alive_1h": {
|
||
"pass": support_alive_1h,
|
||
"desc": "1H 支撑最近被测试过且未跌破",
|
||
"value": "有效" if support_alive_1h else "失效/未测试",
|
||
},
|
||
"strong_uptrend_4h": {
|
||
"pass": strong_uptrend_4h,
|
||
"desc": "4H 趋势强度(扩张中)",
|
||
"value": f"{'%.2f' % (trend_strength_up_val*100)}%",
|
||
},
|
||
"trend_down_1d": {
|
||
"pass": trend_down_1d,
|
||
"desc": "D1 下降趋势(LH+LL 都在降低)",
|
||
"value": "下降" if trend_down_1d else ("上升" if trend_up_1d else "震荡"),
|
||
},
|
||
"in_supply_1h": {
|
||
"pass": in_supply_1h,
|
||
"desc": "价格在 1H 供给区(zone 顶部 65%+)",
|
||
"value": f"位置 {diagnosis['price_position_in_zone']}%" if diagnosis['price_position_in_zone'] is not None else "N/A",
|
||
},
|
||
"short_stop_distance": {
|
||
"pass": short_dist_ok,
|
||
"desc": f"入场距阻力距离 ({'%.2f' % (short_stop_dist*100) if short_stop_dist else 'N/A'}%)",
|
||
"value": f"{'%.2f' % (short_stop_dist*100)}%" if short_stop_dist else "N/A",
|
||
},
|
||
"resistance_alive_1h": {
|
||
"pass": resistance_alive_1h,
|
||
"desc": "1H 阻力最近被测试过且未突破",
|
||
"value": "有效" if resistance_alive_1h else "失效/未测试",
|
||
},
|
||
"strong_downtrend_4h": {
|
||
"pass": strong_downtrend_4h,
|
||
"desc": "4H 下降趋势强度(扩张中)",
|
||
"value": f"{'%.2f' % (trend_strength_down_val*100)}%",
|
||
},
|
||
}
|
||
|
||
# 入场可行性结论(1H S/R + 4H 趋势)
|
||
long_ok = all([
|
||
trend_up_1d, in_demand_1h, long_dist_ok,
|
||
support_alive_1h, strong_uptrend_4h,
|
||
])
|
||
short_ok = all([
|
||
trend_down_1d, in_supply_1h, short_dist_ok,
|
||
resistance_alive_1h, strong_downtrend_4h,
|
||
])
|
||
|
||
return {
|
||
"timestamp": pd.Timestamp.now().isoformat(),
|
||
"current_price": round(current_price, 2),
|
||
"diagnosis": diagnosis,
|
||
"filters": filters,
|
||
"can_enter_long": long_ok,
|
||
"can_enter_short": short_ok,
|
||
"trend_1d": "up" if trend_up_1d else ("down" if trend_down_1d else "neutral"),
|
||
"candle_1h": {
|
||
"bullish_pinbar": bullish_pinbar,
|
||
"bearish_pinbar": bearish_pinbar,
|
||
"bullish_signal": bullish_signal,
|
||
"bearish_signal": bearish_signal,
|
||
},
|
||
"swing_points": {
|
||
"highs": [{"time": t, "price": p} for t, p in zip(swing_highs_1h[-10:], swing_high_prices_1h[-10:])],
|
||
"lows": [{"time": t, "price": p} for t, p in zip(swing_lows_1h[-10:], swing_low_prices_1h[-10:])],
|
||
},
|
||
"trend_strength_4h": {
|
||
"up": round(trend_strength_up_val * 100, 2),
|
||
"down": round(trend_strength_down_val * 100, 2),
|
||
},
|
||
}
|