""" Structure Flow Swing Strategy v3.2 ================================== 波段交易策略 — 基于4H震荡区间,v3.1优化版 v3.2 改动(基于v3.1诊断结果 — 三大市场感知不足): 1. D1趋势强度过滤:D1处于强趋势时拒绝入场,防假区间陷阱 - 计算 D1 EMA20/EMA50 间距作为趋势强度指标 - 趋势强度超过阈值 → 不交易(即使4H出现区间形态) 2. 区间质量评分:从二分法升级为多维度评分 - 边界测试次数(测试越多越可靠) - 区间持续时长(越长越成熟) - 区间宽度适配度(3-8%最优) - 总分>=阈值才入场 3. 主动退出机制:确认转趋势后提前离场 - 3根连续K线收盘在入场时区间外 → 结构破坏 - 不等止损,主动离场(仅在损失<2%时) - 避免浮盈变亏损 保留:纯震荡定位、ATR×1.5止损、区间70%止盈、OR双边测试、冷却期1根 版本历史: v3.0 (2026-06-10): 初版,基于冯总波段交易新思路 v3.1 (2026-06-10): 降低条件门槛,AND→OR等4项 v3.2 (2026-06-10): 三大市场感知改进 """ from datetime import datetime import numpy as np import pandas as pd from pandas import DataFrame from freqtrade.strategy import IStrategy, IntParameter, informative from freqtrade.persistence import Trade class StructureFlowSwingV32(IStrategy): """ Structure Flow Swing Strategy v3.2 4H震荡区间波段交易 — 市场感知增强版 """ can_short = True stoploss = -0.20 use_custom_stoploss = True minimal_roi = {"0": 100} max_open_trades = 1 timeframe = "4h" # ===================== # 核心参数(沿用v3.1默认值) # ===================== swing_lookback = IntParameter(4, 8, default=5, space="buy") zone_stability_threshold = IntParameter(15, 40, default=25, space="buy") entry_zone_pct = IntParameter(1, 5, default=3, space="buy") atr_stop_mult = IntParameter(10, 25, default=15, space="buy") take_profit_pct = IntParameter(50, 80, default=70, space="sell") # v3.2 新增参数 d1_trend_strength_max = IntParameter(6, 15, default=10, space="buy") # D1趋势强度上限%,默认10%(极端趋势才触发) zone_quality_min = IntParameter(20, 60, default=30, space="buy") # 区间质量最低分,默认30 # 固定参数 zone_touch_lookback = 10 breakout_bars = 2 early_exit_bars = 3 # v3.2新增:连续N根在区间外触发主动退出 # ===================== # 工具:Swing Point 检测 # ===================== @staticmethod def _detect_swing_points( high: pd.Series, low: pd.Series, window: int = 5, ) -> tuple[pd.Series, pd.Series]: n = len(high) sh = pd.Series(np.nan, index=high.index, dtype=float) sl = pd.Series(np.nan, index=low.index, dtype=float) for i in range(window, n - window): if high.iloc[i] > high.iloc[i - window:i].max() and high.iloc[i] > high.iloc[i + 1:i + window + 1].max(): sh.iloc[i] = high.iloc[i] if low.iloc[i] < low.iloc[i - window:i].min() and low.iloc[i] < low.iloc[i + 1:i + window + 1].min(): sl.iloc[i] = low.iloc[i] return sh, sl # ===================== # 工具:区间震荡检测(增强版:加入质量评分数据) # ===================== def _detect_range( self, sh: pd.Series, sl: pd.Series, high: pd.Series, low: pd.Series, close: pd.Series, ) -> DataFrame: n = len(high) is_ranging = np.full(n, False) support_arr = np.full(n, np.nan) resistance_arr = np.full(n, np.nan) zone_width_arr = np.full(n, np.nan) touch_count_arr = np.full(n, 0) # v3.2新增 sh_prices = [] sl_prices = [] in_range = False touch_count = 0 for i in range(n): if pd.notna(sh.iloc[i]): sh_prices.append(sh.iloc[i]) if len(sh_prices) > 5: sh_prices.pop(0) if pd.notna(sl.iloc[i]): sl_prices.append(sl.iloc[i]) if len(sl_prices) > 5: sl_prices.pop(0) if len(sh_prices) < 3 or len(sl_prices) < 3: # 不在区间中 if in_range: in_range = False touch_count = 0 continue current_sh = sh_prices[-1] current_sl = sl_prices[-1] if current_sh <= current_sl: if in_range: in_range = False touch_count = 0 continue zone_width = (current_sh - current_sl) / current_sl support_arr[i] = current_sl resistance_arr[i] = current_sh zone_width_arr[i] = zone_width # 条件1:区间宽度稳定性 widths = [] for j in range(min(len(sh_prices), len(sl_prices)) - 1, -1, -1): w = (sh_prices[j] - sl_prices[j]) / sl_prices[j] widths.append(w) if len(widths) >= 3: break if len(widths) >= 3: mean_width = np.mean(widths) if mean_width > 0: max_dev = max(abs(w - mean_width) / mean_width for w in widths) stability_threshold = self.zone_stability_threshold.value / 100.0 is_stable = max_dev <= stability_threshold else: is_stable = False else: is_stable = False if not is_stable: if in_range: in_range = False touch_count = 0 continue # 条件2:价格测试过边界 — v3.1: AND→OR start_idx = max(0, i - self.zone_touch_lookback) support_zone_upper = current_sl * 1.01 touched_support = any( low.iloc[j] <= support_zone_upper for j in range(start_idx, i + 1) ) resistance_zone_lower = current_sh * 0.99 touched_resistance = any( high.iloc[j] >= resistance_zone_lower for j in range(start_idx, i + 1) ) if not (touched_support or touched_resistance): if in_range: in_range = False touch_count = 0 continue # 条件3:无突破 consecutive_outside = 0 for j in range(i, max(0, i - self.breakout_bars) - 1, -1): if close.iloc[j] > current_sh or close.iloc[j] < current_sl: consecutive_outside += 1 else: break if consecutive_outside >= self.breakout_bars: if in_range: in_range = False touch_count = 0 continue # === 通过所有条件 → 在区间中 === is_ranging[i] = True # v3.2: 跟踪区间内的边界触碰次数(质量评分数据) if not in_range: in_range = True touch_count = 0 c = close.iloc[i] if (c <= current_sl * 1.015) or (c >= current_sh * 0.985): touch_count += 1 touch_count_arr[i] = touch_count return DataFrame({ "is_ranging": is_ranging, "support": support_arr, "resistance": resistance_arr, "zone_width": zone_width_arr, "touch_count": touch_count_arr, # v3.2新增 }, index=high.index) # ===================== # 工具:ATR计算 # ===================== @staticmethod def _calc_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: tr = pd.DataFrame({ "hl": high - low, "hc": (high - close.shift(1)).abs(), "lc": (low - close.shift(1)).abs(), }).max(axis=1) return tr.rolling(period).mean() # ================================================================ # D1 信息时间框架 — v3.2: 新增趋势强度计算 # ================================================================ @informative("1d") def populate_indicators_1d(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # 原有:D1趋势方向(swing point比较) sh, sl = self._detect_swing_points( dataframe["high"], dataframe["low"], window=5 ) sh_vals = sh.dropna() sl_vals = sl.dropna() is_uptrend = pd.Series(False, index=dataframe.index) is_downtrend = pd.Series(False, index=dataframe.index) if len(sh_vals) >= 2 and len(sl_vals) >= 2: if sh_vals.iloc[-1] > sh_vals.iloc[-2] and sl_vals.iloc[-1] > sl_vals.iloc[-2]: is_uptrend[:] = True elif sh_vals.iloc[-1] < sh_vals.iloc[-2] and sl_vals.iloc[-1] < sl_vals.iloc[-2]: is_downtrend[:] = True dataframe["d1_uptrend"] = is_uptrend dataframe["d1_downtrend"] = is_downtrend # v3.2新增:D1趋势强度 = EMA20与EMA50的偏离程度 ema_20 = dataframe["close"].ewm(span=20, adjust=False).mean() ema_50 = dataframe["close"].ewm(span=50, adjust=False).mean() dataframe["trend_strength"] = abs(ema_20 - ema_50) / ema_50 return dataframe # ================================================================ # 主时间框架 — 4H 指标(v3.2: 新增区间质量评分) # ================================================================ def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: sh, sl = self._detect_swing_points( dataframe["high"], dataframe["low"], self.swing_lookback.value, ) range_info = self._detect_range(sh, sl, dataframe["high"], dataframe["low"], dataframe["close"]) dataframe["is_ranging"] = range_info["is_ranging"] dataframe["range_support"] = range_info["support"] dataframe["range_resistance"] = range_info["resistance"] dataframe["zone_width_pct"] = range_info["zone_width"] dataframe["range_touch_count"] = range_info["touch_count"] dataframe["atr"] = self._calc_atr(dataframe["high"], dataframe["low"], dataframe["close"], 14) # 价格在区间内的位置 denom = dataframe["range_resistance"] - dataframe["range_support"] dataframe["zone_position"] = np.where( denom > 0, (dataframe["close"] - dataframe["range_support"]) / denom, np.nan, ) # 距离边界百分比 dataframe["dist_to_support"] = np.where( dataframe["range_support"] > 0, (dataframe["close"] - dataframe["range_support"]) / dataframe["close"], np.nan, ) dataframe["dist_to_resistance"] = np.where( dataframe["range_resistance"] > 0, (dataframe["range_resistance"] - dataframe["close"]) / dataframe["close"], np.nan, ) # ── v3.2新增:区间质量评分 ── self._compute_zone_quality(dataframe) # ── v3.2新增:区间连续计数 ── is_ranging_int = dataframe["is_ranging"].astype(int) consecutive = np.zeros(len(dataframe), dtype=int) for i in range(1, len(dataframe)): if is_ranging_int.iloc[i] and is_ranging_int.iloc[i-1]: consecutive[i] = consecutive[i-1] + 1 elif is_ranging_int.iloc[i]: consecutive[i] = 1 dataframe["range_consecutive"] = consecutive for col in ["is_ranging", "zone_position", "dist_to_support", "dist_to_resistance"]: if col in dataframe.columns: dataframe[col] = dataframe[col].fillna(False if col == "is_ranging" else 999) return dataframe def _compute_zone_quality(self, dataframe: DataFrame) -> None: """ v3.2新增:区间质量三因子评分 - 边界测试次数(0-45分):0→15, 1→20, 2→32, 3+→45 - 区间持续时长(0-30分):<5→0, 5-9→12, 10-19→22, 20+→30 - 区间宽度适配(0-25分):3-8%→25, 2-3%→15, 8-12%→15, 其他→0 满分100,合格线默认30 """ touch_count = dataframe["range_touch_count"].fillna(0).values zone_width = dataframe["zone_width_pct"].fillna(0).values is_ranging = dataframe["is_ranging"].values quality = np.zeros(len(dataframe)) # 因子1:边界测试次数(放宽:0次触碰也有基础分) quality += np.where( touch_count >= 3, 45, np.where(touch_count >= 2, 32, np.where(touch_count >= 1, 20, 15)) ) # 因子2:区间持续时长(用连续计数表示暂存,后续由 populate_indicators 补充) # 这里先按最少给分,populate_indicators 中会基于 range_consecutive 二次修正 # 实际上 touche_count > 0 就意味着至少有一些持续性 # 因子3:区间宽度适配度 quality += np.where( (zone_width >= 0.03) & (zone_width <= 0.08), 25, np.where( ((zone_width >= 0.02) & (zone_width < 0.03)) | ((zone_width > 0.08) & (zone_width <= 0.12)), 15, 0 ) ) # 只在区间内有效 quality = np.where(is_ranging, quality, 0) dataframe["zone_quality_base"] = quality # ================================================================ # 入场信号 — v3.2: D1趋势强度 + 区间质量过滤 + 持续时间因子 # ================================================================ def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: entry_zone = self.entry_zone_pct.value / 100.0 d1_downtrend_col = "d1_downtrend_1d" d1_uptrend_col = "d1_uptrend_1d" d1_strength_col = "trend_strength_1d" # v3.2新增 for col in ["is_ranging", d1_uptrend_col, d1_downtrend_col, d1_strength_col]: if col in dataframe.columns: dataframe[col] = dataframe[col].fillna(False) else: dataframe[col] = False # ── v3.2: 计算完整区间质量评分(加入持续性因子) ── range_consec = dataframe.get("range_consecutive", pd.Series(0, index=dataframe.index)) quality_base = dataframe.get("zone_quality_base", pd.Series(0, index=dataframe.index)) # 持续性因子:<5→0, 5-9→12, 10-19→22, 20+→30 duration_score = np.where( range_consec >= 20, 30, np.where(range_consec >= 10, 22, np.where(range_consec >= 5, 12, 0)) ) # 完整质量分 = 基础分(测试+宽度,max=70)+ 持续性分(max=30) dataframe["zone_quality"] = quality_base + duration_score dataframe["zone_quality"] = np.where(dataframe["is_ranging"], dataframe["zone_quality"], 0) # ── v3.2: D1趋势强度过滤(方向感知) ── # 逻辑:只有在极端趋势中,同向的4H区间才有"假区间"风险 # - 做多:D1处于极端上升趋势 → 回调可能很深 → 不进场 # - 做空:D1处于极端下降趋势 → 反弹可能很高 → 不进场 threshold = self.d1_trend_strength_max.value / 100.0 d1_strength_strong = dataframe[d1_strength_col] > threshold long_d1_ok = ~(dataframe[d1_uptrend_col] & d1_strength_strong) # 极端上升趋势不做多 short_d1_ok = ~(dataframe[d1_downtrend_col] & d1_strength_strong) # 极端下降趋势不做空 # ── v3.2: 区间质量过滤 ── quality_min = self.zone_quality_min.value zone_quality_ok = dataframe["zone_quality"] >= quality_min # ── 做多:震荡市中,价格靠近支撑位 ── long_conds = ( dataframe["is_ranging"] & (dataframe["dist_to_support"] <= entry_zone) & (dataframe["dist_to_support"] > 0) & (~dataframe[d1_downtrend_col]) # 原有:D1不能是下降趋势 & long_d1_ok # v3.2新增:极端上升趋势不做多 & zone_quality_ok # v3.2新增:区间质量达标 ) cooldown = 1 long_recent = long_conds.rolling(cooldown, min_periods=1).max().shift(1) == 0 dataframe.loc[long_conds & long_recent, "enter_long"] = 1 # ── 做空:震荡市中,价格靠近阻力位 ── short_conds = ( dataframe["is_ranging"] & (dataframe["dist_to_resistance"] <= entry_zone) & (dataframe["dist_to_resistance"] > 0) & (~dataframe[d1_uptrend_col]) # 原有:D1不能是上升趋势 & short_d1_ok # v3.2新增:极端下降趋势不做空 & zone_quality_ok # v3.2新增:区间质量达标 ) short_recent = short_conds.rolling(cooldown, min_periods=1).max().shift(1) == 0 dataframe.loc[short_conds & short_recent, "enter_short"] = 1 return dataframe # ================================================================ # 出场信号 # ================================================================ def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe # ================================================================ # 自定义止损:支撑/阻力外侧,ATR*1.5 缓冲(v3.1逻辑保持不变) # ================================================================ def custom_stoploss( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, after_fill: bool, **kwargs, ) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return -0.02 if not trade.is_short else 0.02 last = dataframe.iloc[-1] atr_mult = self.atr_stop_mult.value / 10.0 if not trade.is_short: support = last.get("range_support", np.nan) atr = last.get("atr", np.nan) if pd.isna(support) or support <= 0: return -0.02 if pd.notna(atr) and atr > 0: sl_price = support - atr * atr_mult else: sl_price = support * 0.985 sl_ratio = (sl_price / current_rate) - 1.0 return max(sl_ratio, -0.20) else: resistance = last.get("range_resistance", np.nan) atr = last.get("atr", np.nan) if pd.isna(resistance) or resistance <= 0: return 0.02 if pd.notna(atr) and atr > 0: sl_price = resistance + atr * atr_mult else: sl_price = resistance * 1.015 sl_ratio = 1.0 - (sl_price / current_rate) return min(sl_ratio, 0.20) # ================================================================ # 自定义止盈:区间70% + v3.2主动退出机制 # ================================================================ def custom_exit( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, **kwargs, ) -> str | None: tp_pct = self.take_profit_pct.value / 100.0 dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return None last = dataframe.iloc[-1] # ── 原有:区间70%止盈 ── if not trade.is_short: support = last.get("range_support", np.nan) resistance = last.get("range_resistance", np.nan) if pd.notna(support) and pd.notna(resistance) and resistance > support: zone_height = (resistance - support) / support tp_target = zone_height * tp_pct if current_profit >= tp_target: return "take_profit" else: support = last.get("range_support", np.nan) resistance = last.get("range_resistance", np.nan) if pd.notna(support) and pd.notna(resistance) and resistance > support: zone_height = (resistance - support) / resistance tp_target = zone_height * tp_pct if current_profit >= tp_target: return "take_profit" # ── v3.2新增:主动退出机制 ── # 区间结构破坏 → 提前离场 # 条件:连续3根K线收盘在入场时区间外,且当前亏损<2% if current_profit > -0.02: # 找到入场时的K线(取最后一根确认的K线,不是当前正在形成的) entry_date = trade.open_date entry_mask = dataframe["date"] <= entry_date if entry_mask.any(): entry_idx = dataframe[entry_mask].index[-1] entry_support = dataframe.loc[entry_idx, "range_support"] entry_resistance = dataframe.loc[entry_idx, "range_resistance"] if pd.notna(entry_support) and pd.notna(entry_resistance) and entry_resistance > entry_support: # 取最后3根已完成的K线 check_bars = min(self.early_exit_bars, len(dataframe) - 1) recent = dataframe.iloc[-(check_bars + 1):-1] # 排除当前正在形成的K线 if len(recent) >= self.early_exit_bars: outside_count = 0 for _, bar in recent.iterrows(): c = bar["close"] # 缓冲0.5%避免噪音触发 if c < entry_support * 0.995 or c > entry_resistance * 1.005: outside_count += 1 if outside_count >= self.early_exit_bars: return "early_exit_structure_broken" return None # ================================================================ # Plot config # ================================================================ @staticmethod def plot_config() -> dict: return { "main_plot": { "range_support": {"color": "green", "type": "line"}, "range_resistance": {"color": "red", "type": "line"}, }, "subplots": { "range": { "is_ranging": {"color": "blue", "type": "line"}, "zone_width_pct": {"color": "purple", "type": "line"}, "zone_quality": {"color": "orange", "type": "line"}, }, "position": { "dist_to_support": {"color": "green", "type": "line"}, "dist_to_resistance": {"color": "red", "type": "line"}, }, }, }