Stream Avro Records into Kafka using Avro4s and Akka Streams Kafka

Continuing our quest to learn Akka Streams, we'll stream some Avro records into a Kafka Topic and then read them as well

Posted by Abhishek Srivastava on October 2, 2017

Continuing our quest to learn Akka Streams, we’ll take our same old countrycapital.csv and then we’ll convert each line into an AVRO Record and then write into a Kafka topic using Akka streams.

Why Avro. Avro is binary data serialization format. It is more concise than Json and is very interoperable. Almost all popular languages have Avro bindings and therfore can easily and efficiently read the data from Kafka topic.

Without any further ado. Here is our build.sbt

name := "KafkaAvro"
version := "1.0"
scalaVersion := "2.12.3"
libraryDependencies ++= Seq(
   "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.0",
   "com.typesafe.akka" %% "akka-stream" % "2.5.6",
   "com.lightbend.akka" %% "akka-stream-alpakka-csv" % "0.13",
   "com.lightbend.akka" %% "akka-stream-alpakka-file" % "0.13",
   "com.typesafe.akka" %% "akka-stream-kafka" % "0.17"
)

Now let’s get some fundamentals out of the way. Basically we need to parse our CSV and then convert it into a stream of CountryCapital objects. I won’t explain the code here because I already explained this here

  implicit val actorSystem = ActorSystem()
  implicit val actorMaterializer = ActorMaterializer()
  val resource = getClass.getResource("/countrycapital.csv")
  val path = Paths.get(resource.toURI)
  val source = FileIO.fromPath(path)
  val flow1 = CsvParsing.lineScanner()
  val flow2 = Flow[List[ByteString]].map(list => list.map(_.utf8String))
  val flow3 = Flow[List[String]].map(list => CountryCapital(list(0), list(1)))

So now let us convert our CountryCapital stream of objects into Avro Records

  val flow4 = Flow[CountryCapital].map { cc =>
    val baos = new ByteArrayOutputStream()
    val output = AvroOutputStream.binary[CountryCapital](baos)
    output.write(cc)
    output.close()
    val result = baos.toByteArray
    baos.close()
    result
  }

In the code above i used Avro4s Serializer to convert the CountryCapital object into a strem of bytes containing the Avro Record.

Now that we have the Stream of Bytes, its easy to write it into a Kafka topic

val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new ByteArraySerializer)
      .withBootstrapServers("abhisheks-mini:9093")
  val flow5 = Flow[Array[Byte]].map{array =>
    new ProducerRecord[Array[Byte], Array[Byte]]("test", array)
  }
  val sink = Producer.plainSink(producerSettings)

Here we are using the Akka Streams Kafka library to create a akka stream Sink which acts as a Kafka Producer. We need to convert our stream of Array of bytes into a Producer Record. the producer record also contains the name of our topic which is “test” in my case.

Usually Kafka runs on port 9092. I am running it on 9093 in order to avoid port conflict with ElasticSearch (which I also run on the same server).

Now that we have our Sink. All we need to do is to build our Runnable Graph and execute it

val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder =>
  s =>
    import GraphDSL.Implicits._
    source ~> flow1 ~> flow2 ~> flow3  ~> flow4 ~> flow5 ~> s.in
    ClosedShape
})
val future = graph.run()
future.onComplete { _ =>
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

Reading Avro records from Kafka topic is also very straigtforward. The code is exactly the same as above. The only difference is that instead of a Producer, we are creating a Consumer

   val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer(), new ByteArrayDeserializer)
      .withBootstrapServers("abhisheks-mini:9093")
      .withGroupId("abhi")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
   val source = Consumer.committableSource(consumerSettings, Subscriptions.topics("test"))
   val flow1 = Flow[ConsumerMessage.CommittableMessage[Array[Byte], Array[Byte]]].map{ msg => msg.record.value()}
   val flow2 = Flow[Array[Byte]].map{ array =>
      val bais = new ByteArrayInputStream(array)
      val input = AvroInputStream.binary[CountryCapital](bais)
      input.iterator.toSeq.head
   }
   val sink = Sink.foreach[CountryCapital](println)
   val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit builder =>
      s =>
         import GraphDSL.Implicits._
         source ~> flow1 ~> flow2 ~> s.in
         ClosedShape
   })
   val future = graph.run()
   future.onComplete { _ =>
      actorSystem.terminate()
   }

Once the items are read from the kafka topic, they are read as Array of bytes. We use the Avro4s Deserializer this time, to convert array of bytes into CountryCapital object.

The whole example can be found at my github.