""" 指标计算模块 — 从 v2.2b 策略提取的独立版本 用于 APP 后端实时计算市场结构和信号诊断 """ import numpy as np import pandas as pd def detect_swing_points( high: pd.Series, low: pd.Series, window: int = 5, ) -> tuple[pd.Series, pd.Series]: """检测 Swing High / Swing Low,与 v2.2b 策略一致""" n = len(high) sh = pd.Series(np.nan, index=high.index, dtype=float) sl = pd.Series(np.nan, index=low.index, dtype=float) for i in range(window, n - window): if high.iloc[i] > high.iloc[i - window : i].max() and high.iloc[i] > high.iloc[i + 1 : i + window + 1].max(): sh.iloc[i] = high.iloc[i] if low.iloc[i] < low.iloc[i - window : i].min() and low.iloc[i] < low.iloc[i + 1 : i + window + 1].min(): sl.iloc[i] = low.iloc[i] return sh, sl def build_structure( high: pd.Series, low: pd.Series, close: pd.Series, swing_high: pd.Series, swing_low: pd.Series, ) -> pd.DataFrame: """构建价格结构(趋势方向、S/R 位、供需区),与 v2.2b 一致""" n = len(high) trend_up_arr = np.full(n, False) trend_down_arr = np.full(n, False) nearest_support = np.full(n, np.nan) nearest_resistance = np.full(n, np.nan) in_demand_zone = np.full(n, False) in_supply_zone = np.full(n, False) sh_prices = [] sl_prices = [] for i in range(n): if pd.notna(swing_high.iloc[i]): sh_prices.append(swing_high.iloc[i]) if len(sh_prices) > 4: sh_prices.pop(0) if pd.notna(swing_low.iloc[i]): sl_prices.append(swing_low.iloc[i]) if len(sl_prices) > 4: sl_prices.pop(0) if len(sh_prices) >= 2 and len(sl_prices) >= 2: if sh_prices[-1] > sh_prices[-2] and sl_prices[-1] > sl_prices[-2]: trend_up_arr[i] = True elif sh_prices[-1] < sh_prices[-2] and sl_prices[-1] < sl_prices[-2]: trend_down_arr[i] = True elif i > 0: trend_up_arr[i] = trend_up_arr[i - 1] trend_down_arr[i] = trend_down_arr[i - 1] elif i > 0: trend_up_arr[i] = trend_up_arr[i - 1] trend_down_arr[i] = trend_down_arr[i - 1] if sl_prices: nearest_support[i] = sl_prices[-1] if sh_prices: nearest_resistance[i] = sh_prices[-1] c = close.iloc[i] if not np.isnan(nearest_support[i]) and not np.isnan(nearest_resistance[i]): zone_range = nearest_resistance[i] - nearest_support[i] if zone_range > 0: pos_pct = (c - nearest_support[i]) / zone_range in_demand_zone[i] = pos_pct < 0.35 in_supply_zone[i] = pos_pct > 0.65 return pd.DataFrame({ "trend_up": trend_up_arr, "trend_down": trend_down_arr, "support": nearest_support, "resistance": nearest_resistance, "in_demand": in_demand_zone, "in_supply": in_supply_zone, }, index=high.index) def compute_trend_strength( high: pd.Series, low: pd.Series, swing_high: pd.Series, swing_low: pd.Series, min_strength: float = -0.20, ) -> pd.DataFrame: """计算趋势强度(4H Swing Point 间距变化),与 v2.2b 一致""" sh_prices = [] sl_prices = [] trend_strength_up = np.full(len(high), np.nan) trend_strength_down = np.full(len(high), np.nan) strong_uptrend = np.full(len(high), False) strong_downtrend = np.full(len(high), False) for i in range(len(high)): if pd.notna(swing_high.iloc[i]): sh_prices.append(swing_high.iloc[i]) if len(sh_prices) > 4: sh_prices.pop(0) if pd.notna(swing_low.iloc[i]): sl_prices.append(swing_low.iloc[i]) if len(sl_prices) > 4: sl_prices.pop(0) if len(sh_prices) >= 2 and len(sl_prices) >= 2: hh_dist = (sh_prices[-1] - sh_prices[-2]) / sh_prices[-2] if sh_prices[-2] > 0 else 0 hl_dist = (sl_prices[-1] - sl_prices[-2]) / sl_prices[-2] if sl_prices[-2] > 0 else 0 trend_strength_up[i] = hh_dist + hl_dist trend_strength_down[i] = -(hh_dist + hl_dist) strong_uptrend = pd.Series(trend_strength_up) > min_strength strong_downtrend = pd.Series(trend_strength_down) > min_strength return pd.DataFrame({ "trend_strength_up": pd.Series(trend_strength_up, index=high.index), "trend_strength_down": pd.Series(trend_strength_down, index=high.index), "strong_uptrend": strong_uptrend.values, "strong_downtrend": strong_downtrend.values, }, index=high.index) def compute_support_alive( low: pd.Series, close: pd.Series, support: pd.Series, ) -> pd.Series: """检查支撑是否活着(最近测试过没跌破),与 v2.2b 一致""" touched = ( (low <= support * 1.005) & (low >= support * 0.995) ) held = close > support tested_and_held = touched & held return tested_and_held.rolling(3, min_periods=1).max() > 0 def compute_resistance_alive( high: pd.Series, close: pd.Series, resistance: pd.Series, ) -> pd.Series: """检查阻力是否活着(最近测试过没突破),与 v2.2b 一致""" touched = ( (high >= resistance * 0.995) & (high <= resistance * 1.005) ) held = close < resistance tested_and_held = touched & held return tested_and_held.rolling(3, min_periods=1).max() > 0 def detect_candle_patterns( open_: pd.Series, high: pd.Series, low: pd.Series, close: pd.Series, pin_bar_wick_ratio: float = 0.6, ) -> dict: """检测 K 线形态,与 v2.2b 一致""" body = (close - open_).abs() total_range = (high - low).replace(0, 0.0001) upper_wick = high - close.where(close > open_, open_) lower_wick = open_.where(close > open_, close) - low is_pin = (upper_wick + lower_wick) / total_range > pin_bar_wick_ratio bullish_pin = is_pin & (close > open_) & (lower_wick > upper_wick) bearish_pin = is_pin & (close < open_) & (upper_wick > lower_wick) prev_open = open_.shift(1) prev_close = close.shift(1) bullish_engulf = (close > prev_open) & (open_ < prev_close) & (close > open_) bearish_engulf = (close < prev_open) & (open_ > prev_close) & (close < open_) return { "bullish_pinbar": bullish_pin, "bearish_pinbar": bearish_pin, "bullish_signal": bullish_pin | bullish_engulf, "bearish_signal": bearish_pin | bearish_engulf, } def compute_all_indicators( df_1h: pd.DataFrame, df_4h: pd.DataFrame, df_1d: pd.DataFrame, params: dict = None, ) -> dict: """ 完整计算所有指标的入口函数 返回 APP 需要的全部数据 params 可选参数: swing_lookback_d1: int (default 5) swing_lookback_h4: int (default 8) pin_bar_wick_ratio: float (default 0.6) max_stop_dist: float (default 0.50 = 5%) trend_strength_min: float (default -0.20) """ if params is None: params = {} swing_lookback_d1 = params.get("swing_lookback_d1", 5) swing_lookback_h4 = params.get("swing_lookback_h4", 8) swing_lookback_1h = params.get("swing_lookback_1h", 5) pin_bar_wick_ratio = params.get("pin_bar_wick_ratio", 0.6) max_stop_dist = params.get("max_stop_dist", 0.50) trend_strength_min = params.get("trend_strength_min", -0.20) # ---- D1 结构 ---- sh_d1, sl_d1 = detect_swing_points(df_1d["high"], df_1d["low"], swing_lookback_d1) struct_d1 = build_structure(df_1d["high"], df_1d["low"], df_1d["close"], sh_d1, sl_d1) trend_up_1d = bool(struct_d1["trend_up"].iloc[-1]) if len(struct_d1) > 0 else False trend_down_1d = bool(struct_d1["trend_down"].iloc[-1]) if len(struct_d1) > 0 else False # ---- 4H 结构(仅趋势强度) ---- sh_4h, sl_4h = detect_swing_points(df_4h["high"], df_4h["low"], swing_lookback_h4) struct_4h = build_structure(df_4h["high"], df_4h["low"], df_4h["close"], sh_4h, sl_4h) strength_4h = compute_trend_strength( df_4h["high"], df_4h["low"], sh_4h, sl_4h, min_strength=trend_strength_min, ) strong_uptrend_4h = bool(strength_4h["strong_uptrend"].iloc[-1]) if len(strength_4h) > 0 else False strong_downtrend_4h = bool(strength_4h["strong_downtrend"].iloc[-1]) if len(strength_4h) > 0 else False trend_strength_up_val = float(strength_4h["trend_strength_up"].iloc[-1]) if len(strength_4h) > 0 else 0 trend_strength_down_val = float(strength_4h["trend_strength_down"].iloc[-1]) if len(strength_4h) > 0 else 0 # ---- 1H 结构(S/R 适配 v2.2c) ---- sh_1h, sl_1h = detect_swing_points(df_1h["high"], df_1h["low"], swing_lookback_1h) struct_1h = build_structure(df_1h["high"], df_1h["low"], df_1h["close"], sh_1h, sl_1h) last_1h_s = struct_1h.iloc[-1] if len(struct_1h) > 0 else None support_1h = float(last_1h_s["support"]) if last_1h_s is not None and pd.notna(last_1h_s["support"]) else None resistance_1h = float(last_1h_s["resistance"]) if last_1h_s is not None and pd.notna(last_1h_s["resistance"]) else None in_demand_1h = bool(last_1h_s["in_demand"]) if last_1h_s is not None else False in_supply_1h = bool(last_1h_s["in_supply"]) if last_1h_s is not None else False # 1H 活 S/R 检查 support_alive_1h = False resistance_alive_1h = False if support_1h is not None: support_alive_1h = bool(compute_support_alive(df_1h["low"], df_1h["close"], struct_1h["support"]).iloc[-1]) if resistance_1h is not None: resistance_alive_1h = bool(compute_resistance_alive(df_1h["high"], df_1h["close"], struct_1h["resistance"]).iloc[-1]) # ---- 1H K线形态 ---- last_1h_candle = df_1h.iloc[-1] if len(df_1h) > 0 else None candle = detect_candle_patterns( df_1h["open"], df_1h["high"], df_1h["low"], df_1h["close"], pin_bar_wick_ratio, ) bullish_signal = bool(candle["bullish_signal"].iloc[-1]) if len(df_1h) > 0 else False bearish_signal = bool(candle["bearish_signal"].iloc[-1]) if len(df_1h) > 0 else False bullish_pinbar = bool(candle["bullish_pinbar"].iloc[-1]) if len(df_1h) > 0 else False bearish_pinbar = bool(candle["bearish_pinbar"].iloc[-1]) if len(df_1h) > 0 else False # ---- 入场距离(1H S/R) ---- current_price = float(df_1h["open"].iloc[-1]) if len(df_1h) > 0 else 0 long_stop_dist = None short_stop_dist = None if support_1h is not None and support_1h > 0: long_stop_dist = abs((current_price - support_1h) / current_price) if resistance_1h is not None and resistance_1h > 0: short_stop_dist = abs((resistance_1h - current_price) / current_price) long_dist_ok = long_stop_dist is not None and long_stop_dist <= max_stop_dist and long_stop_dist > 0.003 short_dist_ok = short_stop_dist is not None and short_stop_dist <= max_stop_dist and short_stop_dist > 0.003 # ---- Swing Point 历史(1H 用于画图) ---- swing_highs_1h = df_1h[sh_1h.notna()].index.strftime("%Y-%m-%d %H:%M").tolist() if len(sh_1h) > 0 else [] swing_high_prices_1h = [float(x) for x in sh_1h.dropna().tolist()] swing_lows_1h = df_1h[sl_1h.notna()].index.strftime("%Y-%m-%d %H:%M").tolist() if len(sl_1h) > 0 else [] swing_low_prices_1h = [float(x) for x in sl_1h.dropna().tolist()] # ---- 信号诊断结果(1H S/R) ---- diagnosis = { "market_price": round(current_price, 2), "support": round(support_1h, 2) if support_1h else None, "resistance": round(resistance_1h, 2) if resistance_1h else None, "zone_width_pct": round(abs(resistance_1h - support_1h) / support_1h * 100, 2) if support_1h and resistance_1h else None, "price_position_in_zone": round((current_price - support_1h) / (resistance_1h - support_1h) * 100, 1) if support_1h and resistance_1h and resistance_1h > support_1h else None, } filters = { "trend_up_1d": { "pass": trend_up_1d, "desc": "D1 上升趋势(HH+HL 都在抬高)", "value": "上升" if trend_up_1d else ("下降" if trend_down_1d else "震荡"), }, "in_demand_1h": { "pass": in_demand_1h, "desc": "价格在 1H 需求区(zone 底部 35%)", "value": f"位置 {diagnosis['price_position_in_zone']}%" if diagnosis['price_position_in_zone'] is not None else "N/A", }, "long_stop_distance": { "pass": long_dist_ok, "desc": f"入场距支撑距离 ({'%.2f' % (long_stop_dist*100) if long_stop_dist else 'N/A'}%)", "value": f"{'%.2f' % (long_stop_dist*100)}%" if long_stop_dist else "N/A", }, "support_alive_1h": { "pass": support_alive_1h, "desc": "1H 支撑最近被测试过且未跌破", "value": "有效" if support_alive_1h else "失效/未测试", }, "strong_uptrend_4h": { "pass": strong_uptrend_4h, "desc": "4H 趋势强度(扩张中)", "value": f"{'%.2f' % (trend_strength_up_val*100)}%", }, "trend_down_1d": { "pass": trend_down_1d, "desc": "D1 下降趋势(LH+LL 都在降低)", "value": "下降" if trend_down_1d else ("上升" if trend_up_1d else "震荡"), }, "in_supply_1h": { "pass": in_supply_1h, "desc": "价格在 1H 供给区(zone 顶部 65%+)", "value": f"位置 {diagnosis['price_position_in_zone']}%" if diagnosis['price_position_in_zone'] is not None else "N/A", }, "short_stop_distance": { "pass": short_dist_ok, "desc": f"入场距阻力距离 ({'%.2f' % (short_stop_dist*100) if short_stop_dist else 'N/A'}%)", "value": f"{'%.2f' % (short_stop_dist*100)}%" if short_stop_dist else "N/A", }, "resistance_alive_1h": { "pass": resistance_alive_1h, "desc": "1H 阻力最近被测试过且未突破", "value": "有效" if resistance_alive_1h else "失效/未测试", }, "strong_downtrend_4h": { "pass": strong_downtrend_4h, "desc": "4H 下降趋势强度(扩张中)", "value": f"{'%.2f' % (trend_strength_down_val*100)}%", }, } # 入场可行性结论(1H S/R + 4H 趋势) long_ok = all([ trend_up_1d, in_demand_1h, long_dist_ok, support_alive_1h, strong_uptrend_4h, ]) short_ok = all([ trend_down_1d, in_supply_1h, short_dist_ok, resistance_alive_1h, strong_downtrend_4h, ]) return { "timestamp": pd.Timestamp.now().isoformat(), "current_price": round(current_price, 2), "diagnosis": diagnosis, "filters": filters, "can_enter_long": long_ok, "can_enter_short": short_ok, "trend_1d": "up" if trend_up_1d else ("down" if trend_down_1d else "neutral"), "candle_1h": { "bullish_pinbar": bullish_pinbar, "bearish_pinbar": bearish_pinbar, "bullish_signal": bullish_signal, "bearish_signal": bearish_signal, }, "swing_points": { "highs": [{"time": t, "price": p} for t, p in zip(swing_highs_1h[-10:], swing_high_prices_1h[-10:])], "lows": [{"time": t, "price": p} for t, p in zip(swing_lows_1h[-10:], swing_low_prices_1h[-10:])], }, "trend_strength_4h": { "up": round(trend_strength_up_val * 100, 2), "down": round(trend_strength_down_val * 100, 2), }, }