BlogtechnicalAn Introduction to ZIO Kafka

An Introduction to ZIO Kafka

A short introduction to writing Kafka stream processing applications with the ZIO Kafka library.

19 April 2020# scala# functional programming# kafka# stream processing

ZIO Kafka is a lean and expressive library for interacting with Kafka through a ZIO Streams-based interface.

At Ziverge, we're big fans of Apache Kafka for creating a messaging fabric between microservices. From replicating data using changelog topics, to distributing work to multiple consumers, Kafka is a robust and scalable data store for processing loads of data.

In this post, we'd like to introduce you to ZIO Kafka - a lean library that provides an elegant interface to the Kafka APIs based on ZIO and ZIO Streams. After a quick introduction, we'll show how you can easily create a consumer process that reads records from Kafka, chunks them up and writes them to files.

We're going to use some imports throughout this file, so for brevity's sake, we'll list them here:

import zio._, zio.stream._, zio.duration._
import blocking.Blocking, clock.Clock
import zio.kafka.consumer._, zio.kafka.producer._, zio.kafka.serde._

import org.apache.kafka.clients.consumer.{ KafkaConsumer, ConsumerRecords, ConsumerRecord }

Why ZIO Kafka?

Interfacing applications with Kafka using the standard consumer and producer libraries is not an easy task, particularly if we're looking for asynchronous, non-blocking processing that integrates well with ZIO. Consuming Kafka topics is an effectively infinite computation; at any point in time, more data could be written to those topics. Therefore, long-running applications that consume these topics must be structured as infinite loops.

For example, assuming we've constructed a Kafka consumer, here's how the consumption loop would look like:

val consumer: KafkaConsumer[String, String] = ???

def processRecords(records: ConsumerRecords[String, String]): Task[Unit] = ???

val consumerFiber: ZIO[Blocking, Nothing, Fiber[Throwable, Nothing]] =
  (for {
    data <- blocking.effectBlocking(consumer.poll(50.millis.asJava))
    _    <- processRecords(data)
    _    <- blocking.effectBlocking(consumer.commitSync())
  } yield ()).forever.fork

While great as an initial approach, there are two major downsides to here. First, all of the processing steps are done synchronously. We will not commit our offsets before processing the current batch of records is done, and we will not fetch additional records before we're done committing the offsets. There's a strict happens-before relation between each of the steps here, and complete lack of pipelining. That means that if processRecords is slow, we will see delays in polling the consumer.

Second, the infinite loop structure scales pretty badly for control flow that requires handling more than one batch of data - for example, aggregating batches of records up to a timeout, and once the timeout expires, only then performing the processing and committing. We can try to apply that to the above snippet to see how painful it is.

Let's factor out the poll and commit steps:

def runConsumerLoop(
  consumer: KafkaConsumer[String, String],
  f: ConsumerRecords[String, String] => Task[Unit]) =
  for {
    data <- blocking.effectBlocking(consumer.poll(50.millis.asJava))
    _    <- f(data)
    _    <- blocking.effectBlocking(consumer.commitSync())
  } yield ()

val consumer: KafkaConsumer[String, String] = ???

def processRecords(records: ConsumerRecords[String, String]): Task[Unit] = ???

val consumerFiber: ZIO[Blocking, Nothing, Fiber[Throwable, Nothing]] = 
  runConsumerLoop(consumer, processRecords).forever.fork

With the structure of runConsumerLoop, it is clear that there's no good way for the f function to operate on more than one batch of records at a time. For that to happen, f must store the buffered batches in a mutable reference somewhere. Another problem immediately pops up with that though: even though f buffered some batches for later processing, runConsumerLoop would still commit those batches!

As an alternative approach to structuring our code, we could use streams of records: ZStream[Any, Throwable, ConsumerRecord[String, String]]. This way, we're inverting the control; the processing function can consume that stream and apply whatever strategy makes sense for processing those records:

val recordStream: ZStream[Consumer with Clock with Blocking,
                          Throwable,
                          CommittableRecord[String, String]] = 
  Consumer.plainStream(Serde.string, Serde.string).flattenChunks

def processRecords(records: List[CommittableRecord[String, String]]): Task[Unit] = ???

val processingFiber = 
  recordStream
    .groupedWithin(1000, 30.seconds)
    .mapM { batch => 
      processRecords(batch) *>
        batch.map(_.offset)
          .foldLeft(OffsetBatch.empty)(_ merge _) 
          .commit
    }
    .runDrain
    .fork

I think you'd all agree that this declarative style of code is much more attractive compared to the previous loop. Beyond aesthetics, note that we only needed one line to aggregate the incoming records into batches of 1000 (with up to 30 seconds of waiting). That's the biggest win, by far!

ZIO Kafka 101

Let's see how we setup an application to use ZIO Kafka. The current version of the library is built against the 1.0.0-RC18-2 version of ZIO. Add the library to your sbt build like so:

libraryDependencies += "dev.zio" %% "zio-kafka" % "0.7.0"

