Using Alpakka Kafka Plugin to Connect to a Kafka Broker in Scala

Akka Streams

What is Alpakka ?

Alpakka Kafka Connector

val done =   // Read from CSV file as our source 
FileIO.fromPath(file)
// Separate each line by standard line terminator to be processed
// individually by flow
.via(Framing.delimiter(ByteString(“\n”), 256, allowTruncation = true)) .drop(1) // First line is CSV headers, so we drop it .map { line => // Convert line to JSON string and send it to Kafka broker
val cols = line.utf8String.split(“,”).map(_.trim)
val insuranceItem = InsuranceItem(cols(0).toInt,
cols(2), cols(3).toDouble, cols(15),
Point(cols(13).toDouble, cols(14).toDouble))
println(“Sending item to kafka broker: “ + insuranceItem) new ProducerRecord[String, String](topic,
insuranceItem.policyID.toString,
insuranceItem.toJson.compactPrint)
} // Enable error logging in console
.log(“error logging”)
// Send stream to Kafka producer sink
.runWith(Producer.plainSink(producerSettings))
Visualizing data using Mapbox

Conclusion

--

--

--

Hi, I’m a C++ and Scala ninja! (and a new Rustacean 🦀). I was start programming back in 2011 and I love the Free and Open-Source Software.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Google Dorks

Data Structure in Python — Binary Search Tree

CQRS and Event sourcing pattern : The Big picture : PART 3

MOBOX Community Update #31

Vagrant: Create a multi-machine environment

Weeknotes 8:2019

Bringing Sexy Back: Video screensavers in Ubuntu and Gnome Shell

READ/DOWNLOAD% Starting out with Visual C# FULL BOOK PDF & FULL AUDIOBOOK

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Hesam Gholami

Hesam Gholami

Hi, I’m a C++ and Scala ninja! (and a new Rustacean 🦀). I was start programming back in 2011 and I love the Free and Open-Source Software.

More from Medium

Blending Efficient Ingestion and Querying

Migrating Azure PostgreSQL Single Server Database to Google CloudSQL PostgreSQL with zero-downtime…

Kafka CDC Database to Database connection

Logging WSO2 logs an External Database — Apache Log4j2