一、背景说明
最近得到一批字典文件需要处理,当然原始的字典文件格式是不能直接当做字典来用的,而且这一批字典格式也不同一。刚开始都是一批压缩文件,经过脚本处理后解压出一批文本文件:
[hadoop@master dict]$ ll TXT
总用量 6676700
-rw-r--r--. 1 hadoop hadoop 102532669 3月 28 16:05 11-15.txt
-rw-r--r--. 1 hadoop hadoop 12515529 3月 28 16:05 123.txt
-rw-r--r--. 1 hadoop hadoop 180052 3月 28 16:05 133127.txt
-rw-r--r--. 1 hadoop hadoop 8537098 3月 28 16:05 142183.txt
-rw-r--r--. 1 hadoop hadoop 156511269 3月 28 16:05 14365003.txt
-rw-r--r--. 1 hadoop hadoop 121252829 3月 28 16:05 1-5.txt
-rw-r--r--. 1 hadoop hadoop 97542553 3月 28 16:05 16-20.txt
...
可以看一下几个文件的格式内容:
[hadoop@master dict]$ cat TXT/acg | tail -3
zzzzzzzzyy
zzzzzzzzz
zzzzzzzzzz
[hadoop@master dict]$ cat TXT/www.csdn.net | tail -3
suwei2007 suwei2007 love_flyweiwei@163.com
fangchengli 19860601 fangchengli@gmail.com
jxjaxa 05040603 jxjaxa@hotmail.com
[hadoop@master dict]$ cat TXT/hak5-withcount.txt | tail -3
1 #th1992#
1 #hjkyui67
1 b55273236542107
[hadoop@master dict]$ cat cred | tail -3
78450578-|--|-jbrueneman@cox.net-|-u22m/i9KQBI=-|-new coffee mug|--
78450579-|--|-matthew_j_gould@hotmail.com-|-hLctj76LgIHioxG6CatHBw==-|-It goes 1x2x3x|--
78450580-|--|-marialali@telefonica.net-|-qMtZ8ulqWL1fr+u4/BG3[hadoop@master dict]$
可以看到,文件内容格式是不统一的,而且不是每个字段都可以直接作为密码,比如邮箱字段,最好是将邮箱的用户名作为可参考的密码,比如123456@qq.com中的123456.
二、技术思路
整个技术处理流程可分为以下几个阶段:
- 预处理
将所有的文本文件的格式略处理一下,形成比较统一的、可方便集中处理的文本格式。
然后可以选择将所有的文本文件内容抽出几行,组合成一个包含了所有文本文件内容格式的文件,用于观察和测试。 - map阶段
map阶段可以用于提取一行文本的各个字段,并做处理,输出所需字段。 - reduce阶段
reduce阶段可用来去重。因为map阶段已经默认自动排序过了,所以这个阶段去下重就可以了。
三、技术实现
- 预处理
- 用脚本形成包含所有文本部分内容的文件
经过分析look.txt文件可发现,预处理主要需要处理一下cred文件:#! /bin/sh . ./shell.h check_shell_dir rm -f look.txt for file in TXT/* do if [ -f "${file}" ];then txt=$(head -3 "${file}") echo -e "${txt}" >> look.txt fi done
使用awk很容易得到需要的字段(处理邮箱字段,其他字段都不需要)[hadoop@master dict]$ cat cred | tail -3 78450578-|--|-jbrueneman@cox.net-|-u22m/i9KQBI=-|-new coffee mug|-- 78450579-|--|-matthew_j_gould@hotmail.com-|-hLctj76LgIHioxG6CatHBw==-|-It goes 1x2x3x|-- 78450580-|--|-marialali@telefonica.net-|-qMtZ8ulqWL1fr+u4/BG3[hadoop@master dict]$
重新形成cred文件,最后重新形成look.txt。[hadoop@master dict]$ cat cred | tail -3 | awk -F '-' '{print $5}' jbrueneman@cox.net matthew_j_gould@hotmail.com marialali@telefonica.net
- 编写map程序
从look.txt文件内容格式来看,各个需要的字段都是以空格或者tab分割的。邮箱需要特殊处理,mapper脚本如下:#!/usr/bin/env python3.6 import sys import re # input comes from STDIN (standard input) for line in sys.stdin: line = line.strip() words = filter(lambda word: word, line.split()) for word in words: word = re.sub(r'@[^.]*.com','',word) #只需要输出key就可以了,hadoop会自动按照key排序 print(word)
- 编写reduce程序
reduce阶段重点在于去重,可使用Python的字典来去重。#! /usr/bin/env python3.6 import sys import subprocess import os dict = {} for line in sys.stdin: word = line.strip('\n') if word not in dict.keys(): print(word) else: dict[word]=1
- 单机测试
直接执行如下命令进行单机测试:
显然,执行结果正确。[hadoop@master dict]$ cat look.txt | ./mapper.py | ./reducer.py zangzt1985 88300667 sunguangjin2006 sunguangjin shanwenyi3427.cn 1103887448 !@#$%^&* !@#$%^&*( !@#$%^&*() %null% %username% !@#$ !8zj39le
- 集群测试
可将测试文件look.txt上传至hadoop文件系统运行mapreduce来测试,具体方法略。 - 真正运行程序
程序最终成功执行,从运行日志来看,耗时26分钟,原始文件6.4G,输出文件6.7G。[hadoop@master dict]$ hadoop jar ../hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -input dict -output output -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py
17/04/01 20:14:31 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.230:8032 17/04/01 20:14:31 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.230:8032 ... 17/04/01 20:40:31 INFO mapreduce.Job: Job job_1491015080715_0001 completed successfully 17/04/01 20:40:31 INFO mapreduce.Job: Counters: 51
- reducer简单的优化
由于reducer接收到的key/value已经根据key排序了,在java程序中可以得到key-valuelist对:
但是在streaming应用中,貌似得到的是key/value对:keyA value1 value2 value3 ... keyB value1 value2 value3 ... keyC value1 value2 value3 ...
所以,其实不用Python字典也可以进行去重:keyA value1 keyA value2 keyA value3 keyB value1 keyB value2 keyB value3
但是实际仍然运行了26分钟。#! /usr/bin/env python3.6 import sys old="" for line in sys.stdin: word = line.strip('\n') if word == old : continue else : print(word) old = word
进一步而言,reducer其实也可以分布式执行。 - 问题
在mapper.py处理输入文件时,Python报错:
```
'utf-8' codec can't decode byte 0xb4 in position 0:invalid start byte
```
然而若Python版本换为2.6就没有问题。具体原因不在这里分析,最后通过将源文件转码解决问题:
[hadoop@master dict]$ iconv -f gb2312 -t utf-8 -c all.txt -o all.t
网友评论