美文网首页
Python版商品期货多品种均线策略

Python版商品期货多品种均线策略

作者: 发明者量化 | 来源:发表于2020-06-16 10:02 被阅读0次

    完全移植自「CTP商品期货多品种均线策略」,由于Python版本商品期货策略还没有一个多品种的策略,所以就移植了JavaScript版本的「CTP商品期货多品种均线策略」。提供一些Python商品期货多品种策略的设计思路、例子。不论JavaScript版本还是Python版本,策略架构设计源于商品期货多品种海龟策略

    均线策略作为最简单的策略,是非常易于学习的,因为均线策略没有什么高深的算法,复杂的逻辑。思路清晰不绕弯,可以让初学者更专注于策略设计方面的学习,甚至可以把均线策略相关的代码剔除,留下一个多品种策略框架,可以很轻松的扩展成ATR、MACD、BOLL等策略。

    JavaScript版本相关文章:https://www.fmz.com/bbs-topic/5235

    策略源码

    '''backtest
    start: 2019-07-01 09:00:00
    end: 2020-03-25 15:00:00
    period: 1d
    exchanges: [{"eid":"Futures_CTP","currency":"FUTURES"}]
    '''
    
    import json
    import re
    import time
    
    _bot = ext.NewPositionManager()
    
    class Manager:
        '策略逻辑控制类'
    
        ACT_IDLE = 0
        ACT_LONG = 1
        ACT_SHORT = 2 
        ACT_COVER = 3
    
        ERR_SUCCESS = 0
        ERR_SET_SYMBOL = 1
        ERR_GET_ORDERS = 2
        ERR_GET_POS = 3
        ERR_TRADE = 4
        ERR_GET_DEPTH = 5
        ERR_NOT_TRADING = 6
        errMsg = ["成功", "切换合约失败", "获取订单失败", "获取持仓失败", "交易下单失败", "获取深度失败", "不在交易时间"]
        
        def __init__(self, needRestore, symbol, keepBalance, fastPeriod, slowPeriod):
            # 获取symbolDetail 
            symbolDetail = _C(exchange.SetContractType, symbol)
            if symbolDetail["VolumeMultiple"] == 0 or symbolDetail["MaxLimitOrderVolume"] == 0 or symbolDetail["MinLimitOrderVolume"] == 0 or symbolDetail["LongMarginRatio"] == 0 or symbolDetail["ShortMarginRatio"] == 0:
                Log(symbolDetail)
                raise Exception("合约信息异常")
            else :
                Log("合约", symbolDetail["InstrumentName"], "一手", symbolDetail["VolumeMultiple"], "份,最大下单量", symbolDetail["MaxLimitOrderVolume"], "保证金率:", _N(symbolDetail["LongMarginRatio"]), _N(symbolDetail["ShortMarginRatio"]), "交割日期", symbolDetail["StartDelivDate"])
    
            # 初始化    
            self.symbol = symbol
            self.keepBalance = keepBalance
            self.fastPeriod = fastPeriod
            self.slowPeriod = slowPeriod
    
            self.marketPosition = None
            self.holdPrice = None
            self.holdAmount = None 
            self.holdProfit = None
    
            self.task = {
                "action" : Manager.ACT_IDLE,
                "amount" : 0,
                "dealAmount" : 0,
                "avgPrice" : 0,
                "preCost" : 0,
                "preAmount" : 0,
                "init" : False,
                "retry" : 0,
                "desc" : "空闲",
                "onFinish" : None
            }
            
            self.lastPrice = 0 
            self.symbolDetail = symbolDetail
    
            # 持仓状态信息
            self.status = {
                "symbol" : symbol,
                "recordsLen" : 0,
                "vm" : [], 
                "open" : 0,
                "cover" : 0,
                "st" : 0,
                "marketPosition" : 0,
                "lastPrice" : 0,
                "holdPrice" : 0, 
                "holdAmount" : 0,
                "holdProfit" : 0, 
                "symbolDetail" : symbolDetail,
                "lastErr" : "",
                "lastErrTime" : "",
                "isTrading" : False   
            }
            
            # 对象构造时其他处理工作
            vm = None
            if RMode == 0:
                vm = _G(self.symbol)
            else:
                vm = json.loads(VMStatus)[self.symbol]
            if vm:
                Log("准备恢复进度,当前合约状态为", vm)
                self.reset(vm[0])
            else:
                if needRestore:
                    Log("没有找到" + self.symbol + "的进度恢复信息")
                self.reset()
    
        def setLastError(self, err=None):
            if err is None:
                self.status["lastErr"] = ""
                self.status["lastErrTime"] = ""
                return 
            t = _D()
            self.status["lastErr"] = err
            self.status["lastErrTime"] = t
        
        def reset(self, marketPosition=None):
            if marketPosition is not None:
                self.marketPosition = marketPosition
                pos = _bot.GetPosition(self.symbol, PD_LONG if marketPosition > 0 else PD_SHORT)
                if pos is not None:
                    self.holdPrice = pos["Price"]
                    self.holdAmount = pos["Amount"]
                    Log(self.symbol, "仓位", pos)
                else :
                    raise Exception("恢复" + self.symbol + "的持仓状态出错,没有找到仓位信息")
                Log("恢复", self.symbol, "持仓均价:", self.holdPrice, "持仓数量:", self.holdAmount)
                self.status["vm"] = [self.marketPosition]
            else :
                self.marketPosition = 0
                self.holdPrice = 0 
                self.holdAmount = 0 
                self.holdProfit = 0
            self.holdProfit = 0
            self.lastErr = ""
            self.lastErrTime = ""
    
        def Status(self):
            self.status["marketPosition"] = self.marketPosition
            self.status["holdPrice"] = self.holdPrice
            self.status["holdAmount"] = self.holdAmount
            self.status["lastPrice"] = self.lastPrice
            if self.lastPrice > 0 and self.holdAmount > 0 and self.marketPosition != 0:
                self.status["holdProfit"] = _N((self.lastPrice - self.holdPrice) * self.holdAmount * self.symbolDetail["VolumeMultiple"], 4) * (1 if self.marketPosition > 0 else -1)
            else :
                self.status["holdProfit"] = 0 
            return self.status
    
        def setTask(self, action, amount = None, onFinish = None):
            self.task["init"] = False 
            self.task["retry"] = 0
            self.task["action"] = action
            self.task["preAmount"] = 0
            self.task["preCost"] = 0
            self.task["amount"] = 0 if amount is None else amount
            self.task["onFinish"] = onFinish
            if action == Manager.ACT_IDLE:
                self.task["desc"] = "空闲"
                self.task["onFinish"] = None
            else:
                if action != Manager.ACT_COVER:
                    self.task["desc"] = ("加多仓" if action == Manager.ACT_LONG else "加空仓") + "(" + str(amount) + ")"
                else :
                    self.task["desc"] = "平仓"
                Log("接收到任务", self.symbol, self.task["desc"])
                self.Poll(True)
    
        def processTask(self):
            insDetail = exchange.SetContractType(self.symbol)
            if not insDetail:
                return Manager.ERR_SET_SYMBOL
            SlideTick = 1
            ret = False
            if self.task["action"] == Manager.ACT_COVER:
                hasPosition = False
                while True:
                    if not ext.IsTrading(self.symbol):
                        return Manager.ERR_NOT_TRADING
                    hasPosition = False
                    positions = exchange.GetPosition()
                    if positions is None:
                        return Manager.ERR_GET_POS
                    depth = exchange.GetDepth()
                    if depth is None:
                        return Manager.ERR_GET_DEPTH
                    orderId = None
                    for i in range(len(positions)):
                        if positions[i]["ContractType"] != self.symbol:
                            continue
                        amount = min(insDetail["MaxLimitOrderVolume"], positions[i]["Amount"])
                        if positions[i]["Type"] == PD_LONG or positions[i]["Type"] == PD_LONG_YD:
                            exchange.SetDirection("closebuy_today" if positions[i].Type == PD_LONG else "closebuy")
                            orderId = exchange.Sell(_N(depth["Bids"][0]["Price"] - (insDetail["PriceTick"] * SlideTick), 2), min(amount, depth["Bids"][0]["Amount"]), self.symbol, "平今" if positions[i]["Type"] == PD_LONG else "平昨", "Bid", depth["Bids"][0])
                            hasPosition = True
                        elif positions[i]["Type"] == PD_SHORT or positions[i]["Type"] == PD_SHORT_YD:
                            exchange.SetDirection("closesell_today" if positions[i]["Type"] == PD_SHORT else "closesell")
                            orderId = exchange.Buy(_N(depth["Asks"][0]["Price"] + (insDetail["PriceTick"] * SlideTick), 2), min(amount, depth["Asks"][0]["Amount"]), self.symbol, "平今" if positions[i]["Type"] == PD_SHORT else "平昨", "Ask", depth["Asks"][0])
                            hasPosition = True
                        if hasPosition:
                            if not orderId:
                                return Manager.ERR_TRADE
                            Sleep(1000)
                            while True:
                                orders = exchange.GetOrders()
                                if orders is None:
                                    return Manager.ERR_GET_ORDERS
                                if len(orders) == 0:
                                    break
                                for i in range(len(orders)):
                                    exchange.CancelOrder(orders[i]["Id"])
                                    Sleep(500)
                    if not hasPosition:
                        break
                ret = True
            elif self.task["action"] == Manager.ACT_LONG or self.task["action"] == Manager.ACT_SHORT:
                while True:
                    if not ext.IsTrading(self.symbol):
                        return Manager.ERR_NOT_TRADING
                    Sleep(1000)
                    while True:
                        orders = exchange.GetOrders()
                        if orders is None:
                            return Manager.ERR_GET_ORDERS
                        if len(orders) == 0:
                            break
                        for i in range(len(orders)):
                            exchange.CancelOrder(orders[i]["Id"])
                            Sleep(500)
                    positions = exchange.GetPosition()
                    if positions is None:
                        return Manager.ERR_GET_POS
                    pos = None
                    for i in range(len(positions)):
                        if positions[i]["ContractType"] == self.symbol and (((positions[i]["Type"] == PD_LONG or positions[i]["Type"] == PD_LONG_YD) and self.task["action"] == Manager.ACT_LONG) or ((positions[i]["Type"] == PD_SHORT) or positions[i]["Type"] == PD_SHORT_YD) and self.task["action"] == Manager.ACT_SHORT):
                            if not pos:
                                pos = positions[i]
                                pos["Cost"] = positions[i]["Price"] * positions[i]["Amount"]
                            else :
                                pos["Amount"] += positions[i]["Amount"]
                                pos["Profit"] += positions[i]["Profit"]
                                pos["Cost"] += positions[i]["Price"] * positions[i]["Amount"]
                    # records pre position 
                    if not self.task["init"]:
                        self.task["init"] = True
                        if pos:
                            self.task["preAmount"] = pos["Amount"]
                            self.task["preCost"] = pos["Cost"]
                        else:
                            self.task["preAmount"] = 0
                            self.task["preCost"] = 0
                    remain = self.task["amount"]
                    if pos:
                        self.task["dealAmount"] = pos["Amount"] - self.task["preAmount"]
                        remain = int(self.task["amount"] - self.task["dealAmount"])
                        if remain <= 0 or self.task["retry"] >= MaxTaskRetry:
                            ret = {
                                "price" : (pos["Cost"] - self.task["preCost"]) / (pos["Amount"] - self.task["preAmount"]),
                                "amount" : (pos["Amount"] - self.task["preAmount"]),
                                "position" : pos
                            }
                            break
                    elif self.task["retry"] >= MaxTaskRetry:
                        ret = None
                        break
    
                    depth = exchange.GetDepth()
                    if depth is None:
                        return Manager.ERR_GET_DEPTH
                    orderId = None
                    if self.task["action"] == Manager.ACT_LONG:
                        exchange.SetDirection("buy")
                        orderId = exchange.Buy(_N(depth["Asks"][0]["Price"] + (insDetail["PriceTick"] * SlideTick), 2), min(remain, depth["Asks"][0]["Amount"]), self.symbol, "Ask", depth["Asks"][0])
                    else:
                        exchange.SetDirection("sell")
                        orderId = exchange.Sell(_N(depth["Bids"][0]["Price"] - (insDetail["PriceTick"] * SlideTick), 2), min(remain, depth["Bids"][0]["Amount"]), self.symbol, "Bid", depth["Bids"][0])
                    if orderId is None:
                        self.task["retry"] += 1
                        return Manager.ERR_TRADE
            if self.task["onFinish"]:
                self.task["onFinish"](ret)
            self.setTask(Manager.ACT_IDLE)
            return Manager.ERR_SUCCESS
    
        def Poll(self, subroutine = False):
            # 判断交易时段
            self.status["isTrading"] = ext.IsTrading(self.symbol)
            if not self.status["isTrading"]:
                return 
    
            # 执行下单交易任务
            if self.task["action"] != Manager.ACT_IDLE:
                retCode = self.processTask()
                if self.task["action"] != Manager.ACT_IDLE:
                    self.setLastError("任务没有处理成功:" + Manager.errMsg[retCode] + ", " + self.task["desc"] + ", 重试:" + str(self.task["retry"]))
                else :
                    self.setLastError()
                return 
    
            if subroutine:
                return
    
            suffix = "@" if WXPush else ""
            # switch symbol
            _C(exchange.SetContractType, self.symbol)
    
            # 获取K线数据
            records = exchange.GetRecords()
            if records is None:
                self.setLastError("获取K线失败")
                return 
            self.status["recordsLen"] = len(records)
            if len(records) < self.fastPeriod + 2 or len(records) < self.slowPeriod + 2:
                self.setLastError("K线长度小于 均线周期:" + str(self.fastPeriod) + "或" + str(self.slowPeriod))
                return 
    
            opCode = 0   # 0 : IDLE , 1 : LONG , 2 : SHORT , 3 : CoverALL 
            lastPrice = records[-1]["Close"]
            self.lastPrice = lastPrice
    
            fastMA = TA.EMA(records, self.fastPeriod)
            slowMA = TA.EMA(records, self.slowPeriod)
    
            # 策略逻辑
            if self.marketPosition == 0:
                if fastMA[-3] < slowMA[-3] and fastMA[-2] > slowMA[-2]:
                    opCode = 1 
                elif fastMA[-3] > slowMA[-3] and fastMA[-2] < slowMA[-2]:
                    opCode = 2
            else:
                if self.marketPosition < 0 and fastMA[-3] < slowMA[-3] and fastMA[-2] > slowMA[-2]:
                    opCode = 3
                elif self.marketPosition > 0 and fastMA[-3] > slowMA[-3] and fastMA[-2] < slowMA[-2]:
                    opCode = 3
    
            # 如果不触发任何条件,操作码为0,返回
            if opCode == 0:
                return 
    
            # 执行平仓
            if opCode == 3:
                def coverCallBack(ret):
                    self.reset()
                    _G(self.symbol, None)
                self.setTask(Manager.ACT_COVER, 0, coverCallBack)
                return 
            
            account = _bot.GetAccount()
            canOpen = int((account["Balance"] - self.keepBalance) / (self.symbolDetail["LongMarginRatio"] if opCode == 1 else self.symbolDetail["ShortMarginRatio"]) / (lastPrice * 1.2) / self.symbolDetail["VolumeMultiple"])
            unit = min(1, canOpen)
    
            # 设置交易任务
            def setTaskCallBack(ret):
                if not ret:
                    self.setLastError("下单失败")
                    return 
                self.holdPrice = ret["position"]["Price"]
                self.holdAmount = ret["position"]["Amount"]
                self.marketPosition += 1 if opCode == 1 else -1
                self.status["vm"] = [self.marketPosition]
                _G(self.symbol, self.status["vm"])
    
            self.setTask(Manager.ACT_LONG if opCode == 1 else Manager.ACT_SHORT, unit, setTaskCallBack)
    
    def onexit():
        Log("已退出策略...")
    
    def main():
        if exchange.GetName().find("CTP") == -1:
            raise Exception("只支持商品期货CTP")
        SetErrorFilter("login|ready|流控|连接失败|初始|Timeout")
        mode = exchange.IO("mode", 0)
        if mode is None:
            raise Exception("切换模式失败,请更新到最新托管者!")
        while not exchange.IO("status"):
            Sleep(3000)
            LogStatus("正在等待与交易服务器连接," + _D())
        positions = _C(exchange.GetPosition)
        if len(positions) > 0:
            Log("检测到当前持有仓位,系统将开始尝试恢复进度...")
            Log("持仓信息:", positions)
    
        initAccount = _bot.GetAccount()
        initMargin = json.loads(exchange.GetRawJSON())["CurrMargin"]
        keepBalance = _N((initAccount["Balance"] + initMargin) * (KeepRatio / 100), 3)
        Log("资产信息", initAccount, "保留资金:", keepBalance)
    
        tts = []
        symbolFilter = {}
        arr = Instruments.split(",")
        arrFastPeriod = FastPeriodArr.split(",")
        arrSlowPeriod = SlowPeriodArr.split(",")
        if len(arr) != len(arrFastPeriod) or len(arr) != len(arrSlowPeriod):
            raise Exception("均线周期参数与添加合约数量不匹配,请检查参数!")
        for i in range(len(arr)):
            symbol = re.sub(r'/\s+$/g', "", re.sub(r'/^\s+/g', "", arr[i]))
            if symbol in symbolFilter.keys():
                raise Exception(symbol + "已经存在,请检查参数!")
            symbolFilter[symbol] = True
            hasPosition = False 
            for j in range(len(positions)):
                if positions[j]["ContractType"] == symbol:
                    hasPosition = True
                    break
            fastPeriod = int(arrFastPeriod[i])
            slowPeriod = int(arrSlowPeriod[i])
            obj = Manager(hasPosition, symbol, keepBalance, fastPeriod, slowPeriod)
            tts.append(obj)
        
        preTotalHold = -1
        lastStatus = ""
        while True:
            if GetCommand() == "暂停/继续":
                Log("暂停交易中...")
                while GetCommand() != "暂停/继续":
                    Sleep(1000)
                Log("继续交易中...")
            while not exchange.IO("status"):
                Sleep(3000)
                LogStatus("正在等待与交易服务器连接," + _D() + "\n" + lastStatus)
    
            tblStatus = {
                "type" : "table",
                "title" : "持仓信息",
                "cols" : ["合约名称", "持仓方向", "持仓均价", "持仓数量", "持仓盈亏", "加仓次数", "当前价格"],
                "rows" : [] 
            }
    
            tblMarket = {
                "type" : "table", 
                "title" : "运行状态", 
                "cols" : ["合约名称", "合约乘数", "保证金率", "交易时间", "柱线长度", "异常描述", "发生时间"], 
                "rows" : []
            }
    
            totalHold = 0
            vmStatus = {}
            ts = time.time()
            holdSymbol = 0
            for i in range(len(tts)):
                tts[i].Poll()
                d = tts[i].Status()
                if d["holdAmount"] > 0:
                    vmStatus[d["symbol"]] = d["vm"]
                    holdSymbol += 1
                tblStatus["rows"].append([d["symbolDetail"]["InstrumentName"], "--" if d["holdAmount"] == 0 else ("多" if d["marketPosition"] > 0 else "空"), d["holdPrice"], d["holdAmount"], d["holdProfit"], abs(d["marketPosition"]), d["lastPrice"]])
                tblMarket["rows"].append([d["symbolDetail"]["InstrumentName"], d["symbolDetail"]["VolumeMultiple"], str(_N(d["symbolDetail"]["LongMarginRatio"], 4)) + "/" + str(_N(d["symbolDetail"]["ShortMarginRatio"], 4)), "是#0000ff" if d["isTrading"] else "否#ff0000", d["recordsLen"], d["lastErr"], d["lastErrTime"]])
                totalHold += abs(d["holdAmount"])
    
            now = time.time()
            elapsed = now - ts
            tblAssets = _bot.GetAccount(True)
            nowAccount = _bot.Account()
    
            if len(tblAssets["rows"]) > 10:
                tblAssets["rows"][0] = ["InitAccount", "初始资产", initAccount]
            else:
                tblAssets["rows"].insert(0, ["NowAccount", "当前可用", nowAccount])
                tblAssets["rows"].insert(0, ["InitAccount", "初始资产", initAccount])
            
            lastStatus = "`" + json.dumps([tblStatus, tblMarket, tblAssets]) + "`\n轮询耗时:" + str(elapsed) + " 秒,当前时间:" + _D() + ", 持有品种个数:" + str(holdSymbol)
            if totalHold > 0:
                lastStatus += "\n手动恢复字符串:" + json.dumps(vmStatus)
            LogStatus(lastStatus)
            if preTotalHold > 0 and totalHold == 0:
                LogProfit(nowAccount.Balance - initAccount.Balance - initMargin)
            preTotalHold = totalHold
            Sleep(LoopInterval * 1000)
    

    策略地址:https://www.fmz.com/strategy/208512

    回测对比

    我们用该策略的JavaScript版本和Python版本回测进行对比。

    • Python版本回测
      我们使用公共服务器进行回测,可以看到Python版本的回测略微快了一点。


    • JavaScript版本回测


    可以看到回测结果一模一样,有兴趣的小伙伴可以钻研一下代码,会有不小的收获。

    花里胡哨的扩展

    我们来做个扩展示范,给策略扩展出图表功能,如图:

    主要增加代码部分:

    • Manager类增加一个成员:objChart
    • Manager类增加一个方法:PlotRecords

    其它的一些修改都是围绕这两点进行,可以对比两个版本区别,学习扩展功能的思路。
    python版商品期货多品种均线策略 (扩展图表)

    以上策略学习为主,实盘慎用。

    欢迎留言。

    相关文章

      网友评论

          本文标题:Python版商品期货多品种均线策略

          本文链接:https://www.haomeiwen.com/subject/zddlxktx.html