Files
beast-trader-strategies/strategy.py

486 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Structure Flow Strategy v1.9
=======================
变更记录:
v1.0 (2026-06-07): 纯价格结构策略D1定方向→4H定位→1H入场
v1.1 (2026-06-07): 1H futures结构止损首次回测成功(+61.52%)
v1.2 (2026-06-07): Entry Candle止损bug导致50笔硬止损全亏
v1.3 (2026-06-07): ATR动态止损结果-63.72%胜率20.2%
v1.4 (2026-06-07): 回归纯价格结构止损,+140.71%胜率38.7%
v1.5 (2026-06-07): 参数调优(stoploss -5%→-15%, max_stop_dist 3%→5%)+140.83%
v1.6 (2026-06-07): 入场质量优化(冷却期+活支撑),+151.32% ⭐ 最优基线
v1.7 (2026-06-07): 止损优化(5%缓冲),❌ 失败
v1.8 (2026-06-07): 止损优化(2%缓冲),❌ 失败
v1.9 (2026-06-08): ===== 结构变化检测止损 =====
入场逻辑保持 v1.6 不变。
custom_stoploss 新增:当价格已穿越原 S/R 时,
提前退出而非等待原止损位。
核心假设:入场后 S/R 被否定 → 原止损逻辑无效。
"""
from datetime import datetime
import numpy as np
import pandas as pd
from pandas import DataFrame
from freqtrade.strategy import IStrategy, IntParameter, informative
from freqtrade.persistence import Trade
class StructureFlowStrategyV19(IStrategy):
"""
Structure Flow Strategy v1.9 — 结构变化检测止损
v1.9改动相对于v1.6
custom_stoploss 增加结构变化检测:
- 做多:如果当前 bar 的 close < support_4h支撑被跌破提前退出
- 做空:如果当前 bar 的 close > resistance_4h阻力被突破提前退出
- 否则保持 v1.6 的原有止损逻辑support * 0.999 / resistance * 1.001
"""
can_short = True
stoploss = -0.15
use_custom_stoploss = True
minimal_roi = {"0": 100}
max_open_trades = 1
timeframe = "1h"
# =====================
# 可优化参数
# =====================
swing_lookback_d1 = IntParameter(8, 14, default=10, space="buy")
swing_lookback_h4 = IntParameter(5, 10, default=8, space="buy")
pin_bar_wick_ratio = IntParameter(50, 70, default=60, space="buy")
max_stop_dist = IntParameter(20, 50, default=50, space="buy")
cooldown_bars = IntParameter(3, 12, default=6, space="buy")
# =====================
# 工具Swing Point 检测
# =====================
@staticmethod
def _detect_swing_points(
high: pd.Series,
low: pd.Series,
window: int = 5,
) -> tuple[pd.Series, pd.Series]:
n = len(high)
sh = pd.Series(np.nan, index=high.index, dtype=float)
sl = pd.Series(np.nan, index=low.index, dtype=float)
for i in range(window, n - window):
if high.iloc[i] > high.iloc[i - window : i].max() and high.iloc[i] > high.iloc[i + 1 : i + window + 1].max():
sh.iloc[i] = high.iloc[i]
if low.iloc[i] < low.iloc[i - window : i].min() and low.iloc[i] < low.iloc[i + 1 : i + window + 1].min():
sl.iloc[i] = low.iloc[i]
return sh, sl
# =====================
# 工具:结构分析
# =====================
def _build_structure(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
swing_high: pd.Series,
swing_low: pd.Series,
) -> DataFrame:
n = len(high)
trend_up_arr = np.full(n, False)
trend_down_arr = np.full(n, False)
nearest_support = np.full(n, np.nan)
nearest_resistance = np.full(n, np.nan)
in_demand_zone = np.full(n, False)
in_supply_zone = np.full(n, False)
sh_prices = []
sl_prices = []
for i in range(n):
if pd.notna(swing_high.iloc[i]):
sh_prices.append(swing_high.iloc[i])
if len(sh_prices) > 4:
sh_prices.pop(0)
if pd.notna(swing_low.iloc[i]):
sl_prices.append(swing_low.iloc[i])
if len(sl_prices) > 4:
sl_prices.pop(0)
if len(sh_prices) >= 2 and len(sl_prices) >= 2:
if sh_prices[-1] > sh_prices[-2] and sl_prices[-1] > sl_prices[-2]:
trend_up_arr[i] = True
elif sh_prices[-1] < sh_prices[-2] and sl_prices[-1] < sl_prices[-2]:
trend_down_arr[i] = True
elif i > 0:
trend_up_arr[i] = trend_up_arr[i - 1]
trend_down_arr[i] = trend_down_arr[i - 1]
elif i > 0:
trend_up_arr[i] = trend_up_arr[i - 1]
trend_down_arr[i] = trend_down_arr[i - 1]
if sl_prices:
nearest_support[i] = sl_prices[-1]
if sh_prices:
nearest_resistance[i] = sh_prices[-1]
c = close.iloc[i]
if not np.isnan(nearest_support[i]) and not np.isnan(nearest_resistance[i]):
zone_range = nearest_resistance[i] - nearest_support[i]
if zone_range > 0:
pos_pct = (c - nearest_support[i]) / zone_range
in_demand_zone[i] = pos_pct < 0.35
in_supply_zone[i] = pos_pct > 0.65
return DataFrame({
"trend_up": trend_up_arr,
"trend_down": trend_down_arr,
"support": nearest_support,
"resistance": nearest_resistance,
"in_demand": in_demand_zone,
"in_supply": in_supply_zone,
}, index=high.index)
# =====================
# 工具K线形态检测
# =====================
@staticmethod
def _detect_candle_patterns(
open_: pd.Series,
high: pd.Series,
low: pd.Series,
close: pd.Series,
pin_bar_wick_ratio: float = 0.6,
) -> tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
body = (close - open_).abs()
total_range = (high - low).replace(0, 0.0001)
upper_wick = high - close.where(close > open_, open_)
lower_wick = open_.where(close > open_, close) - low
is_pin = (upper_wick + lower_wick) / total_range > pin_bar_wick_ratio
bullish_pin = is_pin & (close > open_) & (lower_wick > upper_wick)
bearish_pin = is_pin & (close < open_) & (upper_wick > lower_wick)
prev_open = open_.shift(1)
prev_close = close.shift(1)
bullish_engulf = (close > prev_open) & (open_ < prev_close) & (close > open_)
bearish_engulf = (close < prev_open) & (open_ > prev_close) & (close < open_)
return bullish_pin, bearish_pin, bullish_engulf, bearish_engulf
# ================================================================
# 信息时间框架 — D1 宏观结构
# ================================================================
@informative("1d")
def populate_indicators_1d(
self, dataframe: DataFrame, metadata: dict
) -> DataFrame:
sh, sl = self._detect_swing_points(
dataframe["high"], dataframe["low"],
self.swing_lookback_d1.value,
)
structure = self._build_structure(
dataframe["high"], dataframe["low"], dataframe["close"],
sh, sl,
)
dataframe["trend_up"] = structure["trend_up"]
dataframe["trend_down"] = structure["trend_down"]
return dataframe
# ================================================================
# 信息时间框架 — 4H 中期结构
# ================================================================
@informative("4h")
def populate_indicators_4h(
self, dataframe: DataFrame, metadata: dict
) -> DataFrame:
sh, sl = self._detect_swing_points(
dataframe["high"], dataframe["low"],
self.swing_lookback_h4.value,
)
structure = self._build_structure(
dataframe["high"], dataframe["low"], dataframe["close"],
sh, sl,
)
dataframe["trend_up"] = structure["trend_up"]
dataframe["trend_down"] = structure["trend_down"]
dataframe["support"] = structure["support"]
dataframe["resistance"] = structure["resistance"]
dataframe["in_demand"] = structure["in_demand"]
dataframe["in_supply"] = structure["in_supply"]
# ================================
# 活支撑/阻力检查 (v1.6)
# ================================
touched_support = (
(dataframe["low"] <= dataframe["support"] * 1.005) &
(dataframe["low"] >= dataframe["support"] * 0.995)
)
held_support = dataframe["close"] > dataframe["support"]
support_tested_and_held = touched_support & held_support
dataframe["support_alive"] = support_tested_and_held.rolling(3, min_periods=1).max() > 0
touched_resistance = (
(dataframe["high"] >= dataframe["resistance"] * 0.995) &
(dataframe["high"] <= dataframe["resistance"] * 1.005)
)
held_resistance = dataframe["close"] < dataframe["resistance"]
resistance_tested_and_held = touched_resistance & held_resistance
dataframe["resistance_alive"] = resistance_tested_and_held.rolling(3, min_periods=1).max() > 0
# ================================
# v1.9 新增S/R 突破检测
# ================================
# 正确逻辑:检测当前 4H close 是否跌破了前一个 Swing Low结构破坏
# 而不是检测是否跌破了当前 support当前 support 永远在价格下方)
sl_prices_tmp = []
sh_prices_tmp = []
sl_prev = np.full(len(dataframe), np.nan) # 前一个 Swing Low 的价格
sh_prev = np.full(len(dataframe), np.nan) # 前一个 Swing High 的价格
for i in range(len(dataframe)):
if pd.notna(sl.iloc[i]):
sl_prices_tmp.append(sl.iloc[i])
if len(sl_prices_tmp) > 4:
sl_prices_tmp.pop(0)
if pd.notna(sh.iloc[i]):
sh_prices_tmp.append(sh.iloc[i])
if len(sh_prices_tmp) > 4:
sh_prices_tmp.pop(0)
# 前一个 Swing Low倒数第二个
if len(sl_prices_tmp) >= 2:
sl_prev[i] = sl_prices_tmp[-2]
# 前一个 Swing High倒数第二个
if len(sh_prices_tmp) >= 2:
sh_prev[i] = sh_prices_tmp[-2]
# support_broken: 4H close < 前一个 Swing Low价格跌破了之前的支撑
dataframe["support_broken"] = (dataframe["close"] < pd.Series(sl_prev, index=dataframe.index)).fillna(False)
# resistance_broken: 4H close > 前一个 Swing High价格突破了之前的阻力
dataframe["resistance_broken"] = (dataframe["close"] > pd.Series(sh_prev, index=dataframe.index)).fillna(False)
return dataframe
# ================================================================
# 主时间框架 — 1H 指标
# ================================================================
def populate_indicators(
self, dataframe: DataFrame, metadata: dict
) -> DataFrame:
"""1H 级别K线形态零指标"""
bullish_pin, bearish_pin, bullish_engulf, bearish_engulf = (
self._detect_candle_patterns(
dataframe["open"],
dataframe["high"],
dataframe["low"],
dataframe["close"],
self.pin_bar_wick_ratio.value / 100.0,
)
)
dataframe["bullish_pinbar"] = bullish_pin
dataframe["bearish_pinbar"] = bearish_pin
dataframe["bullish_engulfing"] = bullish_engulf
dataframe["bearish_engulfing"] = bearish_engulf
dataframe["bullish_signal"] = bullish_pin | bullish_engulf
dataframe["bearish_signal"] = bearish_pin | bearish_engulf
# NaN 安全处理
bool_cols = [
"trend_up_1d", "trend_down_1d",
"trend_up_4h", "trend_down_4h",
"in_demand_4h", "in_supply_4h",
"support_alive_4h", "resistance_alive_4h",
"support_broken_4h", "resistance_broken_4h",
"bullish_signal", "bearish_signal",
]
for col in bool_cols:
if col in dataframe.columns:
dataframe[col] = dataframe[col].fillna(False)
return dataframe
# =====================
# 入场信号 (与 v1.6 完全一致)
# =====================
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
入场逻辑1H 时间框架)— 与 v1.6 完全一致。
做多条件:
1. D1 上升结构trend_up_1d
2. 4H 需求区域in_demand_4h
3. 1H 看涨 K 线形态bullish_signal
4. 止损距离 ≤ max_stop_dist%
5. 支撑位是""support_alive_4h
6. 6h内没有过同方向入场信号冷却期
做空条件对称。
"""
max_dist = self.max_stop_dist.value / 100.0
cooldown = self.cooldown_bars.value
# NaN 安全处理
bool_cols = [
"trend_up_1d", "trend_down_1d",
"trend_up_4h", "trend_down_4h",
"in_demand_4h", "in_supply_4h",
"support_alive_4h", "resistance_alive_4h",
"bullish_signal", "bearish_signal",
]
for col in bool_cols:
if col in dataframe.columns:
dataframe[col] = dataframe[col].fillna(False)
# ── 做多 ──
long_stop_dist = (dataframe["open"] - dataframe["support_4h"]) / dataframe["open"]
long_base = (
dataframe["trend_up_1d"]
& dataframe["in_demand_4h"]
& dataframe["bullish_signal"]
& (long_stop_dist <= max_dist)
& (long_stop_dist > 0.003)
)
long_base = long_base & dataframe["support_alive_4h"]
long_recent = long_base.rolling(cooldown, min_periods=1).max().shift(1) == 0
long_conditions = long_base & long_recent
dataframe.loc[long_conditions, "enter_long"] = 1
# ── 做空 ──
short_stop_dist = (dataframe["resistance_4h"] - dataframe["open"]) / dataframe["open"]
short_base = (
dataframe["trend_down_1d"]
& dataframe["in_supply_4h"]
& dataframe["bearish_signal"]
& (short_stop_dist <= max_dist)
& (short_stop_dist > 0.003)
)
short_base = short_base & dataframe["resistance_alive_4h"]
short_recent = short_base.rolling(cooldown, min_periods=1).max().shift(1) == 0
short_conditions = short_base & short_recent
dataframe.loc[short_conditions, "enter_short"] = 1
return dataframe
# =====================
# 出场信号
# =====================
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""出场逻辑 — 由结构反转触发。"""
exit_long = ~dataframe["trend_up_1d"].fillna(True)
dataframe.loc[exit_long, "exit_long"] = 1
exit_short = dataframe["trend_up_1d"].fillna(False)
dataframe.loc[exit_short, "exit_short"] = 1
return dataframe
# =====================
# 动态止损 — v1.9 结构变化检测
# =====================
def custom_stoploss(
self,
pair: str,
trade: Trade,
current_time: datetime,
current_rate: float,
current_profit: float,
after_fill: bool,
**kwargs,
) -> float:
"""
v1.9 止损逻辑:结构变化检测 + 原有价格结构止损。
新增逻辑(结构变化检测):
做多时:如果 support_broken_4h == True4H close 已跌破支撑),
说明原支撑逻辑已失效,返回 0 立即平仓。
做空时:如果 resistance_broken_4h == True4H close 已突破阻力),
说明原阻力逻辑已失效,返回 0 立即平仓。
原有逻辑(保持不变):
做多 → support_4h * 0.999
做空 → resistance_4h * 1.001
"""
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe is None or len(dataframe) == 0:
return -0.02 if not trade.is_short else 0.02
last = dataframe.iloc[-1]
if not trade.is_short:
# ===== v1.9 新增:结构变化检测 =====
support_broken = last.get("support_broken_4h", False)
if support_broken:
# 支撑已被跌破 → 原止损逻辑失效,立即退出
return 0.0
# ===== 原有逻辑 =====
support = last.get("support_4h", np.nan)
if pd.isna(support) or support <= 0:
return -0.02
sl_price = support * 0.999
sl_ratio = (sl_price / current_rate) - 1.0
return max(sl_ratio, -0.15)
else:
# ===== v1.9 新增:结构变化检测 =====
resistance_broken = last.get("resistance_broken_4h", False)
if resistance_broken:
# 阻力已被突破 → 原止损逻辑失效,立即退出
return 0.0
# ===== 原有逻辑 =====
resistance = last.get("resistance_4h", np.nan)
if pd.isna(resistance) or resistance <= 0:
return 0.02
sl_price = resistance * 1.001
sl_ratio = 1.0 - (sl_price / current_rate)
return min(sl_ratio, 0.15)
# =====================
# Plot config
# =====================
@staticmethod
def plot_config() -> dict:
return {
"main_plot": {
"support_4h": {"color": "green", "type": "line"},
"resistance_4h": {"color": "red", "type": "line"},
},
"subplots": {
"signals": {
"bullish_pinbar": {"color": "green", "type": "scatter"},
"bearish_pinbar": {"color": "red", "type": "scatter"},
},
"filters": {
"support_alive_4h": {"color": "green", "type": "line"},
"resistance_alive_4h": {"color": "red", "type": "line"},
},
"structure_check": {
"support_broken_4h": {"color": "red", "type": "scatter"},
"resistance_broken_4h": {"color": "green", "type": "scatter"},
},
},
}