The aim of this blog is to walk through a proper end to end example of a multi domain Free program that combines both sequential and parallel operations. Skip to the section Lets Write our Program
if you are already familiar with Applicative
and FreeAp
.
It is assumed you are familiar with the Free Monad and the benefits of functional programming. If not, please refer to my previous blog which introduces the Free Monad and has many links to much more detailed explanations.
For reference – my github gist has a full working copy of the code here.
Recap on Applicatives
At a high level, the Free Monad gives us the power to express our programs as a sequence of instructions that we can interpret at a later time. The key word here being sequential
.
Monads are defined as:
trait Monad[F[_]] { def point[A](a: A): F[A] def flatMap[A, B](a: F[A])(f: A => F[B]): F[B] }
The part to notice here is the flatMap
. See how in the function f
, in order to produce an F[B]
we actually need an A
first? This is what forces a monad to run sequentially.
The Free Applicative on the other hand is modelled off Applicative Functors and as such allows us to run multiple computations independent of one another.
If we take a look at the definition of applicatives this becomes clear:
trait Applicative[F[_]] { def point[A](a: A): F[A] def ap[A, B](fa: => F[A])(f: => F[A => B]): F[B] }
As you can see, the method ap is what is used to apply a function A
=>B
in the context of F
to a value A
in the context of F
.
In practice this might mean if we had the following:
scala> val f = Option((i: Int) => i.toString) f: Option[Int => String] = Some($$Lambda$4986/1160677632@6bbe795e) scala> val fa = Option(4) fa: Option[Int] = Some(4)
We could apply the function contained in f
to the value contained
in fa and end up with a String with value "4"
.
In this case the F
context is Option
.
It’s not immediately clear how this lets us run things in parallel.
If you look at the function ap2
in scalaz, you begin to see how this ap
function gives us the power or running many things together without them depending on one another.
trait Applicative[F[_]] { // point and ap omitted here def ap2[A,B,C](fa: => F[A], fb: => F[B])(f: F[(A,B) => C]): F[C] }
Here we can see that given an F[A]
and an F[B]
as well as a function that can combine (A, B)
into C
, we can produce an F[C]
. There is no mention of needing any of them to be run in any particular order.
This is implemented in terms of our ap
function we saw earlier and lets us express things like:
scala> val fa = Option(4) fa: Option[Int] = Some(4) scala> val fb = Option(5) fb: Option[Int] = Some(5) scala> val f = Option((i: Int, j: Int) => (i + j).toString) f: Option[(Int, Int) => String] = Some($$Lambda$4987/1898967014@6b28dafd) scala> ap2(fa, fb)(f) res0: Option[String] = Some(9)
Using the same technique we can generalise to running any number of independent of one another.
This becomes really interesting when we execute this in parallel.
Effects such as Future
and Task
have this ability out of the box and are a great choice for running Async Parallel tasks.
Quick Overview of FreeAp
FreeAp
is the scalaz implementation of Free Applicative. In terms of structure of use it is essentially the same as the Free Monad in that you:
– Define your operations
– Lift them into the Free Applicative
– Write your program as a combination of FreeAp instructions
– Write your interpreters
– Execute your program by running your interpreters over you FreeAp instructions
As previously discussed, the problem with the above approach is that we then only get parallel computations and cannot enforce the concept of sequential ordering (which almost all programs want).
As such, we will dive into a simplified real world example of wanting to do things in sequence and in parallel.
Defining our domain
As usual we start by defining a domain. Lets create a User domain and an Analytics domain.
case class User(name: String, age: Int) sealed trait UserOperation[T] case class CreateUser(name: String, age: Int) extends UserOperation[User] sealed trait AnalyticsOperation[T] case class AnalyseUser(user: User) extends AnalyticsOperation[Int]
Pretty standard stuff so far.
Next, lets lift our operations into the Free Monad and Free Applicative:
// This is a little convenience trait that reduces boilerplate of defining the // sequential and parallel version for every operation case class ExecStrategy[F[_], A](fa: F[A]) { val seq: Free[F, A] = Free.liftF(fa) val par: FreeAp[F, A] = FreeAp.lift(fa) } // Lift our operations into the Free contexts case class UserRepo[F[_]](implicit ev: Inject[UserOperation, F]) { def createUser(name: String, age: Int): ExecStrategy[F, User] = ExecStrategy[F, User](ev.inj(CreateUser(name, age))) } object UserRepo { implicit def toUserRepo[F[_]](implicit ev: Inject[UserOperation, F]): UserRepo[F] = new UserRepo[F]() } case class AnalyticsRepo[F[_]](implicit ev: Inject[AnalyticsOperation, F]) { def analyseUser(user: User): ExecStrategy[F, Int] = ExecStrategy[F, Int](ev.inj(AnalyseUser(user))) } object AnalyticsRepo { implicit def toAnalyticsRepo[F[_]](implicit ev: Inject[AnalyticsOperation, F]): AnalyticsRepo[F] = new AnalyticsRepo[F]() }
A few things to note at this point:
– The ExecStrategy
is an idea I’m messing around with to avoid needing to define every operation twice in your Repos. You do not need to use it if you prefer not to.
– We use the Inject
typeclass to allow us to take any operation of type UserOperation
and convert it into the type F
.
– Finally we provide some implicits to allow us to construct our instances à la carte.
We now have the ability to write some programs in terms of Free
and FreeAp
.
Combining Free and FreeAp
There are many ways to combine these two abstractions but I will talk through a fairly powerful and general purpose approach.
type Program[F[_], A] = Free[FreeAp[F, ?], A]
This effectively gives us a Free Monad where the effect type is actually a Free Applicative. What this means in practice is that our program is comprised of a sequence of steps where each step has one (or more) parallel computations (embedded in the FreeApplicative).
Now we use this in the following way:
– Sequential operations are simply a FreeAp
with a single instruction which is then embedded in the top level free monad.
– Parallel operations are also just a FreeAp
but have more than one instruction. This is also then embedded in the top level free monad.
As can be seen all operations live in the same nested level of the Program
structure regardless of whether they’re parallel or sequential. The FreeAp
container determines how many operations get executed in a single step of the Free Monad.
Now we just need to lift each of our Free
or FreeAp
operations into the Program
structure. For a Free instruction this simply means embedding the single instruction in a nested FreeAp
. For a FreeAp
instruction, this just means lifting it into Free
. Lets see what those definitions might look like:
object ProgramHelpers { implicit class RichFree[F[_], A](free: Free[F, A]) { def asProgramStep: Program[F, A] = { free.foldMap[Program[F, ?]](new NaturalTransformation[F, Program[F, ?]] { override def apply[A](fa: F[A]): Program[F, A] = liftFA(fa) })(Free.freeMonad[FreeAp[F, ?]]) } } implicit class RichFreeAp[F[_], A](freeap: FreeAp[F, A]) { def asProgramStep: Program[F, A] = Free.liftF[FreeAp[F, ?], A](freeap) } def liftFA[F[_], A](fa: F[A]): Program[F, A] = Free.liftF[FreeAp[F, ?], A](FreeAp.lift(fa)) }
We define these as implicit classes to make our dsl cleaner to follow.
With this we are ready to define a program using both Free
and FreeAp
together. Lets start with a simple program that creates two users in sequence then analyses them in parallel:
def program[F[_]](implicit userRepo: UserRepo[F], analyticsRepo: AnalyticsRepo[F]): Program[F, Int] = { for { user1 <- userRepo.createUser("steve", 23).seq.asProgramStep user2 <- userRepo.createUser("harriet", 33).seq.asProgramStep sumOfAnalytics <- (analyticsRepo.analyseUser(user1).par |@| analyticsRepo.analyseUser(user2).par)((a, b) => a + b).asProgramStep } yield sumOfAnalytics }
And another example that creates users in parallel and also analyses them in parallel:
def program2[F[_]](implicit userRepo: UserRepo[F], analyticsRepo: AnalyticsRepo[F]): Program[F, Int] = { for { users <- (userRepo.createUser("steve", 23).par |@| userRepo.createUser("harriet", 33).par)((u1, u2) => (u1, u2)).asProgramStep (user1, user2) <- users sumOfAnalytics <- (analyticsRepo.analyseUser(user1).par |@| analyticsRepo.analyseUser(user2).par)((a, b) => a + b).asProgramStep } yield sumOfAnalytics }
As we can see – each operation is created as either a sequential operation (Free
) or as a parallel operation (FreeAp
) and is then subsequently lifted into our combined Program
structure. Note that we combine our parallel steps using applicative syntax |@|
as usual.
Interpreting the Program
As with Free Monads, we need to define interpreters for each of our domains and then interpret the above program with these interpreters.
Lets define some simple implementations with timeouts so that we can confirm our
domain is running the right steps in parallel as expected.
case class SlowUserInterpreter(delay: Long) extends (UserOperation ~> Task) { override def apply[A](fa: UserOperation[A]): Task[A] = fa match { case CreateUser(name, age) => Task { println(s"Creating user $name") Thread.sleep(delay) println(s"Finished creating user $name") User(name, age) } } } case class SlowAnalyticsInterpreter(delay: Long) extends (AnalyticsOperation ~> Task) { override def apply[A](fa: AnalyticsOperation[A]): Task[A] = fa match { case AnalyseUser(user) => Task { println(s"Analysing user $user") Thread.sleep(delay) println(s"Finished analysing user $user") if (user.name == "steve") 50 else 20 } } }
In order to interpret our various domain algebras, we need a way to label each step as its specific domain such that the correct interpreter can be used. This is typically solved with Coproducts. e.g.
type ProgramInstructions[A] = Coproduct[UserOperation, AnalyticsOperation, A]
This is simply saying “for each step in our program, we either have a UserOperation
or an AnalyticsOperation
“. This is implemented using disjunctions in scalaz. The simplest way to visualise what is happening here is for each operation we either go Left or Right based on its type. Left leads us to UserOperation
and right leads us to AnalyticsOperation
. If you have more than two domain algebras, you simply nest further so that right is just another Coproduct
. Keep in mind that every new domain algebra you add will incur the cost of traversing this tree of Coproduct
to find the correct interpreter. If this becomes an issue for your project, there are libraries out there such as
iota which provide constant time lookups for arbitrarily large lists of instruction sets.
Now we need to define the following interpreter in order to interpret our program that was defined earlier:
val programInterpreter: ProgramInstructions ~
>Task = ???
As discussed, we need to define a way to find the correct interpreter for an instruction set given the Coproduct
. Here are a few simple helpers to allow us to do so:
object InterpreterHelpers { def combineInterpreters[F[_], G[_], H[_]](f: F ~> H, g: G ~> H): Coproduct[F, G, ?] ~> H = new (Coproduct[F, G, ?] ~> H) { override def apply[A](fa: Coproduct[F, G, A]): H[A] = fa.run match { case -\/(ff) => f(ff) case \/-(gg) => g(gg) } } implicit class RichNaturalTransformation[F[_], H[_]](val f: F ~> H) { def or[G[_]](g: G ~> H): Coproduct[F, G, ?] ~> H = combineInterpreters[F, G, H](f, g) } }
This now allows us to combine interpreters using or
(much like in cats).
Now we are almost there. The last issue we need to solve is that our program doesn’t actually require an interpreter of the form ProgramInstructions ~
>Task
. This is because we actually embedded our FreeAp
in the Free
instruction slot. As such, we actually need to transform a FreeAp
into a Task
, i.e.
FreeAp[ProgramInstruction, ?] ~
>Task
. To do this, we define a helper class:
case class ParallelInterpreter[G[_]](f: ProgramInstructions ~> G)(implicit ev: Applicative[G]) extends (FreeAp[ProgramInstructions, ?] ~> G) { override def apply[A](fa: FreeAp[ProgramInstructions, A]): G[A] = fa.foldMap(f) }
As can be seen, given our known interpreter stack (ProgramInstructions ~
>Task)
this produces a way to go from FreeAp[ProgramInstruction, ?] ~
>Task
as required.
Now we have all the machinery we need to run our program:
import InterpreterHelpers._ val programInterpreter: ProgramInstructions ~> Task = SlowUserInterpreter(delay = 5000) or SlowAnalyticsInterpreter(delay = 2000) program[ProgramInstructions].foldMap(ParallelInterpreter(programInterpreter)).unsafePerformSync
This will construct an interpreter which is the Coproduct
of our two individual interpreters where the User creation will take 5 seconds and the analysing of a single user will take 2 seconds.
If we recall, our program created two users in sequence then analysed them both in parallel. As such we expect this program to run in ~12 seconds.
If you try this yourself you will find that it actually takes closer to 14 seconds.
Wait. What gives?!
Well if we take a close look at how this is being executed, we actually don’t explicitly run anything in parallel. We defer that to the Applicative instance of our target Applicative
. In this case it is Applicative[Task]
. If you look at the applicative instance provided out of the box for Task, you will see that its actually a sequential implementation as it is derived form the Monad[Task]
instance which is necessarily sequential.
Task actually provides a parallel implementation called Task.taskParallelApplicativeInstance
which is actually a Task
that has been tagged as Parallel
and must be explicitly used in place of Task
. I find this to be a bit tedious and simply opted to define my own applicative instance for task to keep things simple for now. (Thanks to pchiusano for the example).
val parallelTaskApplicative = new Applicative[Task] { def point[A](a: => A) = Task.now(a) def ap[A,B](a: => Task[A])(f: => Task[A => B]): Task[B] = apply2(f,a)(_(_)) override def apply2[A,B,C](a: => Task[A], b: => Task[B])(f: (A,B) => C): Task[C] = Nondeterminism[Task].mapBoth(a, b)(f) }
Now we simply run our program from before passing in the parallel applicative for Task
and our program runs in 12 seconds as expected!
val programInterpreter: ProgramInstructions ~> Task = SlowUserInterpreter(delay = 5000) or SlowAnalyticsInterpreter(delay = 2000) program[ProgramInstructions].foldMap(ParallelInterpreter(programInterpreter)(parallelTaskApplicative)).unsafePerformSync
And voila! Our program can now run both sequential AND parallel operations in a single Free Structure.
Closing Thoughts
Once the boilerplate has been set up, this is a really clean and powerful way to express programs. I personally really like expressing programs with these constructs as it helps you to focus on clean domain design due to the clear separation of domain from effects.
I’d also highly recommend you to take a look at the following talks by John de Goes as they helped me better understand the details of some of this stuff:
– Beyond Free Monads
– Move Over Free Monads: Make Way for Free Applicatives
Its also worth noting that there is a library called Freestyle which automated away a lot of this boilerplate however comes at the cost of code generation via macros – which I personally find harder to use in the development cycle.