MapReduce

作者: Bounty_Hunter | 来源:发表于2019-07-26 15:32 被阅读0次

    Week 8 MapReduce

    MapReduce

    思想

    分而治之

    把一个复杂的任务划分为若干个简单的任务分别来做

    原因

    在现实情况下,我们要分析的数据数据量会相当大,这样一台计算机就不足以做这种数据的处理,原因有二:

    • 内存(memory) 不足
    • 算力(CPU)不够

    对于大规模的数据处理任务,需要许多计算机/超算同时做一件任务(并行计算)

    组成

    image

    数据

    分析需要的海量数据,随机地存储于这些计算机上。

    不需要/不现实 :统一地把数据一起存到一个超大的硬盘上。

    数据直接分散在这些计算机上,他们不仅充当数据的处理器,也是充当数据存储的硬盘。

    分工

    • Master,Master是负责调度的,相当于工地的工头。

    • Worker,相当于干活儿的工人。

      • Woker进一步分为两种

        • Mapper 执行处理数据函数

        • Reducer 汇总数据,交付输出

    Master将M分成许多小份,然后每一份分给一个Mapper来做,Mapper干完活儿(执行完函数),将自己那一份儿活儿的结果传给Reducer。Reducer之后统计汇总各个Mapper传过来的结果,得到最后的任务的答案。

    假设原始任务的Input个数为M,output个数为N。Mapper的个数为P,Reducer的个数为R。

    • 每个output有一个编号,假设为o1,o2,o3…oN。
    • 每个Mapper要做M/P个input的处理任务

      当一个Mapper处理完自己那一份儿input之后,每个input i被处理后转化为一个中间结果m。

      每个中间结果m很自然地会若干output (如:m1对应o1,o3,o5) 会有贡献。

    • 每个Reducer要做N/R个output的汇总工作。

      每个Reducer负责一个或多个o的汇总处理。

      假如某个Reducer负责o1,o2,o3,那么凡是对应到o1,o2,o3的被处理过的m都会传给这个Reducer做汇总处理。

    过程

    以 word count 为例

    MapReduce 有六个步骤:

    1. 输入 input

      Hello Java
      Hello C
      Hello Java
      Hello C++
      
    2. 拆分 split ,将上述文档中每一行的内容转换为key-value对

      0 - Hello Java
      1 - Hello C
      2 – Hello Java
      3 - Hello C++
      
    3. 映射 map,将拆分之后的内容转换成新的key-value对

      #mapper0
      (Hello , 1)
      (Java , 1)
      
      #mapper1
      (Hello , 1)
      (C , 1)
      
      #mapper2
      (Hello , 1)
      (Java , 1)
      
      #mapper3
      (Hello , 1)
      (C++ , 1)
      
    4. 派发 shuffle,将key相同的放到一起

      这一步需要移动数据,原来的数据可能在不同的datanode上,这一步过后,相同key的数据,会被移动到同一台机器上。最终,它会返回一个list包含各种k-value对。

      {Hello: 1,1,1,1}
      {Java: 1,1}
      {C: 1}
      {C++: 1}
      
    5. 缩减 reduce,把同一个key的结果加在一起

      (Hello , 4)
      (Java , 2)
      (C , 1)
      (C++,1)
      
    6. 输出 output,输出缩减之后的所有结果

    模拟 MapReduce 实现过程( 以 词频 分析为例 )

    input / spilt

    输入需要处理的文本,将其分割成若干份,交给不同的 Mapper 处理

    input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"
    
    # Split the string into lines and store in a list
    lines_of_text = input_str.split("\n")
    
    print(lines_of_text)
    
    • str.split(sep=None, maxsplit=-1)

      分割 字符串 str.split

      • sep : 分隔符
      • maxsplit : 分割次数,如果是 -1 则尽可能地分割(贪婪)

    Map

    每个 Mapper 对分到的文本块(chunk)进行操作

    # We will store the output of map_fn in here
    word_count_lists = []
    
    # For every line of text
    for line in lines_of_text:
        # Apply the map function (split and count words)
        # Save the result as a list in our list
        word_count_lists.append(list(map_fn(line)))
    
    # Show the result of mapping
    print(word_count_lists)
    
    import itertools
    
    # word_count_lists is a list of lists
    # Flatten the list of words to make it simpler by chaining lists together
    word_count_list_flat = list(itertools.chain.from_iterable(word_count_lists))
    
    print(word_count_list_flat)
    
    1. 生成一个 ==统计各行单词个数==的 list
    2. 对于 split 操作过生成的 字符串 list 的每个元素,用 map_fn 统计每一个行单词的个数,并储存在 list 里
    3. 导入 itertools 库,这个库都是基于迭代的基本操作
    4. itertools.chain.from_iterable() 函数 将 list 里的 各个 list 的元素拿出来组成一个新的 list

    itertools.chain.from_iterable(iterable)

    轻松快速的辗平一个列表,相当于

    def from_iterable(iterables):
        # chain.from_iterable(['ABC', 'DEF']) --> A B C D E F
        for it in iterables:
            for element in it:
                yield element
    

    例子

    a_list = [[1, 2], [3, 4], [5, 6]]
    print(list(itertools.chain.from_iterable(a_list)))
    # Output: [1, 2, 3, 4, 5, 6]
    
    
    

    map_fn

    mapper 做的操作,tut里做的是统计每个词在每句中的词频,可以根据不同的需求更改功能。

    import re
    WORD_RE = re.compile(r"[\w']+")
    
    def map_fn(chunk):
        # Use the regex to find all words in each chunk
        for word in WORD_RE.findall(chunk):
            # Emit a result using the word as the key and number 
            yield (word.lower(), 1)
    

    用正则表达式找到每一块文本中的每个单词,返回一个生成器。生成器(generator)生成由单词和其出现次数的 list。

    正则表达式 ==[\w']+==

    [\w'] 指字母和单引号'

    匹配所有由一个或多个 [\w'] 组成的单词

    用途
    1. 在编写处理字符串的程序或网页时,经常会有查找符合某些复杂规则的字符串的需要。

    正则表达式就是用于描述这些规则的工具。换句话说,正则表达式就是记录文本规则的代码

    ​ 比如你可以编写一个正则表达式,用来查找所有以0开头,后面跟着2-3个数字,然后是一个连字号“-”, 最后是7或8位数字的字符串(像010-12345678或0376-7654321)。

    1. 类似于Control + F 但是功能强大的多

    2. 更主要的原因是,程序执行比嵌套条件判断效率高的多。

    入门
    1. Hi

      有两个字符,第一个是 h,第二个是 i

      通常正则表达式会有选项选择是否忽略大小写,默认是区分的。

      由于许多单词中也包含 hi, 例如 history , 如果我们只需要 找 hi 这个单词的话,要用\bhi\b

    2. \bhi\b

      \b 标识单词的的开头和结尾

      注意这里 识别不出 hihi , 因为 hihi 是另一个单词

    3. \bhi\b.*\blucy\b

      hi后面不远处有一个 lucy

      . 匹配除换行符以外的任意字符

      * *前边的内容可以连续重复使用任意次以使整个表达式得到匹配

    4. 常用的元字符(metacharacter)

      代码 说明
      . 匹配除换行符以外的任意字符
      \w 匹配字母或数字或下划线或汉字
      \s 匹配任意的空白符
      \d 匹配数字
      \b 匹配单词的开始或结束
      ^ 匹配字符串的开始
      $ 匹配字符串的结束
    5. 0\d\d-\d\d\d\d\d\d\d\d

      以0开头,然后是两个数字,然后是一个连字号“-”,最后是8个数字

      但是这样写如果重复的符号个数很麻烦,所以引入限定符{8}

      0\d{2}-\d{8}

      即 先是0然后 \d必须重复2次(2个数字),接着是 - ,最后是8个数字

    1. 限定符

      代码/语法 说明
      * 重复零次或更多次
      + 重复一次或更多次
      ? 重复零次或一次
      {n} 重复n次
      {n,} 重复n次或更多次
      {n,m} 重复n到m次
    2. 字符集合

      \(?0\d{2}[) -]?\d{8}

      匹配几种格式的电话号码,像(010)88886666,或022-22334455,或02912345678

    如果想匹配没有预定义元字符的字符集合(比如元音字母a,e,i,o,u), 只需要在方括号里列出它们就行了,像[aeiou]就匹配任何一个英文元音字母,[.?!]匹配标点符号(.或?或!)

    也可以指定一个字符范围

    ​ 像[0123456789]代表的含意与\d就是完全一致的:一位数字;

    ​ [a-z0-9A-Z] 等同于\w

    1. 例子

    一个网站如果要求你填写的QQ号必须为5位到12位数字时,可以使用:^\d{5,12}$

    1. 函数

      • 在使用 正则表达式 之前需要将 re 库文件导入程序

        import re
        
      • re.compile( pattern )

        将正则关系式转化成一个 正则关系 对象,供之后使用 re.match()re.search() 等函数

        r 声明后面的字符串是普通字符串

        u 声明后面的字符串以 Unicode 编码

        b 声明后面的字符串用 byte 类型(01)

      • re.fandall(pattern, string )

        返回所有 符合 正则表达式的 词组,如果不止一个就返回包含他们的 list

    生成器(generator)

    我们在用 for 循环时,往往是通过遍历 List 的各个元素实现的:

    for i in range(1000):
    

    虽然这样也能完成任务,

    但是该函数在运行中占用的内存会随着参数 max 的增大而增大,如果要控制内存占用,最好不要用 List。

    用 ==流水线== 的方式解决这个问题

    打个比方:流水线自动给料机

    image

    不用找一个特别大的容器装着原料,原料全部生产后再交给下一个机器;而是来一件原料机器就加工一件。

    for 循环不用 list 可以通过 一个函数 每次生成list里的一个 元素,再用另一个函数进行操作

    # 斐波那契数列
      
    
    def fab(max): 
      n, a, b = 0, 0, 1 
      while n < max: 
          yield b 
            # print b 
          a, b = b, a + b 
          n = n + 1 
    for n in fab(5): 
      print (n)
    
    >>>
    1 
    1 
    2 
    3 
    5
    
    • yield 的作用就是把一个函数变成一个 generator,带有 yield 的函数不再是一个普通函数。
    • Python 解释器会将其视为一个 generator,在 for 循环执行时,每次循环都会执行 fab 函数内部的代码,执行到 yield b 时,fab 函数就返回一个迭代值。
    • 下次迭代时,代码从 yield b 的下一条语句继续执行,而函数的本地变量看起来和上次中断执行前是完全一样的,于是函数继续执行,直到再次遇到 yield。
    map_fn 的解释
    import re
    WORD_RE = re.compile(r"[\w']+")
    
    def map_fn(chunk):
        # Use the regex to find all words in each chunk
        for word in WORD_RE.findall(chunk):
            # Emit a result using the word as the key and number 
            yield (word.lower(), 1)
    
    1. 导入 正则表达式库
    2. 正则表达式 匹配 所有包括一个或多个由 字母和单引号组成的单词 的 词组
    3. 定义 map_fn 函数(生成器),函数的参数为 导入的 字符串块
    4. findall函数返回所有 单词组成的 list
    5. 对于每一个 元素,生成一个 单词全小写和 数字1 的 list

    Shuffle and Sort

    派发 shuffle,将key相同的放到一起,返回一个list包含各种key-value对

    # SHUFFLE/SORT STAGE
    from collections import defaultdict
    
    # Create a dictionary where the default value is a list
    word_tuple_dict = defaultdict(list)
    
    for kv_pair in word_count_list_flat:
        # For each unique key append the (word, count) tuple to that keys list
        word_tuple_dict[kv_pair[0]].append(kv_pair)
    
    # Print it in a nice format:
    for k, v in word_tuple_dict.items():
        print(str(k) +": " + str(v))
    
    1. 导入 collections 库里的 defaultdict 类

    2. 将list 作为defaultdict类的初始化函数参数,即每个 defaultdict 的成员都是 list,并别每个成员都有 default_value

    3. 对于word_count_list_flat 的每一个元素(一个 key - number 的 list) ,将第零个元素作为 字典 dict 的 key值,第一个元素加入 value 的 list 中。

    defaultdict 类

    这个类和 传统的 dict 类 基本一致,只是改写了个别函数,可以看做是 dict 类的子类

    最重要的区别,也是为什么要用这个类的原因:

    defaultdict 类的初始化函数接受一个类型作为参数,当所访问的键不存在的时候,可以实例化一个值作为默认值:

    # 初始化函数接受一个类型作为参数
    >>> from collections import defaultdict
    >>> dd = defaultdict(list)
    >>> dd
    defaultdict(<type 'list'>, {})
    
    # 当所访问的键不存在的时候,可以实例化一个值作为默认值:
    
    >>> dd['foo']
    []
    >>> dd
    defaultdict(<type 'list'>, {'foo': []})
    >>> dd['bar'].append('quux')
    >>> dd
    defaultdict(<type 'list'>, {'foo': [], 'bar': ['quux']})
    

    这有什么意义呢?

    举个例子:

    strings = ('puppy', 'kitten', 'puppy', 'puppy',
               'weasel', 'puppy', 'kitten', 'puppy')
    counts = {}
    
    for kw in strings:
        counts[kw] += 1
    

    ​ 该例子统计strings中某个单词出现的次数,并在counts字典中作记录。单词每出现一次,在counts相对应的键所存的值数字加1。但是事实上,运行这段代码会抛出KeyError异常,出现的时机是每个单词第一次统计的时候,因为Python的dict中不存在默认值的说法,

    >>> counts = dict()
    >>> counts
    {}
    >>> counts['puppy'] += 1
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    KeyError: 'puppy'
    

    为了在执行第9行代码(将 list 的第零个元素作为 新的字典 dict 的 key值,第一个元素加入 value 的 list 中)时

    不会因为 dict 不存在该 key 值而报错。

    因此在对数据进行统计操作时,用 defaultdict 类取代 原来的 dict

    Reduce

    把同一个key的结果加在一起

    # REDUCE STAGE
    results = []
    
    for k, v in word_tuple_dict.items():
        # Get the counts from the list of k/v pairs
        vals_list = [t[1] for t in v]
        
        # Apply the reduce_fn to the word and counts pair
        # reduce_fn will yield a (key, value) tuple
        # inside a generator object which we convert to a list
        results.append(list(reduce_fn(k, vals_list)))
        
    print(results)
    
    1. 新建一个 结果 list 用来放 reduce 的结果

    2. 列表推导式( list comprehensions ),生成一个包含==该词在各句中出现次数==的 list,注意==‘is==’ 的值

    3. 将该词和上面的 list 传入 reduce_fn 函数中,得到该词以及总共出现次数的 list ,并添加到 结果 list

    为了好理解, 打印过程中各个变量的值:

    # REDUCE STAGE
    results = []
    
    print('word_tuple_dict.items\n', word_tuple_dict.items(),'\n')
    
    for k, v in word_tuple_dict.items():
    
        vals_list = [t[1] for t in v]
        
        print('vals_list: ',vals_list)
        print("List of results of reduce_fn: ", list(reduce_fn(k, vals_list)))
        
        results.append(list(reduce_fn(k, vals_list)))
        
        print('results: \n', results,'\n')
        
    print('final results: \n',results)
    

    列表推导式( list comprehensions)

    推导式

    推导式(又称解析式)是Python的一种独有特性。

    推导式是可以从一个数据序列构建另一个新的数据序列的结构体。

    共有:列表推导式、字典推导式和集合推导式

    列表推导式

    列表推导式(又称列表解析式)提供了一种简明扼要的方法来创建列表。

    结构
    1. 推导式 被 中括号括在里面 ,代表推导的是个 list
    2. 新建一个变量名,这个变量名参与之后的推导式,同时也是作为结果列在 list 的元素
    3. 建立一个for语句,然后是零个或多个for或者if语句。
    4. 在结果列表中加入新建的变量以iffor语句为上下文的表达式运行完成之后产生的元素
      • 那个表达式可以是任意的,可以在列表中放入任意类型的对象。
    例子
    multiples = [i for i in range(30) if i % 3 is 0]
    print(multiples)
    # Output: [0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
    
    用处

    快速生成 List,尤其是当你需要用 For 循环来生成 list 时。使代码更加简洁。

    #实现方式 1 
    squared = []
    for x in range(10):
        squared.append(x**2)
        
    #实现方式2
    squared = [x**2 for x in range(10)]
    

    Reduce_fn

    reducer做的工作,在 tut 里是将 mapper 统计的各句的词频进行求和。可以根据不同的需求更改功能

    def reduce_fn(key, values):
        yield (key, sum(values))
    

    定义一个生成器,参数是 每一个单词,以及单词在各句中的词频 list

    output

    输出缩减之后的所有结果

    # Flatten the results to make them more readable
    results_flat = list(itertools.chain.from_iterable(results))
    
    print(results_flat)
    
    1. 再次调用 itertools.chain.from_iterable函数将 results里的元素提了出来,并打印

    安装mockr

    Tutorial 里让大家使用的是 mockr 模块使用 MapReduce 架构。

    1. 打开 终端

      Mac 环境

      control+space 打开 Spotlight, 输入 term(终端),并打开

      Windows 环境 :

      Win + R 打开运行,输入cmd 打开命令行工具

    2. 用 PiP 工具 安装 mockr 模块

      pip install mockr
      

    使用 mockr 库实现

    处理字符串(字频统计)

    import re
    from mockr import run_stream_job
    
    WORD_RE = re.compile(r"[\w']+")
    
    def map_fn(chunk):
        for word in WORD_RE.findall(chunk):
            yield (word.lower(), 1)
    
    def reduce_fn(key, values):
        yield (key, sum(values))
    
    input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"
    
    results = run_stream_job(input_str, map_fn, reduce_fn)
    
    print(results)
    
    mockr.run_stream_job(input_data, map_fn, reduce_fn)

    将输入的 字符串 分成多个 Chunk, 分别进行 map 操作和 reduce 操作

    • input_data 将要处理的字符串
    • map_fn 处理 字符串Chunk,返回(key , value)的 list
    • reduce_fn 处理 mapper产生的(key,value )返回 (key, result) list

    除了Tutorial做的MapReduce处理字符串的操作外还有

    • 处理文本
    • 处理表格(pandas)

    可以看官网的例子

    例子

    相关文章

      网友评论

          本文标题:MapReduce

          本文链接:https://www.haomeiwen.com/subject/xvhdrctx.html