美文网首页简友广场多才多艺
量化 | 一键歇菜(下)

量化 | 一键歇菜(下)

作者: opcc | 来源:发表于2021-09-15 18:39 被阅读0次

    上代码。

    from pytdx.hq import TdxHq_API
    from pytdx.reader import TdxDailyBarReader, TdxFileNotFoundException
    import numpy as np
    import pandas as pd
    import akshare as ak
    import time,os,talib,threading,requests
    from queue import Queue
    from lxml import etree
    from selenium import webdriver
    from selenium.webdriver.common.keys import Keys
    from selenium.webdriver.common.action_chains import ActionChains
    
    class YJXC():
        def __init__(self):
            print("开始工作!!")
    
        #策略指标1
        def STKM(self,data):
            #略
    
        #策略指标2
        def ZJTJ(self,data):
            #略
    
        #条件1买卖点
        def Ymm(self,Y,n):
            #略
    
        #条件2买卖点
        def Ydd(self,Y):
            #略
    
        #两种获取数据的方式
        def getGPData(self,code,api):
            if code[0]=="6":
                c = [1,'sh']
            else:
                c = [0,'sz']
            try:
                data = api.to_df(api.get_security_bars(9,c[0], code, 0, 70))
                if len(data)==0:
                    pass
                else:
                    data.index = pd.DatetimeIndex(data.datetime)
            except SystemError as err:
                print(err)
                reader = TdxDailyBarReader()
                data = reader.get_df("I:/Program Files (x86)/zd_ciccwm/vipdoc/%s/lday/%s.day"%(c[1],c[1]+code)[-70:])
            #hs300 = ak.stock_zh_index_daily_em(symbol="sh000300")[-60:]
            data = data.rename(columns={"vol":"volume"})
            return data
    
        #选股,负责多线程
        def XG(self):
            df = pd.read_excel("个股.xlsx",dtype = object)
            r = []
            result_queue = Queue()
            threads = []
            
            param = df.代码.values.tolist()
            part = [0,1100,2200,3300,len(param)]
            for i in range(4):
                p = threading.Thread(target=self.xg,args=(param[part[i]:part[i+1]],result_queue, ))
                p.start()
                threads.append(p)
            for i in threads:
                i.join()
            r = [[] for i in range(10)]
            for _ in range(4):
                rr = result_queue.get()
                for i in range(10):
                    r[i] += rr[i]
            return r
    ##            p = multiprocessing.Pool(4)
    ##            for i in param:
    ##                r.append(xg(i,api))
    ##            r = p.map(self.xg,param)
    ##            p.close()
    ##            p.join()
    ##        r = np.array(r).T
    ##        rr = []
    ##        for i in range(10):
    ##            rr.append(np.delete(np.unique(r[i]),0).tolist())
    ##        return rr
    
        #选股,负责真的选股
        def xg(self,codes,r):
            Scodes = [[] for i in range(10)]
            api = TdxHq_API(multithread=True)
            with api.connect('119.147.212.81', 7709):
                for code in codes:
                    try:
                        data = self.getGPData(code,api)
                        if len(data)<70:
                            pass
                        else:
                            Y1 = self.STKM(data)
                            m1,m2 = self.Ymm(Y1[4],0)
                            if m1[-1]==1:
                                Scodes[0].append(code)
                            if m1[-2]==1:
                                Scodes[1].append(code)
                            if m1[-3]==1:
                                Scodes[2].append(code)
                            if m1[-4]==1:
                                Scodes[3].append(code)
                            if m1[-5]==1:
                                Scodes[4].append(code)
                            Y2 = self.ZJTJ(data.close.values)
                            m1,m2 = self.Ymm(Y2,0)
                            if m1[-1]==1:
                                Scodes[5].append(code)
                            if m1[-2]==1:
                                Scodes[6].append(code)
                            if m1[-3]==1:
                                Scodes[7].append(code)
                            if m1[-4]==1:
                                Scodes[8].append(code)
                            if m1[-5]==1:
                                Scodes[9].append(code)
                    except SystemError as err:
                        print(err)
                r.put(Scodes)
    
        #通过首页爬取研报url
        def getUrls(self):
            urls = []
            URL = "https://"
            xpath = "/html/body/div[2]/div/div[4]/div[1]/div[1]/div/div[2]/div/div[1]/div/div/div"
            o = webdriver.ChromeOptions()
            o.add_argument('headless')
            d = webdriver.Chrome(options=o)
            d.get(URL)
            a = ActionChains(d) 
            a.send_keys(Keys.TAB * 4)
            a.perform()
            time.sleep(1)
            shtml = d.page_source
            ehtml = etree.HTML(shtml)
            for i in range(25):
                url = ehtml.xpath(xpath+"[%s]/div/@url"%(i+1))
                urls.append(url[0])
            d.quit()
            return urls
    
        #爬取研报页并获取选股代码
        def getCodes(self,urls):
            df = ak.stock_info_a_code_name()
            yb = []
            for url in urls:
                r = requests.get(url)
                rr = r.text.replace(" ","")
                #f = rr.find("发布时间:")
                #st = time.strftime("%m-%d",time.localtime())
                df.name = df.name.replace(" ","",regex=True)
                for i in df.name.values.tolist():
                    if rr.find(i)>0:
                        code = df[df.name==i].code.values[0]
                        #if rr[f+5:f+10] == st:
                        yb.append(code)
            return yb
    
        #选股结果写入自定义板块
        def toBLK(self,data,r):
            data = self.addNum(data)
            r = self.addNum(r)
            st = int(time.strftime("%w",time.localtime()))
            if st == 0 or st == 6:
                st = 5
            path = "I:/Program Files (x86)/zd_ciccwm/T0002/blocknew/"
            with open(path+"YB.blk","w") as f:
                for i in data[0]:
                    f.write("%s\n"%i)
            print("研报板块已更新完毕。")
            rs = r[:5]
            rz = r[-5:]
            w = [1,2,3,4,5]
            for i in range(st,st-5,-1):
                with open(path+"Z%s.blk"%w[i-1],"w") as f:
                    for j in rz[i-1]:
                        f.write("%s\n"%j)
                with open(path+"S%s.blk"%w[i-1],"w") as f:
                    for i in rs[i-1]:
                        f.write("%s\n"%j)
            print("庄+S板块已更新完毕。")
    
        #股票标记
        def markGP(self):
            blk = ['S1','S2','S3','S4','S5']
            path = "I:/Program Files (x86)/zd_ciccwm/T0002/"
            mark = []
            for i in range(5):
                with open(path+"blocknew/"+blk[i]+".blk","r") as f:
                    mark += [ "0"+c[:-1]+"=%s\n"%(i+1) for c in f.readlines() ]
            with open(path+"mark.dat","w") as f:
                f.write("[mark]\n"+"".join(mark))
            print("S板块标记完毕。")
    
        #添加市场编号
        def addNum(self,data):
            for li in data:
                for i in range(len(li)):
                    if li[i][0]=="6":
                        li[i] = "1"+li[i]
                    else:
                        li[i] = "0"+li[i]
            return data
    
    if __name__ == '__main__':
        start = time.perf_counter()
        a = YJXC()
        r = a.XG()
        urls = a.getUrls()
        yb = a.getCodes(urls)
        a.toBLK([list(set(yb))],r)
        a.markGP()
        end = time.perf_counter()
        m, s = divmod(round(end-start), 60)
        print("共用时:"+"%02d分%02d秒"%(m,s))
    
    图片

    这是测试的结果,实际差不多5分钟。
    火急火燎的把文章赶完,感觉之前写代码时的一些心得没想起来太多,就是有点唐突,不过也没关系,来日方长。
    工欲善其事,必先自宫。来一起练绝世武功啊。

    相关文章

      网友评论

        本文标题:量化 | 一键歇菜(下)

        本文链接:https://www.haomeiwen.com/subject/izklgltx.html