美文网首页
手把手教你实现一个行情收集器

手把手教你实现一个行情收集器

作者: 发明者量化 | 来源:发表于2020-04-30 08:24 被阅读0次

    在程序化交易、量化交易中研究策略、设计策略、回测分析时离不开行情数据的支持。市面上的所有数据都收集也不现实,毕竟数据量太大。对于数字货币市场来说,发明者量化交易平台上支持有限的交易所、交易对的回测数据。如果想回测一些暂时不支持数据的交易所、交易对。可以使用自定义数据源来进行回测,但是这个前提是要自己有数据才行。所以就迫切需要一个行情收集程序,并且能持久化保存,最好还能实时获取。

    这样可以解决几个需求,例如:

    • 可以给多个机器人提供数据源,可以缓解每个机器人访问交易所接口的频率。
    • 可以让机器人启动时,获取一个K线BAR数量足够多的K线数据,再也不用担心机器人起始的时候K线BAR数量不足了。
    • 可以收集小币种行情数据,用来给发明者量化交易平台回测系统提供自定义数据源,从而使用回测系统回测策略。
    • 等等..

    计划使用python实现,为什么?因为很方便 :)
    有了需求,动手!

    准备

    python的pymongo库

    因为要用到数据库,做持久化保存。数据选择使用MongoDB,使用Python语言写收集程序,所以需要这个数据库的驱动库。
    在Python上安装pymongo即可。

    在托管者所在设备安装MongoDB

    例如:MAC安装MongoDB,当然WIN系统安装MongoDB也差不多,网上有很多教程,以在苹果MAC系统安装为例:

    下载
    下载链接:https://www.mongodb.com/download-center?jmp=nav#community

    解压缩
    下载后,解压缩到目录:/usr/local

    配置环境变量
    终端输入:open -e .bash_profile,打开文件后,写入:export PATH=${PATH}:/usr/local/MongoDB/bin
    保存后,终端使用source .bash_profile使修改生效。

    手动配置数据库文件目录和日志目录
    创建目录/usr/local/data/db中对应的文件夹。
    创建目录/usr/local/data/logs中对应的文件夹。

    编辑配置文件```mongo.conf```:
    ```
    #bind_ip_all = true                  # 任何机器可以连接
    bind_ip = 127.0.0.1                  # 本机和192.168.0.3可以访问
    port = 27017                         # 实例运行在27017端口(默认)
    dbpath = /usr/local/data/db # 数据文件夹存放地址(db要预先创建)
    logpath = /usr/local/data/logs/mongodb.log # 日志文件地址
    logappend = false                    # 启动时 添加还是重写日志文件
    fork = false                         # 是否后台运行
    auth = false                         # 开启校验用户
    ```
    

    运行MongoDB服务

    命令:
    ```
    ./mongod -f mongo.conf
    ```
    

    停止服务

    ```
    use admin;
    db.shutdownServer();
    ```
    

    实现收集器程序

    收集器以发明者量化交易平台上的Python机器人策略形式运行。由于本人Python水平有限,只是实现了一个简单的例子,用于展示本文的思路。

    收集器策略代码:

    import pymongo
    import json
    
    def main():
        Log("测试数据收集")
        
        # 连接数据库服务
        myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   # mongodb://127.0.0.1:27017
        # 创建数据库
        huobi_DB = myDBClient["huobi"]
        
        # 打印目前数据库表
        collist = huobi_DB.list_collection_names()
        Log("collist:", collist)
        
        # 检测是否删除表
        arrDropNames = json.loads(dropNames)
        if isinstance(arrDropNames, list):
            for i in range(len(arrDropNames)):
                dropName = arrDropNames[i]
                if isinstance(dropName, str):
                    if not dropName in collist:
                        continue
                    tab = huobi_DB[dropName]
                    Log("dropName:", dropName, "删除:", dropName)
                    ret = tab.drop()
                    collist = huobi_DB.list_collection_names()
                    if dropName in collist:
                        Log(dropName, "删除失败")
                    else :
                        Log(dropName, "删除成功")
        
        # 创建records表
        huobi_DB_Records = huobi_DB["records"]
        
        # 请求数据
        preBarTime = 0
        index = 1
        while True:
            r = _C(exchange.GetRecords)
            if len(r) < 2:
                Sleep(1000)
                continue
            if preBarTime == 0:
                # 首次写入所有BAR数据
                for i in range(len(r) - 1):
                    # 逐根写入
                    bar = r[i]
                    huobi_DB_Records.insert_one({"index": index, "High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                    index += 1
                preBarTime = r[-1]["Time"]
            elif preBarTime != r[-1]["Time"]:
                bar = r[-2]
                huobi_DB_Records.insert_one({"index": index, "High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
                index += 1
                preBarTime = r[-1]["Time"]
            LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
            Sleep(10000)
            
    
    

    完整策略地址:链接

    使用数据

    创建使用数据的策略机器人。
    注意:需要勾选上「画线类库」,没有的话可以去复制一个到自己策略库。

    import pymongo
    import json
    
    def main():
        Log("测试使用数据库数据")
        
        # 连接数据库服务
        myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   # mongodb://127.0.0.1:27017
        # 创建数据库
        huobi_DB = myDBClient["huobi"]
        
        # 打印目前数据库表
        collist = huobi_DB.list_collection_names()
        Log("collist:", collist)
        
        # 查询数据打印
        huobi_DB_Records = huobi_DB["records"]
        
        while True:
            arrRecords = []
            for x in huobi_DB_Records.find():
                bar = {
                    "High": x["High"], 
                    "Low": x["Low"], 
                    "Close": x["Close"], 
                    "Open": x["Open"], 
                    "Time": x["Time"], 
                    "Volume": x["Volume"]
                }
                arrRecords.append(bar)
            
            # 使用画线类库,把取到的K线数据画出来
            ext.PlotRecords(arrRecords, "K")
            LogStatus(_D(), "records length:", len(arrRecords))
            Sleep(10000)
    

    可以看到使用数据的策略机器人代码中没有访问任何交易所接口,通过访问数据库获取数据,行情收集器程序没有记录当前BAR的数据,收集的是已经完成状态的K线BAR,如果需要当前BAR实时数据,稍加修改即可。
    当前的例子代码,只是为了演示,在访问数据库中表内的数据记录时是全部获取,这样随着收集数据时间增长,收集数据越来越多,全部查询出来会一定程度上影响性能,可以设计成只查询比当前数据新的数据,添加到当前数据中。

    运行

    运行托管者程序


    在托管者所在设备,运行起来MongoDB数据库服务
    ./mongod -f mongo.conf

    收集器运行,收集发明者量化交易平台的模拟盘wexAppBTC_USDT交易对:
    地址:wexApp

    使用数据库数据的机器人A:


    使用数据库数据的机器人B:


    wexApp页面:

    图中可以看到,不同ID的机器人,共享使用一个数据源的K线数据。

    收集任意周期的K线数据

    依托于发明者量化交易平台的强大功能,我们可以轻松实现收集任意周期的K线数据。
    比如,我要收集3分钟K线,交易所没有3分钟K线怎么办?没关系,可以轻松实现。

    我们修改收集器机器人的配置,K线周期设置为3分钟,发明者量化交易平台会自动合成3分钟K线给收集器程序。


    我们使用参数删除表的名称,设置:["records"]删除之前收集的1分钟K线数据表。准备收集3分钟K线数据。

    启动收集器程序,再启动使用数据的策略机器人

    可以看到画出的K线图表,BAR之间间隔时间就是3分钟了,每根BAR就是3分钟周期的K线柱。

    下期我们尝试实现自定义数据源的需求实现。
    感谢阅读

    相关文章

      网友评论

          本文标题:手把手教你实现一个行情收集器

          本文链接:https://www.haomeiwen.com/subject/cosnwhtx.html