美文网首页
股票数据分析(一)数据获取

股票数据分析(一)数据获取

作者: 转工 | 来源:发表于2017-01-11 00:09 被阅读0次

    前段时间开始做股票数据分析的业余项目,希望能提高自己对大型数据量的处理能力。目前大致的想法是用python的tushare模块获取数据,用Java的框架做发布。

    1.Tushare模块的说明

    tushare是国内金融从业者@JimmyTu(挖地兔)搜集国内各个公开渠道的股票数据接口,并把这些接口整理后用python写的一个模块。文档地址为:http://tushare.org/index.html

    首先说明一下tushare获取到的数据格式,这里以分笔历史纪录交易为例:

    import tushare as ts
    df = ts.get_tick_data('600848',date='2014-01-09')
    df.head(4)
    
         time       price change  volume  amount  type
    0    15:00:00   6.05     --       8    4840   卖盘
    1    14:59:55   6.05     --      50   30250   卖盘
    2    14:59:35   6.05     --      20   12100   卖盘
    3    14:59:30   6.05  -0.01     165   99825   卖盘
    

    请求得到的结果是pandas模块中的一种基本数据结构类——DataFrame,这是一种类似二维表的数据结构。DataFrame类提供了多种数据处理、存储的方法,其中也包括了将数据存入数据库的to_sql方法,更多用法可查阅pandas的官方文档。

    2.数据库设计

    目前每日分笔数据采用动态建表的方式保存,即将每天所有股票的分笔数据存储在一张以当天日期命名的表中,并且间隔一定时间周期后再分库。

    3.代码

    以下为一个初步的使用多线程请求历史分笔记录的python3代码

    # -*- coding: utf-8 -*-
    """
    Created on Fri Jan  2 12:37:43 2017
    
    @author: jerry
    """
    
    import tushare as ts
    import time
    import queue
    import threading
    import pandas as ps
    from sqlalchemy import create_engine
    
    THREADS_NUM = 25  # 采集线程数
    THREADS_EXITFLAG = 0  # 线程退出标志
    TICKS_DATA_DATE = '2017-01-04'  # 指定采集日期
    MYSQL_ENGINE = 'mysql://root:pwd@ip:port/dbname?charset=utf8'
    
    class GetStockData(threading.Thread):
        def __init__(self, threadID, q):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.q = q
    
        def run(self):
            print ('线程%s开始下载' % (self.threadID))
            self._process_data()
    
        def _process_data(self):
            engine = create_engine(MYSQL_ENGINE)
            while not THREADS_EXITFLAG:
                if not self.q.empty():
                    code = self.q.get()
                    remain_num = self.q.qsize()
                    tick_data = get_tick(code, TICKS_DATA_DATE)
                    #根据当请求的股票当日停牌时,返回的数据有三行
                    if len(tick_data) > 3:
                        save_to_mysql(TICKS_DATA_DATE, tick_data, engine, code, remain_num)
                    time.sleep(0.05)
                else:
                    break
    
    def get_stock_basics():
        """
        获取当日股票列表
        Return
        --------
        DataFrame
        """
        basics = ts.get_stock_basics()
        return basics
    
    def get_tick(stockCode=None, date=None):
        """
        根据股票列表的股票代码获取当日/指定日期历史分笔
        Return
        --------
        DataFrame
        """
        tick_data = ''
        if date != None and date != '':
            tick_data = ts.get_tick_data(stockCode, date)
        else:
            tick_data = ts.get_today_ticks(stockCode)
        if not tick_data.dropna(axis=0, how='any', thresh=None).empty:
            tick_data.insert(0, 'code', stockCode) #插入股票代码字段
        return tick_data
    
    
    def save_to_mysql(tablename=None, data=None, engine=None, code=None, num=None):
        """
        保存获取的数据到MySQL数据库中
        Return
        --------
        """
        for i in range(3):
            try:
                data.to_sql(tablename, engine, if_exists='append')
                print('save %s %s' % (code, num))
                break
            except BaseException as e:
                print ('Save Error %s ' % (code))
        return
    
    
    def main():
        #    reload(sys)
        #    sys.setdefaultencoding('utf8')
    
        stock_codes = get_stock_basics()
        threads = []
        try:
            """
            根据股票代码列表创建队列
            """
            stocks = queue.Queue(len(stock_codes))
            for code in stock_codes.index:
                code = str(code)
                if (len(code) != 6):
                    code = (6 - len(code)) * '0' + code
                stocks.put(code)
    
            """
            创建并运行线程
            """
            for n in range(THREADS_NUM):
                thread = GetStockData(n, stocks)
                thread.start()
                threads.append(thread)
    
            while not stocks.empty():
                pass
            print ('数据请求完毕。')
            THREADS_EXITFLAG = 1
    
            for t in threads:
                t.join()
        except BaseException as e:
            print ('Error', e)
        return
    
    if __name__ == '__main__':
        print ('开始请求%s的数据' % (TICKS_DATA_DATE))
        main()
    
    

    代码很简单,请求当日的股票列表,并将列表中的股票代码放入队列中,尔后开启指定数量的线程并根据队列中的股票代码请求数据,请求完毕后将数据保存至mysql数据库。
    当然,以上只是个初步版本,还需要修改一些纰漏之处,并增加日志等功能。

    相关文章

      网友评论

          本文标题:股票数据分析(一)数据获取

          本文链接:https://www.haomeiwen.com/subject/gjlkbttx.html