""" Structure Flow Strategy v2.2c — 冷却期修复版 ============================================== 变更记录: v2.2c (2026-06-11): 1H S/R 替代 4H S/R v2.2c-coolfix (2026-06-11): 修复冷却期无限阻止下单 bug """ from datetime import datetime import numpy as np import pandas as pd from pandas import DataFrame from freqtrade.strategy import IStrategy, IntParameter, informative from freqtrade.persistence import Trade class StructureFlowStrategyV22d(IStrategy): can_short = True stoploss = -0.15 use_custom_stoploss = True minimal_roi = {"0": 100} max_open_trades = 1 timeframe = "1h" # ===================== # 可优化参数 # ===================== swing_lookback_d1 = IntParameter(8, 14, default=10, space="buy") swing_lookback_h4 = IntParameter(5, 10, default=8, space="buy") swing_lookback_1h = IntParameter(3, 7, default=5, space="buy") # 新增:1H swing参数 pin_bar_wick_ratio = IntParameter(50, 70, default=60, space="buy") max_stop_dist = IntParameter(20, 50, default=50, space="buy") cooldown_bars = IntParameter(3, 12, default=6, space="buy") trend_strength_min = IntParameter(-50, 20, default=-20, space="buy") # ===================== # 工具:Swing Point 检测 # ===================== @staticmethod def _detect_swing_points( high: pd.Series, low: pd.Series, window: int = 5, ) -> tuple[pd.Series, pd.Series]: n = len(high) sh = pd.Series(np.nan, index=high.index, dtype=float) sl = pd.Series(np.nan, index=low.index, dtype=float) for i in range(window, n - window): if high.iloc[i] > high.iloc[i - window : i].max() and high.iloc[i] > high.iloc[i + 1 : i + window + 1].max(): sh.iloc[i] = high.iloc[i] if low.iloc[i] < low.iloc[i - window : i].min() and low.iloc[i] < low.iloc[i + 1 : i + window + 1].min(): sl.iloc[i] = low.iloc[i] return sh, sl # ===================== # 工具:结构分析 # ===================== def _build_structure( self, high: pd.Series, low: pd.Series, close: pd.Series, swing_high: pd.Series, swing_low: pd.Series, ) -> DataFrame: n = len(high) trend_up_arr = np.full(n, False) trend_down_arr = np.full(n, False) nearest_support = np.full(n, np.nan) nearest_resistance = np.full(n, np.nan) in_demand_zone = np.full(n, False) in_supply_zone = np.full(n, False) sh_prices = [] sl_prices = [] for i in range(n): if pd.notna(swing_high.iloc[i]): sh_prices.append(swing_high.iloc[i]) if len(sh_prices) > 4: sh_prices.pop(0) if pd.notna(swing_low.iloc[i]): sl_prices.append(swing_low.iloc[i]) if len(sl_prices) > 4: sl_prices.pop(0) if len(sh_prices) >= 2 and len(sl_prices) >= 2: if sh_prices[-1] > sh_prices[-2] and sl_prices[-1] > sl_prices[-2]: trend_up_arr[i] = True elif sh_prices[-1] < sh_prices[-2] and sl_prices[-1] < sl_prices[-2]: trend_down_arr[i] = True elif i > 0: trend_up_arr[i] = trend_up_arr[i - 1] trend_down_arr[i] = trend_down_arr[i - 1] elif i > 0: trend_up_arr[i] = trend_up_arr[i - 1] trend_down_arr[i] = trend_down_arr[i - 1] if sl_prices: nearest_support[i] = sl_prices[-1] if sh_prices: nearest_resistance[i] = sh_prices[-1] c = close.iloc[i] if not np.isnan(nearest_support[i]) and not np.isnan(nearest_resistance[i]): zone_range = nearest_resistance[i] - nearest_support[i] if zone_range > 0: pos_pct = (c - nearest_support[i]) / zone_range in_demand_zone[i] = pos_pct < 0.35 in_supply_zone[i] = pos_pct > 0.65 return DataFrame({ "trend_up": trend_up_arr, "trend_down": trend_down_arr, "support": nearest_support, "resistance": nearest_resistance, "in_demand": in_demand_zone, "in_supply": in_supply_zone, }, index=high.index) # ===================== # 工具:K线形态检测 # ===================== @staticmethod def _detect_candle_patterns( open_: pd.Series, high: pd.Series, low: pd.Series, close: pd.Series, pin_bar_wick_ratio: float = 0.6, ) -> tuple[pd.Series, pd.Series, pd.Series, pd.Series]: body = (close - open_).abs() total_range = (high - low).replace(0, 0.0001) upper_wick = high - close.where(close > open_, open_) lower_wick = open_.where(close > open_, close) - low is_pin = (upper_wick + lower_wick) / total_range > pin_bar_wick_ratio bullish_pin = is_pin & (close > open_) & (lower_wick > upper_wick) bearish_pin = is_pin & (close < open_) & (upper_wick > lower_wick) prev_open = open_.shift(1) prev_close = close.shift(1) bullish_engulf = (close > prev_open) & (open_ < prev_close) & (close > open_) bearish_engulf = (close < prev_open) & (open_ > prev_close) & (close < open_) return bullish_pin, bearish_pin, bullish_engulf, bearish_engulf # ===================== # 工具:冷却期正确实现(修复 bug) # ===================== def _apply_cooldown(self, signal: pd.Series, cooldown_bars: int) -> pd.Series: """ 正确应用冷却期:入场后才冷却,而非条件满足就冷却。 原逻辑 bug:long_base.rolling(cooldown).max().shift(1) == 0 - 当市场持续满足入场条件时,rolling window 里永远有 True - 导致冷却期无限阻止下单 修复逻辑:遍历 K 线,模拟"入场 -> 冷却"过程。 - 满足条件 + 距离上次入场 > cooldown -> 允许入场 - 入场后 cooldown 根 K 线内不再入场 """ n = len(signal) result = [False] * n last_entry = -99999 # 上次入场的 bar 索引 # 遍历(对 numpy array 操作,O(n) 约几毫秒) values = signal.values # numpy array,快速访问 for i in range(n): if values[i] and (i - last_entry) > cooldown_bars: result[i] = True last_entry = i return pd.Series(result, index=signal.index) # ================================================================ # 信息时间框架 — D1 宏观结构 # ================================================================ @informative("1d") def populate_indicators_1d( self, dataframe: DataFrame, metadata: dict ) -> DataFrame: sh, sl = self._detect_swing_points( dataframe["high"], dataframe["low"], self.swing_lookback_d1.value, ) structure = self._build_structure( dataframe["high"], dataframe["low"], dataframe["close"], sh, sl, ) dataframe["trend_up"] = structure["trend_up"] dataframe["trend_down"] = structure["trend_down"] return dataframe # ================================================================ # 信息时间框架 — 4H 趋势强度(原版保留) # ================================================================ @informative("4h") def populate_indicators_4h( self, dataframe: DataFrame, metadata: dict ) -> DataFrame: sh, sl = self._detect_swing_points( dataframe["high"], dataframe["low"], self.swing_lookback_h4.value, ) structure = self._build_structure( dataframe["high"], dataframe["low"], dataframe["close"], sh, sl, ) # 趋势强度计算(原版逻辑) sh_prices = [] sl_prices = [] trend_strength_up = np.full(len(dataframe), np.nan) trend_strength_down = np.full(len(dataframe), np.nan) for i in range(len(dataframe)): if pd.notna(sh.iloc[i]): sh_prices.append(sh.iloc[i]) if len(sh_prices) > 4: sh_prices.pop(0) if pd.notna(sl.iloc[i]): sl_prices.append(sl.iloc[i]) if len(sl_prices) > 4: sl_prices.pop(0) if len(sh_prices) >= 2 and len(sl_prices) >= 2: hh_dist = (sh_prices[-1] - sh_prices[-2]) / sh_prices[-2] if sh_prices[-2] > 0 else 0 hl_dist = (sl_prices[-1] - sl_prices[-2]) / sl_prices[-2] if sl_prices[-2] > 0 else 0 trend_strength_up[i] = hh_dist + hl_dist trend_strength_down[i] = -(hh_dist + hl_dist) dataframe["trend_strength_up"] = pd.Series(trend_strength_up, index=dataframe.index) dataframe["trend_strength_down"] = pd.Series(trend_strength_down, index=dataframe.index) min_strength = self.trend_strength_min.value / 100.0 dataframe["strong_uptrend"] = dataframe["trend_strength_up"] > min_strength dataframe["strong_downtrend"] = dataframe["trend_strength_down"] > min_strength return dataframe # ================================================================ # 主时间框架 — 1H 指标(含 1H S/R + 活支撑/阻力) # ================================================================ def populate_indicators( self, dataframe: DataFrame, metadata: dict ) -> DataFrame: # ── K线形态 ── bullish_pin, bearish_pin, bullish_engulf, bearish_engulf = ( self._detect_candle_patterns( dataframe["open"], dataframe["high"], dataframe["low"], dataframe["close"], self.pin_bar_wick_ratio.value / 100.0, ) ) dataframe["bullish_pinbar"] = bullish_pin dataframe["bearish_pinbar"] = bearish_pin dataframe["bullish_engulfing"] = bullish_engulf dataframe["bearish_engulfing"] = bearish_engulf dataframe["bullish_signal"] = bullish_pin | bullish_engulf dataframe["bearish_signal"] = bearish_pin | bearish_engulf # ── 1H级别 Swing Point + 结构(替代原4H S/R) ── sh_1h, sl_1h = self._detect_swing_points( dataframe["high"], dataframe["low"], self.swing_lookback_1h.value, ) structure_1h = self._build_structure( dataframe["high"], dataframe["low"], dataframe["close"], sh_1h, sl_1h, ) dataframe["trend_up_1h"] = structure_1h["trend_up"] dataframe["trend_down_1h"] = structure_1h["trend_down"] dataframe["support"] = structure_1h["support"] dataframe["resistance"] = structure_1h["resistance"] dataframe["in_demand"] = structure_1h["in_demand"] dataframe["in_supply"] = structure_1h["in_supply"] # ── 1H 活支撑/阻力检查 ── touched_support = ( (dataframe["low"] <= dataframe["support"] * 1.005) & (dataframe["low"] >= dataframe["support"] * 0.995) ) held_support = dataframe["close"] > dataframe["support"] support_tested_and_held = touched_support & held_support dataframe["support_alive"] = support_tested_and_held.rolling(3, min_periods=1).max() > 0 touched_resistance = ( (dataframe["high"] >= dataframe["resistance"] * 0.995) & (dataframe["high"] <= dataframe["resistance"] * 1.005) ) held_resistance = dataframe["close"] < dataframe["resistance"] resistance_tested_and_held = touched_resistance & held_resistance dataframe["resistance_alive"] = resistance_tested_and_held.rolling(3, min_periods=1).max() > 0 # ── NaN 安全处理 ── bool_cols = [ "trend_up_1d", "trend_down_1d", "trend_up_4h", "trend_down_4h", "in_demand", "in_supply", "support_alive", "resistance_alive", "strong_uptrend_4h", "strong_downtrend_4h", "bullish_signal", "bearish_signal", ] for col in bool_cols: if col in dataframe.columns: dataframe[col] = dataframe[col].fillna(False) return dataframe # ===================== # 入场信号(修复冷却期逻辑) # ===================== def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: max_dist = self.max_stop_dist.value / 100.0 cooldown = self.cooldown_bars.value bool_cols = [ "trend_up_1d", "trend_down_1d", "trend_up_4h", "trend_down_4h", "in_demand", "in_supply", "support_alive", "resistance_alive", "strong_uptrend_4h", "strong_downtrend_4h", "bullish_signal", "bearish_signal", ] for col in bool_cols: if col in dataframe.columns: dataframe[col] = dataframe[col].fillna(False) # ── 做多(使用1H S/R) ── long_stop_dist = (dataframe["open"] - dataframe["support"]) / dataframe["open"] long_base = ( dataframe["trend_up_1d"] & dataframe["in_demand"] & (long_stop_dist <= max_dist) & (long_stop_dist > 0.003) & dataframe["support_alive"] & dataframe["strong_uptrend_4h"] ) # ✅ 修复:正确应用冷却期(基于实际入场,而非条件满足) long_entries = self._apply_cooldown(long_base, cooldown) dataframe.loc[long_entries, "enter_long"] = 1 # ── 做空(使用1H S/R) ── short_stop_dist = (dataframe["resistance"] - dataframe["open"]) / dataframe["open"] short_base = ( dataframe["trend_down_1d"] & dataframe["in_supply"] & (short_stop_dist <= max_dist) & (short_stop_dist > 0.003) & dataframe["resistance_alive"] & dataframe["strong_downtrend_4h"] ) # ✅ 修复:正确应用冷却期(基于实际入场,而非条件满足) short_entries = self._apply_cooldown(short_base, cooldown) dataframe.loc[short_entries, "enter_short"] = 1 return dataframe # ===================== # 出场信号 # ===================== def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: exit_long = ~dataframe["trend_up_1d"].fillna(True) dataframe.loc[exit_long, "exit_long"] = 1 exit_short = dataframe["trend_up_1d"].fillna(False) dataframe.loc[exit_short, "exit_short"] = 1 return dataframe # ===================== # 动态止损(基于1H S/R) # ===================== def custom_stoploss( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, after_fill: bool, **kwargs, ) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return -0.02 if not trade.is_short else 0.02 last = dataframe.iloc[-1] if not trade.is_short: support = last.get("support", np.nan) if pd.isna(support) or support <= 0: return -0.02 sl_price = support * 0.999 sl_ratio = (sl_price / current_rate) - 1.0 return max(sl_ratio, -0.15) else: resistance = last.get("resistance", np.nan) if pd.isna(resistance) or resistance <= 0: return 0.02 sl_price = resistance * 1.001 sl_ratio = 1.0 - (sl_price / current_rate) return min(sl_ratio, 0.15) # ===================== # Plot config # ===================== @staticmethod def plot_config() -> dict: return { "main_plot": { "support": {"color": "green", "type": "line"}, "resistance": {"color": "red", "type": "line"}, }, "subplots": { "signals": { "bullish_pinbar": {"color": "green", "type": "scatter"}, "bearish_pinbar": {"color": "red", "type": "scatter"}, }, "filters": { "support_alive": {"color": "green", "type": "line"}, "resistance_alive": {"color": "red", "type": "line"}, "strong_uptrend_4h": {"color": "blue", "type": "line"}, "strong_downtrend_4h": {"color": "orange", "type": "line"}, }, }, }