Very quick reference into the FS2 incantations that make up their API.

Critical that you understand IO or SyncIO are basically classes holding your function - which means all the libraries can chain operations together or place them in queues and so on. Rather than the jvm calling your code, the cats effect and fs2 libraries will build up queues of operations in IO’s which they can then get the JVM to run on a thread at some point.

IO
Something with a side effect, ie it’s a class which holds your function, so that it can be invoked synchronously or asynchronously with the evenual unsafeRunSync or the unsafeRunAsync. Critically an IO has a start api, so you can create a new fibre any time you like and schedule the work. But try not to…the fs2 streams API is where you should be heading.

SyncIO
Just like IO but should be used for all the synchronous effects. ie if your code should just be run and not done async then use this

IO.as or SyncIO.as
This allows you to change the content and type of an effect, and yet the original effect is still invoked. This can be very useful if you have an effect producing the wrong type of output, which you want to execute, but then use like it does something else. In fact this is how evalTap works inside. Why do you care?
// Calling processor will still invoke the IGetAString side effect.
// otherwise you would have to start your own fibre, ie IO.start.
def iGetAString(c:String) : IO[String] = IO{println(s"${Thread.currentThread().getName()} $c :iGetAString");"Some DB Value"}
def processor():IO[Unit] =
  iGetAString("processor").as(())
def badProcessor():IO[Unit] =
  IO {
    println(s"${Thread.currentThread().getName()} inside badProcessor")
    val fibre = iGetAString("badProcessor").start // Note IO.start is possible, syncIO.start is not an API
    fibre.as(())
  }.flatten


IO error Handling
Nice notes (here)[https://guillaumebogard.dev/posts/functional-error-handling/]. About half way down, he explains you should NOT throw exceptions inside IO’s, but should instead do IO.raiseError(new Exception()). Then later you use the .handleErrorWith(), or, to raise the error again but do some processing you can use onError.

Simple API’s

See the code here
Pure stream

fs2.Stream(1,2,3)
Stream.emits(List(1,2,3))

Effectful stream

def Random() = SyncIO{ Math.random()*10}
val l = Stream.eval( Random())

‘Running’ the stream