上代码。
from pytdx.hq import TdxHq_API
from pytdx.reader import TdxDailyBarReader, TdxFileNotFoundException
import numpy as np
import pandas as pd
import akshare as ak
import time,os,talib,threading,requests
from queue import Queue
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
class YJXC():
def __init__(self):
print("开始工作!!")
#策略指标1
def STKM(self,data):
#略
#策略指标2
def ZJTJ(self,data):
#略
#条件1买卖点
def Ymm(self,Y,n):
#略
#条件2买卖点
def Ydd(self,Y):
#略
#两种获取数据的方式
def getGPData(self,code,api):
if code[0]=="6":
c = [1,'sh']
else:
c = [0,'sz']
try:
data = api.to_df(api.get_security_bars(9,c[0], code, 0, 70))
if len(data)==0:
pass
else:
data.index = pd.DatetimeIndex(data.datetime)
except SystemError as err:
print(err)
reader = TdxDailyBarReader()
data = reader.get_df("I:/Program Files (x86)/zd_ciccwm/vipdoc/%s/lday/%s.day"%(c[1],c[1]+code)[-70:])
#hs300 = ak.stock_zh_index_daily_em(symbol="sh000300")[-60:]
data = data.rename(columns={"vol":"volume"})
return data
#选股,负责多线程
def XG(self):
df = pd.read_excel("个股.xlsx",dtype = object)
r = []
result_queue = Queue()
threads = []
param = df.代码.values.tolist()
part = [0,1100,2200,3300,len(param)]
for i in range(4):
p = threading.Thread(target=self.xg,args=(param[part[i]:part[i+1]],result_queue, ))
p.start()
threads.append(p)
for i in threads:
i.join()
r = [[] for i in range(10)]
for _ in range(4):
rr = result_queue.get()
for i in range(10):
r[i] += rr[i]
return r
## p = multiprocessing.Pool(4)
## for i in param:
## r.append(xg(i,api))
## r = p.map(self.xg,param)
## p.close()
## p.join()
## r = np.array(r).T
## rr = []
## for i in range(10):
## rr.append(np.delete(np.unique(r[i]),0).tolist())
## return rr
#选股,负责真的选股
def xg(self,codes,r):
Scodes = [[] for i in range(10)]
api = TdxHq_API(multithread=True)
with api.connect('119.147.212.81', 7709):
for code in codes:
try:
data = self.getGPData(code,api)
if len(data)<70:
pass
else:
Y1 = self.STKM(data)
m1,m2 = self.Ymm(Y1[4],0)
if m1[-1]==1:
Scodes[0].append(code)
if m1[-2]==1:
Scodes[1].append(code)
if m1[-3]==1:
Scodes[2].append(code)
if m1[-4]==1:
Scodes[3].append(code)
if m1[-5]==1:
Scodes[4].append(code)
Y2 = self.ZJTJ(data.close.values)
m1,m2 = self.Ymm(Y2,0)
if m1[-1]==1:
Scodes[5].append(code)
if m1[-2]==1:
Scodes[6].append(code)
if m1[-3]==1:
Scodes[7].append(code)
if m1[-4]==1:
Scodes[8].append(code)
if m1[-5]==1:
Scodes[9].append(code)
except SystemError as err:
print(err)
r.put(Scodes)
#通过首页爬取研报url
def getUrls(self):
urls = []
URL = "https://"
xpath = "/html/body/div[2]/div/div[4]/div[1]/div[1]/div/div[2]/div/div[1]/div/div/div"
o = webdriver.ChromeOptions()
o.add_argument('headless')
d = webdriver.Chrome(options=o)
d.get(URL)
a = ActionChains(d)
a.send_keys(Keys.TAB * 4)
a.perform()
time.sleep(1)
shtml = d.page_source
ehtml = etree.HTML(shtml)
for i in range(25):
url = ehtml.xpath(xpath+"[%s]/div/@url"%(i+1))
urls.append(url[0])
d.quit()
return urls
#爬取研报页并获取选股代码
def getCodes(self,urls):
df = ak.stock_info_a_code_name()
yb = []
for url in urls:
r = requests.get(url)
rr = r.text.replace(" ","")
#f = rr.find("发布时间:")
#st = time.strftime("%m-%d",time.localtime())
df.name = df.name.replace(" ","",regex=True)
for i in df.name.values.tolist():
if rr.find(i)>0:
code = df[df.name==i].code.values[0]
#if rr[f+5:f+10] == st:
yb.append(code)
return yb
#选股结果写入自定义板块
def toBLK(self,data,r):
data = self.addNum(data)
r = self.addNum(r)
st = int(time.strftime("%w",time.localtime()))
if st == 0 or st == 6:
st = 5
path = "I:/Program Files (x86)/zd_ciccwm/T0002/blocknew/"
with open(path+"YB.blk","w") as f:
for i in data[0]:
f.write("%s\n"%i)
print("研报板块已更新完毕。")
rs = r[:5]
rz = r[-5:]
w = [1,2,3,4,5]
for i in range(st,st-5,-1):
with open(path+"Z%s.blk"%w[i-1],"w") as f:
for j in rz[i-1]:
f.write("%s\n"%j)
with open(path+"S%s.blk"%w[i-1],"w") as f:
for i in rs[i-1]:
f.write("%s\n"%j)
print("庄+S板块已更新完毕。")
#股票标记
def markGP(self):
blk = ['S1','S2','S3','S4','S5']
path = "I:/Program Files (x86)/zd_ciccwm/T0002/"
mark = []
for i in range(5):
with open(path+"blocknew/"+blk[i]+".blk","r") as f:
mark += [ "0"+c[:-1]+"=%s\n"%(i+1) for c in f.readlines() ]
with open(path+"mark.dat","w") as f:
f.write("[mark]\n"+"".join(mark))
print("S板块标记完毕。")
#添加市场编号
def addNum(self,data):
for li in data:
for i in range(len(li)):
if li[i][0]=="6":
li[i] = "1"+li[i]
else:
li[i] = "0"+li[i]
return data
if __name__ == '__main__':
start = time.perf_counter()
a = YJXC()
r = a.XG()
urls = a.getUrls()
yb = a.getCodes(urls)
a.toBLK([list(set(yb))],r)
a.markGP()
end = time.perf_counter()
m, s = divmod(round(end-start), 60)
print("共用时:"+"%02d分%02d秒"%(m,s))
图片
这是测试的结果,实际差不多5分钟。
火急火燎的把文章赶完,感觉之前写代码时的一些心得没想起来太多,就是有点唐突,不过也没关系,来日方长。
工欲善其事,必先自宫。来一起练绝世武功啊。
网友评论