2015年8月13日木曜日

[Scala] FutureとスレッドプールとAkka

僕は、実は最近までFutureについてよく分かってなかったんです。単に別のスレッドで処理をして値を返すものという認識ぐらいしかありませんでした。Future.applymapflatMapもあまり気にせず使っていました。コンパイル時にExecutionContextがないって怒られたら、コンパイラの指定通りにimport scala.concurrent.ExecutionContext.Implicits.globalをソースコードに盲目的に追加するということもやっちゃってました。とりあえず、コンパイラは通るしプログラムも動くし、いいっしょ!っていう。でも、よく分からないまま使っていて、問題が起こった時に困るのは自分です。もしかしたら、周りの人にも迷惑がかかるかもしれません。反省。
ということで、今回は、Future周りのことについて調べたことをまとめます。

ExecutionContextってなに?
暗黙的にimport scala.concurrent.ExecutionContext.Implicits.globalを記述し、Futureを使っていたんですが、Future.applyにもmapにもflatMapにもExecutionContextを渡さないといけないんです。scala.concurrent.ExecutionContext.Implicits.globalってのは、Scalaがデフォルトで用意しているimplicit宣言されているExecutionContextなんですね。だからこれをimportすると暗黙的にFuture.applyにもmapにもflatMapにも渡されることになります。では、このExecutionContextってなんなのよ?っていう話です。これは、簡単に言うと、スレッドプールです。つまり、Futureで処理されることは、Futureを生成するときに渡されたスレッドプールが管理しているスレッド内で実行されるんです。

scala.concurrent.ExecutionContext.Implicits.globalってなに?
scala.concurrent.ExecutionContext.Implicits.globalってのは、Scalaがデフォルトで用意しているスレッドプールというのは、先に述べましたが、それは一体どんなスレッドプールなのか?
とりあえず、知っておくべきことは、このスレッドプールが管理しているスレッドの数は、プロセッサ(コア)の数と同じだということです。例えば、4つしかコアがなくてFutureを4つ作った場合は、スレッドプールのスレッドを全て専有しちゃうので、別にFutureを作っても、その処理は動かないんです。なので、Future内で時間のかかる処理とか、Thread.sleepとかしないほうがいいんです。時間のかからない処理がいっぱいあるようなプログラムだったら、あまり問題になることはないんじゃないでしょうか?

ExecutorServiceを使う
もちろん、他のスレッドプールを使うこともできます。デフォルト以外で簡単なものとしては、JavaExecutorsを使ってスレッドプールを生成するのが簡単だと思います。例えば、固定数のスレッドを持ったスレッドプールを作る場合には、次のようにします。
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
JavaExecutorServiceは、比較的に時間のかかる処理などに適しているようなので、状況に応じて使いわけるとよいでしょう。

Akkaのスレッドプールを使う
Actorのフレームワークを提供しているAkkaのスレッドプールを使用することもできます。Akkaは、設定ファイルでスレッドプールの設定をすることができるので、設定が容易です。ここでは、比較的に時間のかからない処理をいっぱい行うのに適しているFork/Joinフレームワークを使ったスレッドプールの設定を行います。
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
# sample-akka.conf
akka {
  actor {
    # デフォルトのスレッドプールの設定。
    default-dispatcher {
       # スレッドプールの仕組みの設定。
       # デフォルトでは、"fork-join-executor"なので、ここの設定を省略してもよい。
       executor = "fork-join-executor"

       # "fork-join-executor"の設定。
       # executorが"fork-join-executor"の場合に、この設定が使用される。
       fork-join-executor {
         # 最小のスレッド数
         parallelism-min = 8
         
         # プロセッサ(コア)数の何倍のスレッドを生成するか
         # parallelism-minからparallelism-maxまでの範囲を出ることはない。
         parallelism-factor = 10.0
         
         # 最大スレッド数
         parallelism-max = 64
       }
    }
  }
}
次に、この設定を使ったExecutorContextを取得します。
import com.typesafe.config.{ ConfigFactory }
import akka.actor.{ ActorSystem }
val conf = "sample-akka.conf"
val url = this.getClass.getResource( conf )
val system = ActorSystem( "ActorSystem"ConfigFactory.parseURL( url ) )
implicit val executionContext = system.dispatcher