Tuesday, July 22, 2014

Akka Actors Blocking on Futures

Target audience: Intermediate
Estimated reading time: 6'

This is a brief introduction to distributed computing using blocking Scala/Akka futures.


Table of contents
Follow me on LinkedIn

Overview

Akka is actor based and message-driven framework for building concurrent and reliable applications. As such Akka supports futures for which the requester never block waiting for a response. A message or request is sent for a execution and the expect response (success or failure/exception) is delivered asynchronously in the future. The code snippets in our two examples omits condition test or supporting methods for clarity sake. The code compiles and executes with Akka 2.2.4 and 2.3.6

Note: For the sake of readability of the implementation of algorithms, all non-essential code such as error checking, comments, exception, validation of class and method arguments, scoping qualifiers or import is omitted 

Concurrent processing

One simple and common usage of Akka/Scala futures is to have some computation performed concurrently without going through the pain of creating actors.
Not every computation requires a sequential execution for which the input of one task depends on the output of another tasks. Let's consider the case of the evaluation of a model or function against a predefined time series or dataset.

The first step is to create a controller to manage the concurrent tasks, FuturesController (line 3). The controller takes the input time series xt (line 4) and a list of model candidates, xs as function of time Function1[Double] (line 5). The time series uses a single variable (dimension 1), so the models are simply defined as a one variable function (x: Double) => f(x).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
case class Start()
 
final class FuturesController(
  xt: Array[Double],
  xs: List[Double => Double])
 (implicit timeout: Timeout) extends Actor {
   
  override def receive = {
    case s: Start => {
      val futures = createFutures
      val results = futures.map(
         Await.result(_, timeout.duration)
      )

      sender ! results.zipWithIndex.minBy( _._1 )._2
    }
    case _ => logger.error("Message not recognized")
  }
 
  def createFutures: List[Future[Double]]
}

The event handler receive (line 8) for the message Start creates as many future as needed (equal to the number of models to evaluate) (line 10). The controller/actor blocks by invoking Await until each of the future tasks completes (line 12). Finally, the controller returns the result of the computation (line 15), in this case the fittest of the models xs. The handler logs a simple error message in case a message other than Start is received (line 17).

The futures are created through the method createFutures. The implementation of createFutures consists of computing the least squared error for each model relative to the input dataset using a map transformation and a fold aggregator (lines 4 - 7).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def createFutures: List[Future[Double]] =
  
  xs.map(f => Future[Double] { 
    val yt = Range(0, xt.size).map( f(_) )
    val r = (xt, yt).zipped./:(0.0)((sum, z) => {
       val diff = z._1 - z._2
       sum + diff*diff
    })

    Math.sqrt(r)
  })
}

Evaluation

Let's consider an input data set generated with the following model
              y = m(m-1) + r // r [0. 1]
where r is a random value between 0 and 1, representing noise.



val TIME_SERIES_LEN = 200
val xt = Array.tabulate(TIME_SERIES_LEN)(
    m => m*(m - 1.0) + Random.nextDouble
)

The following code snippet lists all key packages to be imported for most common applications using Akka actors and futures (lines 1 to 6).

The driver program instantiates the Actor system (line 12), creates the controller actor master using the Akka actor factory actorOf (lines 13, 14). It sends a ask request (line 18) and returns the best model (line 22) if the controller completes it tasks within a predefined timeout (line 10). The code print the stack trace in case of failure (line 23).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import akka.actor.{Actor, Props, ActorSystem}
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import akka.pattern.ask
 

private val duration = Duration(2000, "millis")
implicit val timeout = new Timeout(duration)
   
val actorSystem = ActorSystem("system")
val master = actorSystem.actorOf(
  Props(new FuturesController(xt, approx)), "Function_eval"
)
 
Try {
  val future = master ? Start()
  Await.result(future, timeout.duration)
} 
match {
  case Success(n) => logger.info(s"Best fit is $n")
  case Failure(e) => e.printStackTrace
}

actorSystem.shutdown


It is critical that the application shuts down the the Akka system before it exits (line 26).

References