""" Structure Flow Scalp — 震荡市剥头皮策略 ========================================== 基于Al Brooks价格行为学: - 在已识别的震荡区间内,支撑位做多、阻力位做空 - 15m级别支撑/阻力决定交易区间,5m级别入场 - 100x全仓杠杆,每次10%仓位 - 区间高度40%止盈,15m支撑/阻力外侧0.3%止损 变更记录: v1 (2026-06-10): 初版,基于v2.2b核心逻辑重构 v1.1 (2026-06-10): 支撑阻力从4H改为15m v1.2 (2026-06-10): 去掉4H趋势强度判断(冗余);启用100x全仓杠杆,10%仓位 v1.3 (2026-06-10): 代码审查修复——移除populate_exit_trend死循环,NaN安全,杠杆上限 v1.4 (2026-06-10): EMA动态S/R + 入场锁定S/R——止损止盈使用入场时的锁定值,不追最新 v1.5 (2026-06-10): 扩展入场信号 + 追踪止损保护 + 延长活S/R窗口 v1.6 (2026-06-10): 止损改为ATR动态计算——绑入场价,不绑支撑位;追踪改为ATR×0.5自适应 """ from datetime import datetime import numpy as np import pandas as pd from pandas import DataFrame from freqtrade.strategy import IStrategy, IntParameter, informative from freqtrade.persistence import Trade class StructureFlowScalp(IStrategy): """ 震荡市剥头皮策略 — 5m框架,100x全仓杠杆。 去掉4H趋势强度判断——15m支撑阻力本身就是最好的过滤器。 """ can_short = True stoploss = -0.15 use_custom_stoploss = True use_custom_exit = True minimal_roi = {"0": 100} max_open_trades = 1 timeframe = "5m" # ===================== # 杠杆设置 - 全仓 100x # ===================== def leverage(self, pair: str, current_time: datetime, current_rate: float, proposed_leverage: float, max_leverage: float, side: str, **kwargs) -> float: """返回固定 100x 杠杆,不超过交易所允许的最大值""" return min(100.0, max_leverage) # ===================== # 工具:查找入场K线(锁定S/R用) # ===================== def _get_entry_row(self, dataframe: DataFrame, trade: Trade) -> pd.Series | None: """ 从 dataframe 中找到入场 trade 对应的 K 线行。 兼容 live/dry_run(DatetimeIndex)和 backtesting(RangeIndex + date 列)两种模式。 """ if 'date' in dataframe.columns: # Backtesting 模式:dataframe 有 date 列,index 是 int entry_mask = pd.to_datetime(dataframe['date']) <= trade.open_date if not entry_mask.any(): return None return dataframe[entry_mask].iloc[-1] else: # Live/Dry-run 模式:index 是 DatetimeIndex try: entry_idx = dataframe.index.get_indexer([trade.open_date], method="pad") if entry_idx[0] < 0 or entry_idx[0] >= len(dataframe): return None return dataframe.iloc[entry_idx[0]] except (TypeError, ValueError): return None # ===================== # 可优化参数 # ===================== # 15m支撑阻力计算窗口 swing_lookback_15m = IntParameter(5, 15, default=10, space="buy") pin_bar_wick_ratio = IntParameter(50, 70, default=60, space="buy") cooldown_bars = IntParameter(2, 8, default=3, space="buy") # 区间高度止盈比例(%) profit_zone_pct = IntParameter(20, 60, default=40, space="buy") # ===================== # 工具:Swing Point 检测 # ===================== @staticmethod def _detect_swing_points( high: pd.Series, low: pd.Series, window: int = 5, ) -> tuple[pd.Series, pd.Series]: n = len(high) sh = pd.Series(np.nan, index=high.index, dtype=float) sl = pd.Series(np.nan, index=low.index, dtype=float) for i in range(window, n - window): if high.iloc[i] > high.iloc[i - window : i].max() and high.iloc[i] > high.iloc[i + 1 : i + window + 1].max(): sh.iloc[i] = high.iloc[i] if low.iloc[i] < low.iloc[i - window : i].min() and low.iloc[i] < low.iloc[i + 1 : i + window + 1].min(): sl.iloc[i] = low.iloc[i] return sh, sl # ===================== # 工具:结构分析 # ===================== def _build_structure( self, high: pd.Series, low: pd.Series, close: pd.Series, swing_high: pd.Series, swing_low: pd.Series, ) -> DataFrame: n = len(high) trend_up_arr = np.full(n, False) trend_down_arr = np.full(n, False) nearest_support = np.full(n, np.nan) nearest_resistance = np.full(n, np.nan) sh_prices = [] sl_prices = [] for i in range(n): if pd.notna(swing_high.iloc[i]): sh_prices.append(swing_high.iloc[i]) if len(sh_prices) > 4: sh_prices.pop(0) if pd.notna(swing_low.iloc[i]): sl_prices.append(swing_low.iloc[i]) if len(sl_prices) > 4: sl_prices.pop(0) if len(sh_prices) >= 2 and len(sl_prices) >= 2: if sh_prices[-1] > sh_prices[-2] and sl_prices[-1] > sl_prices[-2]: trend_up_arr[i] = True elif sh_prices[-1] < sh_prices[-2] and sl_prices[-1] < sl_prices[-2]: trend_down_arr[i] = True elif i > 0: trend_up_arr[i] = trend_up_arr[i - 1] trend_down_arr[i] = trend_down_arr[i - 1] elif i > 0: trend_up_arr[i] = trend_up_arr[i - 1] trend_down_arr[i] = trend_down_arr[i - 1] if sl_prices: # EMA平滑:不取最后一个,而是对最近swing lows做指数加权 # alpha=0.3,每个新swing point向它移动30%,有"惯性"不跳变 ema_s = sl_prices[0] for p in sl_prices[1:]: ema_s = 0.3 * p + 0.7 * ema_s nearest_support[i] = ema_s if sh_prices: ema_r = sh_prices[0] for p in sh_prices[1:]: ema_r = 0.3 * p + 0.7 * ema_r nearest_resistance[i] = ema_r return DataFrame({ "trend_up": trend_up_arr, "trend_down": trend_down_arr, "support": nearest_support, "resistance": nearest_resistance, }, index=high.index) # ===================== # 工具:K线形态检测 # ===================== @staticmethod def _detect_candle_patterns( open_: pd.Series, high: pd.Series, low: pd.Series, close: pd.Series, pin_bar_wick_ratio: float = 0.6, ) -> tuple[pd.Series, pd.Series, pd.Series, pd.Series]: body = (close - open_).abs() total_range = (high - low).replace(0, 0.0001) upper_wick = high - close.where(close > open_, open_) lower_wick = open_.where(close > open_, close) - low is_pin = (upper_wick + lower_wick) / total_range > pin_bar_wick_ratio bullish_pin = is_pin & (close > open_) & (lower_wick > upper_wick) bearish_pin = is_pin & (close < open_) & (upper_wick > lower_wick) prev_open = open_.shift(1) prev_close = close.shift(1) bullish_engulf = (close > prev_open) & (open_ < prev_close) & (close > open_) bearish_engulf = (close < prev_open) & (open_ > prev_close) & (close < open_) return bullish_pin, bearish_pin, bullish_engulf, bearish_engulf # ================================================================ # 信息时间框架 — 15m 短期支撑阻力(核心过滤器) # ================================================================ @informative("15m") def populate_indicators_15m( self, dataframe: DataFrame, metadata: dict ) -> DataFrame: sh, sl = self._detect_swing_points( dataframe["high"], dataframe["low"], self.swing_lookback_15m.value, ) structure = self._build_structure( dataframe["high"], dataframe["low"], dataframe["close"], sh, sl, ) dataframe["support"] = structure["support"] dataframe["resistance"] = structure["resistance"] # ── 活支撑检查(15根15m ≈ 3.75小时,震荡市中支撑可长期有效)── touched_support = ( (dataframe["low"] <= dataframe["support"] * 1.005) & (dataframe["low"] >= dataframe["support"] * 0.995) ) held_support = dataframe["close"] > dataframe["support"] support_tested_and_held = touched_support & held_support dataframe["support_alive"] = support_tested_and_held.rolling(15, min_periods=1).max() > 0 # ── 活阻力检查(15根窗口)── touched_resistance = ( (dataframe["high"] >= dataframe["resistance"] * 0.995) & (dataframe["high"] <= dataframe["resistance"] * 1.005) ) held_resistance = dataframe["close"] < dataframe["resistance"] resistance_tested_and_held = touched_resistance & held_resistance dataframe["resistance_alive"] = resistance_tested_and_held.rolling(15, min_periods=1).max() > 0 # 区间高度(用于止盈计算) dataframe["zone_height"] = (dataframe["resistance"] - dataframe["support"]).fillna(0) return dataframe # ================================================================ # 主时间框架 — 5m 指标 # ================================================================ def populate_indicators( self, dataframe: DataFrame, metadata: dict ) -> DataFrame: """5m级别:ATR + K线形态 + 信号整合。""" # ── ATR(14) — 用于动态止损,根据市场波动自适应 ── high = dataframe["high"] low = dataframe["low"] close = dataframe["close"] prev_close = close.shift(1) tr = pd.concat([ high - low, (high - prev_close).abs(), (low - prev_close).abs(), ], axis=1).max(axis=1) dataframe["atr"] = tr.rolling(14).mean() bullish_pin, bearish_pin, bullish_engulf, bearish_engulf = ( self._detect_candle_patterns( dataframe["open"], dataframe["high"], dataframe["low"], dataframe["close"], self.pin_bar_wick_ratio.value / 100.0, ) ) dataframe["bullish_pinbar"] = bullish_pin dataframe["bearish_pinbar"] = bearish_pin dataframe["bullish_engulfing"] = bullish_engulf dataframe["bearish_engulfing"] = bearish_engulf # ── 扩展信号:长下影线(比pinbar更宽松,只要下影线>总范围50%) ── total_range = (dataframe["high"] - dataframe["low"]).replace(0, 0.0001) body = (dataframe["close"] - dataframe["open"]).abs() # 下影线 = min(open, close) - low lower_wick = ( dataframe[["open", "close"]].min(axis=1) - dataframe["low"] ) # 上影线 = high - max(open, close) upper_wick = ( dataframe["high"] - dataframe[["open", "close"]].max(axis=1) ) # 长下影线:下影线>总范围50% 且 下影线>上影线 long_lower_wick = ( (lower_wick / total_range > 0.5) & (lower_wick > upper_wick) ) dataframe["long_lower_wick"] = long_lower_wick # ── 扩展信号:支撑位附近的强力反弹阳线 ── # 条件:价格在支撑0.5%范围内 + 阳线 + 实体>0.2% if "support_15m" in dataframe.columns: near_support = ( (dataframe["low"] <= dataframe["support_15m"] * 1.005) & (dataframe["low"] >= dataframe["support_15m"] * 0.995) ) is_bullish = dataframe["close"] > dataframe["open"] body_pct = body / dataframe["open"] strong_recovery = near_support & is_bullish & (body_pct > 0.002) else: strong_recovery = pd.Series(False, index=dataframe.index) dataframe["strong_recovery"] = strong_recovery # ── 综合止跌/止涨信号(扩展后) ── dataframe["bullish_signal"] = ( bullish_pin | bullish_engulf | long_lower_wick | strong_recovery ) dataframe["bearish_signal"] = ( bearish_pin | bearish_engulf ) # 做空对称:阻力位附近的强力下跌阴线 if "resistance_15m" in dataframe.columns: near_resistance = ( (dataframe["high"] >= dataframe["resistance_15m"] * 0.995) & (dataframe["high"] <= dataframe["resistance_15m"] * 1.005) ) is_bearish = dataframe["close"] < dataframe["open"] body_pct = body / dataframe["open"] strong_rejection = near_resistance & is_bearish & (body_pct > 0.002) else: strong_rejection = pd.Series(False, index=dataframe.index) dataframe["strong_rejection"] = strong_rejection dataframe["bearish_signal"] = ( bearish_pin | bearish_engulf | strong_rejection ) # NaN 安全处理 bool_cols = [ "support_alive_15m", "resistance_alive_15m", "bullish_signal", "bearish_signal", ] for col in bool_cols: if col in dataframe.columns: dataframe[col] = dataframe[col].fillna(False) # ATR fillna(前14根无ATR值用均值填补) if "atr" in dataframe.columns: atr_mean = dataframe["atr"].mean() dataframe["atr"] = dataframe["atr"].fillna(atr_mean) return dataframe # ===================== # 入场信号 # ===================== def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ 入场逻辑(5m 时间框架)。 不做4H趋势判断——15m支撑阻力本身就是过滤器: - 趋势强时价格直接突破15m S/R,不会在支撑/阻力附近停留 - 在支撑/阻力附近停留 = 震荡市 入场条件(3个,去掉了冗余的4H趋势判断): - 做多:价格贴近15m支撑 + 支撑有效 + K线止跌信号 - 做空:价格贴近15m阻力 + 阻力有效 + K线止涨信号 出场只依赖 custom_stoploss 和 custom_exit,不需要 D1 结构反转退出。 (去掉 populate_exit_trend:震荡市入场 → D1 非上升趋势 → 立即出场 的死循环) """ cooldown = self.cooldown_bars.value # NaN 安全处理 — 如果 15m informative 列还没对齐,直接跳过本根 K 线 required_cols = ["support_15m", "resistance_15m", "support_alive_15m", "resistance_alive_15m"] for col in required_cols: if col not in dataframe.columns: return dataframe # 数据尚未就绪,跳过 for col in ["bullish_signal", "bearish_signal", "support_alive_15m", "resistance_alive_15m"]: dataframe[col] = dataframe[col].fillna(False) # ── 做多 ── # 条件:价格贴近15m支撑(0.5%范围内)- 使用 low 而非 open # 因为支撑测试看的是价格是否到达支撑位,不是开盘在哪 near_support = ( (dataframe["low"] <= dataframe["support_15m"] * 1.005) & (dataframe["low"] >= dataframe["support_15m"] * 0.995) ) long_conditions = ( near_support & dataframe["support_alive_15m"] & dataframe["bullish_signal"] ) long_recent = long_conditions.rolling(cooldown, min_periods=1).max().shift(1) == 0 dataframe.loc[long_conditions & long_recent, "enter_long"] = 1 # ── 做空 ── # 条件:价格贴近15m阻力(0.5%范围内)- 使用 high 而非 open near_resistance = ( (dataframe["high"] >= dataframe["resistance_15m"] * 0.995) & (dataframe["high"] <= dataframe["resistance_15m"] * 1.005) ) short_conditions = ( near_resistance & dataframe["resistance_alive_15m"] & dataframe["bearish_signal"] ) short_recent = short_conditions.rolling(cooldown, min_periods=1).max().shift(1) == 0 dataframe.loc[short_conditions & short_recent, "enter_short"] = 1 return dataframe # ===================== # exit_trend(freqtrade 2025.11 要求必须实现,即使 use_custom_exit=True) # ===================== def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """退出逻辑完全由 custom_stoploss + custom_exit 管理。""" return dataframe # ===================== # 动态止损 — 入场价 - ATR×2.0(基于市场波动,非固定比例) # ===================== def custom_stoploss( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, after_fill: bool, **kwargs, ) -> float: """ 止损锚定入场价,宽度根据市场波动(ATR)动态计算,而非固定比例。 核心逻辑: - 做多止损 = entry_price - ATR_5m × 2.0 - 做空止损 = entry_price + ATR_5m × 2.0 - ATR值从入场时的K线锁定,持仓期间不漂移 为什么用ATR不用固定比例: - ATR自动适应市场:波动大时止损放宽免误扫,波动小时收紧控风险 - 固定比例是拍脑袋,ATR是算出来的 追踪保护(v1.6 ATR自适应版): - 利润达止盈目标50%:上移到保本(入场价) - 利润达止盈目标80%:启动ATR×0.5窄追踪 """ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return -0.02 if not trade.is_short else 0.02 # 查找入场时的 K 线,锁定当时的 ATR 值 entry_row = self._get_entry_row(dataframe, trade) if entry_row is None: return -0.02 if not trade.is_short else 0.02 # 锁定入场时的 ATR 值,用于全程止损/追踪计算(不追最新,防止漂移) atr_value = entry_row.get("atr", np.nan) if pd.isna(atr_value) or atr_value <= 0: return -0.02 if not trade.is_short else 0.02 if not trade.is_short: # 做多:止损 = 入场价 - ATR × 2.0 base_sl_price = trade.open_rate - (atr_value * 2.0) base_sl = (base_sl_price / trade.open_rate) - 1.0 base_sl = max(base_sl, -0.15) # 追踪保护:需要入场行计算止盈目标 support = entry_row.get("support_15m", np.nan) resistance = entry_row.get("resistance_15m", np.nan) if (not pd.isna(support) and not pd.isna(resistance) and resistance > support and current_profit > 0): zone_height = resistance - support tp_target = (zone_height * self.profit_zone_pct.value / 100.0) / trade.open_rate if current_profit >= tp_target * 0.8: # 利润达止盈80%:ATR自适应窄追踪 trail_price = current_rate - (atr_value * 0.5) trail_ratio = (trail_price / trade.open_rate) - 1.0 return max(trail_ratio, base_sl) elif current_profit >= tp_target * 0.5: # 利润达止盈50%:保本 return max(0.0, base_sl) return base_sl else: # 做空:止损 = 入场价 + ATR × 2.0 base_sl_price = trade.open_rate + (atr_value * 2.0) base_sl = 1.0 - (base_sl_price / trade.open_rate) base_sl = min(base_sl, 0.15) # 追踪保护(做空对称) support = entry_row.get("support_15m", np.nan) resistance = entry_row.get("resistance_15m", np.nan) if (not pd.isna(support) and not pd.isna(resistance) and resistance > support and current_profit > 0): zone_height = resistance - support tp_target = (zone_height * self.profit_zone_pct.value / 100.0) / trade.open_rate if current_profit >= tp_target * 0.8: # ATR自适应窄追踪(做空对称) trail_price = current_rate + (atr_value * 0.5) trail_ratio = (trail_price / trade.open_rate) - 1.0 return min(trail_ratio, base_sl) elif current_profit >= tp_target * 0.5: # 保本 return min(0.0, base_sl) return base_sl # ===================== # 区间高度止盈 # ===================== def custom_exit( self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, **kwargs, ) -> str | None: """ 当利润达到入场时锁定的15m区间高度的设定比例时止盈。 使用入场时锁定的S/R值计算区间高度(zone_height),而非最新的值: - 入场后如果区间收缩,止盈目标不会跟着变小 - 让入场时确定的止盈逻辑"钉死" - profit_zone_pct 默认40%,即锁定区间高度的40% """ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or len(dataframe) == 0: return None # 查找入场时的 K 线,锁定当时的 S/R 值 entry_row = self._get_entry_row(dataframe, trade) if entry_row is None: return None support = entry_row.get("support_15m", np.nan) resistance = entry_row.get("resistance_15m", np.nan) if pd.isna(support) or pd.isna(resistance) or resistance <= support: return None # 用锁定的区间高度计算止盈目标(不随市场漂移) locked_zone_height = resistance - support target_pct = (locked_zone_height * self.profit_zone_pct.value / 100.0) / trade.open_rate if current_profit >= target_pct: return "zone_tp" return None # ===================== # Plot config # ===================== @staticmethod def plot_config() -> dict: return { "main_plot": { "support_15m": {"color": "green", "type": "line"}, "resistance_15m": {"color": "red", "type": "line"}, }, "subplots": { "signals": { "bullish_pinbar": {"color": "green", "type": "scatter"}, "bearish_pinbar": {"color": "red", "type": "scatter"}, "bullish_signal": {"color": "lime", "type": "scatter"}, "bearish_signal": {"color": "orange", "type": "scatter"}, }, "filters": { "support_alive_15m": {"color": "green", "type": "line"}, "resistance_alive_15m": {"color": "red", "type": "line"}, }, }, }