import time
import requests
from datetime import datetime,timezone,timedelta
import pytz
import re
from Utils.dingding import send_dingtalk_message
if __name__ == '__main__':
webhook = "https://oapi.dingtalk.com/robot/send?access_token=8a6ddcf98d3b47c63333580bfe9d0bad55b17272eea05cc9c0af7f7be4de070d"
# 初始化存储满足条件的时间列表
# 初始化存储满足条件的时间列表
bullish_green_hollow = [] # 空心绿色(多头增加)
bearish_red_solid = [] # 实心红色(空头减少)
previous_bullish_green_hollow = [] # 上一次的多头增加时间点
previous_bearish_red_solid = [] # 上一次的空头减少时间点
while True:
time.sleep(2)
def get_klines(symbol, interval, limit=500):
url = "https://fapi.binance.com/fapi/v1/klines"
params = {
'symbol': symbol,
'interval': interval,
'limit': limit,
}
try:
response = requests.get(url, params=params)
response.raise_for_status() # 将触发异常,如果请求返回非200响应
return response.json()
except requests.RequestException as e:
print(f"请求失败: {e}")
def calculate_obv(klines):
obv = 0
obv_values = [0] # 第一日的OBV值设为0
timestamps = [klines[0][0]] # 存储时间戳
for i in range(1, len(klines)):
current_close = float(klines[i][4]) # 当前日收盘价
previous_close = float(klines[i - 1][4]) # 前一日收盘价
current_volume = float(klines[i][5]) # 当前日成交量
timestamps.append(klines[i][0]) # 添加时间戳
if current_close > previous_close:
obv += current_volume
elif current_close < previous_close:
obv -= current_volume
obv_values.append(obv)
return obv_values, timestamps
def calculate_sma(values, window):
"""计算简单移动平均线(SMA)"""
sma = []
for i in range(len(values)):
if i < window:
sma.append(None)
else:
sma.append(sum(values[i - window:i]) / window)
return sma
def convert_timestamp_to_datetime(timestamp):
"""将时间戳转换为UTC+8时间"""
utc_time = datetime.utcfromtimestamp(timestamp / 1000) # Binance的时间戳是毫秒级的
utc_time = utc_time.replace(tzinfo=timezone.utc)
local_time = utc_time.astimezone(pytz.timezone('Asia/Shanghai'))
return local_time.strftime('%Y-%m-%d %H:%M:%S')
symbol = "SOLUSDT"
klines = get_klines(symbol=symbol, interval="3m")
obv_values, timestamps = calculate_obv(klines)
sma_obv = calculate_sma(obv_values, window=21)
obv_cross_up, obv_cross_down = [], []
for i in range(1, len(obv_values)):
if sma_obv[i] is not None and sma_obv[i - 1] is not None:
time_str = convert_timestamp_to_datetime(timestamps[i])
if obv_values[i] > sma_obv[i] and obv_values[i - 1] <= sma_obv[i - 1]:
obv_cross_up.append((time_str, obv_values[i])) # 上穿越
elif obv_values[i] < sma_obv[i] and obv_values[i - 1] >= sma_obv[i - 1]:
obv_cross_down.append((time_str, obv_values[i])) # 下穿越
print("OBV上穿越:", list(reversed(obv_cross_up)))
print("OBV下穿越:", list(reversed(obv_cross_down)))
def calculate_boll(klines, window=21):
close_prices = [float(kline[4]) for kline in klines]
sma = calculate_sma(close_prices, window)
std_deviation = []
for i in range(len(close_prices)):
if i < window - 1:
std_deviation.append(None)
else:
windowed_prices = close_prices[i - window + 1:i + 1]
mean = sum(windowed_prices) / window
variance = sum((p - mean) ** 2 for p in windowed_prices) / window
std_deviation.append(variance ** 0.5)
return sma, std_deviation
# 在主程序部分增加BOLL穿越检测
sma_obv_boll, _ = calculate_boll(klines, window=21)
boll_cross_up, boll_cross_down = [], []
for i in range(1, len(klines)):
current_close = float(klines[i][4])
previous_close = float(klines[i - 1][4])
if sma_obv_boll[i] is not None and sma_obv_boll[i - 1] is not None:
time_str = convert_timestamp_to_datetime(timestamps[i])
if current_close > sma_obv_boll[i] and previous_close <= sma_obv_boll[i - 1]:
boll_cross_up.append((time_str, current_close)) # BOLL上穿越
elif current_close < sma_obv_boll[i] and previous_close >= sma_obv_boll[i - 1]:
boll_cross_down.append((time_str, current_close)) # BOLL下穿越
print("BOLL上穿越:", list(reversed(boll_cross_up)))
print("BOLL下穿越:", list(reversed(boll_cross_down)))
###########
'''
比较当前时间与事件时间:
然后,我们将检查当前时间是否在“OBV下穿且BOLL中线以下的时刻”加减1分钟的范围内。
'''
def is_within_one_minute(current_time, event_time_str):
"""检查当前时间是否在事件时间的1分钟内"""
event_time = datetime.strptime(event_time_str, '%Y-%m-%d %H:%M:%S')
one_minute_before = event_time - timedelta(minutes=1)
one_minute_after = event_time + timedelta(minutes=1)
return one_minute_before <= current_time <= one_minute_after
def send_message_if_within_one_minute(obv_down_boll_below_times):
"""如果当前时间在OBV下穿且BOLL中线以下的时刻1分钟内,发送钉钉消息"""
# 获取当前时间(UTC+8)
current_time_utc8 = datetime.now(pytz.timezone('Asia/Shanghai'))
for time_str in obv_down_boll_below_times:
if is_within_one_minute(current_time_utc8, time_str):
message = f"当前K线在{time_str}(OBV下穿且BOLL中线以下)的1分钟范围内"
print(message) # 这里打印消息用于调试,实际应用中可发送钉钉消息
send_dingtalk_message(webhook=webhook, secret="", message=message)
break # 找到一个匹配即可,避免重复发送消息
#########
# 确定OBV下穿并且收盘价低于BOLL中线的时间点
obv_down_boll_below = []
for i in range(1, len(obv_values)):
if sma_obv[i] is not None and sma_obv[i - 1] is not None:
time_str = convert_timestamp_to_datetime(timestamps[i])
if obv_values[i] < sma_obv[i] and obv_values[i - 1] >= sma_obv[i - 1]:
# OBV下穿检测
current_close = float(klines[i][4])
if sma_obv_boll[i] is not None and current_close < sma_obv_boll[i]:
# 检查收盘价是否低于BOLL中线
obv_down_boll_below.append((time_str, current_close))
obv_down_boll_below_reversed = obv_down_boll_below[::-1]
print("OBV下穿且BOLL中线以下的时刻:")
for time_str, close in obv_down_boll_below_reversed[:1]:
print(time_str, close)
# time_str = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
# send_message_if_within_one_minute(time_str)
# obv_down_boll_below_times = [time_str for time_str, close in obv_down_boll_below_reversed[::-1][:1]] # 从之前的代码获取时刻
# send_message_if_within_one_minute(obv_down_boll_below_times)
# 确定OBV上穿并且收盘价高于BOLL中线的时间点
obv_up_boll_above = []
for i in range(1, len(obv_values)):
if sma_obv[i] is not None and sma_obv[i - 1] is not None:
time_str = convert_timestamp_to_datetime(timestamps[i]) # 将时间戳转换为UTC+8时间
if obv_values[i] > sma_obv[i] and obv_values[i - 1] <= sma_obv[i - 1]:
# OBV上穿检测
current_close = float(klines[i][4])
if sma_obv_boll[i] is not None and current_close > sma_obv_boll[i]:
# 检查收盘价是否高于BOLL中线
obv_up_boll_above.append((time_str, current_close))
# 倒序列表
obv_up_boll_above_reversed = obv_up_boll_above[::-1]
# 取最新的3条数据
latest_3_entries = obv_up_boll_above_reversed[:1]
print("最新的3条OBV上穿且BOLL中线以上的时刻:")
for time_str, close in latest_3_entries:
print(time_str, close)
# time_str = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
# send_message_if_within_one_minute(time_str)
# obv_up_boll_below_times = [time_str for time_str, close in obv_up_boll_above_reversed[::-1][:1]] # 从之前的代码获取时刻
# send_message_if_within_one_minute(obv_up_boll_below_times)
############macd
def calculate_ema_macd(prices, window, smoothing=2):
ema = [sum(prices[:window]) / window]
for price in prices[window:]:
ema.append((price * (smoothing / (1 + window))) + ema[-1] * (1 - (smoothing / (1 + window))))
return ema
def calculate_macd(prices, short_window=12, long_window=26, signal_window=9):
short_ema = calculate_ema_macd(prices, short_window)[long_window - short_window:]
long_ema = calculate_ema_macd(prices, long_window)
macd_line = [s - l for s, l in zip(short_ema, long_ema)]
signal_line = calculate_ema_macd(macd_line, signal_window)
macd_histogram = [m - s for m, s in zip(macd_line, signal_line)]
return macd_line, signal_line, macd_histogram
# 假设klines数据已经获取,并且已经计算了MACD值
close_prices = [float(kline[4]) for kline in klines]
macd_line, signal_line, macd_histogram = calculate_macd(close_prices)
for i in range(1, len(macd_histogram)):
if macd_histogram[i] > 0 and macd_histogram[i] > macd_histogram[i - 1]:
# 多头增加空心绿色
time_str = convert_timestamp_to_datetime(klines[i][0])
# print(f"{time_str} - 空心绿色(多头增加)")
elif macd_histogram[i] < 0 and macd_histogram[i] < macd_histogram[i - 1]:
# 空头减少实心红色
time_str = convert_timestamp_to_datetime(klines[i][0])
# print(f"{time_str} - 实心红色(空头减少)")
################
'''
增加逻辑并打印当前K线属于空心绿色(多头增加)并且最新的OBV上穿且BOLL中线以上的时刻和实心红色(空头减少)并且OBV下穿且BOLL中线以下的时刻:
'''
# 计算MACD
close_prices = [float(kline[4]) for kline in klines]
macd_line, signal_line, macd_histogram = calculate_macd(close_prices)
# 检查多头增加的时间点是否有新的事件
new_bullish_events = [time for time in bullish_green_hollow if time not in previous_bullish_green_hollow]
if new_bullish_events:
message = f"{symbol}新的穿越多头增加事件发生:{new_bullish_events}"
send_dingtalk_message(webhook=webhook, secret="", message=message)
print(message)
# 检查空头减少的时间点是否有新的事件
new_bearish_events = [time for time in bearish_red_solid if time not in previous_bearish_red_solid]
if new_bearish_events:
message = f"{symbol}新的穿越空头减少事件发生:{new_bearish_events}"
send_dingtalk_message(webhook=webhook, secret="", message=message)
print(message)
# 更新记录的时间点列表以供下次循环使用
previous_bullish_green_hollow = bullish_green_hollow[:]
previous_bearish_red_solid = bearish_red_solid[:]
for i in range(1, len(macd_histogram)):
time_str = convert_timestamp_to_datetime(klines[i][0])
# 检查多头增加的条件
if macd_histogram[i] > 0 and macd_histogram[i] > macd_histogram[i - 1]:
for j in range(1, len(obv_values)):
# 确保时间对齐并跳过SMA为None的情况
if timestamps[j] == klines[i][0] and sma_obv[j] is not None and sma_obv[j - 1] is not None:
if obv_values[j] > sma_obv[j] and obv_values[j - 1] <= sma_obv[j - 1]:
current_close = float(klines[j][4])
if sma_obv_boll[j] is not None and current_close > sma_obv_boll[j]:
bullish_green_hollow.append(time_str)
# 检查空头减少的条件
if macd_histogram[i] < 0 and macd_histogram[i] < macd_histogram[i - 1]:
for j in range(1, len(obv_values)):
# 确保时间对齐并跳过SMA为None的情况
if timestamps[j] == klines[i][0] and sma_obv[j] is not None and sma_obv[j - 1] is not None:
if obv_values[j] < sma_obv[j] and obv_values[j - 1] >= sma_obv[j - 1]:
current_close = float(klines[j][4])
if sma_obv_boll[j] is not None and current_close < sma_obv_boll[j]:
bearish_red_solid.append(time_str)
print("做多---空心绿色(多头增加)且OBV上穿BOLL中线的时刻:", bullish_green_hollow[::-1])
print("做空---实心红色(空头减少)且OBV下穿BOLL中线的时刻:", bearish_red_solid[::-1])
网友评论