import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from operator import add
def append(a, b):
if b[0] not in a:
a[b[0]] = b[1]
return a
def extend(a, b):
z = a.copy()
z.update(b)
return z
def ngram(a):
result_list = []
udid = a[0]
cid_list = a[1]
n_gram_limit = 4
for n_gram in range(2, n_gram_limit+1):
for i in range(len(cid_list)-1):
for j in range(1, n_gram):
if i + j < len(cid_list):
result_list.append((cid_list[i], cid_list[i+j]))
return result_list
def appendNgram(a, b):
if b not in a:
a[b] = 1
else:
a[b] += 1
return a
def extendNgram(a, b):
z = a.copy()
for key, value in b.items():
if key in z:
z[key] += value
else:
z[key] = value
return z
def mapToResult(k):
cid = k[0]
count_limit = 16
result_dict = {}
for key, value in k[1].items():
if value >= count_limit:
result_dict[key] = value
cid_list = ",".join([i[0]+'_'+str(i[1]) for i in sorted(result_dict.items(), key=lambda x:int(x[1]), reverse=True)[:100]])
if not cid_list:
return []
output = [cid + '\t' + cid_list]
return output
ss = SparkSession\
.builder\
.appName("NGL")\
.getOrCreate()
sc = ss.sparkContext
# 样本结构 0006C44152A5B97426D45336DB766670为用户id,6406151472878191617为视频id,12为播放时间(为了区分视频播放的前后顺序)
# 0006C44152A5B97426D45336DB766670 6406151472878191617 120#334#0#0#0#0#0#0#0#0#0 1553691542 12
#text_file = sc.textFile("hdfs:///user/hdfs/zhanggaochao/test/rl.dat")
#text_file = sc.textFile("/data/zhanggaochao/sparkdemo/rl.dat")
#text_file = sc.textFile("hdfs://nameservice1/user/hdfs/zhanggaochao/common/rl/history_90d_20190627")
text_file = sc.textFile(sys.argv[1])
sample = text_file.map(lambda line: (line.split("\t")[0], [line.split("\t")[1], line.split("\t")[4]]))
# (0006C44152A5B97426D45336DB766670, [6406151472878191617, 12])
combined_sample = sample.aggregateByKey({}, append, extend)
# (0006C44152A5B97426D45336DB766670, {6406151472878191617:12, 6406151472878191617:13})
sorted_sample = combined_sample.map(lambda a: (a[0], [i[0] for i in sorted(a[1].items(), key=lambda x:x[1])]))
# (0006C44152A5B97426D45336DB766670, [6406151472878191617,6406151472878191617])
ngram_sample = sorted_sample.flatMap(ngram)
# (6406151472878191617,6406151472878191617)
ngramcount_sample = ngram_sample.aggregateByKey({}, appendNgram, extendNgram)
# (6406151472878191617, {6406151472878191617:12, 6406151472878191617:13})
ngramsorted_sample = ngramcount_sample.flatMap(mapToResult)
# 6406151472878191617 6406151472878191617_800,6406151472878191617_600
#ngramsorted_sample.saveAsTextFile("hdfs://nameservice1/user/hdfs/zhanggaochao/ngchain/core_20190627")
#ngramsorted_sample.saveAsTextFile("hdfs://nameservice1/user/hdfs/zhanggaochao/ngchain/core_20190627")
ngramsorted_sample.saveAsTextFile(sys.argv[2])
sc.stop()
网友评论