I used alpakka and kafka streams, and our http4s kafka streams hit a brick wall, so it is time to properly explore what fs2 is, or discard it.

All details below are cribbed for my own reference, but maybe you will find them useful.

Lets see some Scala

First off, the scala 3 fs2 libs up to 3.0.6 (the latest) are all broken. So unless you download the zip, stick with the 2.13 versions.

build.sbt

import sbt.Keys.libraryDependencies

val scala3Version = "3.0.0"

lazy val root = project
  .in(file("."))
  .settings(
    name := "scala3-fs2",
    version := "0.1.0",

    scalaVersion := scala3Version,

  // https://mvnrepository.com/artifact/co.fs2/fs2-core
	libraryDependencies += "co.fs2" % "fs2-core_2.13" % "3.0.6",
	// optional I/O library
	libraryDependencies += "co.fs2" % "fs2-io_2.13" % "3.0.6",

	// optional reactive streams interop
	libraryDependencies += "co.fs2" % "fs2-reactive-streams_2.13" % "3.0.6",
  )

Emit

So, no side effects, ie no IO, just a Stream of [Pure, A].

You can lift this stream into a stream of [IO, A] using .covary[IO] operation.

See the fs2 main docs for examples, it pretty easy.

Streams and Topics

There is example code below written for Scala 3 and Cats 3 in July 2021.

General examples of fs2 streams
fs2 Stream starting and stopping
Using fs2 topics and signals to coordinate
fs2 Pulls explained - maybe

If you have two Kafka consumers, one is a command channel and one is a reader. The command channel can ask the main consumer to disconnect and reconnect at another offset.

So, start an FS2 Topic, when the command channel gets the reset message, republish on the fs2 topic.

In the main consumer, create a signal, and two streams, one listens to the fs2 topic, and one to the kafka topic. When the fs2 topic gets the reset then it can signal the main consumer to terminate.

Wrap this second set - ie the fs2 topic consumer, and the kafka consumer in another stream which repeats.

So, the flow is now

  1. Get the kafka command reset message
  2. publish it to the fs2 topic
  3. receive fs2 topic reset, signal the main consumer of business message to terminate.
  4. repeat the fs2 topic and business message consumers.

So, how to code it - look here: https://github.com/PendaRed/scala3-fs2/tree/main/src/main/scala/com/jgibbons/fs2/c

Sorry its a dummy Kafka, to simplify the code….

what is fs2 pull good for

Streams are pure, and emit values. Or they are effectful (ie have an IO type) and also emit values.

If you want to maybe aggregate a sequence of values in the stream and then continue, how to do that - well pull is your friend. You must know the right API to call to change a stream into a pull, then you can call more chunks from the stream inside the pull, until you have what you want, then convert the pull back to a stream so it can continue.

eg collect all the characters until you have a full line. Or collect up to 5 items from the stream. This idea of working on multiple elements at once is called ‘stateful’ in the docs. Streams provide many APIs that do much of the above anyway, but the implementation for them all use Pulls -ie click into them and you will see.

The pull description is hard to get - talking about monadic in results (or sometimes resources, because the docs are confused). ie if you call stream.pull.uncons you get a stream with results, which you can then call things like map, flatmap etc - ie it is monadic on the results. eg

val g: Pull[IO, INothing, Option[(Chunk[Int], Stream[IO, Int])]] = Stream.eval(IO{1}).repeatN(20).pull.uncons

Look at the pull API for functions which can takeN or echo or etc.

