美文网首页
Python版商品期货跨期对冲策略

Python版商品期货跨期对冲策略

作者: 发明者量化 | 来源:发表于2020-07-08 16:52 被阅读0次

    移植自JavaScript版本的「商品期货跨期对冲 - 百行代码实现」,本策略为简单的教学策略,意图展示Python语言的商品期货策略设计。主要用于学习策略编写、参考设计思路。

    class Hedge:
        '对冲控制类'
        def __init__(self, q, e, initAccount, symbolA, symbolB, hedgeSpread, coverSpread):
            self.q = q 
            self.initAccount = initAccount
            self.status = 0
            self.symbolA = symbolA
            self.symbolB = symbolB
            self.e = e
            self.isBusy = False 
            self.hedgeSpread = hedgeSpread
            self.coverSpread = coverSpread
            self.opAmount = OpAmount 
            
        def poll(self):
            if (self.isBusy or not exchange.IO("status")) or not ext.IsTrading(self.symbolA):
                Sleep(1000)
                return 
    
            insDetailA = exchange.SetContractType(self.symbolA)
            if not insDetailA:
                return 
    
            tickerA = exchange.GetTicker()
            if not tickerA:
                return 
    
            insDetailB = exchange.SetContractType(self.symbolB)
            if not insDetailB:
                return 
    
            tickerB = exchange.GetTicker()
            if not tickerB:
                return 
    
            LogStatus(_D(), "A卖B买", _N(tickerA["Buy"] - tickerB["Sell"]), "A买B卖", _N(tickerA["Sell"] - tickerB["Buy"]))
            action = 0
    
            if self.status == 0:
                if (tickerA["Buy"] - tickerB["Sell"]) > self.hedgeSpread:
                    Log("开仓 A卖B买", tickerA["Buy"], tickerB["Sell"], "#FF0000")
                    action = 1
                elif (tickerB["Buy"] - tickerA["Sell"]) > self.hedgeSpread:
                    Log("开仓 B卖A买", tickerB["Buy"], tickerA["Sell"], "#FF0000")
                    action = 2
            elif self.status == 1 and (tickerA["Sell"] - tickerB["Buy"]) <= self.coverSpread:
                Log("平仓 A买B卖", tickerA["Sell"], tickerB["Buy"], "#FF0000")
                action = 2
            elif self.status == 2 and (tickerB["Sell"] - tickerA["Buy"]) <= self.coverSpread:
                Log("平仓 B买A卖", tickerB["Sell"] - tickerA["Buy"], "#FF0000")
                action = 1 
    
            if action == 0:
                return 
            
            self.isBusy = True
            tasks = []
            if action == 1:
                tasks.append([self.symbolA, "sell" if self.status == 0 else "closebuy"])
                tasks.append([self.symbolB, "buy" if self.status == 0 else "closesell"])
            elif action == 2:
                tasks.append([self.symbolA, "buy" if self.status == 0 else "closesell"])
                tasks.append([self.symbolB, "sell" if self.status == 0 else "closebuy"])
    
            def callBack(task, ret):
                def callBack(task, ret):
                    self.isBusy = False
                    if task["action"] == "sell":
                        self.status = 2
                    elif task["action"] == "buy":
                        self.status = 1
                    else:
                        self.status = 0
                        account = _C(exchange.GetAccount)
                        LogProfit(account["Balance"] - self.initAccount["Balance"], account)
                self.q.pushTask(self.e, tasks[1][0], tasks[1][1], self.opAmount, callBack)
    
            self.q.pushTask(self.e, tasks[0][0], tasks[0][1], self.opAmount, callBack)
    
    
    def main():
        SetErrorFilter("ready|login|timeout")
        Log("正在与交易服务器连接...")
        while not exchange.IO("status"):
            Sleep(1000)
    
        Log("与交易服务器连接成功")
        initAccount = _C(exchange.GetAccount)
        Log(initAccount)
        n = 0 
    
        def callBack(task, ret):
            Log(task["desc"], "成功" if ret else "失败")
    
        q = ext.NewTaskQueue(callBack)
    
        if CoverAll:
            Log("开始平掉所有残余仓位...")
            ext.NewPositionManager().CoverAll()
            Log("操作完成")
    
        t = Hedge(q, exchange, initAccount, SA, SB, HedgeSpread, CoverSpread)
        while True:
            q.poll()
            t.poll()
    

    只是移植一下代码,感觉有点太简单了,我们继续来做一些改造,给策略加上图表。

    LogStatus函数调用的位置之前加上以下代码,把实时的价格差做成K线统计出来,self.preBarTimeHedge类增加的一个成员,用来记录最新BAR的时间戳,画图我们使用「画线类库」,直接调用画图接口,很简单就可以画出图表。

    
            # 计算差价K线
            r = exchange.GetRecords()
            if not r:
                return 
            diff = tickerB["Last"] - tickerA["Last"]
            if r[-1]["Time"] != self.preBarTime:
                # 更新
                self.records.append({"Time": r[-1]["Time"], "High": diff, "Low": diff, "Open": diff, "Close": diff, "Volume": 0})
                self.preBarTime = r[-1]["Time"]
            if diff > self.records[-1]["High"]:
                self.records[-1]["High"] = diff
            if diff < self.records[-1]["Low"]:
                self.records[-1]["Low"] = diff
            self.records[-1]["Close"] = diff
            ext.PlotRecords(self.records, "diff:B-A")
            ext.PlotHLine(self.hedgeSpread if diff > 0 else -self.hedgeSpread, "hedgeSpread")
            ext.PlotHLine(self.coverSpread if diff > 0 else -self.coverSpread, "coverSpread")
    

    回测时的效果:


    接下来,我们再加入交互功能,让策略在运行时可以修改HedgeSpreadCoverSpread参数,控制对冲开仓差价、平仓差价。还需要一个一键平仓的按钮。我们在策略编辑页面增加这几个控件。


    然后在策略的主循环中,q.poll(),t.poll()调用之后,加上交互控制代码。

        while True:
            q.poll()
            t.poll()
            # 以下交互控制代码
            cmd = GetCommand()
            if cmd:
                arr = cmd.split(":")
                if arr[0] == "AllCover":
                    p.CoverAll()
                elif arr[0] == "SetHedgeSpread":
                    t.SetHedgeSpread(float(arr[1]))
                elif arr[0] == "SetCoverSpread":
                    t.SetCoverSpread(float(arr[1]))
    

    Python版商品期货跨期对冲策略 (升级图表、交互功能)

    策略用于教学,实盘根据自身需求优化调整。
    如有问题,欢迎留言。

    相关文章

      网友评论

          本文标题:Python版商品期货跨期对冲策略

          本文链接:https://www.haomeiwen.com/subject/euiccktx.html