美文网首页
OBV MACD BOLL

OBV MACD BOLL

作者: Leoguo小哥 | 来源:发表于2024-08-06 09:55 被阅读0次
import time

import requests
from datetime import datetime,timezone,timedelta
import pytz
import re
from Utils.dingding import send_dingtalk_message

if __name__ == '__main__':
    webhook = "https://oapi.dingtalk.com/robot/send?access_token=8a6ddcf98d3b47c63333580bfe9d0bad55b17272eea05cc9c0af7f7be4de070d"
    # 初始化存储满足条件的时间列表
    # 初始化存储满足条件的时间列表
    bullish_green_hollow = []  # 空心绿色(多头增加)
    bearish_red_solid = []  # 实心红色(空头减少)
    previous_bullish_green_hollow = []  # 上一次的多头增加时间点
    previous_bearish_red_solid = []  # 上一次的空头减少时间点

    while True:
        time.sleep(2)




        def get_klines(symbol, interval, limit=500):
            url = "https://fapi.binance.com/fapi/v1/klines"
            params = {
                'symbol': symbol,
                'interval': interval,
                'limit': limit,
            }
            try:
                response = requests.get(url, params=params)
                response.raise_for_status()  # 将触发异常,如果请求返回非200响应
                return response.json()
            except requests.RequestException as e:
                print(f"请求失败: {e}")


        def calculate_obv(klines):
            obv = 0
            obv_values = [0]  # 第一日的OBV值设为0
            timestamps = [klines[0][0]]  # 存储时间戳

            for i in range(1, len(klines)):
                current_close = float(klines[i][4])  # 当前日收盘价
                previous_close = float(klines[i - 1][4])  # 前一日收盘价
                current_volume = float(klines[i][5])  # 当前日成交量
                timestamps.append(klines[i][0])  # 添加时间戳

                if current_close > previous_close:
                    obv += current_volume
                elif current_close < previous_close:
                    obv -= current_volume

                obv_values.append(obv)

            return obv_values, timestamps


        def calculate_sma(values, window):
            """计算简单移动平均线(SMA)"""
            sma = []
            for i in range(len(values)):
                if i < window:
                    sma.append(None)
                else:
                    sma.append(sum(values[i - window:i]) / window)
            return sma


        def convert_timestamp_to_datetime(timestamp):
            """将时间戳转换为UTC+8时间"""
            utc_time = datetime.utcfromtimestamp(timestamp / 1000)  # Binance的时间戳是毫秒级的
            utc_time = utc_time.replace(tzinfo=timezone.utc)
            local_time = utc_time.astimezone(pytz.timezone('Asia/Shanghai'))
            return local_time.strftime('%Y-%m-%d %H:%M:%S')

        symbol = "SOLUSDT"
        klines = get_klines(symbol=symbol, interval="3m")
        obv_values, timestamps = calculate_obv(klines)
        sma_obv = calculate_sma(obv_values, window=21)

        obv_cross_up, obv_cross_down = [], []
        for i in range(1, len(obv_values)):
            if sma_obv[i] is not None and sma_obv[i - 1] is not None:
                time_str = convert_timestamp_to_datetime(timestamps[i])
                if obv_values[i] > sma_obv[i] and obv_values[i - 1] <= sma_obv[i - 1]:
                    obv_cross_up.append((time_str, obv_values[i]))  # 上穿越
                elif obv_values[i] < sma_obv[i] and obv_values[i - 1] >= sma_obv[i - 1]:
                    obv_cross_down.append((time_str, obv_values[i]))  # 下穿越

        print("OBV上穿越:", list(reversed(obv_cross_up)))
        print("OBV下穿越:", list(reversed(obv_cross_down)))


        def calculate_boll(klines, window=21):
            close_prices = [float(kline[4]) for kline in klines]
            sma = calculate_sma(close_prices, window)
            std_deviation = []

            for i in range(len(close_prices)):
                if i < window - 1:
                    std_deviation.append(None)
                else:
                    windowed_prices = close_prices[i - window + 1:i + 1]
                    mean = sum(windowed_prices) / window
                    variance = sum((p - mean) ** 2 for p in windowed_prices) / window
                    std_deviation.append(variance ** 0.5)

            return sma, std_deviation


        # 在主程序部分增加BOLL穿越检测
        sma_obv_boll, _ = calculate_boll(klines, window=21)

        boll_cross_up, boll_cross_down = [], []
        for i in range(1, len(klines)):
            current_close = float(klines[i][4])
            previous_close = float(klines[i - 1][4])
            if sma_obv_boll[i] is not None and sma_obv_boll[i - 1] is not None:
                time_str = convert_timestamp_to_datetime(timestamps[i])
                if current_close > sma_obv_boll[i] and previous_close <= sma_obv_boll[i - 1]:
                    boll_cross_up.append((time_str, current_close))  # BOLL上穿越
                elif current_close < sma_obv_boll[i] and previous_close >= sma_obv_boll[i - 1]:
                    boll_cross_down.append((time_str, current_close))  # BOLL下穿越

        print("BOLL上穿越:", list(reversed(boll_cross_up)))
        print("BOLL下穿越:", list(reversed(boll_cross_down)))

        ###########
        '''
        比较当前时间与事件时间:
        然后,我们将检查当前时间是否在“OBV下穿且BOLL中线以下的时刻”加减1分钟的范围内。
        '''


        def is_within_one_minute(current_time, event_time_str):
            """检查当前时间是否在事件时间的1分钟内"""
            event_time = datetime.strptime(event_time_str, '%Y-%m-%d %H:%M:%S')
            one_minute_before = event_time - timedelta(minutes=1)
            one_minute_after = event_time + timedelta(minutes=1)
            return one_minute_before <= current_time <= one_minute_after


        def send_message_if_within_one_minute(obv_down_boll_below_times):
            """如果当前时间在OBV下穿且BOLL中线以下的时刻1分钟内,发送钉钉消息"""
            # 获取当前时间(UTC+8)
            current_time_utc8 = datetime.now(pytz.timezone('Asia/Shanghai'))
            for time_str in obv_down_boll_below_times:
                if is_within_one_minute(current_time_utc8, time_str):
                    message = f"当前K线在{time_str}(OBV下穿且BOLL中线以下)的1分钟范围内"
                    print(message)  # 这里打印消息用于调试,实际应用中可发送钉钉消息
                    send_dingtalk_message(webhook=webhook, secret="", message=message)
                    break  # 找到一个匹配即可,避免重复发送消息


        #########

        # 确定OBV下穿并且收盘价低于BOLL中线的时间点
        obv_down_boll_below = []
        for i in range(1, len(obv_values)):
            if sma_obv[i] is not None and sma_obv[i - 1] is not None:
                time_str = convert_timestamp_to_datetime(timestamps[i])
                if obv_values[i] < sma_obv[i] and obv_values[i - 1] >= sma_obv[i - 1]:
                    # OBV下穿检测
                    current_close = float(klines[i][4])
                    if sma_obv_boll[i] is not None and current_close < sma_obv_boll[i]:
                        # 检查收盘价是否低于BOLL中线
                        obv_down_boll_below.append((time_str, current_close))

        obv_down_boll_below_reversed = obv_down_boll_below[::-1]
        print("OBV下穿且BOLL中线以下的时刻:")
        for time_str, close in obv_down_boll_below_reversed[:1]:
            print(time_str, close)
            # time_str = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
            # send_message_if_within_one_minute(time_str)

        # obv_down_boll_below_times = [time_str for time_str, close in obv_down_boll_below_reversed[::-1][:1]]  # 从之前的代码获取时刻
        # send_message_if_within_one_minute(obv_down_boll_below_times)

        # 确定OBV上穿并且收盘价高于BOLL中线的时间点
        obv_up_boll_above = []
        for i in range(1, len(obv_values)):
            if sma_obv[i] is not None and sma_obv[i - 1] is not None:
                time_str = convert_timestamp_to_datetime(timestamps[i])  # 将时间戳转换为UTC+8时间
                if obv_values[i] > sma_obv[i] and obv_values[i - 1] <= sma_obv[i - 1]:
                    # OBV上穿检测
                    current_close = float(klines[i][4])
                    if sma_obv_boll[i] is not None and current_close > sma_obv_boll[i]:
                        # 检查收盘价是否高于BOLL中线
                        obv_up_boll_above.append((time_str, current_close))

        # 倒序列表
        obv_up_boll_above_reversed = obv_up_boll_above[::-1]

        # 取最新的3条数据
        latest_3_entries = obv_up_boll_above_reversed[:1]

        print("最新的3条OBV上穿且BOLL中线以上的时刻:")
        for time_str, close in latest_3_entries:
            print(time_str, close)
            # time_str = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
            # send_message_if_within_one_minute(time_str)


        # obv_up_boll_below_times = [time_str for time_str, close in obv_up_boll_above_reversed[::-1][:1]]  # 从之前的代码获取时刻
        # send_message_if_within_one_minute(obv_up_boll_below_times)

        ############macd
        def calculate_ema_macd(prices, window, smoothing=2):
            ema = [sum(prices[:window]) / window]
            for price in prices[window:]:
                ema.append((price * (smoothing / (1 + window))) + ema[-1] * (1 - (smoothing / (1 + window))))
            return ema


        def calculate_macd(prices, short_window=12, long_window=26, signal_window=9):
            short_ema = calculate_ema_macd(prices, short_window)[long_window - short_window:]
            long_ema = calculate_ema_macd(prices, long_window)
            macd_line = [s - l for s, l in zip(short_ema, long_ema)]
            signal_line = calculate_ema_macd(macd_line, signal_window)
            macd_histogram = [m - s for m, s in zip(macd_line, signal_line)]
            return macd_line, signal_line, macd_histogram


        # 假设klines数据已经获取,并且已经计算了MACD值
        close_prices = [float(kline[4]) for kline in klines]
        macd_line, signal_line, macd_histogram = calculate_macd(close_prices)

        for i in range(1, len(macd_histogram)):
            if macd_histogram[i] > 0 and macd_histogram[i] > macd_histogram[i - 1]:
                # 多头增加空心绿色
                time_str = convert_timestamp_to_datetime(klines[i][0])
                # print(f"{time_str} - 空心绿色(多头增加)")
            elif macd_histogram[i] < 0 and macd_histogram[i] < macd_histogram[i - 1]:
                # 空头减少实心红色
                time_str = convert_timestamp_to_datetime(klines[i][0])
                # print(f"{time_str} - 实心红色(空头减少)")

        ################
        '''
        增加逻辑并打印当前K线属于空心绿色(多头增加)并且最新的OBV上穿且BOLL中线以上的时刻和实心红色(空头减少)并且OBV下穿且BOLL中线以下的时刻:
        '''
        # 计算MACD
        close_prices = [float(kline[4]) for kline in klines]
        macd_line, signal_line, macd_histogram = calculate_macd(close_prices)

        # 检查多头增加的时间点是否有新的事件
        new_bullish_events = [time for time in bullish_green_hollow if time not in previous_bullish_green_hollow]
        if new_bullish_events:
            message = f"{symbol}新的穿越多头增加事件发生:{new_bullish_events}"
            send_dingtalk_message(webhook=webhook, secret="", message=message)
            print(message)

        # 检查空头减少的时间点是否有新的事件
        new_bearish_events = [time for time in bearish_red_solid if time not in previous_bearish_red_solid]
        if new_bearish_events:
            message = f"{symbol}新的穿越空头减少事件发生:{new_bearish_events}"
            send_dingtalk_message(webhook=webhook, secret="", message=message)
            print(message)

        # 更新记录的时间点列表以供下次循环使用
        previous_bullish_green_hollow = bullish_green_hollow[:]
        previous_bearish_red_solid = bearish_red_solid[:]


        for i in range(1, len(macd_histogram)):
            time_str = convert_timestamp_to_datetime(klines[i][0])

            # 检查多头增加的条件
            if macd_histogram[i] > 0 and macd_histogram[i] > macd_histogram[i - 1]:
                for j in range(1, len(obv_values)):
                    # 确保时间对齐并跳过SMA为None的情况
                    if timestamps[j] == klines[i][0] and sma_obv[j] is not None and sma_obv[j - 1] is not None:
                        if obv_values[j] > sma_obv[j] and obv_values[j - 1] <= sma_obv[j - 1]:
                            current_close = float(klines[j][4])
                            if sma_obv_boll[j] is not None and current_close > sma_obv_boll[j]:
                                bullish_green_hollow.append(time_str)

            # 检查空头减少的条件
            if macd_histogram[i] < 0 and macd_histogram[i] < macd_histogram[i - 1]:
                for j in range(1, len(obv_values)):
                    # 确保时间对齐并跳过SMA为None的情况
                    if timestamps[j] == klines[i][0] and sma_obv[j] is not None and sma_obv[j - 1] is not None:
                        if obv_values[j] < sma_obv[j] and obv_values[j - 1] >= sma_obv[j - 1]:
                            current_close = float(klines[j][4])
                            if sma_obv_boll[j] is not None and current_close < sma_obv_boll[j]:
                                bearish_red_solid.append(time_str)

        print("做多---空心绿色(多头增加)且OBV上穿BOLL中线的时刻:", bullish_green_hollow[::-1])
        print("做空---实心红色(空头减少)且OBV下穿BOLL中线的时刻:", bearish_red_solid[::-1])



相关文章

网友评论

      本文标题:OBV MACD BOLL

      本文链接:https://www.haomeiwen.com/subject/kumxkjtx.html