ScalaのFutureはExecutionContextのキューに何をいれるのか? #ヌーラボ真夏のブログリレー2024

ヌーラボでScalaばかり書いているRubyistの谷本です。このブログは、ヌーラバー真夏のブログリレー2024の7日目のブログとして更新しています。

ScalaのFutureは並列処理を実行する便利な機能で、mapやflatMapで処理をつなげることができ、さらにforを使ったシンタックスシュガーにより逐次実行と見た目が近いコードがかけます。
しかし、実際はExecutionContextのスレッドプールのキューに入れるので、処理が完了してGC対象になるまではヒープに残り続けるため、場合によっては息の長いオブジェクトを生成していることになります。
そこで、今回はどのようなタイミングでキューにたまり、どのようなオブジェクトがGC対象にならず残るのかヒープダンプを見て確認します。

動作確認環境

以下のDockerfileを作り、

FROM amazoncorretto:17.0.11

RUN curl -O https://downloads.lightbend.com/scala/2.13.14/scala-2.13.14.rpm && \
    rpm -ivh scala-2.13.14.rpm && \
    rm  scala-2.13.14.rpm

WORKDIR /work/

以下のように実行してイメージのビルドし–cpus=4を指定してコンテナを立ち上げた中でコードを実行します。

docker build -t local/scala-2.13.14 . && docker run -it --rm --cpus=4 -v $PWD:/work local/scala-2.13.14 bash

ヒープダンプの取得時には、別のターミナルからdocker execで立ち上げたコンテナに入り、jcmdでコマンドで取得します。
pidの特定が若干手間なので以下のワンライナーを用意して実行しました。

jcmd | grep scala | cut -d" " -f2 | xargs -I{} jcmd {} GC.heap_dump app.hprof

Future.applyを実行したときのExecutionContextのスレッドプールのキューを確認する

まず以下のようなコードを用意します。

import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext
import scala.concurrent.{Await, Future}
import java.util.concurrent.Executors

