Hadoopの実行結果として、いくつかのファイルに分けて出力したい場合がある。
そんなときに使用できるのが、新APIのmapreduceパッケージにあるMultipleOutputsクラスだ。
下のように、Job設定の場面で、必要な分だけOutputFormatクラスを追加しておく。
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, Text.class, IntWritable.class);
そしてReducerの方で、"text"を指定して、データを書き込んでやればよい。
デフォルトで作られるpart-r-xxxxxを削除するときは、同じくJob設定で、
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
としておけば、バイト0なので作成されなくなる。
さて、ここまではいいのだが、
この状態でMapReduceを実行してみると、標準出力に出てくるログが気になる。
私だけかもしれないが、「REDUCE_OUTPUT_RECORDS:0」となっていた。
ちゃんと出ている場合はこれ以降を読まなくて良いです。。。。
もちろん、実際には、ちゃんとファイルが作成されており、データも書かれている。
しかし、0件となっている。
どうやら、カウンタが連動していないためだった。
もしかしたら、つくりの問題なのかもしれない。ファイル名を変えたりしているからか!?
ちなみに、MultipleOutputs自体のカウンタはsetCounterEnabledにTrueを設定することで変更できる。
Trueにすると、各ファイルの件数をカウントしてくれる。
だけど、標準のカウンタには連動していない(ように思う)。
ところで、カウンタ、カウンタと言っているけれど、
Hadoopでは思ったより多くのカウンタが使われている。
一部しか書いていないが、以下のようなものがある。
◆FileSystemCounter
・FILE_BYTES_READ
・FILE_BYTES_WRITTEN
◆JobInProgress$Counter
・TOTAL_LAUNCHED_MAPS
・TOTAL_LAUNCHED_REDUCES
◆Task$Counter
・MAP_INPUT_RECORDS
・MAP_OUTPUT_RECORDS
・REDUCE_INPUT_RECORDS
・REDUCE_OUTPUT_RECORDS
こんな感じで書くと標準出力に一覧が出てくるので確認できる。
for(String s : job.getCounters().getGroupNames()){
System.out.println(s);
for(Counter c : job.getCounters().getGroup(s)){
System.out.println(c.getName()+ ":" + c.getValue());
}
}
MultipleOutputsを設定していると一番最後に、
自分で追加したカウンタが表示されることになる。
ソースを深くまで読んでいないので、どこでどうカウンタが加算されているのかは分からない。
ただ、使い方としては、たとえばReducerの書き込むところで、
context.getCounter(Task.Counter.class.getName(), "REDUCE_OUTPUT_RECORDS").increment(1);
のように書くと、REDUCE_OUTPUT_RECORDSのカウンタが1増える。
上記コードで全ファイル分のカウントは可能だ。
実際に、REDUCE_OUTPUT_RECORDSは、0から実際の数字に変更された。
ここからは試していないが、
上の状態だと無駄なカウント処理をしていることになる。
既に各ファイルのカウントは出来ているわけで、最後に全ファイルのカウントを加算したものを、
REDUCE_OUTPUT_RECORDSに設定してやれば良いのではないかと考える。
もしくは、上記で良いとして、MultipleOutputsのカウントをやめるかだ。
Reducerの後処理であるcleanupメソッドでやれば実現できそうな気もする。