Using Alpakka Kafka Plugin to Connect to a Kafka Broker in Scala

Hesam Gholami
4 min readMay 13, 2019

Kafka is a famous stream-processing software platform which can be used in large scale applications to manage high data input.

Using it with Scala will be very easy for us to create our producers and consumers in our micro-service structured system.

But one the problems that we can face is the problem of a fast producer and a slow consumer, which will be a little bit tricky for us to manage the state of our application and actually make it reactive and stream-oriented.

Because our consumer app may be get down or be error prone because of high amount of data that we pass through them.

Kafka by its nature is stream-oriented, but our producers and consumers may not be so.

Akka Streams

To cover this problem we can make our producers and consumers stream-oriented so we can really be reactive here and control the back-pressure of our applications.

One of the cool solutions in Scala is to use the Akka Streams, this library will allow us to manage different kind of streams and will transparent several type of problems for us.

Combining the power of streams in our services with the power of Kafka will bring us a huge performance point and beside that, it will make our source codes very easy to understand and meaningful as well as making several problems transparent.

So we can reach our target of being Reactive.

But with Alpakka we can go further!

What is Alpakka ?

We saw that we can integrate Akka Streams with our producers and consumers to reach out to our Reactive target.

But for getting to that target more easily, there is a library which is using Akka Streams underneath, called Alpakka.

This library has several extensions for connecting to different sources like Apache Kafka, Apache Cassandra, Apache Solr, AWS S3, Couchbase, MongoDB and lots of other tools.

So by using Alpakka it can take care of handling the data flow between our application and the Kafka (or whatever other tools that we are using).

Alpakka Kafka Connector

So by using Alpakka Kafka Connector it will be connect to our Kafka broker and manage the flow of data that is coming to our application and handle the back-pressure for us and make our application really reactive.

Note that we can use this in our producer apps and consumer apps as it can handle both directions using streams.

Let’s see an example of using Alpakka Kafka connector in a sample producer application:

val done =   // Read from CSV file as our source 
FileIO.fromPath(file)
// Separate each line by standard line terminator to be processed
// individually by flow
.via(Framing.delimiter(ByteString(“\n”), 256, allowTruncation = true)) .drop(1) // First line is CSV headers, so we drop it .map { line => // Convert line to JSON string and send it to Kafka broker
val cols = line.utf8String.split(“,”).map(_.trim)
val insuranceItem = InsuranceItem(cols(0).toInt,
cols(2), cols(3).toDouble, cols(15),
Point(cols(13).toDouble, cols(14).toDouble))
println(“Sending item to kafka broker: “ + insuranceItem) new ProducerRecord[String, String](topic,
insuranceItem.policyID.toString,
insuranceItem.toJson.compactPrint)
} // Enable error logging in console
.log(“error logging”)
// Send stream to Kafka producer sink
.runWith(Producer.plainSink(producerSettings))

This code is part of one of my GitHub projects that I will be mention below.

But anyway, the above code will be open up a .CSV file using the FileIO which is a stream itself, so we can open up files that are hundreds of Gigabytes and be don’t worry about overflowing the memory of system.

Then by using the via method we can call something named Materializer which will actually change the face of our data in a stream-oriented flow. For example here we are dividing every line using standard line ending character.

Then we are creating a JSON string and sending it to Kafka using ProducerRecord and producerSettings.

Here you can see there is a stream between our .CSV file and the Kafka producer which will repeat the same job again and again.

This snippet is from an example that I put on GitHub. You can find it here:

This project has a producer that will read data from a .CSV file and send it to a Kafka topic, and several consumers will take that data and use it to implement their own logic on them.

For example there is a consumer app which will save this data to MongoDB which is again used Alpakka MongoDB connector to do so.

And there is another application that will take that data from database and convert it to .geojson format then it can be visualized on the map like the following:

Visualizing data using Mapbox

Conclusion

All of these applications are using Alpakka to handle the flow of data and underneath they are using Akka Stream.

I hope you enjoyed this article and please consider contributing to my GitHub project if you think it will be helpful for you.

--

--

Hesam Gholami

Rustacean 🦀, C++ and Scala ninja by day and JS/TS guru by night, developer since 2011, FOSS lover, creator of puresoftware.org