美文网首页
Python读取通达信本地数据

Python读取通达信本地数据

作者: Curious1990 | 来源:发表于2022-11-19 20:34 被阅读0次

    获取内容

    股票名称、缩写、领域及K线(日 周 月 季 年)

    import os
    import struct
    import pandas as pd
    
    
    # 负责读取本地通达信数据
    class TDXLoader:
        def __init__(self):
            # TDX_DIR 为通达信根目录
            self.TDX_DIR = os.environ.get("Tdx")
            self.__base_all = self._get_base_all()
    
        def resolve(self, *args, **kwargs):
            return os.path.join(self.TDX_DIR, *args, **kwargs)
    
        # 获取行业数据
        def _get_tdxhy(self):
            tdxhy_path = self.resolve(r"T0002\hq_cache\tdxhy.cfg")
            names = "market symbol industry_code idontcare1 idontcare2 industry_second_code".split()
            usecols = "market symbol industry_code industry_second_code".split()
            tdxhy = pd.read_csv(
                tdxhy_path,
                sep="|",
                names=names,
                usecols=usecols,
                dtype={"symbol": str},
            )
            return tdxhy
    
        # 行业代码码值
        def _get_incon(self):
            incon_path = self.resolve(r"incon.dat")
            incon = pd.read_csv(incon_path, encoding="gb2312", names=["index"])
            fx0 = lambda x: x.split("|")[0] if "|" in x else ""
            incon["industry_code"] = incon["index"].apply(fx0)
            fx1 = lambda x: x.split("|")[1] if "|" in x else ""
            incon["industry_name"] = incon["index"].apply(fx1)
            usecols = ["industry_code", "industry_name"]
            return incon[usecols]
    
        # 股票代码对应拼音缩写
        def _read_tnf(self, path):
            market = path.split(".")[0][-3:]
            with open(path, "rb") as f:
                buff = f.read()
    
            data = buff[50:]
            l = len(data) // 314
            fx = lambda x: str(x, encoding="gbk").strip("\x00")
            sm = {"szm": ("00", "30"), "shm": ("60", "68")}
    
            stocks = []
            for x in [data[i * 314 : (i + 1) * 314] for i in range(l)]:
                code = fx(x[:6])
                if code.startswith(sm[market]):
                    name = fx(x[23:41])
                    shortcode = fx(x[285:293])
    
                    stocks += [[code, name, shortcode]]
            return stocks
    
        # 股票K线数据源文件
        def _get_szshm(self):
            szm_path = self.resolve(r"T0002\hq_cache\szm.tnf")
            shm_path = self.resolve(r"T0002\hq_cache\shm.tnf")
    
            szm = self._read_tnf(szm_path)
            shm = self._read_tnf(shm_path)
    
            stocks = pd.DataFrame(szm + shm, columns=["symbol", "name", "shortcode"])
            return stocks
    
        # 整合基本数据
        def _get_base_all(self):
            tdxhy = self._get_tdxhy()
            incon = self._get_incon()
            szshm = self._get_szshm()
    
            base = pd.merge(szshm, tdxhy, how="left", on="symbol")
            base = pd.merge(base, incon, how="left", on="industry_code")
            base = pd.merge(
                base,
                incon,
                how="left",
                left_on="industry_second_code",
                right_on="industry_code",
            )
    
            fx = lambda x: ".sh" if x else ".sz"
            base["ts_code"] = base["symbol"] + base["market"].apply(fx)
    
            base.rename(
                columns={
                    "industry_name_x": "industry_name",
                    "industry_name_y": "industry_detail",
                },
                inplace=True,
            )
            usecols = "ts_code symbol name shortcode industry_name industry_detail".split()
            return base[usecols]
    
        # 读取K线源文件
        def _read_kline(self, filepath):
            with open(filepath, "rb") as f:
                usecols = "trade_date open high low close amount vol openinterest".split()
                buffers = []
                while True:
                    buffer = f.read(32)
                    if not buffer:
                        break
                    buffer = struct.unpack("lllllfll", buffer)
                    buffers.append(buffer)
                kline = pd.DataFrame(buffers, columns=usecols)
    
            kline["trade_date"] = kline["trade_date"].astype(str)
    
            price_columns = ["open", "high", "low", "close"]
            kline[price_columns] = kline[price_columns].apply(lambda x: x / 100)
            return kline
    
        # 获取基本数据
        def get_base_all(self):
            return self.__base_all
    
        # 获取日K线数据
        def get_kline_daily(self, ts_code):
            filename = ts_code.split(".")[1] + ts_code.split(".")[0] + ".day"
            filepath = self.resolve("vipdoc", ts_code.split(".")[1], "lday", filename)
            kline = self._read_kline(filepath)
            kline["ts_code"] = ts_code
            kline.index = pd.to_datetime(kline["trade_date"])
            kline.index.name = "index"
            kline = kline.rename(columns={"vol": "volume"})
            usecols = (
                "ts_code trade_date open high low close amount volume openinterest".split()
            )
            return kline[usecols]
    
    
    if __name__ == "__main__":
        loader = TDXLoader()
        # 获取所有股票基本数据
        base_all = loader.get_base_all()
        # 获取单股日K线数据
        kline = loader.get_kline_daily("600645.sh")
        print(kline)
    

    走缓存,并加工出K线的各个周期

    单单运行脚本缓存是无效的,需要放在Web服务或桌面应用中

    from store.loader import TDXLoader as Loader
    from cacheout import LFUCache
    
    from loguru import logger as log
    
    
    # 负责缓存
    class Cache(object):
        def __init__(self):
            self.__loader = Loader()
            self.cache = LFUCache()
    
            self.rule = {
                "ts_code": "last",
                "trade_date": "last",
                "open": "first",
                "high": "max",
                "low": "min",
                "close": "last",
                "volume": "sum",
                "openinterest": "last",
            }
    
        # 从缓存获取日K线,没有缓存就从Loader读取
        def _get_kline_daily(self, ts_code):
            if self.cache.has(ts_code):
                kline = self.cache.get(ts_code)
            else:
                kline = self.__loader.get_kline_daily(ts_code)
                self.cache.set(ts_code, kline)
                log.info(f"{ts_code} K线数据已缓存")
            return kline
    
        # 按 日 周 月 季 年 获取K线数据,对应周期为 D W M Q A。day week month quarter annual
        def _get_kline_by_period(self, ts_code, period="D"):
            kline = self._get_kline_daily(ts_code)
    
            kline = kline.resample(period).agg(self.rule).dropna()
            kline = kline.sort_values(by="trade_date", ascending=False)
            kline = kline.reset_index(drop=True)
    
            return kline
    
        # 按天数分割获取K线数据,days等于30则代表30天为一个周期
        def _get_kline_by_days(self, ts_code, days=1):
            kline = self._get_kline_daily(ts_code)
    
            kline = kline.sort_values(by="trade_date", ascending=False)
            kline = kline.reset_index(drop=True).sort_values(by="trade_date")
            kline = kline.groupby(lambda x: x // days).agg(self.rule)
            kline = kline.sort_values(by="trade_date", ascending=False)
    
            return kline
    
        # 获取基本数据的函数,传股票代码进来就获取一只股票的,不传参就获取所有股票的
        def get_base(self, ts_code=None):
            data = self.__loader.get_base_all()
            if ts_code:
                base = data[data["ts_code"] == ts_code]
            else:
                base = data
            return base
    
        # 整合 _get_kline_by_period 和 _get_kline_by_days
        def get_kline(self, ts_code, param):
            if isinstance(param, int):
                return self._get_kline_by_days(ts_code, days=param)
            else:
                return self._get_kline_by_period(ts_code, period=param)
    
    
    datas = Cache()
    
    
    if __name__ == "__main__":
        # 获取所有股票基本数据
        base_all = datas.get_base()
        # 获取单股基本数据
        base_all = datas.get_base("600571.sh")
        # 获取单股日线数据
        kline = datas.get_kline("600571.sh", "D")
        # 获取单股周线数据
        kline = datas.get_kline("600571.sh", "W")
        # 获取单股月线数据
        kline = datas.get_kline("600571.sh", "M")
        # 获取单股季线数据
        kline = datas.get_kline("600571.sh", "Q")
        # 获取单股年线数据
        kline = datas.get_kline("600571.sh", "A")
        # 获取单股以5天为周期的数据
        kline = datas.get_kline("600571.sh", 5)
        # 获取单股以30天为周期的数据
        kline = datas.get_kline("600571.sh", 30)
    

    相关文章

      网友评论

          本文标题:Python读取通达信本地数据

          本文链接:https://www.haomeiwen.com/subject/gcfixdtx.html