MapReduce初探

MapReduce 是一种编程模型,用于处理大规模的数据。用户主要通过指定一个 map 函数和一个 reduce 函数来完成数据的处理。看到 map/reduce 很容易就联想到函数式编程,而实际上论文中也提到确实受到 Lisp 和其它函数式编程语言的启发。MapReduce的灵感来源于函数式语言(比如Lisp)中的内置函数map和reduce。函数式语言也算是阳春白雪了,离我们普通开发者总是很远。简单来说,在函数式语言里,map表示对一个列表(List)中的每个元素做计算,reduce表示对一个列表中的每个元素做迭代计算。
以 Python 为例,map/reduce 的用法如下:

from functools import reduce
from operator import add
ls = map(lambda x: len(x), ["ana", "bob", "catty", "dogge"])
# print(list(ls))
# => [3, 3, 5, 5]
reduce(add, ls)
# => 16

MapReduce 的优势在于对大规模数据进行切分(split),并在分布式集群上分别运行 map/reduce 并行加工,而用户只需要针对数据处理逻辑编写简单的 map/reduce 函数,MapReduce 则负责保证分布式运行和容错机制。

MapReduce的简单流程是在处理数据时,首先生成一个 job 将输入文件切分成独立的块(chunk),切块的大小是根据配置设定的。然后每个独立的文件块交给 map task 并行加工,得到一组 列表,MapReduce 再将 map 输出的结果按 k1 进行重新组合,再将结果传递给 reduce task,最后 reduce 计算得出结果。

Map作业是每一个worker 节点都对本地的数据进行map函数处理,然后将输出写入到一个临时文件中;
Reduce作业是每一个worker 节点并行处理每组的输出数据。

Map(k1,v1) → list(k2,v2)
Shuffle->整合数据,根据相同的key,进行整合。list(k2,v2)->k2, list (v2)
Reduce(k2, list (v2)) → list(v3)

更简单的说法就是MapReduce把一个任务分割成多个独立的子任务,子任务的分发由map实现,结果的合并由reduce实现。

针对于MapReduce平台,用户只需要写好map和reduce函数,其余内部的分配,调度不需要关心。这既加快了处理时间,又简化了代码编写,非常方便。
MapReduce最经典的案例就是Hadoop。首先在多态主机上部署Hadoop集群。通过Hadoop实现MapReduce。后续要深入学习。

因为我是一个Pythoner,因此不管什么都会研究一下在Python中的应用。

Hadoop虽然是由Java编写,但也提供了python的接口。Python可以利用Hadoop平台实现MapReduce功能。这里暂时不介绍,后续再学习。首先说一下用纯Python实现MapReduce。

用纯Python编写,并行用多进程实现,在单机中运行。

    #-*-coding:utf-8 -*-


import multiprocessing
import collections



class MapReduce(object):

    def __init__(self,mapper,reducer):
        self.mapper = mapper
        self.reducer = reducer
        self.pool = multiprocessing.Pool()

    def partition(self,mapped_value):
        result = []
        for item in mapped_value:
            result.extend(item)
        partition_data = collections.defaultdict(list)
        for key, value in result:
            partition_data[key].append(value)
        return partition_data.items()


    def __call__(self,inputs):
        mapped_result = self.pool.map(self.mapper,inputs,chunksize=1)
        mapped_value = self.partition(mapped_result)
        reduced_value = self.pool.map(self.reducer,mapped_value)
        return reduced_value





def mapper(logfile):
    mapped_value = []
    with file(logfile,'r') as f:
        for line in f.readlines():
            #print line
            line = line.split()
            #print line
            item = ()
            try:
                item = (line[0],1)
            except Exception,e:
                print str(e)
            mapped_value.append(item)
    return mapped_value



def reducer(item):
    cookie,occurances = item
    return (cookie,sum(occurances))




if __name__ == "__main__":
    mapreduce = MapReduce(mapper,reducer)
    import os
    logfile1 = os.environ.get("SPIDERPATH") + '/logs/spiderInfo-2017-03-08.log'
    logfile2 = os.environ.get("SPIDERPATH") + '/logs/spiderInfo-2017-03-09.log'
    logfile3 = os.environ.get("SPIDERPATH") + '/logs/spiderInfo-2017-03-10.log'
    result = mapreduce([logfile1,logfile2,logfile3])
    print result

这是参照网上写的一段代码,整体逻辑是遵循了MapReduce的思想,分了3步骤。

并行运算采用多进程模块的pool实现。

Map函数用来对原始数据进行提取,返回(k1,v1)的一个列表。 shuffle过程对数据进行整合,返回(k1,list(v1))的列表。 Reduce函数,将(k1,list(v1))变成list(v2)。

其实map和reduce的灵感就来自函数式编程,这类可以把Python的函数式编程拿出来。

map(lambda x:x+2,[1,3,4,5])   对列表的元素分别进行运算。
reduce(lambdax,y:x+y,[1,2,3,4])   对列表进行迭代运算。

使用Python写Hadoop平台的MapReduce,要利用Hadoop提供的API,最重要的是利用Hadoop流的API,通过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。

我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout。Hadoop流将会帮助我们处理别的任何事情。

因为,我这还没搭建Hadoop集群,就把官网的例子拿出来吧。该例子是用来计算单词出现的次数。

Mapper.py


#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

mapper提取标准输入的数据,返回一个元祖,每个单词可能出现不止一次,但是每一个单词出现都按照1次计算,然后让Reduce来计算单词出现的次数。

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

注意:上面的这个reducer,统计次数逻辑是建立在Hadoop已经预先对map的输出列表进行了排序。

参考: http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

--------EOF---------
微信分享/微信扫码阅读