美文网首页简友广场多才多艺
量化 | 一键歇菜(下)

量化 | 一键歇菜(下)

作者: opcc | 来源:发表于2021-09-15 18:39 被阅读0次

上代码。

from pytdx.hq import TdxHq_API
from pytdx.reader import TdxDailyBarReader, TdxFileNotFoundException
import numpy as np
import pandas as pd
import akshare as ak
import time,os,talib,threading,requests
from queue import Queue
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains

class YJXC():
    def __init__(self):
        print("开始工作!!")

    #策略指标1
    def STKM(self,data):
        #略

    #策略指标2
    def ZJTJ(self,data):
        #略

    #条件1买卖点
    def Ymm(self,Y,n):
        #略

    #条件2买卖点
    def Ydd(self,Y):
        #略

    #两种获取数据的方式
    def getGPData(self,code,api):
        if code[0]=="6":
            c = [1,'sh']
        else:
            c = [0,'sz']
        try:
            data = api.to_df(api.get_security_bars(9,c[0], code, 0, 70))
            if len(data)==0:
                pass
            else:
                data.index = pd.DatetimeIndex(data.datetime)
        except SystemError as err:
            print(err)
            reader = TdxDailyBarReader()
            data = reader.get_df("I:/Program Files (x86)/zd_ciccwm/vipdoc/%s/lday/%s.day"%(c[1],c[1]+code)[-70:])
        #hs300 = ak.stock_zh_index_daily_em(symbol="sh000300")[-60:]
        data = data.rename(columns={"vol":"volume"})
        return data

    #选股,负责多线程
    def XG(self):
        df = pd.read_excel("个股.xlsx",dtype = object)
        r = []
        result_queue = Queue()
        threads = []
        
        param = df.代码.values.tolist()
        part = [0,1100,2200,3300,len(param)]
        for i in range(4):
            p = threading.Thread(target=self.xg,args=(param[part[i]:part[i+1]],result_queue, ))
            p.start()
            threads.append(p)
        for i in threads:
            i.join()
        r = [[] for i in range(10)]
        for _ in range(4):
            rr = result_queue.get()
            for i in range(10):
                r[i] += rr[i]
        return r
##            p = multiprocessing.Pool(4)
##            for i in param:
##                r.append(xg(i,api))
##            r = p.map(self.xg,param)
##            p.close()
##            p.join()
##        r = np.array(r).T
##        rr = []
##        for i in range(10):
##            rr.append(np.delete(np.unique(r[i]),0).tolist())
##        return rr

    #选股,负责真的选股
    def xg(self,codes,r):
        Scodes = [[] for i in range(10)]
        api = TdxHq_API(multithread=True)
        with api.connect('119.147.212.81', 7709):
            for code in codes:
                try:
                    data = self.getGPData(code,api)
                    if len(data)<70:
                        pass
                    else:
                        Y1 = self.STKM(data)
                        m1,m2 = self.Ymm(Y1[4],0)
                        if m1[-1]==1:
                            Scodes[0].append(code)
                        if m1[-2]==1:
                            Scodes[1].append(code)
                        if m1[-3]==1:
                            Scodes[2].append(code)
                        if m1[-4]==1:
                            Scodes[3].append(code)
                        if m1[-5]==1:
                            Scodes[4].append(code)
                        Y2 = self.ZJTJ(data.close.values)
                        m1,m2 = self.Ymm(Y2,0)
                        if m1[-1]==1:
                            Scodes[5].append(code)
                        if m1[-2]==1:
                            Scodes[6].append(code)
                        if m1[-3]==1:
                            Scodes[7].append(code)
                        if m1[-4]==1:
                            Scodes[8].append(code)
                        if m1[-5]==1:
                            Scodes[9].append(code)
                except SystemError as err:
                    print(err)
            r.put(Scodes)

    #通过首页爬取研报url
    def getUrls(self):
        urls = []
        URL = "https://"
        xpath = "/html/body/div[2]/div/div[4]/div[1]/div[1]/div/div[2]/div/div[1]/div/div/div"
        o = webdriver.ChromeOptions()
        o.add_argument('headless')
        d = webdriver.Chrome(options=o)
        d.get(URL)
        a = ActionChains(d) 
        a.send_keys(Keys.TAB * 4)
        a.perform()
        time.sleep(1)
        shtml = d.page_source
        ehtml = etree.HTML(shtml)
        for i in range(25):
            url = ehtml.xpath(xpath+"[%s]/div/@url"%(i+1))
            urls.append(url[0])
        d.quit()
        return urls

    #爬取研报页并获取选股代码
    def getCodes(self,urls):
        df = ak.stock_info_a_code_name()
        yb = []
        for url in urls:
            r = requests.get(url)
            rr = r.text.replace(" ","")
            #f = rr.find("发布时间:")
            #st = time.strftime("%m-%d",time.localtime())
            df.name = df.name.replace(" ","",regex=True)
            for i in df.name.values.tolist():
                if rr.find(i)>0:
                    code = df[df.name==i].code.values[0]
                    #if rr[f+5:f+10] == st:
                    yb.append(code)
        return yb

    #选股结果写入自定义板块
    def toBLK(self,data,r):
        data = self.addNum(data)
        r = self.addNum(r)
        st = int(time.strftime("%w",time.localtime()))
        if st == 0 or st == 6:
            st = 5
        path = "I:/Program Files (x86)/zd_ciccwm/T0002/blocknew/"
        with open(path+"YB.blk","w") as f:
            for i in data[0]:
                f.write("%s\n"%i)
        print("研报板块已更新完毕。")
        rs = r[:5]
        rz = r[-5:]
        w = [1,2,3,4,5]
        for i in range(st,st-5,-1):
            with open(path+"Z%s.blk"%w[i-1],"w") as f:
                for j in rz[i-1]:
                    f.write("%s\n"%j)
            with open(path+"S%s.blk"%w[i-1],"w") as f:
                for i in rs[i-1]:
                    f.write("%s\n"%j)
        print("庄+S板块已更新完毕。")

    #股票标记
    def markGP(self):
        blk = ['S1','S2','S3','S4','S5']
        path = "I:/Program Files (x86)/zd_ciccwm/T0002/"
        mark = []
        for i in range(5):
            with open(path+"blocknew/"+blk[i]+".blk","r") as f:
                mark += [ "0"+c[:-1]+"=%s\n"%(i+1) for c in f.readlines() ]
        with open(path+"mark.dat","w") as f:
            f.write("[mark]\n"+"".join(mark))
        print("S板块标记完毕。")

    #添加市场编号
    def addNum(self,data):
        for li in data:
            for i in range(len(li)):
                if li[i][0]=="6":
                    li[i] = "1"+li[i]
                else:
                    li[i] = "0"+li[i]
        return data

