A short introduction to writing Kafka stream processing applications with the ZIO Kafka library.
ZIO Kafka is a lean and expressive library for interacting with Kafka through a ZIO Streams-based interface.
At Ziverge, we're big fans of Apache Kafka for creating a messaging fabric between microservices. From replicating data using changelog topics, to distributing work to multiple consumers, Kafka is a robust and scalable data store for processing loads of data.
In this post, we'd like to introduce you to ZIO Kafka - a lean library that provides an elegant interface to the Kafka APIs based on ZIO and ZIO Streams. After a quick introduction, we'll show how you can easily create a consumer process that reads records from Kafka, chunks them up and writes them to files.
We're going to use some imports throughout this file, so for brevity's sake, we'll list them here:
import zio._, zio.stream._, zio.duration._
import blocking.Blocking, clock.Clock
import zio.kafka.consumer._, zio.kafka.producer._, zio.kafka.serde._
import org.apache.kafka.clients.consumer.{ KafkaConsumer, ConsumerRecords, ConsumerRecord }
Interfacing applications with Kafka using the standard consumer and producer libraries is not an easy task, particularly if we're looking for asynchronous, non-blocking processing that integrates well with ZIO. Consuming Kafka topics is an effectively infinite computation; at any point in time, more data could be written to those topics. Therefore, long-running applications that consume these topics must be structured as infinite loops.
For example, assuming we've constructed a Kafka consumer, here's how the consumption loop would look like:
val consumer: KafkaConsumer[String, String] = ???
def processRecords(records: ConsumerRecords[String, String]): Task[Unit] = ???
val consumerFiber: ZIO[Blocking, Nothing, Fiber[Throwable, Nothing]] =
(for {
data <- blocking.effectBlocking(consumer.poll(50.millis.asJava))
_ <- processRecords(data)
_ <- blocking.effectBlocking(consumer.commitSync())
} yield ()).forever.fork
While great as an initial approach, there are two major downsides to here. First, all of the processing steps are done synchronously. We will not commit our offsets before processing the current batch of records is done, and we will not fetch additional records before we're done committing the offsets. There's a strict happens-before relation between each of the steps here, and complete lack of pipelining. That means that if processRecords
is slow, we will see delays in polling the consumer.
Second, the infinite loop structure scales pretty badly for control flow that requires handling more than one batch of data - for example, aggregating batches of records up to a timeout, and once the timeout expires, only then performing the processing and committing. We can try to apply that to the above snippet to see how painful it is.
Let's factor out the poll
and commit
steps:
def runConsumerLoop(
consumer: KafkaConsumer[String, String],
f: ConsumerRecords[String, String] => Task[Unit]) =
for {
data <- blocking.effectBlocking(consumer.poll(50.millis.asJava))
_ <- f(data)
_ <- blocking.effectBlocking(consumer.commitSync())
} yield ()
val consumer: KafkaConsumer[String, String] = ???
def processRecords(records: ConsumerRecords[String, String]): Task[Unit] = ???
val consumerFiber: ZIO[Blocking, Nothing, Fiber[Throwable, Nothing]] =
runConsumerLoop(consumer, processRecords).forever.fork
With the structure of runConsumerLoop
, it is clear that there's no good way for the f
function to operate on more than one batch of records at a time. For that to happen, f
must store the buffered batches in a mutable reference somewhere. Another problem immediately pops up with that though: even though f
buffered some batches for later processing, runConsumerLoop
would still commit those batches!
As an alternative approach to structuring our code, we could use streams of records: ZStream[Any, Throwable, ConsumerRecord[String, String]]
. This way, we're inverting the control; the processing function can consume that stream and apply whatever strategy makes sense for processing those records:
val recordStream: ZStream[Consumer with Clock with Blocking,
Throwable,
CommittableRecord[String, String]] =
Consumer.plainStream(Serde.string, Serde.string).flattenChunks
def processRecords(records: List[CommittableRecord[String, String]]): Task[Unit] = ???
val processingFiber =
recordStream
.groupedWithin(1000, 30.seconds)
.mapM { batch =>
processRecords(batch) *>
batch.map(_.offset)
.foldLeft(OffsetBatch.empty)(_ merge _)
.commit
}
.runDrain
.fork
I think you'd all agree that this declarative style of code is much more attractive compared to the previous loop. Beyond aesthetics, note that we only needed one line to aggregate the incoming records into batches of 1000 (with up to 30 seconds of waiting). That's the biggest win, by far!
Let's see how we setup an application to use ZIO Kafka. The current version of the library is built against the 1.0.0-RC18-2
version of ZIO. Add the library to your sbt
build like so:
libraryDependencies += "dev.zio" %% "zio-kafka" % "0.7.0"
First, we'll create the Kafka Consumer as a ZLayer
, and provide it to our consumer stream:
val consumer = Consumer.make(ConsumerSettings(List("localhost:9092")))
// consumer: ZLayer[Clock with Blocking, Throwable, Consumer] = zio.ZLayer@357cdb00
The individual operations on the consumer and producer can be accessed through accessor methods on the zio.kafka.consumer.Consumer
and zio.kafka.producer.Producer
objects. For example, this is how we'd create a stream that subscribes to a topic and consumes it:
val dataTopicRecords =
Consumer.subscribeAnd(Subscription.topics("data-topic"))
.plainStream(Serde.string, Serde.string)
.flattenChunks
// dataTopicRecords: ZStream[Clock with Blocking with Consumer, Throwable, CommittableRecord[String, String]] = zio.stream.ZStream@1e4d93f7
The type signature tells us that this stream depends on a Kafka consumer that deals with strings. We can add some effects to the stream by printing every consumed value and committing the offsets:
val printerStream = dataTopicRecords
.mapM { committableRecord =>
console.putStrLn(committableRecord.record.value)
.as(committableRecord.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
// printerStream: ZStream[Clock with Blocking with Consumer with console.package.Console, Throwable, Unit] = zio.stream.ZStream@3d1b43d8
At this point, we can provide the Kafka Consumer layer to our stream and convert it to a ZManaged
fiber. This ZManaged
value, when run, will execute the stream in a fiber and interrupt the fiber when the ZManaged
is released:
val managedStreamFiber = printerStream
.provideCustomLayer(consumer)
.foreachManaged(_ => ZIO.unit).fork
// managedStreamFiber: ZManaged[ZEnv, Nothing, Fiber.Runtime[Throwable, Unit]] = zio.ZManaged@9f52eb7
The provideCustomLayer
function is available on most ZIO data types (ZIO
, ZManaged
, ZStream
, etc.) and is extremely convenient when the environment requirements that are left after providing our layer are a subset of the standard ZEnv
environment. In our case, the requirements that are left are Blocking
and Clock
, so we can use this handy form.
It is highly recommended to compose the main modules of your application as ZManaged
values (or, equivalently, ZLayer
values if you'd like to inject them as dependencies to other parts of your application). ZManaged
values can be freely composed with other values with the flatMap
, <*>
, ZManaged.collectAll
combinators just like ZIO
, with the added benefit of maintaining correct finalizer ordering.
For our application's entrypoint, we'll execute two of these streams, each one consuming a different topic. First, here's a function that does everything we've discussed so far:
def createConsumerStream(topic: String) =
Consumer.subscribeAnd(Subscription.topics(topic))
.plainStream(Serde.string, Serde.string)
.flattenChunks
.mapM { committableRecord =>
console.putStrLn(committableRecord.record.value)
.as(committableRecord.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
.provideCustomLayer(Consumer.make(ConsumerSettings(List("localhost:9092"))))
.foreachManaged(_ => ZIO.unit)
.fork
The application is constructed as a ZManaged
value that executes two streams, and yields a ZIO
value that can be used to join all of the streams. We're doing this to make sure that our main fiber exits if one of the background fibers exits or fails. We can execute that ZIO
value inside ZManaged#use
, which we immediately call:
val app =
(for {
first <- createConsumerStream("first")
second <- createConsumerStream("second")
} yield ZIO.raceAll(first.join, List(second.join)))
.use(identity)
// app: ZIO[ZEnv, Throwable, Unit] = zio.ZIO$CheckInterrupt@46c2189e
Ok! That's a good skeleton for our application. We can now proceed to writing the actual logic which batches records and writes them to files.
For our example, we'll write a consumer that groups incoming records into batches of 16kB or 4096 records, while waiting for up to 30 seconds for those conditions to be fulfilled.
We will package this functionality up in a ZIO module that provides a stream transformer function - that's a function of the form ZStream[R, E, A] => ZStream[R, E, B]
. Our module will also keep a running counter of the number of files that have been written; that way the application can expose some runtime metrics.
To start, we will specify our interface and accessors:
type RecordChunking = Has[RecordChunking.Service]
object RecordChunking {
trait Service {
def writeRecords[R](
stream: ZStream[R, Throwable, CommittableRecord[String, String]]):
ZStream[R, Throwable, OffsetBatch]
}
def writeRecords[R](
stream: ZStream[R, Throwable, CommittableRecord[String, String]]):
ZStream[R with RecordChunking, Throwable, OffsetBatch] =
ZStream.accessStream(_.get[Service].writeRecords(stream))
}
The ZStream.accessStream
function is a handy utility for accessing functions on the environment that return streams. It is most commonly used when defining accessors for your modules.
Next, let's write the implementation for the module:
import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }
class Live(filePrefix: String, writtenFiles: Ref[Int]) extends RecordChunking.Service {
val batchRecords = ZSink.foldWeighted(List[String]())(
(rec: CommittableRecord[String, String]) => rec.value.length,
16384) { (acc, el) =>
el.record.value :: acc
}.map(_.reverse.mkString("\n").getBytes(StandardCharsets.UTF_8))
val batchOffsets = ZSink.foldLeft(OffsetBatch.empty) {
(acc, rec: CommittableRecord[String, String]) =>
acc.merge(rec.offset)
}
def writeRecords[R](
stream: ZStream[R, Throwable, CommittableRecord[String, String]]):
ZStream[R, Throwable, OffsetBatch] =
stream
.aggregate(batchOffsets zipPar batchRecords)
.mapM { case (offsets, data) =>
for {
fileIndex <- writtenFiles.updateAndGet(_ + 1)
_ <- Task {
Files.write(
Paths.get(filePrefix, s"chunk-$fileIndex"),
data
)
}
} yield offsets
}
}
Whew, a lot to unpack here! Let's walk through it step by step. First, we have the definition for the ZSink
named batchRecords
. A complete tutorial of ZSink
is beyond the scope of this article, but a good intuition for sinks are composable aggregators of values. This sink implements a weighted fold: it's like a normal fold, but it tracks the cost of the aggregated value using a cost function defined on the elements. This sink will aggregate records into a list for as long as the list has less than 16kB of records.
We're also applying ZSink#map
on the sink: once the sink reaches 16kB of records and yields its result (the list of records), we're applying a function to that result. In this case, we're reversing the list, concatenating the string and extracting the UTF-8 bytes of the data.
Next, we have the batchOffsets
sink. This one is identical to a left fold on a list; we're aggregating the individual offsets on the records to a batch of offsets that can be committed at once.
We're using both of these sinks in the call to ZStream#aggregate
. They are combined using the ZSink#zipPar
function: this function creates a new sink that feeds the incoming elements into both sinks, ending as soon as one of the sinks signals completion. The resulting sink will yield a tuple of the two results. The ZStream#aggregate
function will repeatedly apply the composite sink to the stream, resulting in a stream that consists of the aggregated results.
Finally, we're writing the chunks of data to files using ZStream#mapM
and some standard JDK functions.
To use our module's live implementation, we want it to be available as a ZLayer
. Here's how we can create a constructor for that (usually you'd want this available on the RecordChunking
object as def live
):
def liveRecordChunking(filePrefix: String) =
ZLayer.fromEffect(Ref.make(0).map(new Live(filePrefix, _)))
Ok! All done with the module. To use it, we can redefine the consumer stream we created earlier as such:
def createRecordChunkingStream(topic: String) =
Consumer.subscribeAnd(Subscription.topics(topic))
.plainStream(Serde.string, Serde.string)
.flattenChunks
.via(RecordChunking.writeRecords(_))
.mapM(_.commit)
.provideSomeLayer[Clock with Blocking with RecordChunking](
Consumer.make(ConsumerSettings(List("localhost:9092")))
)
.foreachManaged(_ => ZIO.unit)
.fork
Note how we're now using the ZStream#via
function, a convenience function for applying a function to the stream; and the ZStream#provideSomeLayer
function, which allows us to provide some of the layers required by our stream. We need to aid the compiler's type inference by explicitly specifying which layers are left after providing.
Last up is our main entrypoint:
val recordChunkingApp =
(for {
first <- createConsumerStream("first")
second <- createConsumerStream("second")
} yield ZIO.raceAll(first.join, List(second.join)))
.use(identity)
.provideCustomLayer(liveRecordChunking("/tmp/data"))
// recordChunkingApp: ZIO[ZEnv, Throwable, Unit] = zio.ZIO$CheckInterrupt@3462e99a
We deferred providing the record chunking layer up to the top-most level of our application so it is shared between the streams.
That's it! The complete example is available at https://github.com/zivergetech/zio-kafka-example-app. It also includes a docker-compose.yaml
file that you can use to setup a local Kafka broker. The README.md
contains some instructions on how to start the broker, write some data to the topics and run the application.
Enjoy, and join us on the ZIO Discord (https://discord.gg/2ccFBr4) if you have any follow-up questions!
This blog post was type-checked using mdoc against ZIO 1.0.0-RC18-2 and ZIO Kafka 0.8.0.