HadoopはJavaで作られている。
だからHadoopに何か操作をさせたい場合には、通常、Javaで記述する必要がある。
しかしHadoopにはHadoop Streamingという仕組みがあり、早い話UNIXのStandard Stream要するに標準入出力を扱うことができる。
すなわち、UNIXの標準入出力の流儀に則ってさえいれば、お好きな言語で操作ができる。
Javaがまったく合わない私としては、Hadoop Streamはとてもありがたい。
これがなければHadoopに手を付ける気にはならなかった。
Hadoop Streamに必要なもの。
mapperとreducerを、好きな言語で書くだけ。
べつにmapperだけでもよいけど。
私はPython。
Hadoop streamにおけるmapperとreducerの概要。
mapperは何らかの入力を得て、キーと値(key, value)を出力する。
reducerはmapperからのkey, valueを受けて、keyごとにvalueを処理する。
なお、mapperの出力がreducerに渡されるとき、Hadoopがkeyごとにソートしてくれる。
この点はreducerの処理を簡単にする。詳細は後述。
試しにやってみること。
LAN向けのApacheのアクセス状況をカウントしてみる。
IPアドレスごとのアクセス回数だ。
ログは以下のようなもの。
アクセス元はすべてIPアドレスで記録されている。
1 2 3 |
192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/js/common/Editor/Iframe.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0" 192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/js/common/Editor/Textarea.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0" 192.168.100.106 - - [24/Jul/2013:22:35:26 +0900] "GET /MT/mt.js?v=5.14-ja HTTP/1.1" 304 - "http://192.168.100.5/MTcgi/mt.cgi?__mode=view&_type=entry&blog_id=2&id=330" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0" |
処理の流れ
1.mapperは、アクセスログからIPアドレスを抜き出す。
そして「<IPアドレス><タブ>1」を出力する。
これは、たとえば「192.168.1.1」から「1」回アクセスがあったよ、という意味。
2.hadoopがIPアドレスをキーにソート。
3.reducerは、IPアドレスごとに回数をカウントし、
「<IPアドレス><タブ><集計回数>」を出力する。
mapper.py
IPアドレスは、Apacheログにおいて、スペースを区切りにした第一フィールドに記載される。
だから一行ずつログを読んで、行頭のIPアドレスを抜き出し、その都度「<IPアドレス><タブ>1」を出力する。
「if “newsyslog” not in line:」は、システムメッセージ行を読み飛ばすため。
1 2 3 4 5 6 7 8 |
#!/usr/bin/env python import sys for line in sys.stdin: if "newsyslog" not in line: fields = line.strip().split() print '%s\t%s' % (fields[0],1) |
実行権限も忘れずにつける。
1 |
$ chmod a+x ./mapper.py |
実験。意図したとおり動いていますね。
1 2 3 4 5 6 7 8 |
$ cat ../input/httpd-access.log |./mapper.py 192.168.100.107 1 192.168.100.106 1 192.168.100.110 1 192.168.100.107 1 192.168.100.107 1 192.168.100.107 1 |
reducer.py
mapper.pyからの出力は、ソートされてreducer.pyに入力される。
上記のmapper.py出力例は、以下のようにソートされる。
1 2 3 4 5 6 |
192.168.100.106 1 192.168.100.107 1 192.168.100.107 1 192.168.100.107 1 192.168.100.107 1 192.168.100.110 1 |
だからreducerとしては、第一フィールドを上から読んでいって、keyが変化したら、そこまでのカウント数を出力する。
そしてそのkeyの事は、さっぱり忘れて次のkeyのカウントに移ることができる。
もしソートがなされていないならば、入力の終わりまですべてのkeyを保持しなければならない。Hadoopに感謝である。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
#!/usr/bin/env python import sys (last_key, count) = (None, 0) for line in sys.stdin: (key, val) = line.strip().split("\t") if last_key and last_key != key: print "%s\t%s" % (last_key, count) (last_key, count) = (key,int(val)) else: last_key = key count += int(val) if last_key: print "%s\t%s" % (last_key, count) |
実行権限を付ける。
1 |
$ chmod a+x reducer.py |
実験。間にsortを入れること。
問題なし。
1 2 3 4 5 6 7 |
$ cat ../input/httpd-access.log |./mapper.py |sort|./reducer.py 127.0.0.1 567 192.168.100.106 327 192.168.100.107 671 192.168.100.109 2 192.168.100.150 193 |
Hadoopで動かしてみよう。
まずカウント対象となるログをHDFSにコピーする。
1 2 3 4 5 6 7 8 9 10 |
[hadoop@isis ~]$ hadoop dfs -put apachelog apachelog [hadoop@isis ~]$ [hadoop@isis ~]$ hadoop dfs -ls Found 1 items drwxr-xr-x - hadoop supergroup 0 2013-07-28 12:57 /user/hadoop/apachelog [hadoop@isis ~]$ [hadoop@isis ~]$ hadoop dfs -ls apachelog Found 1 items -rw-r--r-- 1 hadoop supergroup 999290 2013-07-28 12:57 /user/hadoop/apachelog/httpd-access.log [hadoop@isis ~]$ |
実行。
hadoopにhadoop-streaming-1.0.0.jarを与え、input、outputのほかに、mapperとreducerも指定する。
-mapper -reducerとしてローカルファイルシステムでのパスを与える。
同時に、-fileでそれぞれのスクリプトを指定すると、スクリプトファイルをリモートのノードへ送ってくれる。
コマンドはすごく長くなる。
エスケープシーケンスを使って適宜改行し、見やすくしてタイプミスを防ぐ。
1 2 3 4 5 6 7 8 |
hadoop jar \ /usr/local/share/hadoop/contrib/streaming/hadoop-streaming-1.0.0.jar \ -input apachelog \ -output apachelog.out \ -mapper /home/hadoop/sandbox/mapper/mapper.py \ -reducer /home/hadoop/sandbox/mapper/reducer.py \ -file /home/hadoop/sandbox/mapper/mapper.py \ -file /home/hadoop/sandbox/mapper/reducer.py |
実際のログ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
[hadoop@isis ~]$ hadoop jar \ > -output apachelog.out \ > -mapper /home/hadoop/sandbox/mapper/mapper.py \ > /usr/local/share/hadoop/contrib/streaming/hadoop-streaming-1.0.0.jar \ > -file /home/hadoop/sandbox/mapper/mapper.py \ > -file /home/hadoop/sandbox/mapper/reducer.py \ > -input apachelog \ > -output apachelog.out \ > -mapper /home/hadoop/sandbox/mapper/mapper.py \ > -reducer /home/hadoop/sandbox/mapper/reducer.py \ > -file /home/hadoop/sandbox/mapper/mapper.py \ > -file /home/hadoop/sandbox/mapper/reducer.py packageJobJar: [/home/hadoop/sandbox/mapper/mapper.py, /home/hadoop/sandbox/mapper/reducer.py, /tmp/hadoop-hadoop/hadoop-unjar1190814954490199586/] [] /tmp/streamjob8960127756596123730.jar tmpDir=null 13/07/28 13:30:15 INFO mapred.FileInputFormat: Total input paths to process : 1 13/07/28 13:30:43 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local] 13/07/28 13:30:43 INFO streaming.StreamJob: Running job: job_201307281252_0001 13/07/28 13:30:43 INFO streaming.StreamJob: To kill this job, run: 13/07/28 13:30:43 INFO streaming.StreamJob: /usr/local/share/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201307281252_0001 13/07/28 13:30:43 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201307281252_0001 13/07/28 13:30:46 INFO streaming.StreamJob: map 0% reduce 0% 13/07/28 13:39:14 INFO streaming.StreamJob: map 100% reduce 0% 13/07/28 13:40:27 INFO streaming.StreamJob: map 100% reduce 33% 13/07/28 13:40:33 INFO streaming.StreamJob: map 100% reduce 67% 13/07/28 13:40:49 INFO streaming.StreamJob: map 100% reduce 100% 13/07/28 13:42:39 INFO streaming.StreamJob: Job complete: job_201307281252_0001 13/07/28 13:42:41 INFO streaming.StreamJob: Output: apachelog.out |
結果の確認
hadoop dfs -catなどで。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
[hadoop@isis ~]$ hadoop dfs -ls Found 2 items drwxr-xr-x - hadoop supergroup 0 2013-07-28 12:57 /user/hadoop/apachelog drwxr-xr-x - hadoop supergroup 0 2013-07-28 13:42 /user/hadoop/apachelog.out [hadoop@isis ~]$ [hadoop@isis ~]$ hadoop dfs -ls apachelog.out Found 3 items -rw-r--r-- 1 hadoop supergroup 0 2013-07-28 13:42 /user/hadoop/apachelog.out/_SUCCESS drwxr-xr-x - hadoop supergroup 0 2013-07-28 13:30 /user/hadoop/apachelog.out/_logs -rw-r--r-- 1 hadoop supergroup 1438 2013-07-28 13:40 /user/hadoop/apachelog.out/part-00000 [hadoop@isis ~]$ [hadoop@isis ~]$ hadoop dfs -cat apachelog.out/part-00000 127.0.0.1 567 192.168.100.106 327 192.168.100.107 671 192.168.100.109 2 192.168.100.150 193 [hadoop@isis ~]$ |
以上
No tags for this post.