FS2 Cribsheet - August 2021
Very quick reference into the FS2 incantations that make up their API.
Critical that you understand IO or SyncIO are basically classes holding your function - which means all the libraries can chain operations together or place them in queues and so on. Rather than the jvm calling your code, the cats effect and fs2 libraries will build up queues of operations in IO’s which they can then get the JVM to run on a thread at some point.
- IO
- Something with a side effect, ie it’s a class which holds your
function, so that it can be invoked synchronously or asynchronously with
the evenual unsafeRunSync or the unsafeRunAsync. Critically an IO has a start
api, so you can create a new fibre any time you like and schedule the work.
But try not to…the fs2 streams API is where you should be heading.
SyncIO - Just like IO but should be used for all the synchronous effects.
ie if your code should just be run and not done async then use this
IO.as or SyncIO.as - This allows you to change the content and type of an effect, and yet the original effect is still invoked. This can be very useful if you have an effect producing the wrong type of output, which you want to execute, but then use like it does something else. In fact this is how evalTap works inside. Why do you care?
// Calling processor will still invoke the IGetAString side effect. // otherwise you would have to start your own fibre, ie IO.start. def iGetAString(c:String) : IO[String] = IO{println(s"${Thread.currentThread().getName()} $c :iGetAString");"Some DB Value"} def processor():IO[Unit] = iGetAString("processor").as(()) def badProcessor():IO[Unit] = IO { println(s"${Thread.currentThread().getName()} inside badProcessor") val fibre = iGetAString("badProcessor").start // Note IO.start is possible, syncIO.start is not an API fibre.as(()) }.flatten
- IO error Handling
- Nice notes (here)[https://guillaumebogard.dev/posts/functional-error-handling/]. About half way down, he explains you should NOT throw exceptions inside IO’s, but should instead do IO.raiseError(new Exception()). Then later you use the .handleErrorWith(), or, to raise the error again but do some processing you can use onError.
Simple API’s
See the code here
Pure stream
fs2.Stream(1,2,3) Stream.emits(List(1,2,3))
Effectful stream
def Random() = SyncIO{ Math.random()*10} val l = Stream.eval( Random())
‘Running’ the stream
EvalTap
Say you have a stream of IOT house light switch state, and when they turn off the last light in the house you invoke a cartel service which returns the burglary time. IO[LocalDateTime], but your original stream is of [LightState].
So from LightState you invoke another effect returning a LocalDateTime, but you want to leave the original stream unchanged. Well EvalTap lets you evaluate a totally different effect, and then pretend that it didnt have another type or effect after all.
EvalTap to run a nested stream
When you compile and drain it down to an IO then its now perfect for EvalTap to run a finite sub stream.
Using Signals and Topics
You can share state between streams in FS2. So for instance, you could get a Kafka topic to disconnect and then restart from another Kafka topic when a command arrives.
If you want to start simpler, then look at these
Pull explained
Normally when you read a stream you only deal with this item, or chunks of items. Pull lets you write code which maintains state internall and calls itself - so the state is on ‘stack’ and immutable within each call. The clever bit is that from this function which calls itself you can emit whatever result you want whenever you want back onto the stream. And then continue to call yourself.
In fact there is an operator » which you use to invoke yourself later, which means its fs2 based call and not a stack based call. So it works for infinite streams.
Best thing to do is read the code linked above.
Revisiting fs2 stream start
Its simple to start a stream, but you could call start and get the Fibre and then cancel it, rather than use the stream takeWhile.
Example showing fibres vs compile.drain
The output is inlined in comments, again look at the thread names.
Parallel streams
Concurrent fs2 streams
An example using effectful and non effectful streams - 4 of them, all running at once using concurrently.
Example of infinite concurrent streams
Merge fs2 streams
Do not wait, take the first value from either…
This displays
1 2 1 2 2 1 1 2 2 1 1 2 2 1 .....
So it merges them, but whichever emits first gets displayed.
Interleave fs2 streams
This displays
1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 .....
FS2 chunking, impact of sleep and …
Example of chunking and sleeps
This example shows an effect being invoked which then sleeps (eg maybe it calls a db connection pool or something). You can see how each compute thread gets invoked with calls running in parallel. Look at the thread names!
FS2 Error Handling
When to throw an excemption and when to raise an error, and how the different handlers are invoked - all in the example above.
More on parallel
This is still coming, if I get time I will update it…..
Oh to code like Fabio
Sadly, I am still not fluent like Fabio.
It’s from his talk.