if __name__ == '__main__':
    start = time.perf_counter()
    a = YJXC()
    r = a.XG()
    urls = a.getUrls()
    yb = a.getCodes(urls)
    a.toBLK([list(set(yb))],r)
    a.markGP()
    end = time.perf_counter()
    m, s = divmod(round(end-start), 60)
    print("共用时:"+"%02d分%02d秒"%(m,s))
图片

这是测试的结果,实际差不多5分钟。
火急火燎的把文章赶完,感觉之前写代码时的一些心得没想起来太多,就是有点唐突,不过也没关系,来日方长。
工欲善其事,必先自宫。来一起练绝世武功啊。

相关文章

  • 量化 | 一键歇菜(下)

    上代码。 这是测试的结果,实际差不多5分钟。火急火燎的把文章赶完,感觉之前写代码时的一些心得没想起来太多,就是有点...

  • 量化 | 一键歇菜(上)

    紧赶慢赶,今天终于是开发完了。用时不到一个星期,就是上次文章提到的想要一键选股到炒股软件上。虽然有lh的部分...

  • 量化 | 一键歇菜(中)

    策略S和策略Z,各有千秋,股票用的是我那老套牢股。可惜我没用策略去经营它,不然也应该能回三分之一本了。这里是...

  • 1807W3周检视0709-0715

    一、健康 听饮食课程,并自己做健康不增脂早餐,下一步准备了解食物能量值,并对自己进行量化饮食。体脂秤终于也在歇菜一...

  • 股记 | 21-09-16

    这周继一键歇菜之后,我又改进了回测系统,加入了仓控功能。逻辑就是把一只股票预期投入的资金三等分,建仓一份,加...

  • 歇菜了

    上午小同事打电话,让帮忙往经开区送货,麻利的答应了。一路上也是谨小慎微的前行。 雨一直下,去的时...

  • C爷日常||歇菜一下

    自1月4日回来这边,已经一个月有余了。 这是一个月一来第一次状态歇菜。 那种不知道自己在做什么的无意义感又涌了上来...

  • 力软敏捷开发框架-轻量化app一键快速开发

    力软敏捷开发框架-轻量化app一键快速开发 APP怎么快速开发?2018年,力软敏捷开发框架的APP在线制作平台已...

  • Day11 - 基于Scrum的量化驱动改进

    基于量化的工程效能管理 从感觉到量化 工程效能下的度量指标 量化质量构建持续交付 工程效能下的度量指标 体系化指标...

  • 数字资产量化小白成长记---基础学习篇

    昨天和大家简单介绍了一下数字资产量化交易的情况,今天和大家分享下从0开始,开始量化交易需要怎么做。 量化的学习像是...

网友评论

    本文标题:量化 | 一键歇菜(下)

    本文链接:https://www.haomeiwen.com/subject/izklgltx.html