First, we'll create the Kafka Consumer as a ZLayer, and provide it to our consumer stream:

val consumer = Consumer.make(ConsumerSettings(List("localhost:9092")))
// consumer: ZLayer[Clock with Blocking, Throwable, Consumer] = zio.ZLayer@357cdb00

The individual operations on the consumer and producer can be accessed through accessor methods on the zio.kafka.consumer.Consumer and zio.kafka.producer.Producer objects. For example, this is how we'd create a stream that subscribes to a topic and consumes it:

val dataTopicRecords = 
  Consumer.subscribeAnd(Subscription.topics("data-topic"))
    .plainStream(Serde.string, Serde.string)
    .flattenChunks
// dataTopicRecords: ZStream[Clock with Blocking with Consumer, Throwable, CommittableRecord[String, String]] = zio.stream.ZStream@1e4d93f7

The type signature tells us that this stream depends on a Kafka consumer that deals with strings. We can add some effects to the stream by printing every consumed value and committing the offsets:

val printerStream = dataTopicRecords
  .mapM { committableRecord =>
    console.putStrLn(committableRecord.record.value)
      .as(committableRecord.offset)
  }
  .aggregateAsync(Consumer.offsetBatches)
  .mapM(_.commit)
// printerStream: ZStream[Clock with Blocking with Consumer with console.package.Console, Throwable, Unit] = zio.stream.ZStream@3d1b43d8

At this point, we can provide the Kafka Consumer layer to our stream and convert it to a ZManaged fiber. This ZManaged value, when run, will execute the stream in a fiber and interrupt the fiber when the ZManaged is released:

val managedStreamFiber = printerStream
  .provideCustomLayer(consumer)
  .foreachManaged(_ => ZIO.unit).fork
// managedStreamFiber: ZManaged[ZEnv, Nothing, Fiber.Runtime[Throwable, Unit]] = zio.ZManaged@9f52eb7

The provideCustomLayer function is available on most ZIO data types (ZIO, ZManaged, ZStream, etc.) and is extremely convenient when the environment requirements that are left after providing our layer are a subset of the standard ZEnv environment. In our case, the requirements that are left are Blocking and Clock, so we can use this handy form.

It is highly recommended to compose the main modules of your application as ZManaged values (or, equivalently, ZLayer values if you'd like to inject them as dependencies to other parts of your application). ZManaged values can be freely composed with other values with the flatMap, <*>, ZManaged.collectAll combinators just like ZIO, with the added benefit of maintaining correct finalizer ordering.

For our application's entrypoint, we'll execute two of these streams, each one consuming a different topic. First, here's a function that does everything we've discussed so far:

def createConsumerStream(topic: String) = 
  Consumer.subscribeAnd(Subscription.topics(topic))
    .plainStream(Serde.string, Serde.string)
    .flattenChunks
    .mapM { committableRecord =>
      console.putStrLn(committableRecord.record.value)
        .as(committableRecord.offset)
    }
    .aggregateAsync(Consumer.offsetBatches)
    .mapM(_.commit)
    .provideCustomLayer(Consumer.make(ConsumerSettings(List("localhost:9092"))))
    .foreachManaged(_ => ZIO.unit)
    .fork

The application is constructed as a ZManaged value that executes two streams, and yields a ZIO value that can be used to join all of the streams. We're doing this to make sure that our main fiber exits if one of the background fibers exits or fails. We can execute that ZIO value inside ZManaged#use, which we immediately call:

val app = 
  (for {
    first  <- createConsumerStream("first")
    second <- createConsumerStream("second")
  } yield ZIO.raceAll(first.join, List(second.join)))
    .use(identity)
// app: ZIO[ZEnv, Throwable, Unit] = zio.ZIO$CheckInterrupt@46c2189e

Ok! That's a good skeleton for our application. We can now proceed to writing the actual logic which batches records and writes them to files.

Writing our ETL process

For our example, we'll write a consumer that groups incoming records into batches of 16kB or 4096 records, while waiting for up to 30 seconds for those conditions to be fulfilled.

We will package this functionality up in a ZIO module that provides a stream transformer function - that's a function of the form ZStream[R, E, A] => ZStream[R, E, B]. Our module will also keep a running counter of the number of files that have been written; that way the application can expose some runtime metrics.

To start, we will specify our interface and accessors:

type RecordChunking = Has[RecordChunking.Service]

object RecordChunking {
  trait Service {
    def writeRecords[R](
      stream: ZStream[R, Throwable, CommittableRecord[String, String]]): 
      ZStream[R, Throwable, OffsetBatch]
  }

  def writeRecords[R](
    stream: ZStream[R, Throwable, CommittableRecord[String, String]]): 
    ZStream[R with RecordChunking, Throwable, OffsetBatch] = 
    ZStream.accessStream(_.get[Service].writeRecords(stream))
}

The ZStream.accessStream function is a handy utility for accessing functions on the environment that return streams. It is most commonly used when defining accessors for your modules.

Next, let's write the implementation for the module:

import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }

