""" Structure Flow Swing Strategy v4.0 ================================== 15m 震荡区间波段策略 — 基于价格聚集度检测 核心变革(相对于 v3.x): 1. 时间框架从 4H → 15m:直接在小周期检测和执行 2. 震荡判定从 "swing points 宽度稳定性" → "价格聚集度 + 边界测试次数" 3. 检测周期 4-8 小时即可识别震荡,覆盖 1-3 天的 mini-震荡 v3.1 诊断回顾(2026-06-10 全周期回测): - 122笔全部做空,+76%,CAGR 10.97% - is_ranging 仅 13.7%,用 4H 判定只抓到大周期震荡 - 1-3 天的小震荡完全被漏掉,这才是手工交易的利润来源 版本历史: v3.0 (2026-06-10): 初版,4H swing points + 双边测试 v3.1 (2026-06-10): AND→OR,降门槛 v4.0 (2026-06-10): 全面重写,15m 价格聚集度检测 """ from datetime import datetime import numpy as np import pandas as pd from pandas import DataFrame from freqtrade.strategy import IStrategy, IntParameter from freqtrade.persistence import Trade class StructureFlowSwingV40(IStrategy): """ Structure Flow Swing Strategy v4.0 15m 震荡区间波段交易 — 价格聚集度检测 """ can_short = True stoploss = -0.20 use_custom_stoploss = True minimal_roi = {"0": 100} max_open_trades = 1 timeframe = "15m" # ===================== # 可优化参数 # ===================== lookback = IntParameter(24, 96, default=48, space="buy") # 检测窗口:24~96根15m(6h~24h) min_touches = IntParameter(1, 4, default=2, space="buy") # 边界至少测试次数 zone_width_atr_mult = IntParameter(2, 6, default=4, space="buy") # 区间宽度上限 = ATR × N entry_zone_pct = IntParameter(2, 8, default=5, space="buy") # 入场范围:距边界千分比(0.5%) atr_stop_mult = IntParameter(10, 25, default=15, space="buy") # ATR止损倍数 take_profit_pct = IntParameter(50, 80, default=70, space="sell") # 区间高度止盈比例 # 固定参数 breakout_bars = 2 # 连续几根K线突破才算真突破 cooldown = 4 # 入场后冷却 4 根15m(1小时) # ===================== # 工具:ATR计算 # ===================== @staticmethod def _calc_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: tr = pd.DataFrame({ "hl": high - low, "hc": (high - close.shift(1)).abs(), "lc": (low - close.shift(1)).abs(), }).max(axis=1) return tr.rolling(period).mean() # ===================== # 主时间框架 — 15m 指标 # ===================== def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: lookback = self.lookback.value # ── 价格聚集度检测 ── rolling_high = dataframe["high"].rolling(lookback).max() rolling_low = dataframe["low"].rolling(lookback).min() # 区间宽度(绝对值和百分比) zone_width = rolling_high - rolling_low zone_width_pct = zone_width / rolling_low dataframe["zone_high"] = rolling_high dataframe["zone_low"] = rolling_low dataframe["zone_width_raw"] = zone_width dataframe["zone_width_pct"] = zone_width_pct # ATR dataframe["atr"] = self._calc_atr(dataframe["high"], dataframe["low"], dataframe["close"], 14) # ── 边界测试计数 ── # 价格在区间上边界 0.5% 范围内 → 算一次测试 touch_upper = dataframe["high"] >= rolling_high * 0.995 touch_lower = dataframe["low"] <= rolling_low * 1.005 # 滚动窗口内测试次数 dataframe["upper_touches"] = touch_upper.rolling(lookback).sum() dataframe["lower_touches"] = touch_lower.rolling(lookback).sum() # ── 震荡判定条件 ── atr_mult = self.zone_width_atr_mult.value min_touches = self.min_touches.value # 条件1:区间宽度合理(不超过 ATR × N) is_compact = zone_width <= dataframe["atr"] * atr_mult # 条件2:上下边界都被测试过至少 min_touches 次 is_tested = (dataframe["upper_touches"] >= min_touches) & (dataframe["lower_touches"] >= min_touches) # 条件3:无突破(最近 breakout_bars 根收盘价在边界内) no_break_high = True no_break_low = True for i in range(1, self.breakout_bars + 1): if i <= len(dataframe): no_break_high = no_break_high & (dataframe["close"].shift(i) <= rolling_high) no_break_low = no_break_low & (dataframe["close"].shift(i) >= rolling_low) is_ranging = is_compact & is_tested & no_break_high & no_break_low dataframe["is_ranging"] = is_ranging # ── 价格在区间内的位置 ── denom = rolling_high - rolling_low dataframe["zone_position"] = np.where( denom > 0, (dataframe["close"] - rolling_low) / denom, np.nan, ) # 距边界百分比 dataframe["dist_to_low"] = np.where( rolling_low > 0, (dataframe["close"] - rolling_low) / dataframe["close"], np.nan, ) dataframe["dist_to_high"] = np.where( rolling_high > 0, (rolling_high - dataframe["close"]) / dataframe["close"], np.nan, ) # ── 填充 ── for col in ["is_ranging", "zone_position", "dist_to_low", "dist_to_high"]: if col in dataframe.columns: dataframe[col] = dataframe[col].fillna(False if col == "is_ranging" else 999) return dataframe # ================================================================ # 入场信号 # ================================================================ def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: entry_zone = self.entry_zone_pct.value / 1000.0 # 千分比 if "is_ranging" not in dataframe.columns: dataframe["is_ranging"] = False # ── 做多:震荡中,价格靠近下边界 ── long_conds = ( dataframe["is_ranging"] & (dataframe["dist_to_low"] < entry_zone) & (dataframe["dist_to_low"] > 0) ) # 冷却 long_recent = long_conds.rolling(self.cooldown, min_periods=1).max().shift(1) == 0 dataframe.loc[long_conds & long_recent, "enter_long"] = 1 # ── 做空:震荡中,价格靠近上边界 ── short_conds = ( dataframe["is_ranging"] & (dataframe["dist_to_high"] < entry_zone) & (dataframe["dist_to_high"] > 0) ) short_recent = short_conds.rolling(self.cooldown, min_periods=1).max().shift(1) == 0 dataframe.loc[short_conds & short_recent, "enter_short"] = 1 return dataframe # ================================================================ # 出场信号 # ================================================================ def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe # ================================================================ # 自定义止损:区间边界外侧 + ATR 缓冲 # ================================================================ def custom_stoploss( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, after_fill: bool, **kwargs, ) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return -0.02 if not trade.is_short else 0.02 last = dataframe.iloc[-1] atr_mult = self.atr_stop_mult.value / 10.0 if not trade.is_short: zone_low = last.get("zone_low", np.nan) atr = last.get("atr", np.nan) if pd.isna(zone_low) or zone_low <= 0: return -0.02 if pd.notna(atr) and atr > 0: sl_price = zone_low - atr * atr_mult else: sl_price = zone_low * 0.985 sl_ratio = (sl_price / current_rate) - 1.0 return max(sl_ratio, -0.20) else: zone_high = last.get("zone_high", np.nan) atr = last.get("atr", np.nan) if pd.isna(zone_high) or zone_high <= 0: return 0.02 if pd.notna(atr) and atr > 0: sl_price = zone_high + atr * atr_mult else: sl_price = zone_high * 1.015 sl_ratio = 1.0 - (sl_price / current_rate) return min(sl_ratio, 0.20) # ================================================================ # 自定义止盈:区间高度 × TP% # ================================================================ def custom_exit( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, **kwargs, ) -> str | None: tp_pct = self.take_profit_pct.value / 100.0 dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return None last = dataframe.iloc[-1] if not trade.is_short: zone_low = last.get("zone_low", np.nan) zone_high = last.get("zone_high", np.nan) if pd.notna(zone_low) and pd.notna(zone_high) and zone_high > zone_low: zone_height = (zone_high - zone_low) / zone_low tp_target = zone_height * tp_pct if current_profit >= tp_target: return "take_profit" else: zone_low = last.get("zone_low", np.nan) zone_high = last.get("zone_high", np.nan) if pd.notna(zone_low) and pd.notna(zone_high) and zone_high > zone_low: zone_height = (zone_high - zone_low) / zone_high tp_target = zone_height * tp_pct if current_profit >= tp_target: return "take_profit" return None # ================================================================ # Plot config # ================================================================ @staticmethod def plot_config() -> dict: return { "main_plot": { "zone_high": {"color": "red", "type": "line"}, "zone_low": {"color": "green", "type": "line"}, }, "subplots": { "zone": { "is_ranging": {"color": "blue", "type": "line"}, "zone_width_pct": {"color": "purple", "type": "line"}, }, "touches": { "upper_touches": {"color": "red", "type": "line"}, "lower_touches": {"color": "green", "type": "line"}, }, }, }