""" Structure Flow Swing Strategy v4.1 ================================== 15m 震荡波段 — 基于"碰壁验证"价格聚集度检测(短窗口版) v4.1b (2026-06-10): 1H + 3K线验证(正收益但频率太低) v4.1c (2026-06-10): 回到15m + 缩短lookback至24根(6h) + 保留min_rejections=2 """ from datetime import datetime import numpy as np import pandas as pd from pandas import DataFrame from freqtrade.strategy import IStrategy, IntParameter from freqtrade.persistence import Trade class StructureFlowSwingV41(IStrategy): """ Structure Flow Swing Strategy v4.1 15m 震荡区间波段交易 — 碰壁验证(短窗口) """ can_short = True stoploss = -0.20 use_custom_stoploss = True minimal_roi = {"0": 100} max_open_trades = 1 timeframe = "15m" # ===================== # 可优化参数 # ===================== lookback = IntParameter(8, 48, default=12, space="buy") # 检测窗口:12根15m = 3h min_rejections = IntParameter(1, 4, default=1, space="buy") # 碰壁验证1次即可 rejection_window = IntParameter(1, 6, default=3, space="buy") # 碰壁验证窗口 zone_width_atr_mult = IntParameter(2, 6, default=4, space="buy") entry_zone_pct = IntParameter(2, 8, default=5, space="buy") take_profit_pct = IntParameter(50, 80, default=70, space="sell") # 固定参数 breakout_bars = 2 cooldown = 2 # 冷却 2 根15m(30分钟) # ===================== # 工具:ATR计算 # ===================== @staticmethod def _calc_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: tr = pd.DataFrame({ "hl": high - low, "hc": (high - close.shift(1)).abs(), "lc": (low - close.shift(1)).abs(), }).max(axis=1) return tr.rolling(period).mean() # ===================== # 主时间框架 — 15m 指标 # ===================== def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: lookback = self.lookback.value rej_threshold = 0.005 # 边界 0.5% 范围内算"碰边" # ── 价格聚集范围 ── rolling_high = dataframe["high"].rolling(lookback).max() rolling_low = dataframe["low"].rolling(lookback).min() zone_width_raw = rolling_high - rolling_low dataframe["zone_high"] = rolling_high dataframe["zone_low"] = rolling_low dataframe["zone_width_raw"] = zone_width_raw dataframe["zone_width_pct"] = zone_width_raw / rolling_low # ATR dataframe["atr"] = self._calc_atr(dataframe["high"], dataframe["low"], dataframe["close"], 14) # ── 碰壁验证检测 ── # 支撑验证:价格到了低点附近 → 随后 rejection_window 根K线内有反弹 near_low = dataframe["low"] <= rolling_low * (1 + rej_threshold) bounced_up = pd.Series(False, index=dataframe.index) for i in range(1, self.rejection_window.value + 1): future_close = dataframe["close"].shift(-i).fillna(dataframe["close"]) bounced_up = bounced_up | (future_close > dataframe["close"]) support_rejection = near_low & bounced_up dataframe["support_rejection_count"] = support_rejection.rolling(lookback).sum() # 阻力验证:价格到了高点附近 → 随后 rejection_window 根K线内有回落 near_high = dataframe["high"] >= rolling_high * (1 - rej_threshold) bounced_down = pd.Series(False, index=dataframe.index) for i in range(1, self.rejection_window.value + 1): future_close = dataframe["close"].shift(-i).fillna(dataframe["close"]) bounced_down = bounced_down | (future_close < dataframe["close"]) resistance_rejection = near_high & bounced_down dataframe["resistance_rejection_count"] = resistance_rejection.rolling(lookback).sum() # ── 震荡判定 ── atr_mult = self.zone_width_atr_mult.value min_rej = self.min_rejections.value # 条件1:区间宽度合理(不超过 ATR × N) is_compact = zone_width_raw <= dataframe["atr"] * atr_mult # 条件2:上下边界都经过碰壁验证(各至少 min_rej 次) support_ok = dataframe["support_rejection_count"] >= min_rej resistance_ok = dataframe["resistance_rejection_count"] >= min_rej # 条件3:无突破 no_break_high = True no_break_low = True for i in range(1, self.breakout_bars + 1): if i <= len(dataframe): no_break_high = no_break_high & (dataframe["close"].shift(i) <= rolling_high) no_break_low = no_break_low & (dataframe["close"].shift(i) >= rolling_low) is_ranging = is_compact & support_ok & resistance_ok & no_break_high & no_break_low dataframe["is_ranging"] = is_ranging # ── 价格在区间内的位置 ── denom = rolling_high - rolling_low dataframe["zone_position"] = np.where( denom > 0, (dataframe["close"] - rolling_low) / denom, np.nan, ) dataframe["dist_to_low"] = np.where( rolling_low > 0, (dataframe["close"] - rolling_low) / dataframe["close"], np.nan, ) dataframe["dist_to_high"] = np.where( rolling_high > 0, (rolling_high - dataframe["close"]) / dataframe["close"], np.nan, ) # ── 填充 ── for col in ["is_ranging", "zone_position", "dist_to_low", "dist_to_high"]: if col in dataframe.columns: dataframe[col] = dataframe[col].fillna(False if col == "is_ranging" else 999) return dataframe # ================================================================ # 入场信号 # ================================================================ def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: entry_zone = self.entry_zone_pct.value / 1000.0 if "is_ranging" not in dataframe.columns: dataframe["is_ranging"] = False # ── 做多:震荡中,价格靠近下边界 ── long_conds = ( dataframe["is_ranging"] & (dataframe["dist_to_low"] < entry_zone) & (dataframe["dist_to_low"] > 0) ) long_recent = long_conds.rolling(self.cooldown, min_periods=1).max().shift(1) == 0 dataframe.loc[long_conds & long_recent, "enter_long"] = 1 # ── 做空:震荡中,价格靠近上边界 ── short_conds = ( dataframe["is_ranging"] & (dataframe["dist_to_high"] < entry_zone) & (dataframe["dist_to_high"] > 0) ) short_recent = short_conds.rolling(self.cooldown, min_periods=1).max().shift(1) == 0 dataframe.loc[short_conds & short_recent, "enter_short"] = 1 return dataframe # ================================================================ # 出场信号 # ================================================================ def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe # ================================================================ # 自定义止损:固定入场价下方-3%(真固定,不追踪) # ================================================================ def custom_stoploss( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, after_fill: bool, **kwargs, ) -> float: # 固定止损 = 入场价下方3%,不随当前价格移动 # 不用current_rate,用trade.open_rate确保锚点固定 if not trade.is_short: return max((trade.open_rate * 0.98 / current_rate) - 1.0, -0.20) else: return min(1.0 - (trade.open_rate * 1.02 / current_rate), 0.20) # ================================================================ # 自定义止盈:区间高度 × TP% # ================================================================ def custom_exit( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, **kwargs, ) -> str | None: tp_pct = self.take_profit_pct.value / 100.0 dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return None last = dataframe.iloc[-1] z_low = last.get("zone_low", np.nan) z_high = last.get("zone_high", np.nan) if pd.notna(z_low) and pd.notna(z_high) and z_high > z_low: base = z_low if not trade.is_short else z_high zone_height = (z_high - z_low) / base if current_profit >= zone_height * tp_pct: return "take_profit" return None # ================================================================ # 自定义退出:锁定入场边界 + 收盘确认止损 + 区间高度止盈 # ================================================================ def custom_exit( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, **kwargs, ) -> str | None: tp_pct = self.take_profit_pct.value / 100.0 dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return None last = dataframe.iloc[-1] # ── 锁定入场时的区间边界 ── open_time = trade.open_date_utc.replace(tzinfo=None) time_diff = (dataframe["date"] - open_time).abs() open_idx = time_diff.idxmin() open_row = dataframe.iloc[open_idx] z_low_open = open_row.get("zone_low", np.nan) z_high_open = open_row.get("zone_high", np.nan) # ── 结构止损:收盘价确认跌破入场时的边界 ── if not trade.is_short: if pd.notna(z_low_open) and z_low_open > 0 and last["close"] < z_low_open: return "stop_loss" # 收盘跌破支撑 else: if pd.notna(z_high_open) and z_high_open > 0 and last["close"] > z_high_open: return "stop_loss" # 收盘涨破阻力 # ── 止盈:区间高度 × TP%(也用锁定边界) ── if pd.notna(z_low_open) and pd.notna(z_high_open) and z_high_open > z_low_open: base = z_low_open if not trade.is_short else z_high_open zone_height = (z_high_open - z_low_open) / base if current_profit >= zone_height * tp_pct: return "take_profit" return None # ================================================================ # Plot config # ================================================================ @staticmethod def plot_config() -> dict: return { "main_plot": { "zone_high": {"color": "red", "type": "line"}, "zone_low": {"color": "green", "type": "line"}, }, "subplots": { "rejections": { "support_rejection_count": {"color": "green", "type": "line"}, "resistance_rejection_count": {"color": "red", "type": "line"}, }, "zone": { "is_ranging": {"color": "blue", "type": "line"}, }, }, }