Files
beast-trader/strategies/v2.2d/structure_flow_strategy_v2_2d.py

452 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Structure Flow Strategy v2.2c — 冷却期修复版
==============================================
变更记录:
v2.2c (2026-06-11): 1H S/R 替代 4H S/R
v2.2c-coolfix (2026-06-11): 修复冷却期无限阻止下单 bug
"""
from datetime import datetime
import numpy as np
import pandas as pd
from pandas import DataFrame
from freqtrade.strategy import IStrategy, IntParameter, informative
from freqtrade.persistence import Trade
class StructureFlowStrategyV22d(IStrategy):
can_short = True
stoploss = -0.15
use_custom_stoploss = True
minimal_roi = {"0": 100}
max_open_trades = 1
timeframe = "1h"
# =====================
# 可优化参数
# =====================
swing_lookback_d1 = IntParameter(8, 14, default=10, space="buy")
swing_lookback_h4 = IntParameter(5, 10, default=8, space="buy")
swing_lookback_1h = IntParameter(3, 7, default=5, space="buy") # 新增1H swing参数
pin_bar_wick_ratio = IntParameter(50, 70, default=60, space="buy")
max_stop_dist = IntParameter(20, 50, default=50, space="buy")
cooldown_bars = IntParameter(3, 12, default=6, space="buy")
trend_strength_min = IntParameter(-50, 20, default=-20, space="buy")
# =====================
# 工具Swing Point 检测
# =====================
@staticmethod
def _detect_swing_points(
high: pd.Series,
low: pd.Series,
window: int = 5,
) -> tuple[pd.Series, pd.Series]:
n = len(high)
sh = pd.Series(np.nan, index=high.index, dtype=float)
sl = pd.Series(np.nan, index=low.index, dtype=float)
for i in range(window, n - window):
if high.iloc[i] > high.iloc[i - window : i].max() and high.iloc[i] > high.iloc[i + 1 : i + window + 1].max():
sh.iloc[i] = high.iloc[i]
if low.iloc[i] < low.iloc[i - window : i].min() and low.iloc[i] < low.iloc[i + 1 : i + window + 1].min():
sl.iloc[i] = low.iloc[i]
return sh, sl
# =====================
# 工具:结构分析
# =====================
def _build_structure(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
swing_high: pd.Series,
swing_low: pd.Series,
) -> DataFrame:
n = len(high)
trend_up_arr = np.full(n, False)
trend_down_arr = np.full(n, False)
nearest_support = np.full(n, np.nan)
nearest_resistance = np.full(n, np.nan)
in_demand_zone = np.full(n, False)
in_supply_zone = np.full(n, False)
sh_prices = []
sl_prices = []
for i in range(n):
if pd.notna(swing_high.iloc[i]):
sh_prices.append(swing_high.iloc[i])
if len(sh_prices) > 4:
sh_prices.pop(0)
if pd.notna(swing_low.iloc[i]):
sl_prices.append(swing_low.iloc[i])
if len(sl_prices) > 4:
sl_prices.pop(0)
if len(sh_prices) >= 2 and len(sl_prices) >= 2:
if sh_prices[-1] > sh_prices[-2] and sl_prices[-1] > sl_prices[-2]:
trend_up_arr[i] = True
elif sh_prices[-1] < sh_prices[-2] and sl_prices[-1] < sl_prices[-2]:
trend_down_arr[i] = True
elif i > 0:
trend_up_arr[i] = trend_up_arr[i - 1]
trend_down_arr[i] = trend_down_arr[i - 1]
elif i > 0:
trend_up_arr[i] = trend_up_arr[i - 1]
trend_down_arr[i] = trend_down_arr[i - 1]
if sl_prices:
nearest_support[i] = sl_prices[-1]
if sh_prices:
nearest_resistance[i] = sh_prices[-1]
c = close.iloc[i]
if not np.isnan(nearest_support[i]) and not np.isnan(nearest_resistance[i]):
zone_range = nearest_resistance[i] - nearest_support[i]
if zone_range > 0:
pos_pct = (c - nearest_support[i]) / zone_range
in_demand_zone[i] = pos_pct < 0.35
in_supply_zone[i] = pos_pct > 0.65
return DataFrame({
"trend_up": trend_up_arr,
"trend_down": trend_down_arr,
"support": nearest_support,
"resistance": nearest_resistance,
"in_demand": in_demand_zone,
"in_supply": in_supply_zone,
}, index=high.index)
# =====================
# 工具K线形态检测
# =====================
@staticmethod
def _detect_candle_patterns(
open_: pd.Series,
high: pd.Series,
low: pd.Series,
close: pd.Series,
pin_bar_wick_ratio: float = 0.6,
) -> tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
body = (close - open_).abs()
total_range = (high - low).replace(0, 0.0001)
upper_wick = high - close.where(close > open_, open_)
lower_wick = open_.where(close > open_, close) - low
is_pin = (upper_wick + lower_wick) / total_range > pin_bar_wick_ratio
bullish_pin = is_pin & (close > open_) & (lower_wick > upper_wick)
bearish_pin = is_pin & (close < open_) & (upper_wick > lower_wick)
prev_open = open_.shift(1)
prev_close = close.shift(1)
bullish_engulf = (close > prev_open) & (open_ < prev_close) & (close > open_)
bearish_engulf = (close < prev_open) & (open_ > prev_close) & (close < open_)
return bullish_pin, bearish_pin, bullish_engulf, bearish_engulf
# =====================
# 工具:冷却期正确实现(修复 bug
# =====================
def _apply_cooldown(self, signal: pd.Series, cooldown_bars: int) -> pd.Series:
"""
正确应用冷却期:入场后才冷却,而非条件满足就冷却。
原逻辑 buglong_base.rolling(cooldown).max().shift(1) == 0
- 当市场持续满足入场条件时rolling window 里永远有 True
- 导致冷却期无限阻止下单
修复逻辑:遍历 K 线,模拟"入场 -> 冷却"过程。
- 满足条件 + 距离上次入场 > cooldown -> 允许入场
- 入场后 cooldown 根 K 线内不再入场
"""
n = len(signal)
result = [False] * n
last_entry = -99999 # 上次入场的 bar 索引
# 遍历(对 numpy array 操作O(n) 约几毫秒)
values = signal.values # numpy array快速访问
for i in range(n):
if values[i] and (i - last_entry) > cooldown_bars:
result[i] = True
last_entry = i
return pd.Series(result, index=signal.index)
# ================================================================
# 信息时间框架 — D1 宏观结构
# ================================================================
@informative("1d")
def populate_indicators_1d(
self, dataframe: DataFrame, metadata: dict
) -> DataFrame:
sh, sl = self._detect_swing_points(
dataframe["high"], dataframe["low"],
self.swing_lookback_d1.value,
)
structure = self._build_structure(
dataframe["high"], dataframe["low"], dataframe["close"],
sh, sl,
)
dataframe["trend_up"] = structure["trend_up"]
dataframe["trend_down"] = structure["trend_down"]
return dataframe
# ================================================================
# 信息时间框架 — 4H 趋势强度(原版保留)
# ================================================================
@informative("4h")
def populate_indicators_4h(
self, dataframe: DataFrame, metadata: dict
) -> DataFrame:
sh, sl = self._detect_swing_points(
dataframe["high"], dataframe["low"],
self.swing_lookback_h4.value,
)
structure = self._build_structure(
dataframe["high"], dataframe["low"], dataframe["close"],
sh, sl,
)
# 趋势强度计算(原版逻辑)
sh_prices = []
sl_prices = []
trend_strength_up = np.full(len(dataframe), np.nan)
trend_strength_down = np.full(len(dataframe), np.nan)
for i in range(len(dataframe)):
if pd.notna(sh.iloc[i]):
sh_prices.append(sh.iloc[i])
if len(sh_prices) > 4:
sh_prices.pop(0)
if pd.notna(sl.iloc[i]):
sl_prices.append(sl.iloc[i])
if len(sl_prices) > 4:
sl_prices.pop(0)
if len(sh_prices) >= 2 and len(sl_prices) >= 2:
hh_dist = (sh_prices[-1] - sh_prices[-2]) / sh_prices[-2] if sh_prices[-2] > 0 else 0
hl_dist = (sl_prices[-1] - sl_prices[-2]) / sl_prices[-2] if sl_prices[-2] > 0 else 0
trend_strength_up[i] = hh_dist + hl_dist
trend_strength_down[i] = -(hh_dist + hl_dist)
dataframe["trend_strength_up"] = pd.Series(trend_strength_up, index=dataframe.index)
dataframe["trend_strength_down"] = pd.Series(trend_strength_down, index=dataframe.index)
min_strength = self.trend_strength_min.value / 100.0
dataframe["strong_uptrend"] = dataframe["trend_strength_up"] > min_strength
dataframe["strong_downtrend"] = dataframe["trend_strength_down"] > min_strength
return dataframe
# ================================================================
# 主时间框架 — 1H 指标(含 1H S/R + 活支撑/阻力)
# ================================================================
def populate_indicators(
self, dataframe: DataFrame, metadata: dict
) -> DataFrame:
# ── K线形态 ──
bullish_pin, bearish_pin, bullish_engulf, bearish_engulf = (
self._detect_candle_patterns(
dataframe["open"],
dataframe["high"],
dataframe["low"],
dataframe["close"],
self.pin_bar_wick_ratio.value / 100.0,
)
)
dataframe["bullish_pinbar"] = bullish_pin
dataframe["bearish_pinbar"] = bearish_pin
dataframe["bullish_engulfing"] = bullish_engulf
dataframe["bearish_engulfing"] = bearish_engulf
dataframe["bullish_signal"] = bullish_pin | bullish_engulf
dataframe["bearish_signal"] = bearish_pin | bearish_engulf
# ── 1H级别 Swing Point + 结构替代原4H S/R ──
sh_1h, sl_1h = self._detect_swing_points(
dataframe["high"], dataframe["low"],
self.swing_lookback_1h.value,
)
structure_1h = self._build_structure(
dataframe["high"], dataframe["low"], dataframe["close"],
sh_1h, sl_1h,
)
dataframe["trend_up_1h"] = structure_1h["trend_up"]
dataframe["trend_down_1h"] = structure_1h["trend_down"]
dataframe["support"] = structure_1h["support"]
dataframe["resistance"] = structure_1h["resistance"]
dataframe["in_demand"] = structure_1h["in_demand"]
dataframe["in_supply"] = structure_1h["in_supply"]
# ── 1H 活支撑/阻力检查 ──
touched_support = (
(dataframe["low"] <= dataframe["support"] * 1.005) &
(dataframe["low"] >= dataframe["support"] * 0.995)
)
held_support = dataframe["close"] > dataframe["support"]
support_tested_and_held = touched_support & held_support
dataframe["support_alive"] = support_tested_and_held.rolling(3, min_periods=1).max() > 0
touched_resistance = (
(dataframe["high"] >= dataframe["resistance"] * 0.995) &
(dataframe["high"] <= dataframe["resistance"] * 1.005)
)
held_resistance = dataframe["close"] < dataframe["resistance"]
resistance_tested_and_held = touched_resistance & held_resistance
dataframe["resistance_alive"] = resistance_tested_and_held.rolling(3, min_periods=1).max() > 0
# ── NaN 安全处理 ──
bool_cols = [
"trend_up_1d", "trend_down_1d",
"trend_up_4h", "trend_down_4h",
"in_demand", "in_supply",
"support_alive", "resistance_alive",
"strong_uptrend_4h", "strong_downtrend_4h",
"bullish_signal", "bearish_signal",
]
for col in bool_cols:
if col in dataframe.columns:
dataframe[col] = dataframe[col].fillna(False)
return dataframe
# =====================
# 入场信号(修复冷却期逻辑)
# =====================
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
max_dist = self.max_stop_dist.value / 100.0
cooldown = self.cooldown_bars.value
bool_cols = [
"trend_up_1d", "trend_down_1d",
"trend_up_4h", "trend_down_4h",
"in_demand", "in_supply",
"support_alive", "resistance_alive",
"strong_uptrend_4h", "strong_downtrend_4h",
"bullish_signal", "bearish_signal",
]
for col in bool_cols:
if col in dataframe.columns:
dataframe[col] = dataframe[col].fillna(False)
# ── 做多使用1H S/R ──
long_stop_dist = (dataframe["open"] - dataframe["support"]) / dataframe["open"]
long_base = (
dataframe["trend_up_1d"]
& dataframe["in_demand"]
& (long_stop_dist <= max_dist)
& (long_stop_dist > 0.003)
& dataframe["support_alive"]
& dataframe["strong_uptrend_4h"]
)
# ✅ 修复:正确应用冷却期(基于实际入场,而非条件满足)
long_entries = self._apply_cooldown(long_base, cooldown)
dataframe.loc[long_entries, "enter_long"] = 1
# ── 做空使用1H S/R ──
short_stop_dist = (dataframe["resistance"] - dataframe["open"]) / dataframe["open"]
short_base = (
dataframe["trend_down_1d"]
& dataframe["in_supply"]
& (short_stop_dist <= max_dist)
& (short_stop_dist > 0.003)
& dataframe["resistance_alive"]
& dataframe["strong_downtrend_4h"]
)
# ✅ 修复:正确应用冷却期(基于实际入场,而非条件满足)
short_entries = self._apply_cooldown(short_base, cooldown)
dataframe.loc[short_entries, "enter_short"] = 1
return dataframe
# =====================
# 出场信号
# =====================
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
exit_long = ~dataframe["trend_up_1d"].fillna(True)
dataframe.loc[exit_long, "exit_long"] = 1
exit_short = dataframe["trend_up_1d"].fillna(False)
dataframe.loc[exit_short, "exit_short"] = 1
return dataframe
# =====================
# 动态止损基于1H S/R
# =====================
def custom_stoploss(
self,
pair: str,
trade: Trade,
current_time: datetime,
current_rate: float,
current_profit: float,
after_fill: bool,
**kwargs,
) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe is None or len(dataframe) == 0:
return -0.02 if not trade.is_short else 0.02
last = dataframe.iloc[-1]
if not trade.is_short:
support = last.get("support", np.nan)
if pd.isna(support) or support <= 0:
return -0.02
sl_price = support * 0.999
sl_ratio = (sl_price / current_rate) - 1.0
return max(sl_ratio, -0.15)
else:
resistance = last.get("resistance", np.nan)
if pd.isna(resistance) or resistance <= 0:
return 0.02
sl_price = resistance * 1.001
sl_ratio = 1.0 - (sl_price / current_rate)
return min(sl_ratio, 0.15)
# =====================
# Plot config
# =====================
@staticmethod
def plot_config() -> dict:
return {
"main_plot": {
"support": {"color": "green", "type": "line"},
"resistance": {"color": "red", "type": "line"},
},
"subplots": {
"signals": {
"bullish_pinbar": {"color": "green", "type": "scatter"},
"bearish_pinbar": {"color": "red", "type": "scatter"},
},
"filters": {
"support_alive": {"color": "green", "type": "line"},
"resistance_alive": {"color": "red", "type": "line"},
"strong_uptrend_4h": {"color": "blue", "type": "line"},
"strong_downtrend_4h": {"color": "orange", "type": "line"},
},
},
}