aws memo

AWS関連の備忘録 (※本ブログの内容は個人的見解であり、所属組織及び企業の意見を代弁するものではありません。1年以上古いエントリは疑ってかかってください)

訳:EMR : Jobflowの作成とデバッグ方法

How to Create and Debug an Amazon Elastic MapReduce Job Flow : Articles & Tutorials : Amazon Web Services

http://aws.amazon.com/articles/3938

====

Understanding Elastic MapReduce Job Flows

Amazon EC2インスタンスクラスタとして稼働するAmazon EMRにおいて、ジョブフローは( job flow)ユーザが定義するタスクである。JavaのJARファイルや、Python, Ruby, Perl, C++で書かれたHadoop Stremingプログラムで実装された MapReduceアルゴリズムが各ステップとなる。ステップの集合(である job flow)はAmazon EC2クラスタのマスターノード上でシーケンシャルに実行される。

あるステップの出力が、次のステップの入力になるような場合、一般的にジョブフローは複数のステップから構成される。例:あるドキュメントにある全単語の出現回数をカウントするステップがあり、次のステップでは、最初のステップの出力を出現回数でソートするという場合。

Hadoop Distributed File System (HDFS)に保存されたファイルを使って、前ステップと後ステップの間でデータはやり取りされる。HDFSに保存されたデータは、ジョブフローが実行中の間だけ存在する。ジョブフローがシャットダウンすると、HDFSの全データは捨てられる。通常、ジョブフローの最終ステップでは、ジョブフローの結果をAmazon S3に保存する。そうすることで、将来のジョブフローがこの結果にアクセスできることになる。

MapReduceはHadoopを使ってデータを処理するアルゴリズムである。複数の方法でアルゴリズムを特定できる。

  • Hive: 高レベル言語Hiveを使ってMapReduceアルゴリズムを作成できる
  • Pig: 高レベル言語Pigを使ってMapReduceアルゴリズムを作成できる
  • Java: Custom JARを作成することでHadoop job flowを作成できる
  • Streming: Java以外の言語を使ってアルゴリズムとJobFlowを作成できる

Elastic MapReduce command line interface(CLI)を使って、複数ステップのJob Flowを作成できる。AWSコンソールは、単一ステップのjob flowのみサポートする。本稿では、CLIでのjob flowの管理方法を説明する。AWS ConsoleとElastic MapReduce APIの詳細は、 Amazon Elastic MapReduce Developer Guide  Amazon Elastic MapReduce API Reference. にある。

環境セットアップ

  1. S3にlog用のバケツを作成、URIを決定。EMR job flowからのアクセスを許可しておく
  2. Rubyインストール(1.8+、EMR CLIのため)
  3. EMR CLIインストール
  4. CLIのcredentials.jsonに、AWS credeintial, log-uriを設定

Creating and Managing Job Flows

CLIを使ってjob flowを構築すると、そのjob flowはターミネートされるまで実行する。これはデバッグに便利である。あるステップが失敗すると、シャットダウンや起動を行うこと無く、現在アクティブな job flowに別のステップを追加できる。以下のコマンドで、job flowが開始し、job flowが終了するまでリソースを消費する。

コマンドプロンプトで、以下のようにコマンドを入力する。

>$ ./elastic-mapreduce --create --alive --log-uri s3n://my-example-bucket/logs
Created job flow j-36U2JMAE73054

デフォルトでは、1台のm1.smallインスタンス上でjob flowが起動される。 小さなサンプルデータでjob flowが正しく動いた後に、より多くのインスタンスでjob flowを起動できる。

インスタンス数やインスタンスタイプも、 --num-instances --instance-type オプションで指定できる。 

--aliveオプションは、job flowの全ステップが終了しても起動し続けることを指定する。

--log-uriオプションで、job flowが出力するログファイルの出力先となるAmazon S3の場所を指定する。S3にバケツを作成していない場合は、無視される(omitted)。ステップ完了後5分間、ログファイルはS3にプッシュされない。デバッグ中は、アクティブなjob flowのマスターノードにログを出力するとよい。job flow終了後にS3からログファイルを見るには、--log-uriが必要となる。

Hiveの起動

--createコマンド時に--aliveオプションをつけると、ターミネートするまでジョブフローが存在し続ける。

>$ ./elastic-mapreduce \
--create \
--alive \
--name "Testing Hive -- $USER" \ 
--num-instances 5 \
--instance-type c1.medium \
--hive-interactive \
--log-uri s3n://my-example-bucket/logs

Created job flow j-36U2JMAE73054

実行すると、job flow IDが返ってくる。実行中のjob flow一覧を表示するには以下のコマンドを実行する。