object Example01 {
  def mainLogic(implicit executionContext: ExecutionContext): Seq[Future[Unit]] = {
    (1 to 6).map { id =>
      val taskName = s"Task $id"
      Future {
        println(s"$taskName is sleeping...")
        Thread.sleep(10 * 1000)
        println(s"$taskName woke up")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val executorService = Executors.newFixedThreadPool(3)
    val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(executorService)

    val fs = mainLogic(executionContext)
    fs.foreach(Await.result(_, Duration.Inf)) // mainLogicで作成したFutureの完了を待つ

    executorService.shutdown()
  }
}

3個のスレッドを持つスレッドプールからExecutionContextを作り、6個のFutureを実行するコードになっています。
scalaコマンドに上のプログラムを指定して実行すると以下のような出力になります。

bash-4.2# scala Example01.scala
Task 1 is sleeping...
Task 2 is sleeping...
Task 3 is sleeping...
Task 2 woke up
Task 4 is sleeping...
Task 3 woke up
Task 1 woke up
Task 5 is sleeping...
Task 6 is sleeping...
Task 4 woke up
Task 5 woke up
Task 6 woke up

3個のスレッドで実行するので「Task 4 is sleeping…」という4個目のFutureの実行はすぐには始まらず、「Task 2 woke up」という2個目のFutureが完了してから始まっていることがわかります。
「Task 3 is sleeping…」と「Task 2 woke up」の間のタイミングで取得したヒープダンプを見てみます。6個のFutureを作ったのに3個しか実行していない状態なので残りの3個の処理がキューにあるはずです。
以下のコマンドを実行すると今回のプログラムのExecutionContextの実際のクラス名がscala.concurrent.impl.ExecutionContextImpl$$anon$4であることがわかるので、そのオブジェクトからの参照をたどります。

bash-4.2# scala -e "println(scala.concurrent.ExecutionContext.fromExecutorService(java.util.concurrent.Executors.newFixedThreadPool(3)).getClass)"
class scala.concurrent.impl.ExecutionContextImpl$$anon$4

Eclipse Memory Analyzer(MAT)を使ってヒープダンプを見るとscala.concurrent.impl.ExecutionContextImpl$$anon$4の中にThreadPoolExecutorがあり、そのworkQueueに3個のscala.concurrent.impl.Promise$Transformationがあります。これが処理を表すオブジェクトです。

scala.concurrent.impl.Promise$Transformationの各変数の役割は以下です。

  • _ecは実行に必要なExecutionContextで、最初のExecutionContextと同じIDであることからもわかるように循環参照しています。
  • _funはFuture内部で作られたLambdaですがそれほど大した意味のあるモノではなく、参照しているFuture.applyに渡したLambdaを呼び出しているだけだと考えてよいです。
  • valueは色々な値を持てるのですが、この場合は後続で実行するFutureを持っています。今回の場合はscala.concurrent.impl.Promise.Noopという何もしないことを表すオブジェクトです。
  • _argはmapなどを使った時に必要になるのですが、このコードではmapなどを使っていないのでUnitを表す定数が入っています。

scala.concurrent.imple.Promise$TransformationのRetained Heapが112Bになっていることから考えると、Futureが完了時にGC対象になるのは自身(32B)と_fun(80B)であることがわかります。

これを見ると_fun > arg$1 > arg$1のLambdaで使うオブジェクト(コードではtaskNameという変数)が考慮から漏れて大きなオブジェクトを作ってしまうとヒープを圧迫するという事故が起こりそうです。

Future#mapとFuture#flatMapは新しく処理をExecutionContextのスレッドプールのキューに入れなおす

次にFuture#mapやFuture#flatMapに指定した処理がスレッドやヒープ内ではどのようになっているか見てみます。
まずは上のコードのmainLogicを以下のようなFuture#mapを使ったコードに変えて実行します。

def mainLogic(implicit executionContext: ExecutionContext): Seq[Future[Unit]] = {
  (1 to 9).map { id =>
    val taskName = s"Task $id"
    Future {
      println(s"$taskName is sleeping...")
      Thread.sleep(10 * 1000)
      println(s"$taskName work up")
      s"some result of Task ${id}"
    }.map { _ =>
      println(s"$taskName is sleeping again...")
      Thread.sleep(10 * 1000)
      println(s"$taskName woke up again")
    }
  }
}
bash-4.2# scala Example02.scala
Task 1 is sleeping...
Task 2 is sleeping...
Task 3 is sleeping...
Task 1 work up
Task 4 is sleeping...
Task 2 work up
Task 5 is sleeping...
Task 3 work up
Task 6 is sleeping...
Task 4 work up
Task 5 work up
Task 7 is sleeping...
Task 6 work up
Task 9 is sleeping...
Task 8 is sleeping...
Task 7 work up
Task 9 work up
Task 1 is sleeping again...
Task 2 is sleeping again...
Task 8 work up
Task 3 is sleeping again...
Task 2 woke up again
Task 3 woke up again
Task 1 woke up again
Task 4 is sleeping again...
Task 5 is sleeping again...
Task 6 is sleeping again...
Task 4 woke up again
Task 6 woke up again
Task 7 is sleeping again...
Task 9 is sleeping again...
Task 5 woke up again
Task 8 is sleeping again...
Task 7 woke up again
Task 8 woke up again
Task 9 woke up again

 

「Task 6 is sleeping…」と「Task 4 work up」の間のタイミングで取得したヒープダンプを見てみます。先ほどと同じようにスレッドプールのworkQueueに6個のscala.concurrent.impl.Promise$Transformationが入っています。

末尾の3個の詳細を見てみるとLambdaのarg$1が「Task 1~3」を参照していることとscala.concurrent.impl.Promise$Transformationの_argが「some result of Task 1~3」を参照していることからmapに渡された処理がキューの末尾に追加されたことがわかります。

scala.concurrent.impl.Promise$Transformationの各変数で前回との違いは以下の2点でした。

  • Retained Heapが208BになっているオブジェクトのvalueだけはAwait.resultの影響によりNoopとの間にscala.concurrent.impl.Promise$Transformationが一つ増えています。
  • _argは最初にFuture.applyに渡された処理の結果を表すオブジェクトが入っています。つまり、mapに渡された処理の前段の処理の結果です。

次に、Future#flatMapを使った以下のコードも実行します。

def mainLogic(implicit executionContext: ExecutionContext): Seq[Future[Unit]] = {
  (1 to 9).map { id =>
    val taskName = s"Task $id"
    Future {
      println(s"$taskName is sleeping...")
      Thread.sleep(10 * 1000)
      println(s"$taskName work up")
      s"some result of Task ${id}"
    }.flatMap { _ =>
      val f = Future {
        println(s"$taskName is sleeping again...")
        Thread.sleep(10 * 1000)
        println(s"$taskName woke up again")
      }
      println(s"2nd Future of $taskName is created")
      Thread.sleep(10 * 1000)
      f
    }
  }
}
bash-4.2# scala Example03.scala
Task 1 is sleeping...
Task 2 is sleeping...
Task 3 is sleeping...
Task 1 work up
Task 4 is sleeping...
Task 2 work up
Task 5 is sleeping...
Task 3 work up
Task 6 is sleeping...
Task 4 work up
Task 5 work up
Task 8 is sleeping...
Task 7 is sleeping...
Task 6 work up
Task 9 is sleeping...
Task 8 work up
Task 9 work up
Task 7 work up
2nd Future of Task 3 is created
2nd Future of Task 1 is created
2nd Future of Task 2 is created
2nd Future of Task 4 is created
2nd Future of Task 5 is created
2nd Future of Task 6 is created
2nd Future of Task 8 is created
2nd Future of Task 9 is created
2nd Future of Task 7 is created
Task 3 is sleeping again...
Task 1 is sleeping again...
Task 2 is sleeping again...
Task 3 woke up again
Task 2 woke up again
Task 1 woke up again
Task 5 is sleeping again...
Task 4 is sleeping again...
Task 6 is sleeping again...
Task 5 woke up again
Task 6 woke up again
Task 4 woke up again
Task 9 is sleeping again...
Task 8 is sleeping again...
Task 7 is sleeping again...
Task 9 woke up again
Task 7 woke up again
Task 8 woke up again

「Task 6 is sleeping…」と「Task 4 work up」の間のタイミングで取得したヒープダンプを見てみます。先ほどと同様に6個のscala.concurrent.impl.Promise$Transformationが入っています。
また、それぞれが参照している値からFuture#mapを使った時と同様にFuture#flatMapに渡された処理が末尾に追加されていることもわかります。

scala.concurrent.impl.Promise$Transformationの各変数でFuture#mapと異なる点は、_funのShallow Heapが8B増えた点でした。Future#mapに渡したLambdaと比べてコード量が少し増えていることに起因しています。

scala.concurrent.imple.Promise$TransformationのRetained Heapの変化から考えると、Future#mapとFuture#flatMapともに最初にFuture.applyに渡された処理の結果を持つ_argがFuture完了時のGC対象として増えています。この結果を考慮すると前段の処理で大きなオブジェクトを作るとヒープを圧迫する可能性がありそうです。特にforを使っていると逐次実行と似ているので気づきにくいです。

処理の実行順序はExecutionContextの実装に依存していてスレッドプールを持つ必要さえない

mapやflatMapを呼び出すたびにキューの後ろに追加していては大量のFutureが作られる場面(例えばWebアプリケーションへの大量アクセス)が発生した時に、オブジェクトの生存期間が想定外に伸びてしまう可能性があります。
そういった問題の解決方法の1つに、ForkJoinPoolというスレッドプールを使ったExecutionContextを使うという手があります。デフォルトで用意されているscala.concurrent.ExecutionContext.globalはForkJoinPoolを使うExecutionContextなので、以下のように変更すると、実行順序が変わるので試してみます。

import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext
import scala.concurrent.{Await, Future}

object Example04 {
  def mainLogic(implicit executionContext: ExecutionContext): Seq[Future[Unit]] = {
    (1 to 9).map { id =>
      val taskName = s"Task $id"
      Future {
        println(s"$taskName is sleeping...")
        Thread.sleep(10 * 1000)
        println(s"$taskName work up")
        s"some result of Task ${id}"
      }.flatMap { _ =>
        val f = Future {
          println(s"$taskName is sleeping again...")
          Thread.sleep(10 * 1000)
          println(s"$taskName woke up again")
        }
        println(s"2nd Future of $taskName is created")
        Thread.sleep(10 * 1000)
        f
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val fs = mainLogic(scala.concurrent.ExecutionContext.global)
    fs.foreach(Await.result(_, Duration.Inf)) // mainLogicで作成したFutureの完了を待つ
  }
}
bash-4.2# scala Example04.scala
Task 1 is sleeping...
Task 2 is sleeping...
Task 3 is sleeping...
Task 4 is sleeping...
Task 1 work up
Task 2 work up
2nd Future of Task 1 is created
2nd Future of Task 2 is created
Task 3 work up
2nd Future of Task 3 is created
Task 4 work up
2nd Future of Task 4 is created
Task 1 is sleeping again...
Task 2 is sleeping again...
Task 3 is sleeping again...
Task 4 is sleeping again...
Task 1 woke up again
Task 2 woke up again
Task 3 woke up again
Task 5 is sleeping...
Task 6 is sleeping...
Task 4 woke up again
Task 7 is sleeping...
Task 8 is sleeping...
Task 5 work up
2nd Future of Task 5 is created
Task 6 work up
Task 7 work up
2nd Future of Task 6 is created
2nd Future of Task 7 is created
Task 8 work up
2nd Future of Task 8 is created
Task 5 is sleeping again...
Task 7 is sleeping again...
Task 6 is sleeping again...
Task 8 is sleeping again...
Task 5 woke up again
Task 7 woke up again
Task 9 is sleeping...
Task 6 woke up again
Task 8 woke up again
Task 9 work up
2nd Future of Task 9 is created
Task 9 is sleeping again...
Task 9 woke up again

Task 1~4がスレッドを占有しているように見える結果が得られました。
ForkJoinPoolはワークスティーリングというスケジューリング戦略を採用したスレッドプールです。ちなみにGolangのgoroutineもこの戦略を採用しているそうです。簡単に説明するとForkJoinPoolはスレッドプール全体のキューとスレッド単体のキューがあり、ForkJoinPoolのスレッドがキューに処理を入れるときは自身のキューに入れるという戦略を取っています。そして自身のキューが空の時は別のキューからタスクを取り出し実行します。(ただし、ForkJoinPoolはそれ以外にも特徴があるので使用するときは詳細を理解した方がよいです。)
「2nd Future of Task 3 is created」と「Task 1 is sleeping again…」の間のタイミングで取得したヒープダンプを見てみます。ownerが存在する4個のWorkQueueが各スレッドが持つキューで、一つだけownerが存在しないWorkQueueがスレッドプール全体のキューです。各スレッドのキューにはflatMapで作成されたFutureの処理が入っていて、スレッドプール全体のキューにはTask5~9の最初のFutureの処理が入っています。

これまで説明に使用したExecutionContextはスレッドプールを持ちますが、実はスレッドプールを使わず呼び出しスレッドをそのまま使うExecutionContextも作れます。(実用性は低いですが)

import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext
import scala.concurrent.{Await, Future}
import java.util.concurrent.Executors

object Example05 {
  def mainLogic(implicit executionContext: ExecutionContext): Seq[Future[Unit]] = {
    (1 to 3).map { id =>
      val taskName = s"Task $id"
      Future {
        println(s"$taskName is sleeping...")
        Thread.sleep(10 * 1000)
        println(s"$taskName work up")
        s"some result of Task ${id}"
      }.flatMap { _ =>
        val f = Future {
          println(s"$taskName is sleeping again...")
          Thread.sleep(10 * 1000)
          println(s"$taskName woke up again")
        }
        println(s"2nd Future of $taskName is created")
        Thread.sleep(10 * 1000)
        f
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val ec = new ExecutionContext {
      override def execute(runnable: Runnable): Unit = runnable.run()

      override def reportFailure(cause: Throwable): Unit = cause.printStackTrace()
    }
    val fs = mainLogic(ec)
    fs.foreach(Await.result(_, Duration.Inf)) // mainLogicで作成したFutureの完了を待つ
  }
}
bash-4.2# scala Example05.scala
Task 1 is sleeping...
Task 1 work up
Task 1 is sleeping again...
Task 1 woke up again
2nd Future of Task 1 is created
Task 2 is sleeping...
Task 2 work up
Task 2 is sleeping again...
Task 2 woke up again
2nd Future of Task 2 is created
Task 3 is sleeping...
Task 3 work up
Task 3 is sleeping again...
Task 3 woke up again
2nd Future of Task 3 is created

「Task 1 is sleeping again…」「Task 1 woke up again」の間のタイミングでスレッドダンプを取得するとmainメソッドから呼ばれてそのまま同じスレッドで実行していることがわかります。

"main" #1 prio=5 os_prio=0 cpu=1364.56ms elapsed=14.33s tid=0x00007fdfa4026030 nid=0x4f waiting on condition  [0x00007fdfacfa0000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(java.base@17.0.11/Native Method)
        at Main$.$anonfun$mainLogic$4(Example05.scala:18)
        at Main$$$Lambda$783/0x00007fdf303c0e98.apply$mcV$sp(Unknown Source)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:687)
        at scala.concurrent.Future$$$Lambda$781/0x00007fdf303984d8.apply(Unknown Source)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
        at Main$$anon$1.execute(Example05.scala:30)
        at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
        at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
        at scala.concurrent.impl.Promise$DefaultPromise.dispatchOrAddCallbacks(Promise.scala:312)
        at scala.concurrent.impl.Promise$DefaultPromise.map(Promise.scala:182)
        at scala.concurrent.Future$.apply(Future.scala:687)
        at Main$.$anonfun$mainLogic$3(Example05.scala:16)
        at Main$$$Lambda$782/0x00007fdf303c0ac8.apply(Unknown Source)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:470)
        at Main$$anon$1.execute(Example05.scala:30)
        at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
        at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
        at scala.concurrent.impl.Promise$DefaultPromise.dispatchOrAddCallbacks(Promise.scala:312)
        at scala.concurrent.impl.Promise$DefaultPromise.flatMap(Promise.scala:176)
        at Main$.$anonfun$mainLogic$1(Example05.scala:15)
        at Main$.$anonfun$mainLogic$1$adapted(Example05.scala:8)
        at Main$$$Lambda$779/0x00007fdf303c0438.apply(Unknown Source)
        at scala.collection.immutable.Range.map(Range.scala:59)
        at Main$.mainLogic(Example05.scala:8)
        at Main$.main(Example05.scala:34)
        at Main.main(Example05.scala)
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.11/Native Method)
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.11/NativeMethodAccessorImpl.java:77)
        (以下略)

ExecutionContextを変更するとトレードオフがあるので総合的に判断が必要ですが、未検討の場合は一度検討してみることをお勧めします。

開発メンバー募集中

より良いチームワークを生み出す

チームの創造力を高めるコラボレーションツール

製品をみる