class Live(filePrefix: String, writtenFiles: Ref[Int]) extends RecordChunking.Service {
  val batchRecords = ZSink.foldWeighted(List[String]())(
    (rec: CommittableRecord[String, String]) => rec.value.length,
      16384) { (acc, el) =>
        el.record.value :: acc
      }.map(_.reverse.mkString("\n").getBytes(StandardCharsets.UTF_8))

  val batchOffsets = ZSink.foldLeft(OffsetBatch.empty) { 
    (acc, rec: CommittableRecord[String, String]) => 
      acc.merge(rec.offset) 
  }

  def writeRecords[R](
    stream: ZStream[R, Throwable, CommittableRecord[String, String]]): 
    ZStream[R, Throwable, OffsetBatch] = 
    stream
      .aggregate(batchOffsets zipPar batchRecords)
      .mapM { case (offsets, data) =>
        for {
          fileIndex <- writtenFiles.updateAndGet(_ + 1)
          _ <- Task {
                 Files.write(
                   Paths.get(filePrefix, s"chunk-$fileIndex"),
                   data
                 )
               }
        } yield offsets
      }
}

Whew, a lot to unpack here! Let's walk through it step by step. First, we have the definition for the ZSink named batchRecords. A complete tutorial of ZSink is beyond the scope of this article, but a good intuition for sinks are composable aggregators of values. This sink implements a weighted fold: it's like a normal fold, but it tracks the cost of the aggregated value using a cost function defined on the elements. This sink will aggregate records into a list for as long as the list has less than 16kB of records.

We're also applying ZSink#map on the sink: once the sink reaches 16kB of records and yields its result (the list of records), we're applying a function to that result. In this case, we're reversing the list, concatenating the string and extracting the UTF-8 bytes of the data.

Next, we have the batchOffsets sink. This one is identical to a left fold on a list; we're aggregating the individual offsets on the records to a batch of offsets that can be committed at once.

We're using both of these sinks in the call to ZStream#aggregate. They are combined using the ZSink#zipPar function: this function creates a new sink that feeds the incoming elements into both sinks, ending as soon as one of the sinks signals completion. The resulting sink will yield a tuple of the two results. The ZStream#aggregate function will repeatedly apply the composite sink to the stream, resulting in a stream that consists of the aggregated results.

Finally, we're writing the chunks of data to files using ZStream#mapM and some standard JDK functions.

To use our module's live implementation, we want it to be available as a ZLayer. Here's how we can create a constructor for that (usually you'd want this available on the RecordChunking object as def live):

def liveRecordChunking(filePrefix: String) = 
  ZLayer.fromEffect(Ref.make(0).map(new Live(filePrefix, _)))

Ok! All done with the module. To use it, we can redefine the consumer stream we created earlier as such:

def createRecordChunkingStream(topic: String) = 
  Consumer.subscribeAnd(Subscription.topics(topic))
    .plainStream(Serde.string, Serde.string)
    .flattenChunks
    .via(RecordChunking.writeRecords(_))
    .mapM(_.commit)
    .provideSomeLayer[Clock with Blocking with RecordChunking](
      Consumer.make(ConsumerSettings(List("localhost:9092")))
    )
    .foreachManaged(_ => ZIO.unit)
    .fork

Note how we're now using the ZStream#via function, a convenience function for applying a function to the stream; and the ZStream#provideSomeLayer function, which allows us to provide some of the layers required by our stream. We need to aid the compiler's type inference by explicitly specifying which layers are left after providing.

Last up is our main entrypoint:

val recordChunkingApp = 
  (for {
    first  <- createConsumerStream("first")
    second <- createConsumerStream("second")
  } yield ZIO.raceAll(first.join, List(second.join)))
    .use(identity)
    .provideCustomLayer(liveRecordChunking("/tmp/data"))
// recordChunkingApp: ZIO[ZEnv, Throwable, Unit] = zio.ZIO$CheckInterrupt@3462e99a

We deferred providing the record chunking layer up to the top-most level of our application so it is shared between the streams.

That's it! The complete example is available at https://github.com/zivergetech/zio-kafka-example-app. It also includes a docker-compose.yaml file that you can use to setup a local Kafka broker. The README.md contains some instructions on how to start the broker, write some data to the topics and run the application.

Enjoy, and join us on the ZIO Discord (https://discord.gg/2ccFBr4) if you have any follow-up questions!

This blog post was type-checked using mdoc against ZIO 1.0.0-RC18-2 and ZIO Kafka 0.8.0.

Related Posts

14 July 2020

African Scala Development Program

African Scala Development Program We are thrilled to announce that Ziverge Inc. has launched an African Scala Development Program, in…

See More

Functional Design

Functional Design by John A. De Goes Although functional programming theory is useful, most day-to-day functional programming does not…

See More
19 April 2020

Functional Scala 2020

Functional Scala 2020 Functional Scala 2020 returns for another year of great talks, familiar and fresh faces, and positive energy around…

See More
Subscribe to our newsletter