Files
beast-trader-strategies/strategy.py

332 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
多时间框架价格行为策略 — ETH/USDT 中低频交易
==============================================
设计理念 (v0.2)
1. 反转大多会失败 → 不做反转预测,只做趋势延续。
在 S/R 位入场不是赌反弹,是赌"回调结束、趋势恢复"
2. 移动止损优先 → 放弃固定止盈,用 ATR 追踪止损让利润在趋势中奔跑。
3. 多时间框架自上而下分析:
D1 → 判断宏观方向(能不能做)
1H → 识别中期结构 + S/R 区域(在哪做)
5M → 确认入场时机(什么时候做)
核心原则:只在大趋势方向上,在关键位置,等确认信号入场。
版本v0.2.0 — 多时间框架重构
回测日期2026-06-07
回测结果1253笔 / 胜率17.4% / -0.36% / 平均持仓24min
已知问题(诊断见 docs/backtest-pitfalls.md
1. 成交量 surge 计算了但未用于入场过滤 → 信号过多
2. 1H 只要求"非反向"而非"同向" → 过滤太弱
3. 止损太紧保本0.5ATR/追踪1.0ATR → 持仓仅24min
4. 缺少最低波动率过滤
注意以下属性在首次回测时缺失后补stoploss/use_custom_stoploss/minimal_roi/NaN清理
"""
from functools import reduce
from typing import Optional
import numpy as np
import pandas as pd
import talib.abstract as ta
from pandas import DataFrame
from freqtrade.strategy import IStrategy, merge_informative_pair
from freqtrade.strategy import IntParameter, DecimalParameter
def detect_swing_points(df: DataFrame, window: int, col_high="high", col_low="low"):
w = int(window)
roll_max = df[col_high].rolling(window=w, center=True).max()
roll_min = df[col_low].rolling(window=w, center=True).min()
df["is_swing_high"] = (
(df[col_high] == roll_max)
& (df[col_high] > df[col_high].shift(1))
& (df[col_high] > df[col_high].shift(-1))
)
df["is_swing_low"] = (
(df[col_low] == roll_min)
& (df[col_low] < df[col_low].shift(1))
& (df[col_low] < df[col_low].shift(-1))
)
df["last_swing_high"] = df[col_high].where(df["is_swing_high"]).ffill()
df["last_swing_low"] = df[col_low].where(df["is_swing_low"]).ffill()
return df
def detect_candle_patterns(df: DataFrame, pin_body_ratio=0.3, engulf_ratio=1.5):
body = abs(df["close"] - df["open"])
c_range = df["high"] - df["low"]
upper_wick = df["high"] - df[["open", "close"]].max(axis=1)
lower_wick = df[["open", "close"]].min(axis=1) - df["low"]
safe_range = c_range.replace(0, np.nan)
df["bullish_pinbar"] = (
(body < pin_body_ratio * safe_range)
& (lower_wick > 2 * body)
& (lower_wick > upper_wick)
& (df["close"] > df["open"])
)
df["bearish_pinbar"] = (
(body < pin_body_ratio * safe_range)
& (upper_wick > 2 * body)
& (upper_wick > lower_wick)
& (df["close"] < df["open"])
)
prev_open = df["open"].shift(1)
prev_close = df["close"].shift(1)
prev_body = abs(prev_close - prev_open)
df["bullish_engulfing"] = (
(prev_close < prev_open)
& (df["close"] > df["open"])
& (df["open"] < prev_close)
& (df["close"] > prev_open)
& (body > engulf_ratio * prev_body)
)
df["bearish_engulfing"] = (
(prev_close > prev_open)
& (df["close"] < df["open"])
& (df["open"] > prev_close)
& (df["close"] < prev_open)
& (body > engulf_ratio * prev_body)
)
return df
class PriceActionStrategy(IStrategy):
INTERFACE_VERSION = 3
timeframe = "5m"
can_short = True
max_open_trades = 1
startup_candle_count = 200
process_only_new_candles = True
use_exit_signal = True
stoploss = -0.10 # [回测补] 首次缺失
use_custom_stoploss = True # [回测补] 首次缺失
minimal_roi = {"0": 100} # [回测补] 首次缺失
ema_fast_daily = IntParameter(10, 30, default=20, space="buy")
ema_slow_daily = IntParameter(40, 80, default=50, space="buy")
swing_window_daily = IntParameter(3, 10, default=5, space="buy")
ema_fast_h1 = IntParameter(10, 30, default=20, space="buy")
ema_slow_h1 = IntParameter(40, 80, default=50, space="buy")
swing_window_h1 = IntParameter(3, 10, default=5, space="buy")
atr_period = IntParameter(10, 28, default=14, space="buy")
atr_stop_multiplier = DecimalParameter(1.0, 3.0, default=1.5, space="sell")
pin_bar_body_ratio = DecimalParameter(0.15, 0.40, default=0.30, space="buy")
engulfing_body_ratio = DecimalParameter(1.2, 3.0, default=1.5, space="buy")
volume_surge_multiplier = DecimalParameter(1.2, 3.0, default=1.5, space="buy")
def informative_pairs(self):
pairs = self.dp.current_whitelist()
informative_pairs = []
for pair in pairs:
informative_pairs.append((pair, "1h"))
informative_pairs.append((pair, "1d"))
return informative_pairs
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Layer 1: D1
daily = self.dp.get_pair_dataframe(pair=metadata["pair"], timeframe="1d")
if not daily.empty:
daily["ema_fast"] = ta.EMA(daily, timeperiod=self.ema_fast_daily.value)
daily["ema_slow"] = ta.EMA(daily, timeperiod=self.ema_slow_daily.value)
daily = detect_swing_points(daily, self.swing_window_daily.value)
daily["trend_up"] = (
(daily["ema_fast"] > daily["ema_slow"])
& (daily["close"] > daily["ema_fast"])
)
daily["trend_down"] = (
(daily["ema_fast"] < daily["ema_slow"])
& (daily["close"] < daily["ema_fast"])
)
else:
daily = dataframe.copy()
for col in ["ema_fast", "ema_slow", "is_swing_high", "is_swing_low",
"last_swing_high", "last_swing_low", "trend_up", "trend_down"]:
daily[col] = np.nan
dataframe = merge_informative_pair(dataframe, daily, self.timeframe, "1d", ffill=True)
# Layer 2: 1H
hourly = self.dp.get_pair_dataframe(pair=metadata["pair"], timeframe="1h")
if not hourly.empty:
hourly["ema_fast"] = ta.EMA(hourly, timeperiod=self.ema_fast_h1.value)
hourly["ema_slow"] = ta.EMA(hourly, timeperiod=self.ema_slow_h1.value)
hourly = detect_swing_points(hourly, self.swing_window_h1.value)
hourly["trend_up"] = (
(hourly["ema_fast"] > hourly["ema_slow"])
& (hourly["close"] > hourly["ema_fast"])
)
hourly["trend_down"] = (
(hourly["ema_fast"] < hourly["ema_slow"])
& (hourly["close"] < hourly["ema_fast"])
)
else:
hourly = dataframe.copy()
for col in ["ema_fast", "ema_slow", "is_swing_high", "is_swing_low",
"last_swing_high", "last_swing_low", "trend_up", "trend_down"]:
hourly[col] = np.nan
dataframe = merge_informative_pair(dataframe, hourly, self.timeframe, "1h", ffill=True)
# Layer 3: 5M
dataframe["atr"] = ta.ATR(dataframe, timeperiod=self.atr_period.value)
dataframe["atr_ratio"] = dataframe["atr"] / dataframe["atr"].rolling(20).mean()
dataframe["ema_20_5m"] = ta.EMA(dataframe, timeperiod=20)
dataframe = detect_candle_patterns(
dataframe,
pin_body_ratio=self.pin_bar_body_ratio.value,
engulf_ratio=self.engulfing_body_ratio.value,
)
dataframe["volume_ma20"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_surge"] = (
dataframe["volume"] > self.volume_surge_multiplier.value * dataframe["volume_ma20"]
)
support = dataframe["last_swing_low_1h"]
resistance = dataframe["last_swing_high_1h"]
dataframe["dist_to_support_pct"] = np.where(
support > 0,
(dataframe["close"] - support) / dataframe["close"] * 100,
np.nan,
)
dataframe["dist_to_resistance_pct"] = np.where(
resistance > 0,
(resistance - dataframe["close"]) / dataframe["close"] * 100,
np.nan,
)
# NaN 清理 [回测补]
bool_cols = [
"trend_up_1d", "trend_down_1d", "trend_up_1h", "trend_down_1h",
"bullish_pinbar", "bearish_pinbar",
"bullish_engulfing", "bearish_engulfing", "volume_surge",
]
for col in bool_cols:
if col in dataframe.columns:
dataframe[col] = dataframe[col].fillna(False).infer_objects(copy=False)
return dataframe
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
daily_bullish = dataframe["trend_up_1d"] & (dataframe["close"] > dataframe["ema_fast_1d"])
daily_bearish = dataframe["trend_down_1d"] & (dataframe["close"] < dataframe["ema_fast_1d"])
h1_not_bearish = ~dataframe["trend_down_1h"]
price_near_support = (dataframe["dist_to_support_pct"] < 3.0) & (dataframe["dist_to_support_pct"] > 0)
h1_not_bullish = ~dataframe["trend_up_1h"]
price_near_resistance = (dataframe["dist_to_resistance_pct"] < 3.0) & (dataframe["dist_to_resistance_pct"] > 0)
bullish_pattern = dataframe["bullish_pinbar"] | dataframe["bullish_engulfing"]
bearish_pattern = dataframe["bearish_pinbar"] | dataframe["bearish_engulfing"]
normal_vol = dataframe["atr_ratio"] < 2.0
conditions_long = [daily_bullish, h1_not_bearish, price_near_support, bullish_pattern, normal_vol]
conditions_short = [daily_bearish, h1_not_bullish, price_near_resistance, bearish_pattern, normal_vol]
if conditions_long:
dataframe.loc[reduce(lambda a, b: a & b, conditions_long), "enter_long"] = 1
if conditions_short:
dataframe.loc[reduce(lambda a, b: a & b, conditions_short), "enter_short"] = 1
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
daily_no_longer_bullish = ~dataframe["trend_up_1d"]
daily_no_longer_bearish = ~dataframe["trend_down_1d"]
conditions_exit_long = [daily_no_longer_bullish]
conditions_exit_short = [daily_no_longer_bearish]
if conditions_exit_long:
dataframe.loc[reduce(lambda a, b: a | b, conditions_exit_long), "exit_long"] = 1
if conditions_exit_short:
dataframe.loc[reduce(lambda a, b: a | b, conditions_exit_short), "exit_short"] = 1
return dataframe
def custom_stoploss(self, pair, trade, current_time, current_rate, current_profit,
after_fill, **kwargs) -> Optional[float]:
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
if dataframe.empty:
return None
last_candle = dataframe.iloc[-1]
atr = last_candle.get("atr", current_rate * 0.005)
entry_price = trade.open_rate
atr_ratio = atr / entry_price
if trade.is_short:
profit_ratio = -current_profit
if profit_ratio > atr_ratio * 2.0:
return -atr_ratio * 1.0
elif profit_ratio > atr_ratio * 0.5:
return 0
else:
return -atr_ratio * self.atr_stop_multiplier.value
else:
if current_profit > atr_ratio * 2.0:
return -atr_ratio * 1.0
elif current_profit > atr_ratio * 0.5:
return 0
else:
return -atr_ratio * self.atr_stop_multiplier.value
def custom_exit(self, pair, trade, current_time, current_rate, current_profit,
**kwargs) -> Optional[str]:
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
if dataframe.empty:
return None
last_candle = dataframe.iloc[-1]
if trade.is_short:
if last_candle.get("trend_up_1d", False):
return "daily_trend_reversed"
else:
if last_candle.get("trend_down_1d", False):
return "daily_trend_reversed"
return None
def custom_stake_amount(self, pair, current_time, current_rate, proposed_stake,
min_stake, max_stake, leverage, entry_tag, side, **kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
if dataframe.empty:
return min_stake or proposed_stake
last_candle = dataframe.iloc[-1]
atr = last_candle.get("atr", current_rate * 0.005)
stop_distance = atr * self.atr_stop_multiplier.value
available_balance = self.wallets.get_total_stake_amount()
risk_amount = available_balance * 0.01
position_size = risk_amount / stop_distance if stop_distance > 0 else proposed_stake
position_size = min(position_size, max_stake or float("inf"))
if min_stake and position_size < min_stake:
return 0
return position_size
def confirm_trade_entry(self, pair, order_type, amount, rate, time_in_force,
current_time, entry_tag, side, **kwargs) -> bool:
return True