Very quick reference into the FS2 incantations that make up their API.

Critical that you understand IO or SyncIO are basically classes holding your function - which means all the libraries can chain operations together or place them in queues and so on. Rather than the jvm calling your code, the cats effect and fs2 libraries will build up queues of operations in IO’s which they can then get the JVM to run on a thread at some point.

Something with a side effect, ie it’s a class which holds your function, so that it can be invoked synchronously or asynchronously with the evenual unsafeRunSync or the unsafeRunAsync. Critically an IO has a start api, so you can create a new fibre any time you like and schedule the work. But try not to…the fs2 streams API is where you should be heading.

Just like IO but should be used for all the synchronous effects. ie if your code should just be run and not done async then use this or
This allows you to change the content and type of an effect, and yet the original effect is still invoked. This can be very useful if you have an effect producing the wrong type of output, which you want to execute, but then use like it does something else. In fact this is how evalTap works inside. Why do you care?
// Calling processor will still invoke the IGetAString side effect.
// otherwise you would have to start your own fibre, ie IO.start.
def iGetAString(c:String) : IO[String] = IO{println(s"${Thread.currentThread().getName()} $c :iGetAString");"Some DB Value"}
def processor():IO[Unit] =
def badProcessor():IO[Unit] =
  IO {
    println(s"${Thread.currentThread().getName()} inside badProcessor")
    val fibre = iGetAString("badProcessor").start // Note IO.start is possible, syncIO.start is not an API

IO error Handling
Nice notes (here)[]. About half way down, he explains you should NOT throw exceptions inside IO’s, but should instead do IO.raiseError(new Exception()). Then later you use the .handleErrorWith(), or, to raise the error again but do some processing you can use onError.

Simple API’s

See the code here
Pure stream


Effectful stream

def Random() = SyncIO{ Math.random()*10}
val l = Stream.eval( Random())

‘Running’ the stream

Example of emits

Example of stream from list

Example of eval

Example of Zip


Say you have a stream of IOT house light switch state, and when they turn off the last light in the house you invoke a cartel service which returns the burglary time. IO[LocalDateTime], but your original stream is of [LightState].

So from LightState you invoke another effect returning a LocalDateTime, but you want to leave the original stream unchanged. Well EvalTap lets you evaluate a totally different effect, and then pretend that it didnt have another type or effect after all.

EvalTap example

EvalTap to run a nested stream

When you compile and drain it down to an IO then its now perfect for EvalTap to run a finite sub stream.

Nested sub finite stream

Using Signals and Topics

You can share state between streams in FS2. So for instance, you could get a Kafka topic to disconnect and then restart from another Kafka topic when a command arrives.

Again, see the code

If you want to start simpler, then look at these

Pull explained

See the code with notes

Normally when you read a stream you only deal with this item, or chunks of items. Pull lets you write code which maintains state internall and calls itself - so the state is on ‘stack’ and immutable within each call. The clever bit is that from this function which calls itself you can emit whatever result you want whenever you want back onto the stream. And then continue to call yourself.

In fact there is an operator » which you use to invoke yourself later, which means its fs2 based call and not a stack based call. So it works for infinite streams.

Best thing to do is read the code linked above.

a simpler non annoted pull

Revisiting fs2 stream start

Its simple to start a stream, but you could call start and get the Fibre and then cancel it, rather than use the stream takeWhile.

Example showing fibres vs compile.drain

The output is inlined in comments, again look at the thread names.

Parallel streams

Concurrent fs2 streams

An example using effectful and non effectful streams - 4 of them, all running at once using concurrently.

Example of infinite concurrent streams

Merge fs2 streams

Do not wait, take the first value from either…

import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef

import scala.concurrent.duration.*
@main def StartingParStreams =
  val s1 = fs2.Stream.eval(IO{1}).metered(100.millis).repeatN(20)
  val s2 = fs2.Stream.eval(IO{2}).metered(100.millis).repeatN(20)

  val s3 = s1.merge(s2)
  val s4 =>{println(x); x})
  val s5 = s4.compile.drain.unsafeRunSync

This displays

1 2 1 2 2 1 1 2 2 1 1 2 2 1 .....

So it merges them, but whichever emits first gets displayed.

Interleave fs2 streams

import cats.effect.IO
import scala.concurrent.duration.*

@main def InterleaveFs2Streams = {
  val s1 = fs2.Stream.eval(IO{1}).metered(100.millis).repeatN(20)
  val s2 = fs2.Stream.eval(IO{2}).metered(100.millis).repeatN(20)

  val s3 = s1.interleave(s2)
  val s4 =>{println(x); x})
  val s5 = s4.compile.drain.unsafeRunSync

This displays

1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 .....

FS2 chunking, impact of sleep and …

Example of chunking and sleeps

This example shows an effect being invoked which then sleeps (eg maybe it calls a db connection pool or something). You can see how each compute thread gets invoked with calls running in parallel. Look at the thread names!

FS2 Error Handling

Example of error handling

When to throw an excemption and when to raise an error, and how the different handlers are invoked - all in the example above.

More on parallel

This is still coming, if I get time I will update it…..

Oh to code like Fabio

Sadly, I am still not fluent like Fabio.

Example of Fabio fs2

It’s from his talk.