>$ ./elastic-mapreduce --list --active

job flowがwaiting状態になった後、Hadoopユーザでマスタノードにsshログインして、Hiveを実行する

>$ ./elastic-mapreduce --jobflow j-36U2JMAE73054 --ssh --key-pair-file ~/.ssh/xxx
hadoop@domU-12-12-12-12:~$ hive
Hive>

Pigの起動

>$ ./elastic-mapreduce \
--create \
--alive \
--name "Testing Pig -- $USER" \ 
--num-instances 5 \
--instance-type c1.medium \
--pig-interactive \
--log-uri s3n://my-example-bucket/logs

Created job flow j-ABABABABABAB

job flowがwaiting状態になった後、Hadoopユーザでマスタノードにsshログインして、Pigを実行する

>$ ./elastic-mapreduce --jobflow j-ABABABABABAB --ssh --key-pair-file ~/.ssh/xxx
hadoop@domU-12-12-12-12:~$ pig
Pig>

 Streamingの起動

job flowを実行するには、JSONファイルで記述する必要がある。以下のコード例は、streaming job flowを実行するための streaming_jobflow.jsonファイルである。Streaming job flowはMapReduce実行可能ファイル(Java以外で書かれたもの)を使う。

このjob flowではPythonスクリプトを使って、あるファイル内の単語を数える。サンプルのjob flowコードを使う前に、 MY_LOG_BUCKETとMY_BUCKET変数を実際のS3のバケツ名に、MY_KEY_NAMEを EC2の key pair名に置換し、streaming_jobflow.jsonとして保存する。

 

{
  "Name": "Wordcount Using Python Example", 
  "LogUri": "[MY_LOG_BUCKET]/log", 
  "Instances": 
  { 
   "SlaveInstanceType": "m1.small", 
   "MasterInstanceType": "m1.small", 
   "InstanceCount": "1", 
   "Ec2KeyName": "[MY_KEY_NAME]", 
   "KeepJobFlowAliveWhenNoSteps": "false" 
  }, 

"Steps": 
 [ 
 
   { 
      "Name": "Streaming Job flow Step", 
      "ActionOnFailure": "CONTINUE", 
      "HadoopJarStep": 
      { 
         "Jar": "/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar", 
         "Args": 
         [ 
            "-input", "elasticmapreduce-external/demo/wordcount/input", 
            "-output", "[MY_BUCKET]/demo/output", 
            "-mapper", "elasticmapreduce-external/demo/wordcount/wordSplitter.py", 
            "-reducer", "aggregate" 
         ] 
      } 
    } 
  ] 
}

このjobflowでは、InstanceCountでどのタイプのハードウェアを何台使うか、LogUriでログの出力先を、KeepJobFlowAliveWhenNoStepsで全ステップが完了しても起動し続ける、ActionOnFailuerでエラー時に何をするか?を指定している。ステップ群は0個以上の要素からなる配列として定義可能。EMRは、複数ステップを記述順に実行する。

注:

ゼロステップは、何もデータを処理しないことを意味する。デバッグ時にはクラスタのセットアップだけ行うようにゼロステップを定義しておいて、その後AddJobFlowStepsを使ってステップを追加することができる。

注:

InstanceValueが1の場合、1台のインスタンスがmasterとslaveとして動く。1より大きい場合、1台masterノードで残りがslaveノードになる。

入力データとしてHDFSを使う場合、パス記述にスラッシュ3個(///)を使う。例: hdfs:///aws-hadoop/MyCompany/sampleInput/

Reducerの実行形式 aggregateはHadoop提供のAggregateライブラリに含まれている。Aggregateライブラリは、多くの基本的なaggregationレデューサを含んでいる。例えば、sum、max、min。

Management ConsoleのS3を使って、 --output パラメータで指定したバケツを作成し、処理したいデータをアップロードしておく。

RunJobflowの例: wordSplitter.py

#!/usr/bin/python 
import sys 
def main(argv): line = sys.stdin.readline() try: 
while line: line = line.rstrip() words = line.split() for word in words: 
print "LongValueSum:" + word + "\t" + "1" 
line = sys.stdin.readline() except "end of file": return None 
if __name__ == "__main__": main(sys.argv) 

Stremingジョブフローの実行

  1.  MapReduce実行形式( wordSplitter.py)をS3にアップロードする
  2. 以下のコマンドでjob flowを実行する
$ ./elasticmapreduce-client.rb RunJobFlow streaming_jobflow.json {"JobFlowId":"j-138L1TOL8PIJT"} 

job flowの最大生存期間は2週間である。