美文网首页
推荐系统 - Ngrel

推荐系统 - Ngrel

作者: 左心Chris | 来源:发表于2019-11-05 12:05 被阅读0次
    import sys
    
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    from operator import add
    
    def append(a, b):
        if b[0] not in a:
            a[b[0]] = b[1]
        return a
    
    def extend(a, b):
        z = a.copy()
        z.update(b)
        return z
    
    def ngram(a):
        result_list = []
        udid = a[0]
        cid_list = a[1]
        n_gram_limit = 4
        for n_gram in range(2, n_gram_limit+1):
            for i in range(len(cid_list)-1):
                for j in range(1, n_gram):
                    if i + j < len(cid_list):
                        result_list.append((cid_list[i], cid_list[i+j]))
        return result_list
    
    def appendNgram(a, b):
        if b not in a:
            a[b] = 1
        else:
            a[b] += 1
        return a
    
    def extendNgram(a, b):
        z = a.copy()
        for key, value in b.items():
            if key in z:
                z[key] += value
            else:
                z[key] = value
        return z
    
    def mapToResult(k):
        cid = k[0]
        count_limit = 16
        result_dict = {}
        for key, value in k[1].items():
            if value >= count_limit:
                result_dict[key] = value
        cid_list = ",".join([i[0]+'_'+str(i[1]) for i in sorted(result_dict.items(), key=lambda x:int(x[1]), reverse=True)[:100]])
        if not cid_list:
            return []
        output = [cid + '\t' + cid_list]
        return output
    
    ss = SparkSession\
            .builder\
            .appName("NGL")\
            .getOrCreate()
    
    sc = ss.sparkContext
    # 样本结构 0006C44152A5B97426D45336DB766670为用户id,6406151472878191617为视频id,12为播放时间(为了区分视频播放的前后顺序)
    # 0006C44152A5B97426D45336DB766670        6406151472878191617     120#334#0#0#0#0#0#0#0#0#0       1553691542      12
    #text_file = sc.textFile("hdfs:///user/hdfs/zhanggaochao/test/rl.dat")
    #text_file = sc.textFile("/data/zhanggaochao/sparkdemo/rl.dat")
    #text_file = sc.textFile("hdfs://nameservice1/user/hdfs/zhanggaochao/common/rl/history_90d_20190627")
    text_file = sc.textFile(sys.argv[1])
    sample = text_file.map(lambda line: (line.split("\t")[0], [line.split("\t")[1], line.split("\t")[4]])) 
    # (0006C44152A5B97426D45336DB766670, [6406151472878191617, 12]) 
    combined_sample = sample.aggregateByKey({}, append, extend)
    # (0006C44152A5B97426D45336DB766670, {6406151472878191617:12, 6406151472878191617:13})
    sorted_sample = combined_sample.map(lambda a: (a[0], [i[0] for i in sorted(a[1].items(), key=lambda x:x[1])]))
    # (0006C44152A5B97426D45336DB766670, [6406151472878191617,6406151472878191617])
    ngram_sample = sorted_sample.flatMap(ngram)
    # (6406151472878191617,6406151472878191617)
    ngramcount_sample = ngram_sample.aggregateByKey({}, appendNgram, extendNgram)
    # (6406151472878191617, {6406151472878191617:12, 6406151472878191617:13})
    ngramsorted_sample = ngramcount_sample.flatMap(mapToResult)
    # 6406151472878191617   6406151472878191617_800,6406151472878191617_600
    
    #ngramsorted_sample.saveAsTextFile("hdfs://nameservice1/user/hdfs/zhanggaochao/ngchain/core_20190627")
    #ngramsorted_sample.saveAsTextFile("hdfs://nameservice1/user/hdfs/zhanggaochao/ngchain/core_20190627")
    ngramsorted_sample.saveAsTextFile(sys.argv[2])
    
    sc.stop()
    

    相关文章

      网友评论

          本文标题:推荐系统 - Ngrel

          本文链接:https://www.haomeiwen.com/subject/bmusbctx.html