Once you have your Pull, you want to do stuff, before getting back into the stream (maybe). The crucial APIs are:

  • Pull.output() - this can be given chunks and will give you a pull with output rather than results. This is critical as you can then call .stream on it to get back to streaming stuff.
  • pull.stream - You need a pull with output - which you can create with Pull.output(). This converts the pull back to a stream.
  • stream.through - passes the stream into the function and gets a pipe back
  • pipe is simply a function that takes a stream of one type and returns a stream of another type.
  • pull >> - this is a lazy append, if the first pull works, append the second pull. Good for appending the terminator Pull.done, but even better at appending a function (to a helper for instance) which will get run sometime - but critcally not using stack based recursion.
  • Pull.pure(None) >> somethingelse - This emits nothing, but lets you then use the >> operator, so you can have a function call after >> which FS2 will defer to be executed, avoiding stack based recusion.
  • evalTap - Magically takes the stream element and then will schedule any SyncIO or IO of any type without impacting the stream itself! The magic is that IO{yyy}.as(xxx) will return an IO{xxx}, so you can call one effectful function and then pretend it returned something else. Or as the documentation says, keep the effect but replace the return type. Or, if you are me, it is the same as running the effectful function, but then replacing the return type, which is how evalTap does its magic.
  • compile - gets a projection of the stream which you can drain into an IO[Unit] needed by IOApp, or convert toList etc. Now it isn't needed on a pure stream with no side effects.
  • compile.drain again - say you have a stream, for instance the kafka assigned partitions on a rebalance, and you want to do an evalTap(consumer=>someFn(consumer)), and your someFn function uses the assigned partitions stream, but you just want the stream to run and give you gave an IO[Unit], well, thats when the compile.drain above is great. Putting it simpler, if you use evalTap, and you want to run a stream inside it, then you create the stream and compile.drain which gives an IO[Unit] which evalTap will then change to be an IO of the stream type.

So lets put it together, in pseudo code The stream code, lets call it streamA

streamA.through( aPipe).compile.drain

The pipe code:

def aPipe : Pipe =
  def myRecursiveHelper(s:Stream) returns a pull = {
    // from the input stream, convert to a pull where we can flatmap // the results - uncons waits for some chunks to have been
    // emited, ie they are the results in the pull.
    s.pull.uncons.flatMap {
        do some conditional stuff resulting in either a
        call to myRecursiveFunction
        or a Pull.output(some resutls) >> flatMap(Pull.done)
        or a Pull.pure() >> flatMap(Pull.done) to remove an element from the stream
        or a Pull.done
    }
  }
  inStream => myRecursiveHelper(inStream).stream

The critical magic is that calling Pull.output(foo) gives you a pull whch you can then call .stream on. ie a Pull can contain an effect, output, results. Only a pull with output can go back to a stream.

Gotcha with evalTap and infinite streams

Do not do an evalTap for a nested stream which doesn’t end. If you do then the evalTap will not end, ie that stream will sit there emitting. This bite me for the fs2 kafka consumer.assignmentStream, which is a stream will will emit on every revoke/assign. ie it doesn’t just emit once, but stays active. We played with calling .start to create a new fibre, but obviously the correct thing is to run it concurrently with the main message consumer stream.

Debate

Nothing about the fs2 api is obvious. If you look at the documentation and think its all inpenetrable you are not alone.

So rather than try to understand it, you just have to remember the magic incantations above.

Pull! Got it?

If you want to work on multiple elements in a way not supported by Stream out of the box use a pull.

When you are done, use {Pull.output() » helpFn()}.stream to get back to the stream and continue.

Actually, there is too much to say on Pull, so look at the code example here: fs2 Pulls explained - maybe

fs2 notes from youtube

Fabio Labella has some great you tube talks - so watch them. Some of his quotes:

Streams are great for things that will not fit into memory.

A stream is a lazy emission.

IOs are your words, streams are your sentences.

FS2 is about dealing with the emitted items and causing effects. Each efect is an IO, and this is put into a fibre. A fibre has a stack of things in order to execute, and a fibre simply asks the execution context or scheduled execution context (ie thread pool) to run the effect and then yields every so often so other fibres can get some work done.

Zip will merge two stream, but it will wait for something to be emitted from each.

FS2 is all about effectful streams, ie things happen which cause other things to happen.

In cats an effect is called an IO.

IO is a monad, so F[_] in scala 2 can be used in your type classes, or you could just use IO.

Supports resource safety - ie auto closing them, and also concurrency constructs such as semaphores, ref, queues etc.

Simple pure streams have no side effects. You can lift them into IO using .covary[IO]. ie lifts it into an effectful stream.

Compile will take a Stream[IO, A] and change it to a single IO[Unit]. You can then call .unsafeRunSync on it (but actually, extending IOApp.Simple is the way to go).

Compile will compile to a single effect, so you could call .toList after compile instead of .drain.

IO[Unit] (eg after compile) describes running the stream of IOs and dealing with all the effects.

This is a gppd description of the components https://devon-miller.gitbook.io/test_private_book/sstream_model

But the description of pull is basically mad.