美文网首页
julia GPU计算之CUDA编程——用日k线来计算周k线

julia GPU计算之CUDA编程——用日k线来计算周k线

作者: 昵称违法 | 来源:发表于2021-06-20 11:28 被阅读0次

    前言

    本文用周k线的计算来举例,如何通过CUDA来实现GPU计算。

    1、用日线来计算周线的方法:

    (1)大盘(沪深300指数)如何计算周k线?

    大盘不涉及停牌,所以直接按照week来划分数据,然后分别计算open、 high、 low、 close、 vol、 money几个值

    (2)个股如何计算周线?

    先调用大盘的周线信息,用大盘周线的起止日期去查询个股的日线数据,然后分别计算open、 high、 low、 close、 vol、 money以及停牌信息。

    2、GPU计算相关

    (1)julia中GPU计算的实现

    a、直接用矢量化的计算

     a_gpu .+ b_gpu
    

    b、用map reduce等类似的函数

    c_gpu = CUDA.map(+,a_gpu,b_gpu)
    

    c、自己编写cuda核函数
    CPU和GPU之间数据的传输
    用Array和CuArray

    CPU在GPU上启动核函数后,如何等待GPU计算结束,并取回计算结果
    用【sync】来实现

    在GPU上创建的数组,操作完毕后,如何释放变量
    用CUDA.unsafe_free!()来释放

    blocks中线程数如何设置
    每个block中thread最大数量不超过1024,具体查询CUDA官网,针对GPU计算能力值来设定

    blocks块数如何设置
    任务量 / 每块线程数,然后取ceil整数,就是blocks的数量

    日线计算周线的CUDA核函数实现

    """
    GPU核函数:生成周k线
    #大盘信息
    dapan_idx_d,
    dapan_date_start_d,
    dapan_date_end_d,
    dapan_cnt_d,
    
    #个股信息
    arg_day_d,
    arg_open_d,
    arg_high_d,
    arg_low_d,
    arg_close_d,
    arg_vol_d,
    arg_money_d,
    
    #返回值
    rtn_idx_d,
    rtn_day_d,
    rtn_open_d,
    rtn_high_d,
    rtn_low_d,
    rtn_close_d,
    rtn_volume_d,
    rtn_money_d,
    rtn_cnt_dapan_d,
    rtn_cnt_d,
    """
    function gen_weekly_kline_kernel!(
        dapan_idx_d,
        dapan_date_start_d,
        dapan_date_end_d,
        dapan_cnt_d,
        arg_day_d,
        arg_open_d,
        arg_high_d,
        arg_low_d,
        arg_close_d,
        arg_vol_d,
        arg_money_d,
        rtn_idx_d,
        rtn_day_d,
        rtn_open_d,
        rtn_high_d,
        rtn_low_d,
        rtn_close_d,
        rtn_volume_d,
        rtn_money_d,
        rtn_cnt_dapan_d,
        rtn_cnt_d,
    )
        index = (blockIdx().x - 1) * blockDim().x + threadIdx().x
    
        if index <= length(dapan_idx_d) #线程的数量多余任务的数量,防止越界
    
            #初始化
            rtn_idx_d[index] = dapan_idx_d[index]        #序号
            rtn_day_d[index] = dapan_date_end_d[index]   #周k线的日期
            rtn_cnt_dapan_d[index] = dapan_cnt_d[index]  #大盘在该周有几天交易日,用于跟rtn_cnt_d比对,看看个股在该周是否有停牌
            rtn_open_d[index] = -Inf      #开盘价    open = day_num == 1 的 open
            rtn_high_d[index] = -Inf      #最高价    取较大的值
            rtn_low_d[index] = Inf        #最低价    取较小的值
            rtn_close_d[index] = NaN      #收盘价    用当前close覆盖
            rtn_volume_d[index] = 0.0     #成交量    累加
            rtn_money_d[index] = 0.0      #成交资金  累加
            rtn_cnt_d[index] = 0          #交易日数  个股在每个周的交易日数量统计
    
            #遍历个股的数据,找到符合条件的,则进行处理
            for i = 1:length(arg_day_d)
                if dapan_date_start_d[index] <= arg_day_d[i] <= dapan_date_end_d[index]
                    #交易日数量
                    rtn_cnt_d[index] += 1
    
                    #开盘价
                    if rtn_cnt_d[index] == 1
                        rtn_open_d[index] = arg_open_d[i]
                    end
    
                    #最高价
                    if arg_high_d[i] > rtn_high_d[index]
                        rtn_high_d[index] = arg_high_d[i]
                    end
    
                    #最低价
                    if arg_low_d[i] < rtn_low_d[index]
                        rtn_low_d[index] = arg_low_d[i]
                    end
    
                    #收盘价,每天更新覆盖
                    rtn_close_d[index] = arg_close_d[i]
    
                    #成交量
                    rtn_volume_d[index] += arg_vol_d[i]
    
                    #成交资金
                    rtn_money_d[index] += arg_money_d[i]
                end
            end
        else
            #
        end
        return nothing
    end
    

    3、计算时间对比

    (1)用CPU计算周线的时间

    一共4000多只个股,用单线程计算,耗时101秒

    (2)用CUDA计算周线的时间

    一共4000多只个股,用单线程计算,耗时11秒;用CPU多线程去调用核函数,耗时3秒。

    (3)小技巧:如何在主存里面管理显存的变量?

    下面代码功能:把GPU中的变量映射到内存中,用命名元组来管理

    """
    在GPU中生成CuArray,用于缓存计算结果
    """
    function gen_rtn_cu(N::Int64)
        return(
        idx = CUDA.ones(Int64,N),       #k线的序号
        day = CUDA.ones(Int64,N),       #日期
        open = CUDA.ones(Float64,N),    #开盘价
        high = CUDA.ones(Float64,N),    #最高价
        low = CUDA.ones(Float64,N),     #最低价
        close = CUDA.ones(Float64,N),   #收盘价
        volume = CUDA.ones(Float64,N),  #成交量
        money = CUDA.ones(Float64,N),   #成交额
        cnt_dapan = CUDA.ones(Int64,N), #本周大盘有多少个交易日
        cnt = CUDA.ones(Int64,N)        #本周个股有多少个交易日
        )
    end
    

    通过命名元组来获取GPU中的数组对象,并释放GPU空间

    """
    释放gpu中的rtn_cu内存
    """
    function free_rtn_cu(rtn)
        """
        如何编程自己循环捕捉对象,然后释放对象
        """
        rtn.idx |> CUDA.unsafe_free!       #k线的序号
        rtn.day |> CUDA.unsafe_free!       #日期
        rtn.open |> CUDA.unsafe_free!      #开盘价
        rtn.high |> CUDA.unsafe_free!      #最高价
        rtn.low |> CUDA.unsafe_free!       #最低价
        rtn.close |> CUDA.unsafe_free!     #收盘价
        rtn.volume |> CUDA.unsafe_free!    #成交量
        rtn.money |> CUDA.unsafe_free!     #成交额
        rtn.cnt_dapan |> CUDA.unsafe_free! #本周大盘有多少个交易日
        rtn.cnt |> CUDA.unsafe_free!       #本周个股有多少个交易日
        rtn = nothing
    
    end
    

    4、附录:代码

    本文用到的k线数据是存于本地的,附录代码只供参考,不能直接运行。你可以用量化接口来读取数据,保存到本地,然后再进行实验。

    (1)CPU计算的代码

    #用CPU来生成k线,日线->周线,与GPU计算结果互相验证
    using DataFrames
    using DataFramesMeta
    using CSV
    using Dates
    using BenchmarkTools
    
    
    """
    用大盘的日k线,生成每周交易概要信息
    
    输入:
    df300 大盘的日k线
    
    输出:
            k线序号         起始日期           结束日期           本周大盘交易日数量
    DataFrame(idx = [...], date_start = [...], date_end = [...], cnt = [...])
    """
    function gen_dapan_week_line_info(df300)
        #df300 |> display
        df300.week = df300.day .|> week
    
        #把week逐个计数,相同的week,它的idx相同
        df300.weekidx = let
            last_week = 0
            idx = 1
            idx_ary = []
            for (i, r) in zip(1:size(df300, 1), eachrow(df300))
                if i == 1
                    idx = 1
                    last_week = r.week
                elseif i > 1
                    if last_week != r.week
                        idx += 1
                        last_week = r.week
                    else
                        last_week = r.week
                    end
                end
                push!(idx_ary, idx)
            end
            idx_ary
        end
    
    
        #生成周k线中每根k线的概要信息,用于计算详细的k线
        info_df = DataFrame(idx = [], date_start = [], date_end = [], cnt = [])
        gd = groupby(df300, "weekidx")
    
        for weekidx in df300.weekidx |> unique |> sort
            #println(weekidx)
            sub_df = gd[weekidx] |> df -> sort(df, "day")
            idx = weekidx
            sub_df = gd[weekidx] |> df -> sort(df, "day")
            date_start = first(sub_df, 1)[1, "day"]
            date_end = last(sub_df, 1)[1, "day"]
            cnt = size(sub_df, 1)
    
            push!(info_df, [idx, date_start, date_end, cnt])
        end
    
        return info_df
    end
    
    function  gen_dapan_week_line_info_test()
        df300 = CSV.read("data/399300.csv",DataFrame)
        df300 |> display
        info_df = gen_dapan_week_line_info(df300)
        info_df |> display
    end
    
    gen_dapan_week_line_info_test()
    
    """
    帮个股生成周k线
    
    输入:
    info_df  大盘每周交易日信息
    df       个股日k线
    
    输出:
    DataFrame
              序号        日期        开盘价       最高价        最低价      收盘价         成交量        成交金额        本周大盘交易日数量 本周个股交易日数量
    --------------------------------------------------------------------------------------------------------------------------------------------------------
    DataFrame(idx = [...],day = [...],open = [...],high = [...],low = [...],close = [...],volume = [...],money = [...],dapan_cnt=[...],cnt = [...])
    """
    function gen_weekly_kline(info_df,df)
        rtn_df = DataFrame(idx = [],day = [],open = [],high = [],low = [],close = [],volume = [],money = [],dapan_cnt=[],cnt = [])
    
        for r in eachrow(info_df)
            idx = r.idx
            date_start = r.date_start
            date_end = r.date_end
    
            q_df = @linq df |> where(date_start .<= :day .<= date_end)
            if size(q_df,1) > 0
                sort!(q_df,"day")
    
                day = date_end
                open = q_df.open[1]
                high = q_df.high |> maximum
                low = q_df.low |> minimum
                close = q_df.close[end]
                volume = q_df.vol |> sum
                money = q_df.money |> sum
                dapan_cnt = r.cnt
                cnt = size(q_df,1)
    
                push!(rtn_df,[idx,day,open,high,low,close,volume,money,dapan_cnt,cnt])
            else  #本周个股停牌
                day = date_end
                open = 0.0
                high = 0.0
                low = 0.0
                close = 0.0
                volume = 0.0
                money = 0.0
                dapan_cnt = r.cnt
                cnt = 0
                push!(rtn_df,[idx,day,open,high,low,close,volume,money,dapan_cnt,cnt])
            end
        end
        rtn_df
    end
    
    function gen_weekly_kline_test()
        #读取大盘数据
        df300 = CSV.read("data/399300.csv",DataFrame)
        df300 |> display
        info_df = gen_dapan_week_line_info(df300)
        info_df |> display
    
        #读取个股信息
        df = CSV.read("data/股票/000001.csv",DataFrame)
        gen_weekly_kline(info_df,df)
    end
    
    gen_weekly_kline_test()
    
    #读取所有的股票
    stocks = readdir("data/股票") .|> item->split(item,".")[1]
    
    """
    读取所有的股票k线,存到一个字典中备用
    """
    function get_stocks_dict(stocks)
        mydict = Dict()
        for s in stocks
            mydict[s] = CSV.read("data/股票/$(s).csv",DataFrame)
        end
        mydict
    end
    
    stocks_dict = get_stocks_dict(stocks)
    
    """
    计算多只个股的周线表
    """
    function main_multi_stocks(stocks_dict)
        #读取大盘数据
        df300 = CSV.read("data/399300.csv",DataFrame)
        #df300 |> display
        info_df = gen_dapan_week_line_info(df300)
        #info_df |> display
    
        t1 = time()
        #逐只个股计算
        for (stock,df) in stocks_dict
            week_df = gen_weekly_kline(info_df,df)
            CSV.write("data/cpu周线/$(stock).csv",week_df)
        end
        t2 = time()
    
        println(t2-t1)
    end
    
    main_multi_stocks(stocks_dict)
    
    #单线程 101s
    
    

    (2)GPU计算的代码

    #用GPU来生成k线,日线->周线
    using DataFrames
    using CSV
    using XLSX
    using Dates
    
    using BenchmarkTools
    using CUDA
    using Test
    
    if capability(device()) < v"7.0"
        println("你的显卡计算能力没达到7.0,退出程序!!")
        exit()
    end
    
    
    """
    【月份】和【日期】字符串格式化为两位
    01 =  01
    1  =  01
    
    输入:"0"-"31" 或者 "00"-"31"
    """
    function format_str_00(str::String)
        rtn = ""
        if length(str) == 1
            rtn = string(0,str)
        else
            rtn = str
        end
        return rtn
    end
    
    format_str_00("9")
    format_str_00("12")
    
    
    """
    把一个Date日期转成同形式字面量的Int,注意,月份和日期会补足成两位
    例如:
    __________________________
          Date   => Int64
    ==========================
    "2021-01-09" => 20210109
    ==========================
    """
    function get_int_day(mydate::Date)
        day_str = ""
        y = string(mydate |> year)
        m = string(mydate |> month) |> format_str_00
        d = string(mydate |> day) |> format_str_00
        parse(Int64, string(y, m, d))
        return parse(Int64, string(y, m, d))
    end
    
    get_int_day(Date("2021-01-09"))
    get_int_day(Date("2021-2-1"))
    get_int_day(Date("2021-12-14"))
    
    
    """
    从日k线中,生成每根周线k线的【起始日期】和【终止日期】信息。作用:【个股日线生成周线的时候,用这两个日期,进行检索】
    
    ==输出==
         |序号  开始日期    结束日期   交易天数
    Row  │ idx  date_start  date_end  cnt
         │ Any  Any         Any       Any
    ─────┼────────────────────────────────
       1 │ 1    20091203    20091204  2
       2 │ 2    20091207    20091211  5
       3 │ 3    20091214    20091218  5
       4 │ 4    20091221    20091225  5
       ⋮  │  ⋮       ⋮          ⋮       ⋮
     562 │ 562  20201116    20201120  5
     563 │ 563  20201123    20201127  5
     564 │ 564  20201130    20201202  3
    
    """
    function gen_week_kline_info(df300)
        df = DataFrame(idx = [],date_start = [],date_end = [],cnt = [])
        idx = 0                 #每根k线的序号
        date_start = 19700101   #起始日期【用int来存储日期】
        date_end = 19700101     #结束日期
        cnt = 0                 #本周交易日的天数
    
        last_week = -1          #上一根k线的week值,计算周线的时候,week跳变是标志事件
        for i in 1:size(df300,1)
            current_week = df300[i,"week"]
            if current_week != last_week  #新的一周开始了
                #把上一周的信息假如到df中
                if idx > 0
                    date_end = df300[i-1,"day"] |> get_int_day
                    push!(df,[idx,date_start,date_end,cnt])
                end
    
                #新的一周的计数
                date_start = df300[i,"day"] |> get_int_day
                idx += 1
                cnt = 1
            else                        #同一个周
                cnt += 1
            end
    
            last_week = current_week
    
            if i == size(df300,1) #最后在生成一根k线
                date_end = df300[i,"day"] |> get_int_day
                push!(df,[idx,date_start,date_end,cnt])
            end
        end
        df
    end
    
    function gen_week_kline_info_test()
        #读取hs300大盘数据
        df300 = CSV.read("data/399300.csv",DataFrame)
        df300 |> display
    
        #计算week
        df300[!,"week"] = df300.day .|> week
        kline_info = gen_week_kline_info(df300)
        kline_info |> display
    end
    
    gen_week_kline_info_test()
    
    """
    在GPU中生成大盘周线的概要信息
    
    输入:
    kline_info   周k线的概要信息df
    
    输出:
    命名元组:
    (
    idx = CuArray(kline_info.idx .|> Int64),               #序号
    start_date = CuArray(kline_info.date_start .|> Int64), #开始日期
    end_date =  CuArray(kline_info.date_end .|> Int64),    #结束日期
    cnt = CuArray(kline_info.cnt .|> Int64)                #本周交易日天数
    )
    
    """
    function gen_cu_dapan_kline_info(kline_info)
        return(
        idx = CuArray(kline_info.idx .|> Int64),               #序号
        start_date = CuArray(kline_info.date_start .|> Int64), #开始日期
        end_date =  CuArray(kline_info.date_end .|> Int64),    #结束日期
        cnt = CuArray(kline_info.cnt .|> Int64)                #本周交易日天数
        )
    end
    
    """
    释放cu_dapan_kline_info在GPU上的内存
    """
    function free_cu_dapan_kline_info(cu_info)
        cu_info.idx |> CUDA.unsafe_free!         #序号
        cu_info.start_date |> CUDA.unsafe_free!  #开始日期
        cu_info.end_date |> CUDA.unsafe_free!    #结束日期
        cu_info.cnt |> CUDA.unsafe_free!         #本周交易日天数
    
        cu_info = nothing
    end
    
    function gen_cu_dapan_kline_info_test()
        #读取日k线数据
        df300 = CSV.read("data/399300.csv",DataFrame)
        df300 |> display
    
        #计算week
        df300[!,"week"] = df300.day .|> week
    
        #生成k线概要信息
        kline_info = gen_week_kline_info(df300)
        kline_info |> display
    
        #在gpu中生成k线概要信息
        dapan_cu = gen_cu_dapan_kline_info(kline_info)
        dapan_cu |> display
    
        dapan_cu.idx |> typeof |> display
    
        dapan_cu
        free_cu_dapan_kline_info(dapan_cu)
    
        try
            dapan_cu.idx |> display
        catch e
    
            println("执行代码[dapan_cu.idx |> display] 出错,info = ",e)
        end
    
    end
    
    gen_cu_dapan_kline_info_test()
    
    """
    在GPU中生成CuArray,用于缓存计算结果
    """
    function gen_rtn_cu(N::Int64)
        return(
        idx = CUDA.ones(Int64,N),       #k线的序号
        day = CUDA.ones(Int64,N),       #日期
        open = CUDA.ones(Float64,N),    #开盘价
        high = CUDA.ones(Float64,N),    #最高价
        low = CUDA.ones(Float64,N),     #最低价
        close = CUDA.ones(Float64,N),   #收盘价
        volume = CUDA.ones(Float64,N),  #成交量
        money = CUDA.ones(Float64,N),   #成交额
        cnt_dapan = CUDA.ones(Int64,N), #本周大盘有多少个交易日
        cnt = CUDA.ones(Int64,N)        #本周个股有多少个交易日
        )
    end
    
    """
    释放gpu中的rtn_cu内存
    """
    function free_rtn_cu(rtn)
        """
        如何编程自己循环捕捉对象,然后释放对象
        """
        rtn.idx |> CUDA.unsafe_free!       #k线的序号
        rtn.day |> CUDA.unsafe_free!       #日期
        rtn.open |> CUDA.unsafe_free!      #开盘价
        rtn.high |> CUDA.unsafe_free!      #最高价
        rtn.low |> CUDA.unsafe_free!       #最低价
        rtn.close |> CUDA.unsafe_free!     #收盘价
        rtn.volume |> CUDA.unsafe_free!    #成交量
        rtn.money |> CUDA.unsafe_free!     #成交额
        rtn.cnt_dapan |> CUDA.unsafe_free! #本周大盘有多少个交易日
        rtn.cnt |> CUDA.unsafe_free!       #本周个股有多少个交易日
        rtn = nothing
    
    end
    
    function gen_rtn_cu_test()
        #生成第一个rtn
        res1 = gen_rtn_cu(1024)
        res1.idx .+= 1
        res1.idx |> display
    
        #生成第二个rtn
        res2 = gen_rtn_cu(1024)
        res2.idx |> display
    
        #释放对象并测试
        free_rtn_cu(res1)
        try
            res1.idx |> println
        catch
            println("执行代码:res1.idx |> println ,因为res1已经销毁了!")
        end
    end
    
    gen_rtn_cu_test()
    
    
    """
    把k线的数据载入到GPU中
    ====输入====
    df k线数据df
    ====输出===
    命名元组
    (
    day = CuArray(df.day .|> get_int_day)  #k线的日期
    open = CuArray(df.open)    #开盘价
    high = CuArray(df.high)    #最高价
    low = CuArray(df.low)      #最低价
    close = CuArray(df.close)  #收盘价
    vol = CuArray(df.vol)      #成交量
    money = CuArray(df.money)  #成交额
    )
    
    """
    function gen_cu_stock_kline(df)
        return(
        day = CuArray(df.day .|> get_int_day),  #k线的日期
        open = CuArray(df.open),    #开盘价
        high = CuArray(df.high),    #最高价
        low = CuArray(df.low),      #最低价
        close = CuArray(df.close),  #收盘价
        vol = CuArray(df.vol),      #成交量
        money = CuArray(df.money)  #成交额
        )
    end
    
    """
    释放cu_stock_kline对象在GPU上的内存
    """
    function free_cu_stock_kline(stock_cu)
        stock_cu.day |> CUDA.unsafe_free!    #k线的日期
        stock_cu.open |> CUDA.unsafe_free!   #开盘价
        stock_cu.high |> CUDA.unsafe_free!   #最高价
        stock_cu.low |> CUDA.unsafe_free!    #最低价
        stock_cu.close |> CUDA.unsafe_free!  #收盘价
        stock_cu.vol |> CUDA.unsafe_free!    #成交量
        stock_cu.money |> CUDA.unsafe_free!  #成交额
        stock_cu = nothing
    end
    
    function gen_cu_stock_kline_test()
        df = CSV.read("data/股票/000001.csv",DataFrame)
        stock_cu = gen_cu_stock_kline(df)
    
        stock_cu.open |> display
    
        free_cu_stock_kline(stock_cu)
        try
            stock_cu.day |> display
        catch e
            println("执行代码【stock_cu.day |> display】出错",e)
        end
    
    end
    
    gen_cu_stock_kline_test()
    
    
    
    """
    GPU核函数:生成周k线
    
    dapan_idx_d,
    dapan_date_start_d,
    dapan_date_end_d,
    dapan_cnt_d,
    
    arg_day_d,
    arg_open_d,
    arg_high_d,
    arg_low_d,
    arg_close_d,
    arg_vol_d,
    arg_money_d,
    
    rtn_idx_d,
    rtn_day_d,
    rtn_open_d,
    rtn_high_d,
    rtn_low_d,
    rtn_close_d,
    rtn_volume_d,
    rtn_money_d,
    rtn_cnt_dapan_d,
    rtn_cnt_d,
    """
    function gen_weekly_kline_kernel!(
        dapan_idx_d,
        dapan_date_start_d,
        dapan_date_end_d,
        dapan_cnt_d,
        arg_day_d,
        arg_open_d,
        arg_high_d,
        arg_low_d,
        arg_close_d,
        arg_vol_d,
        arg_money_d,
        rtn_idx_d,
        rtn_day_d,
        rtn_open_d,
        rtn_high_d,
        rtn_low_d,
        rtn_close_d,
        rtn_volume_d,
        rtn_money_d,
        rtn_cnt_dapan_d,
        rtn_cnt_d,
    )
        index = (blockIdx().x - 1) * blockDim().x + threadIdx().x
    
        if index <= length(dapan_idx_d) #线程的数量多余任务的数量,防止越界
    
            #初始化
            rtn_idx_d[index] = dapan_idx_d[index]        #序号
            rtn_day_d[index] = dapan_date_end_d[index]   #周k线的日期
            rtn_cnt_dapan_d[index] = dapan_cnt_d[index]  #大盘在该周有几天交易日,用于跟rtn_cnt_d比对,看看个股在该周是否有停牌
            rtn_open_d[index] = -Inf      #开盘价    open = day_num == 1 的 open
            rtn_high_d[index] = -Inf      #最高价    取较大的值
            rtn_low_d[index] = Inf        #最低价    取较小的值
            rtn_close_d[index] = NaN      #收盘价    用当前close覆盖
            rtn_volume_d[index] = 0.0     #成交量    累加
            rtn_money_d[index] = 0.0      #成交资金  累加
            rtn_cnt_d[index] = 0          #交易日数  个股在每个周的交易日数量统计
    
            #遍历个股的数据,找到符合条件的,则进行处理
            for i = 1:length(arg_day_d)
                if dapan_date_start_d[index] <= arg_day_d[i] <= dapan_date_end_d[index]
                    #交易日数量
                    rtn_cnt_d[index] += 1
    
                    #开盘价
                    if rtn_cnt_d[index] == 1
                        rtn_open_d[index] = arg_open_d[i]
                    end
    
                    #最高价
                    if arg_high_d[i] > rtn_high_d[index]
                        rtn_high_d[index] = arg_high_d[i]
                    end
    
                    #最低价
                    if arg_low_d[i] < rtn_low_d[index]
                        rtn_low_d[index] = arg_low_d[i]
                    end
    
                    #收盘价,每天更新覆盖
                    rtn_close_d[index] = arg_close_d[i]
    
                    #成交量
                    rtn_volume_d[index] += arg_vol_d[i]
    
                    #成交资金
                    rtn_money_d[index] += arg_money_d[i]
                end
            end
        else
            #
        end
        return nothing
    end
    
    
    """
    把GPU中的rtn复制到host中,组成一个df
    """
    function gen_df_from_gpu(rtn_cu)
        #从GPU复制数据
        idx = rtn_cu.idx |> Array
        day = rtn_cu.day |> Array
        open = rtn_cu.open |> Array
        high = rtn_cu.high |> Array
        low = rtn_cu.low |> Array
        close = rtn_cu.close |> Array
        volume = rtn_cu.volume |> Array
        money = rtn_cu.money |> Array
        dapan_cnt = rtn_cu.cnt_dapan |> Array
        cnt = rtn_cu.cnt |> Array
    
        df = DataFrame(idx = idx,day = day,open = open,high = high,low = low,close = close,volume = volume,money = money,dapan_cnt=dapan_cnt,cnt = cnt)
    end
    
    
    function main2(stocks_dict)
        #加载大盘数据
        df300 = CSV.read("data/399300.csv", DataFrame)
        df300[!, "week"] = df300.day .|> week
    
        #生成k线概要信息
        dapan_kline_info = gen_week_kline_info(df300)
    
        #在gpu中生成k线概要信息
        dapan_cu = gen_cu_dapan_kline_info(dapan_kline_info)
    
        #free_cu_dapan_kline_info(dapan_cu)
    
        t1 = time()
    
        #Threads.@threads for stock in stocks_dict |> keys |> collect
        for stock in stocks_dict |> keys |> collect
            df = stocks_dict[stock]
            #println(stock)
            stock_cu = gen_cu_stock_kline(df)
    
            N = size(dapan_kline_info, 1)
            rtn_cu = gen_rtn_cu(N)
    
            #启动CUDA内核进行计算
            numblocks = ceil(Int, N / 256)
            CUDA.@sync begin
                @cuda threads = 256 blocks = numblocks gen_weekly_kline_kernel!(
                    dapan_cu.idx,
                    dapan_cu.start_date,
                    dapan_cu.end_date,
                    dapan_cu.cnt,
                    stock_cu.day,
                    stock_cu.open,
                    stock_cu.high,
                    stock_cu.low,
                    stock_cu.close,
                    stock_cu.vol,
                    stock_cu.money,
                    rtn_cu.idx,
                    rtn_cu.day,
                    rtn_cu.open,
                    rtn_cu.high,
                    rtn_cu.low,
                    rtn_cu.close,
                    rtn_cu.volume,
                    rtn_cu.money,
                    rtn_cu.cnt_dapan,
                    rtn_cu.cnt,
                )
    
                #从GPU上获取数据
                df = gen_df_from_gpu(rtn_cu)
                #CSV.write("data/gpu周线/$(stock).csv", df)
    
                #清空gpu变量
                #free_cu_dapan_kline_info(dapan_cu)
                free_cu_stock_kline(stock_cu)
                free_rtn_cu(rtn_cu)
            end
        end
    
        t2 = time()
    
        println(t2 - t1)
    
    end
    
    #读取所有的股票
    stocks = readdir("data/股票") .|> item->split(item,".")[1]
    
    """
    读取所有的股票k线,存到一个字典中备用
    """
    function get_stocks_dict(stocks)
        mydict = Dict()
        for s in stocks
            mydict[s] = CSV.read("data/股票/$(s).csv",DataFrame)
        end
        mydict
    end
    
    stocks_dict = get_stocks_dict(stocks)
    
    main2(stocks_dict)
    

    相关文章

      网友评论

          本文标题:julia GPU计算之CUDA编程——用日k线来计算周k线

          本文链接:https://www.haomeiwen.com/subject/ggwjyltx.html