import json
import requests
import numpy as np
from datetime import datetime, timedelta, timezone
import requests
import pandas as pd
import time
import hmac
import hashlib
import base64
import urllib.parse
import json
from tools.read_write import read_last_status, update_last_status
webhook="https://oapi.dingtalk.com/robot/send?access_token=8a6ddcf98d3b47c63333580bfe9d0bad55b17272eea05cc9c0af7f7be4de070d"
secret=""
last_status_up_down=None #上次趋势的状态
def send_dingtalk_message(webhook, secret, message):
timestamp = str(round(time.time() * 1000))
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
webhook = '{}×tamp={}&sign={}'.format(webhook, timestamp, sign)
headers = {'Content-Type': 'application/json'}
data = {"msgtype": "text", "text": {"content": message}}
response = requests.post(url=webhook, headers=headers, data=json.dumps(data))
print(response.text)
def get_klines(symbol, interval, limit=100):
# url = "https://api.binance.com/api/v3/klines" #现货的
url = "https://fapi.binance.com/fapi/v1/klines" #合约的
params = {'symbol': symbol, 'interval': interval, 'limit': limit}
print(url)
print(json.dumps(params, indent=4))
response = requests.get(url, params=params)
return response.json()
def calculate_sma(prices, window):
return np.convolve(prices, np.ones(window)/window, mode='valid')
def calculate_ema(prices, period, smoothing=2):
ema = [sum(prices[:period]) / period]
for price in prices[period:]:
ema.append((price * (smoothing / (1 + period))) + ema[-1] * (1 - (smoothing / (1 + period))))
return ema
def calculate_macd(prices):
fast_ema = calculate_ema(prices, 6)
slow_ema = calculate_ema(prices, 7)
macd = np.subtract(fast_ema[-len(slow_ema):], slow_ema)
signal = calculate_ema(macd, 4)
return macd, signal
def calculate_boll(prices, window=21, num_std=2):
sma = calculate_sma(prices, window)
std = np.sqrt(calculate_sma(np.power(np.subtract(prices[window-1:], sma), 2), 1))
upper_band = sma + (std * num_std)
lower_band = sma - (std * num_std)
return upper_band, sma, lower_band
# def find_special_moments(symbol, interval):
# global last_status_up_down
# klines = get_klines(symbol, interval, 100)
# close_prices = np.array([float(kline[4]) for kline in klines], dtype=np.float64)
# high_prices = np.array([float(kline[2]) for kline in klines], dtype=np.float64) # 获取最高价格
# low_prices = np.array([float(kline[3]) for kline in klines], dtype=np.float64) # 获取最低价格
# dates = [datetime.fromtimestamp(int(kline[0]) / 1000, tz=timezone.utc) for kline in klines] # 时间戳转换为datetime对象
#
# upper_band, middle_band, lower_band = calculate_boll(close_prices)
# macd_line, _ = calculate_macd(close_prices)
#
#
# boll_reduction = len(close_prices) - len(middle_band)
# macd_reduction = len(close_prices) - len(macd_line)
# start_index = max(boll_reduction, macd_reduction) + 2
#
# for i in range(start_index, len(close_prices)):
# adjusted_index = i - boll_reduction
# macd_index = i - macd_reduction
# prev_kline_time = dates[i - 1].astimezone(timezone(timedelta(hours=8))) # 前一个K线的时间,转换为UTC+8时区
# if close_prices[i] < middle_band[adjusted_index - 2] and macd_line[macd_index] < 0 and macd_line[macd_index] < \
# macd_line[macd_index - 1] < macd_line[macd_index - 2]:
# mess = f"下跌趋势穿越--中线下方DIF减小: {dates[i].astimezone(timezone(timedelta(hours=8)))}, 前一个K线时间: {prev_kline_time}, 最高价: {high_prices[i - 1]}, 最低价: {low_prices[i - 1]}"
# print(mess)
# last_status_up_down = read_last_status()
# print("1"*88)
# print(last_status_up_down)
# if last_status_up_down !="down":
# print("发送钉钉消息拉 down down down ")
# send_dingtalk_message(webhook,secret,message=mess)
# last_status_up_down="down"
# elif close_prices[i] > middle_band[adjusted_index - 2] and macd_line[macd_index] > 0 and macd_line[macd_index] > \
# macd_line[macd_index - 1] > macd_line[macd_index - 2]:
# mess = f"上涨趋势穿越--中线上方DIF增大: {dates[i].astimezone(timezone(timedelta(hours=8)))}, 前一个K线时间: {prev_kline_time}, 最高价: {high_prices[i - 1]}, 最低价: {low_prices[i - 1]}"
# print(mess)
# last_status_up_down = read_last_status()
# print("2"*88)
# print(last_status_up_down)
# if last_status_up_down !="up":
# print("发送钉钉消息拉 up up up ")
# send_dingtalk_message(webhook,secret,message=mess)
# last_status_up_down="up"
# 调用函数
def find_special_moments(symbol, interval):
global last_status_up_down
klines = get_klines(symbol, interval, 100) # 获取最新的100条K线数据
close_prices = np.array([float(kline[4]) for kline in klines], dtype=np.float64)
high_prices = np.array([float(kline[2]) for kline in klines], dtype=np.float64) # 获取最高价格
low_prices = np.array([float(kline[3]) for kline in klines], dtype=np.float64) # 获取最低价格
dates = [datetime.fromtimestamp(int(kline[0]) / 1000, tz=timezone.utc) for kline in klines] # 时间戳转换为datetime对象
upper_band, middle_band, lower_band = calculate_boll(close_prices)
macd_line, _ = calculate_macd(close_prices)
# 开始处理最后3条数据
for i in range(len(close_prices) - 1, len(close_prices)):
boll_reduction = len(close_prices) - len(middle_band)
macd_reduction = len(close_prices) - len(macd_line)
adjusted_index = i - boll_reduction
macd_index = i - macd_reduction
if i > 0: # 确保有前一个K线的时间可以引用
prev_kline_time = dates[i - 1].astimezone(timezone(timedelta(hours=8))) # 前一个K线的时间,转换为UTC+8时区
if close_prices[i] < middle_band[adjusted_index - 2] and macd_line[macd_index] < 0 and macd_line[macd_index] < \
macd_line[macd_index - 1] < macd_line[macd_index - 2]:
mess = f"下跌趋势穿越--中线下方DIF减小: {dates[i].astimezone(timezone(timedelta(hours=8)))}, 前一个K线时间: {prev_kline_time}, 最高价: {high_prices[i - 1]}, 最低价: {low_prices[i - 1]}"
print(mess)
last_status_up_down = read_last_status()
if last_status_up_down != "down":
print("发送钉钉消息拉 down down down ")
send_dingtalk_message(webhook, secret, message=mess)
last_status_up_down = "down"
update_last_status(last_status_up_down)
elif close_prices[i] > middle_band[adjusted_index - 2] and macd_line[macd_index] > 0 and macd_line[macd_index] > \
macd_line[macd_index - 1] > macd_line[macd_index - 2]:
mess = f"上涨趋势穿越--中线上方DIF增大: {dates[i].astimezone(timezone(timedelta(hours=8)))}, 前一个K线时间: {prev_kline_time}, 最高价: {high_prices[i - 1]}, 最低价: {low_prices[i - 1]}"
print(mess)
last_status_up_down = read_last_status()
if last_status_up_down != "up":
print("发送钉钉消息拉 up up up ")
send_dingtalk_message(webhook, secret, message=mess)
last_status_up_down = "up"
update_last_status(last_status_up_down)
symbol = 'BTCUSDT'
interval = '5m'
while True: # 开始一个无限循环
find_special_moments(symbol, interval) # 调用原有的查询和分析函数
time.sleep(10) # 暂停5秒钟再次执行
网友评论