import re
import time
import requests
from datetime import datetime, timezone
import pytz
from Utils.dingding import send_dingtalk_message # 确保这个路径和函数名称与您的实际情况匹配
def get_klines(symbol, interval, limit=500):
"""获取K线数据"""
url = "https://fapi.binance.com/fapi/v1/klines"
params = {
'symbol': symbol,
'interval': interval,
'limit': limit,
}
response = requests.get(url, params=params)
return response.json()
def calculate_sma(prices):
"""计算简单移动平均线(SMA)"""
return sum(prices) / len(prices)
def collect_cross_boll_midline(klines):
"""收集穿越BOLL中线的信息"""
closes = [float(kline[4]) for kline in klines] # 获取收盘价
volumes = [float(kline[5]) for kline in klines] # 获取成交量
total_traded = [float(kline[7]) for kline in klines] # 获取成交额
buy_volumes = [float(kline[9]) for kline in klines] # 获取买单成交量
up_crosses = [] # 存储向上穿越的信息
down_crosses = [] # 存储向下穿越的信息
previous_state_above = None # 初始状态未知
for i in range(20, len(closes)):
sma20 = calculate_sma(closes[i - 20:i]) # 计算BOLL中线
current_state_above = closes[i] > sma20 # 判断当前价格是否在BOLL中线之上
active_buy_value = buy_volumes[i] * closes[i] # 主动买入成交额的近似计算
if current_state_above != previous_state_above:
if previous_state_above is not None:
direction = "上穿越" if current_state_above else "下穿越"
time = datetime.utcfromtimestamp(klines[i][0] / 1000)
time = time.replace(tzinfo=timezone.utc).astimezone(pytz.timezone('Asia/Shanghai'))
# info = f"{time.strftime('%Y-%m-%d %H:%M:%S')} {direction} BOLL中线的价格: {closes[i]}, 成交量: {volumes[i]}, 成交额: {total_traded[i]}, 主动买入成交额: {active_buy_value}"
# 修改info的构建方式,包含更多信息
info = {
'time': time.strftime('%Y-%m-%d %H:%M:%S'),
'timestamp': klines[i][0], # 添加Unix时间戳
'direction': direction,
'price': closes[i],
'volume': volumes[i],
'total_traded': total_traded[i],
'active_buy_value': active_buy_value,
}
if current_state_above:
up_crosses.append(info)
else:
down_crosses.append(info)
previous_state_above = current_state_above
return up_crosses, down_crosses
if __name__ == "__main__":
webhook = "https://oapi.dingtalk.com/robot/send?access_token=8a6ddcf98d3b47c63333580bfe9d0bad55b17272eea05cc9c0af7f7be4de070d"
symbol = "BTCUSDT"
interval = "5m"
# 初始化上一次穿越信息和发送标志位
last_up_cross = None
last_down_cross = None
last_up_cross_15m = None
last_down_cross_15m = None
up_cross_5m_sent = False
down_cross_5m_sent = False
up_cross_15m_sent = False
down_cross_15m_sent = False
up_5m_last_sent_time = 0 #穿越5分钟中线,成交额增加,限频,5分钟内只发送1次
down_5m_last_sent_time = 0 #穿越5分钟中线,成交额增加,限频,5分钟内只发送1次
up_5min =0 #5分钟向上穿越信息 5分钟内只发送1次
down_5min=0 #5分钟向下穿越信息5分钟内只发送1次
up_15min=0 #15分钟向上穿越信息 1分钟内只发送1次
down_15min = 0 # 15分钟向下穿越信息1分钟内只发送1次
# 在主循环之前初始化变量以跟踪同时穿越事件的发送状态
last_sent_up_cross_5m_15m = None
last_sent_down_cross_5m_15m = None
sent_up_cross_5m_15m_notification = False
sent_down_cross_5m_15m_notification = False
last_cross_direction_5m = None
last_cross_direction_15m = None
last_cross_notification_time = 0 # 记录上一次发送通知的时间
while True:
current_time = time.time()
# 5分钟K线
klines = get_klines(symbol, interval)
up_crosses, down_crosses = collect_cross_boll_midline(klines)
current_up_cross = up_crosses[-1:] if up_crosses else None
current_down_cross = down_crosses[-1:] if down_crosses else None
# 获取最新的上穿越信息
current_up_cross = up_crosses[-1] if up_crosses else None
# 获取最新的下穿越信息
current_down_cross = down_crosses[-1] if down_crosses else None
# 15分钟K线
klines_15m = get_klines(symbol, "15m")
up_crosses_15m, down_crosses_15m = collect_cross_boll_midline(klines_15m)
current_up_cross_15m = up_crosses_15m[-1:] if up_crosses_15m else None
current_down_cross_15m = down_crosses_15m[-1:] if down_crosses_15m else None
current_time = int(time.time() * 1000) # 获取当前时间的Unix时间戳(毫秒级)
# 处理5分钟穿越事件
if current_up_cross and current_up_cross != last_up_cross:
message = "5分钟向上穿越信息:" + str(current_up_cross)
print(message)
if time.time() - up_5min > 300: # 限频,5分钟内只发送1次
send_dingtalk_message(webhook=webhook, secret="", message=message)
up_5min = time.time() # 更新发送时间
print("5分钟向上穿越信息,记录的发送时间是:{}".format(up_5min))
last_up_cross = current_up_cross
if current_down_cross and current_down_cross != last_down_cross:
message = "5分钟向下穿越信息:" + str(current_down_cross)
print(message)
if time.time() - down_5min > 120: # 限频,5分钟内只发送1次
send_dingtalk_message(webhook=webhook, secret="", message=message)
down_5min = time.time() # 更新发送时间
print("5分钟向下穿越信息,记录的发送时间是:{}".format(down_5min))
last_down_cross = current_down_cross
# 处理15分钟穿越事件
if current_up_cross_15m and current_up_cross_15m != last_up_cross_15m:
message = "15分钟向上穿越信息:" + str(current_up_cross_15m)
print(message)
if time.time() - up_15min > 120: # 限频,1分钟内只发送1次
send_dingtalk_message(webhook=webhook, secret="", message=message)
up_15min=time.time() # 更新发送时间
last_up_cross_15m = current_up_cross_15m
if current_down_cross_15m and current_down_cross_15m != last_down_cross_15m:
message = "15分钟向下穿越信息:" + str(current_down_cross_15m)
print(message)
if time.time() - down_15min > 60: # 限频,1分钟内只发送1次
send_dingtalk_message(webhook=webhook, secret="", message=message)
down_15min=time.time() # 更新发送时间
last_down_cross_15m = current_down_cross_15m
#处理5分钟和15分钟同时向上的事件
# 5分钟K线
klines_5m = get_klines(symbol, "5m")
up_crosses_5m, down_crosses_5m = collect_cross_boll_midline(klines_5m)
# 获取最新的5分钟穿越信息
if up_crosses_5m:
current_up_cross_5m = up_crosses_5m[-1]
last_cross_direction_5m = "up"
elif down_crosses_5m:
current_down_cross_5m = down_crosses_5m[-1]
last_cross_direction_5m = "down"
# 15分钟K线
klines_15m = get_klines(symbol, "15m")
up_crosses_15m, down_crosses_15m = collect_cross_boll_midline(klines_15m)
# 获取最新的15分钟穿越信息
if up_crosses_15m:
current_up_cross_15m = up_crosses_15m[-1]
last_cross_direction_15m = "up"
elif down_crosses_15m:
current_down_cross_15m = down_crosses_15m[-1]
last_cross_direction_15m = "down"
current_time_seconds = time.time() # 获取当前时间的Unix时间戳(秒级)
# 检查最近的5分钟和15分钟穿越方向是否一致
if last_cross_direction_5m and last_cross_direction_15m and ((current_time_seconds - last_cross_notification_time) > 120):
if last_cross_direction_5m == last_cross_direction_15m:
direction = "向上" if last_cross_direction_5m == "up" else "向下"
message = f"同时在5分钟和15分钟K线上检测到{direction}穿越事件"
print(message)
# 发送消息
send_dingtalk_message(webhook=webhook, secret="", message=message)
last_cross_notification_time = current_time_seconds # 更新最后一次发送通知的时间
# 重置方向,避免重复发送相同方向的消息(可选)
last_cross_direction_5m = None
last_cross_direction_15m = None
# 比较时间戳找到最接近当前时间的事件
time_diff_up = abs(current_time - last_up_cross['timestamp'])
time_diff_down = abs(current_time - last_down_cross['timestamp'])
if time_diff_up < time_diff_down:
current_total_traded = current_up_cross['total_traded']
print("最近的事件是上穿越,发生在:", last_up_cross['time'])
# 检查并发送钉钉消息的逻辑需要调整
if last_up_cross is None or current_total_traded > last_up_cross['total_traded']:
print("上一次的成交额是:{}".format(last_up_cross['total_traded']))
if time.time() - up_5m_last_sent_time > 60: # 限频,5分钟内只发送1次
message = "5分钟向上穿越信息,并且成交额增加:" + str(current_up_cross)
print(message)
send_dingtalk_message(webhook=webhook, secret="", message=message)
up_5m_last_sent_time = time.time() # 更新发送时间
else:
print("最近的事件是下穿越,发生在:", last_down_cross['time'])
if not last_down_cross or current_down_cross['total_traded'] > last_down_cross['total_traded']:
message = "5分钟向下穿越,并且成交额增加:" + str(current_down_cross)
if time.time() - down_5m_last_sent_time > 60: # 限频,5分钟内只发送1次
print(message)
send_dingtalk_message(webhook=webhook, secret="", message=message)
down_5m_last_sent_time = time.time() # 更新发送时间
time.sleep(50)
网友评论