この章ではRDDの概要について説明します。 最初はRDD($2-1)Sparkで利用するプログラムインタフェース($2-2)について説明します。それからメモリの詳細について説明し($2-3)、最後にSparkのモデルの限界($2-4)について議論します。
RDDは読み込み専用のレコードを分割した集まりです。RDDはオペレーションを通して安定したストレージか他のRDDが作成されます。私たちはこのオペレーションについて他のRDDオペレーションと差別化するためにtransformationsとよびます。例えばmap,filter,joinなどがそれにあたります。
RDDは常に実データを持つ必要はありません。その代わりRDDはストレージのデータからたどれるための情報を持つようにします。これはパワフルな特徴で失敗したあと、再構成できないRDDはプログラムから参照できません。
最終的にユーザは2つの方法でRDDを扱えるようになります。ユーザはメモリストレージ等を利用してデータを再利用することができます。また、各レコードのキーをベースにしてどのようにマシン間で要素を分割するか指定することができます。これは今後結合されるRDDを同じマシンにおいておくなどの最適化に利用できます。
SparkはDyadLINQやFlumeJavaのような言語APIを提供しており、それぞれのデータセットはオブジェクトとして表現され、オブジェクトにインボークされたメソッドを利用してtransformaionsを呼び出します。
プログラマはストレージ上のデータを一つないしそれ以上のtransformationsオペレータを利用します。(mapとかfilterとか)それからRDD上のデータを処理してアプリケーションに値を返したりデータをエクスポートしたりする。アクションの例としては、count(データセットにある件数をカウントする)、collect(値一覧を返す)、save(データセットをストレージに保存する)。DyadLINQのようにSparkは遅延評価を行うのでパイプライン処理ができます。
さらに、プログラマは再利用したいデータを保持するためのメソッドを呼び出すことができます。Sparkはデフォルトでメモリ上にRDDにデータを保持しようとしますが、RAMに乗り切らない場合はディスクに吐き出します。その他にもSparkをディスクだけ利用したり、レプリケーションできるように指定できます。要はユーザはRDDのデータセットをディスクに配置するかメモリに持つか優先度を指定することができます。
WebサービスでHDFS上にあるテラバイトレベルのログファイルからエラー行をみて原因を突き止めるケースを考えてみます。Sparkを利用すると、マシンノード間のメモリにデータをロードしてそれに対して対話式にクエリを発行します。彼女は最初に以下のようなコードを書きます。
lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()
1行目はHDFSからRDDにデータを呼び出し(テキストの行の集まりとして)2行目でRDDからフィルタリングします。3行目でメモリにある結果をそれぞれのノードをまたがって共有します。ちなみにfilterはScalaのclosureを実現するための文法です。ここでポイントなのはクラスタ上で何もしていないことです。しかし、ユーザはメッセージの数を数えるようなRDDのメソッドをよびだすことが可能です。
errors.count()
ユーザはさらに以下のように結果を利用してRDDのtransformaitionを利用することができます。
// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split('\t')(3))
.collect()
最初のアクションはerrorsを含んでいますが、Sparkはこれをメモリ上にもっているため高速に計算することができます。ここで利用されているRDDにあるlinesはメモリ上には読み込まれません。これはメモリを有効利用するには大変有効な手段です。
最終的にどのように耐障害性を実現したか、イラストを図1にまとめました。このクエリでは、最初にerrorsから始めます。これはlinesをフィルターした結果であり、そこからcollectを呼び出す前にmapとfilterの処理を行います。Sparkスケジューラは後者の2つのtransformationsをパイプラインで実行計画し、ノードにキャッシュされている各errorsデータについて計算命令を行います。さらに仮にデータをなくしたとしてもlinesから一致したデータだけ再度filterを行い、データを再構築します。
視点 | RDDs | 分散メモリシステム |
---|---|---|
読み込み | 荒いもしくはきめ細かい設定 | きめ細かい読み込み設定 |
書き込み | 荒い | きめ細かい読み込み設定 |
一貫性 | データは不変 | 実行時 |
復旧 | きめ細かくかつ最低限の処理で復旧 | チェックポイントが必要でプログラムのロールバックも必要 |
Straggler mitigation | バックアップタスクを利用して可能 | 難しい |
仕事する場所 | データローカリティをベースにして自動で判断 | 実行時に判断 |
RAMがないときの振る舞い | データフローシステムのように振る舞う | Swapするので遅延が起きる |
RDDが優れた分散メモリ環境であることを理解するために、分散共有メモリシステムとの比較をテーブルにまとめました。分散共有メモリシステムでは、アプリケーションの任意の読み書きはグローバルアドレスを利用しています。ちなみに、この話は分散共有メモリシステムだけではなく状態を共有するようなPiccoroは分散データベースについても同じことがいえます。分散共有メモリシステムはとても優れた考え方ですが一般的なクラスタを利用した実装であったり耐障害性の確保は難しくなります。
主なRDDと分散共有システムのちがいですが、RDDは荒い書き込みしかサポートしていないことです。分散共有メモリシステムは各メモリ場所に読み書きすることが可能です。RDDはアプリケーションに一斉書き込みしかサポートしないことを意味しますが、それ以上に効果的な耐障害性を手に入れることができます。RDDは特にチェックポイントのデメリットを受けることなく元データから復旧することが可能です。そうすることでRDDは一部のデータに不備があっても全体のプログラムをロールバック無しに実行することが可能になります。
二つ目の利点としてRDDはデータ不変のため、MapReduce内の遅いタスクをコピーして実行することで遅延を和らげることが可能です。タスク自体をコピーと分散共有メモリシステムではコピーしたタスクが同一メモリにアクセスしてそれぞれを更新することが難しいです。
結局、RDDは分散共有メモリシステムに比べると2つのメリットを提供します。一つはRDD上のバルクオペレーションでデータロケーションによってタスクをスケジューリングすることでパフォーマンスを向上させます。そして二点目はRAMが十分でなくなったときのマイグレーション方式です。透過的にディスクを利用することでパフォーマンスの低下を極力軽減します。
RDDはすべてのデータセットに対して同一の処理をするようなバッチアプリケーションには向いています。それらのケースでは効果的にデータを取り扱うことができます。逆にいうと状態を管理しながら同期して処理を進めていくような処理には向いていません。そういうのはログを更新しながら、チェックポイントをもってすすめていくPiccoroのようなシステムがフィットするでしょう。私たちのゴールは分析バッチや同期を必要としない処理について効果的なプログラミングモデルを提供することなのです。
SparkはRDDの概念を実装したインタフェースを動的関数型言語であるScalaで提供しています。Scalaを選んだのは簡単で効率的な言語だからです。しかし、RDDが関数型の特徴を必要とするということではありません。Sparkを利用するためには図2で書いているワーカーに接続したdriver programを書く必要があります。 driverでは一つないしそれ以上のRDDを定義してそれに帯するアクションを記述します。driver上のSparkコードはRDDの履歴を記録します。workerは常駐プロセスとして存在し、オペレーションの結果をRDD上のメモリに所持する。
2.2.1のログマイニングアプリケーションを例にすると、 ユーザはRDDsに対してmapオペレーションを関数型言語のようなリテラル群で記述している。Scalaは各々のメソッド/引数部をJavaオブジェクトして表現し、これらのオブジェクトはシリアライズされてクラスタ内の異なるノードにネットワーク越しに配信され、別ノードでロードされる。Scalaはまた構文の中で記述されたいかなる変数もJavaオブジェクトとして保持する。 例えば、以下の記述はrddの要素に対して各々5を加算するというオペレーションを示す。
var x = 5;
rdd.map(_ + x)
RDD自体はElement型の動的オブジェクトです。例えばRDD[Int]はInt型のRDDです。しかしほとんどの例ではScalaがサポートするインタフェースを省略することになるでしょう。しかしながら私たちが考えるRDDのコンセプトは簡単で、リフレクションを利用してScalaのクロージャーオブジェクトから扱えるようにするだけです。私たちは5-2のシェルでお話するようにより簡単にScalaからSparkを扱えるように仕様と考えています。それでも、Scalaコンパイラは修正する必要はありません。
テーブル2でSparkが利用できるtransformationsとアクションの一覧を提供しています。それぞれのオペレーション名とカギ括弧内にパラメータを記述できるようになっています。transformationsを再び呼び出すと遅延評価によって新しいRDDが定義され、アクションは値を返すかなにか別のストレージに書き出すようにします。注意してほしいのはjoinのようないくつかのオペレーションはkey-valueのようなペアでしか利用できないことがあります。かつ、APIの名前は極力Scalaや他の関数言語に合わせて記述しています。例えば、mapはone-to-oneのマッピングでflatMapで一つ以上のアウトプットに対応しています。(MapReduceのMapに近いです)
それらのアクションに追加して、ユーザはRDDにpersistで訪ねることができます。さらにPartitionを順序付けしてPartition Classで表現することが可能です。groupByKey, reduceByKey, sortはそれぞれRDD内での分割を行います。
2-1でみせたマイニングアプリケーションの他、ロジスティック回帰やページランクのサンプルもあります。後ほどどのようにパフォーマンス向上するかについては言及します。
たくさんの機械学習アルゴリズムがイテレーティブな計算を必要としています。それらのためにもメモリでデータを保持しつつより早く動作させないといけません。例として、論理回帰によってデータ群から2値変数に対して回帰分析を行う例を示す。(スパムか、そうでないか、の判定などに用いられる)
このアルゴリズムは勾配降下法を使用している。 wというランダム値から開始し、イテレーションを回す。 関数に対してwを適用した結果が最小となる値に収斂するまで実行する。
val points = spark.textFile(...)
.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}
上記の処理はまずテキストファイルの各行をPointのオブジェクトに変換し、RDD化する処理を行っている。 その後RDDに対してmap/reduceを繰り返し実行し、勾配をwに対して加算する。(イテレーション回数実行する) pointsをメモリ上に維持することでディスクに出力した場合と比較して20倍以上の高速化を行っている。
より複雑なデータ共有が発生するパターンとして、ページランクを確認する。 このアルゴリズムはあるページが他のページからリンクされている数を基に「ランク」を算出するもの。 各イテレーションにおいて、各ドキュメントは¥frac r {n}のページランク値をリンク先のページに与える。 ここでの『r』はリンク元ページのランクで、『n』はリンク先のページ数となる。
その後ランクを¥frac ¥alpha {N}+(1-¥alpha)¥Sigma c ¥small i の式を用いて更新する。 ここでの合計値は自分のページに対してリンクを行ったページの合計値で、Nは全ページ数となる。
Sparkでページランクを実現した場合は以下になる。
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) => links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y).mapValues(sum => a/N + (1-a)*sum)
}
各イテレーションにおいて、ranksデータセットを前回のイテレーション結果であるranksと静的情報であるlinksを用いて更新している。
このグラフで興味深いところは系統グラフがイテレーション数に応じて増大すること。 したがって、多くのイテレーションを実行するジョブにおいては、障害発生時の復旧時間を短縮するために いくつかのバージョンのranksをレプリケーションする必要があるかもしれない。
このようなケースにおいて、プログラマはpersist関数をRELIABLEフラグを設定して呼ぶことができる。 但し、linksデータセットはファイルからのmap節から効率よく再生成されるためレプリケーションする必要はない。
linksデータセットは一般的にranksデータセットよりも大きくなる。 なぜなら、各ページは多数のリンク先を保持するが、ranksデータセットはページ数のサイズに収まるからである。 そのため、linksデータセットの復元に系列グラフを使用することで使用するメモリ量を抑えることができる。
また、RDDのパーティショニングを制御することでPageRankアルゴリズム実行における通信量を最適化することができる。 links RDDのパーティショニングを指定した場合(例:linklists をURL のハッシュでパーティショニング)、 ranks RDDも同じ方式でパーティショニングすることによってlinks とranksのjoin関数においてノード間の通信を行わずに実行できる。 (同じリンクに対するランクとlinklistは同じノード上に存在するため)
加えて、ページをグルーピングするカスタムパーティショナーを自作することもできる。 (例:ドメイン名でリンクを分割する場合) これらの最適化は以下のようにpartitionBy関数を実行することで実施可能。
links = spark.textFile(...).map(...).partitionBy(myPartFunc).persist() 上記の呼び出しから処理を開始した場合、linksとranksのjoin関数から生成されるcontirbutionsは 自動的に同一マシン上に保持しているURL同士で行われ、新たなrankが生成される。 その際にrankにも同一のパーティショニングが適用される。
この種のイテレーション間の一貫性はPregelのようなフレームワーク上で行われていた最適化策のうちの一つ。 RDDsではユーザが指定することでこの最適化を達成できる。
RDDの一つの目標は幅広いtransformationsに対して追跡可能にすることです。アイデアとしてはテーブル2のようなリッチなtransformationsを提供しかつ、それらを組み合わせても追跡可能になるという点である。私たちはそれらの実現方法としてシンプルなグラフベースのRDDの表現方法を提案した。私たちは個別のスケジューラに手をいれることなく実現することで大変シンプルな設計を実現しました。簡単にいうとRDDには5つの情報を付与しました。
- 一貫したデータパーティション
- 親RDDへの依存
- 親RDDをベースとした計算関数
- 分割スキーマとデータ配置場所に関するメタデータ
例えば、HDFSのファイルをRDDとして表現すると、ブロックファイルの場所とそれがどこにあるか分かっていることになる。RDD上でのMapの結果は同一の分割場所におかれるが、その要素を計算するときは親データにたいしてmap関数を呼び出している。私たちはそのときのインタフェースをテーブル3にまとめました。このインタフェースに関する面白い質問はどのようにしてRDD間の依存関係を表現するかということです。私たちはこれに関して2つの依存関係さえ解決すれば十分に対応できることがわかりました。一つは親RDDが子RDDに最高1つまでしか参照されないという狭い依存、もう一つは広い依存で親RDDが複数の子RDDに依存されるパターンです。例えばmapは狭い依存でjoinは広い依存です。サンプルを図4に示します。
この定義は2つの理由で便利です。最初に狭い依存はクラスタノードに対して全て親のパーティションで実行できるためパイプラインを実行可能という意味を示します。例えば、基本的なmapとfilterであればそれは全て同一のクラスタノードで処理できる。逆に広い依存の場合MapReduceの場合クラスタ間でのシャッフルが必要になる。
2つ目の理由は狭い依存の場合ノード障害からの復旧で障害があったノードだけ復旧すればよく並列実行が可能になる。広い依存の場合RDDの親全体が複数の分割ノードからきている可能性があり、再実行が難しい。
RDDのtransformationsの実装はほとんど20行未満で記述できる。それはスケジューラに依存することなく書くことができる。今から代表的な実装を見ていく。
サンプルではHDFSファイルを利用している。ファイルのそれぞれのブロックはPartitionオブジェクトとして扱うことができ、オフセットも取得できる。prefferedLocationはHDFS上のブロック場所をかえし、iteratorがそのブロックを読みにいく。
mapをよぶとどんなRDDでもmappedRDDオブジェクトがかえってくる。このオブジェクトは親オブジェクトからmapメソッドを通して生成されるが親RDDと同じ分割場所に配置される。
2つのRDDを結合するとそれらの親のRDDを結合したRDDが返される。 それぞれの子RDDはそれぞれの親RDDに一致した狭い依存を通して計算される。
samplingはmapに似ていますが子のRDDは親RDDからランダムにデータ取得するためのシード値が設定されています。
二つのRDDをjoinした場合以下のケースが考えられる。
- 同一パーティションにある。
- 同一パーティションかる同じ範囲/ハッシュにあるパーティション
- 両方違うパーティション
- 片方同じパーティション、もう一個違うパーティション