MapReduce — модель распределённых вычислений, используемая для параллельных вычислений над очень большими, несколько петабайт, наборами данных в компьютерных кластерах. Преимущество MapReduce заключается в том, что он позволяет распределенно производить операции предварительной обработки и свертки. Операции предварительной обработки работают независимо друг от друга и могут производиться параллельно (хотя на практике это ограничено источником входных данных и/или количеством используемых процессоров). Аналогично, множество рабочих узлов могут осуществлять свертку — для этого необходимо только чтобы все результаты предварительной обработки с одним конкретным значением ключа обрабатывались одним рабочим узлом в один момент времени. Хотя этот процесс может быть менее эффективным по сравнению с более последовательными алгоритмами, MapReduce может быть применен к большим объёмам данных, которые могут обрабатываться большим количеством серверов. Так, MapReduce может быть использован для сортировки петабайта данных, что займет всего лишь несколько часов. Параллелизм также дает некоторые возможности восстановления после частичных сбоев серверов: если в рабочем узле, производящем операцию предварительной обработки или свертки, возникает сбой, то его работа может быть передана другому рабочему узлу (при условии, что входные данные для проводимой операции доступны).
Создаем тестовую директорию в файловой системе hdfs:
$ su - hdfs $ hdfs dfs -mkdir /user/mytest $ hdfs dfs -ls /user/ Found 6 items drwxr-xr-x - hdfs supergroup 0 2016-12-27 14:43 /user/hadoop drwxr-xr-x - hdfs supergroup 0 2016-12-27 12:39 /user/hdfs drwxrwxrwx - mapred hadoop 0 2016-12-26 14:19 /user/history drwxrwxr-t - hive hive 0 2016-12-27 10:26 /user/hive drwxrwxr-x - hue hue 0 2016-12-27 10:27 /user/hue drwxr-xr-x - hdfs supergroup 0 2016-12-27 12:23 /user/mytest |
Создаем текстовый файл любого содержания и переносим его в HDFS:
$ hdfs dfs -copyFromLocal /tmp/datatest /user/mytest/ $ hdfs dfs -cat /user/mytest/datatest This guide describes how to quickly install Cloudera software and create initial deployments for proof of concept (POC) or development. It describes how to download and use the QuickStart virtual machines, which provide everything you need to start a basic installation. It also shows you how to create a new installation of Cloudera Manager 5, CDH 5, and managed services on a cluster of four hosts. QuickStart installations should be used for demonstrations and POC applications only and are not recommended for production. |
Создадим простое MapReduce-приложение на языке Python для подсчета слов.
Map:
$ cat > /tmp/map.py #!/usr/bin/env python import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1) |
Reduce:
$ cat > /tmp/reduce.py #!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count) |
Вызываем MapReduce приложение в среде Hadoop:
$ hadoop jar /opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/jars/hadoop-streaming-2.6.0-cdh5.9.0.jar -files /tmp/map.py,/tmp/reduce.py -mapper map.py -reducer reduce.py -input /user/mytest/datatest -output /user/mytest/outputtest packageJobJar: [] [/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/jars/hadoop-streaming-2.6.0-cdh5.9.0.jar] /tmp/streamjob6732053312263900298.jar tmpDir=null 17/01/10 12:03:06 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm122 17/01/10 12:03:07 INFO mapred.FileInputFormat: Total input paths to process : 1 17/01/10 12:03:07 INFO mapreduce.JobSubmitter: number of splits:2 17/01/10 12:03:07 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1484035485729_0001 17/01/10 12:03:08 INFO impl.YarnClientImpl: Submitted application application_1484035485729_0001 17/01/10 12:03:08 INFO mapreduce.Job: The url to track the job: http://master2.hadoop.stage.int.nic.ru:8088/proxy/application_1484035485729_0001/ 17/01/10 12:03:08 INFO mapreduce.Job: Running job: job_1484035485729_0001 17/01/10 12:03:16 INFO mapreduce.Job: Job job_1484035485729_0001 running in uber mode : false 17/01/10 12:03:16 INFO mapreduce.Job: map 0% reduce 0% 17/01/10 12:03:25 INFO mapreduce.Job: map 100% reduce 0% 17/01/10 12:03:34 INFO mapreduce.Job: map 100% reduce 100% 17/01/10 12:03:35 INFO mapreduce.Job: Job job_1484035485729_0001 completed successfully 17/01/10 12:03:35 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=616 FILE: Number of bytes written=393679 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=973 HDFS: Number of bytes written=550 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=1 Rack-local map tasks=1 Total time spent by all maps in occupied slots (ms)=13480 Total time spent by all reduces in occupied slots (ms)=5332 Total time spent by all map tasks (ms)=13480 Total time spent by all reduce tasks (ms)=5332 Total vcore-seconds taken by all map tasks=13480 Total vcore-seconds taken by all reduce tasks=5332 Total megabyte-seconds taken by all map tasks=13803520 Total megabyte-seconds taken by all reduce tasks=5459968 Map-Reduce Framework Map input records=1 Map output records=83 Map output bytes=692 Map output materialized bytes=628 Input split bytes=184 Combine input records=0 Combine output records=0 Reduce input groups=61 Reduce shuffle bytes=628 Reduce input records=83 Reduce output records=61 Spilled Records=166 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=151 CPU time spent (ms)=3110 Physical memory (bytes) snapshot=1091223552 Virtual memory (bytes) snapshot=4757819392 Total committed heap usage (bytes)=813170688 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=789 File Output Format Counters Bytes Written=550 17/01/10 12:03:35 INFO streaming.StreamJob: Output directory: /user/mytest/outputtest |
Т.е. в скрипт map.py отправляется на вход текстовый файл, а на вход reduce.py отправляется результат работы скрипта map.py.
Ответ в директории /user/mytest/outputtest.