Real-Time Recommendations Using Apache Storm and Kafka in Scala

sarath chandra
4 min readNov 6, 2024

--

In this blog, we will walk through the implementation of a real-time recommendation system using Apache Storm and Kafka in Scala. We’ll create a topology that reads user activity and inventory data from two separate Kafka topics, joins them, and emits real-time product recommendations. This example will showcase how you can use Apache Storm to process large streams of data in real-time.

Prerequisites

Before diving into the code, you’ll need:

  • Apache Storm running on your system.
  • Kafka setup for the data streams (user activity and inventory updates).
  • Scala and SBT installed for building the project.

Step 1: Setting Up the Project with SBT

To manage dependencies and build the project, we’ll use SBT (Scala Build Tool). Here’s the build.sbt file to set up the necessary dependencies for Apache Storm, Kafka, and Jackson serialization.

name := "RealTimeRecommendation"

version := "1.0"

scalaVersion := "2.13.10"

libraryDependencies ++= Seq(
"org.apache.storm" % "storm-core" % "2.3.0", // Apache Storm dependency
"org.apache.kafka" % "kafka-clients" % "2.8.1", // Kafka client dependency
"com.fasterxml.jackson.core" % "jackson-databind" % "2.12.3" // Jackson for serialization
)

Step 2: Defining Spouts to Read from Kafka Topics

A Spout in Apache Storm is responsible for emitting tuples into the topology. In our case, we’ll define two spouts: one for reading user activity data and another for inventory updates from Kafka.

User Activity Spout

This spout will simulate reading user activity events from a Kafka topic and emit the user ID, product ID, and timestamp.

import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.Fields
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.task.TopologyContext
import org.apache.storm.spout.SpoutContext

class UserActivitySpout(kafkaTopic: String, zookeeperHosts: String) extends BaseRichSpout {
var collector: SpoutOutputCollector = _

override def open(config: java.util.Map[String, Object], context: SpoutContext, collector: SpoutOutputCollector): Unit = {
this.collector = collector
}

override def nextTuple(): Unit = {
// Simulating user activity data
val userActivity = Map("userId" -> "user123", "productId" -> "prod456", "timestamp" -> System.currentTimeMillis())
collector.emit(new org.apache.storm.tuple.Values(userActivity("userId"), userActivity("productId"), userActivity("timestamp")))
}

override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("userId", "productId", "timestamp"))
}
}

Inventory Update Spout

Similarly, this spout will simulate reading inventory update data from a Kafka topic and emit the product ID and stock count.

import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.Fields
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.task.TopologyContext
import org.apache.storm.spout.SpoutContext

class InventoryUpdateSpout(kafkaTopic: String, zookeeperHosts: String) extends BaseRichSpout {
var collector: SpoutOutputCollector = _

override def open(config: java.util.Map[String, Object], context: SpoutContext, collector: SpoutOutputCollector): Unit = {
this.collector = collector
}

override def nextTuple(): Unit = {
// Simulating inventory update data
val inventoryUpdate = Map("productId" -> "prod456", "stockCount" -> 10, "lastUpdated" -> System.currentTimeMillis())
collector.emit(new org.apache.storm.tuple.Values(inventoryUpdate("productId"), inventoryUpdate("stockCount")))
}

override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("productId", "stockCount"))
}
}

Step 3: The Join Bolt

We now need to join data from the two streams: user activity and inventory updates. The JoinBolt will combine the data based on productId and emit recommendations when a product is available.

import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.Tuple
import org.apache.storm.task.OutputCollector

class JoinBolt extends BaseBasicBolt {
var userActivityCache: Map[String, Tuple] = Map()
var inventoryCache: Map[String, Tuple] = Map()

override def execute(input: Tuple, collector: OutputCollector): Unit = {
val streamId = input.getSourceStreamId
val productId = input.getStringByField("productId")

streamId match {
case "userActivityStream" =>
// Cache user activity by productId
userActivityCache += (productId -> input)
case "inventoryUpdateStream" =>
// Cache inventory update by productId
inventoryCache += (productId -> input)
}

// Join logic: If both user activity and inventory update exist, emit the recommendation
if (userActivityCache.contains(productId) && inventoryCache.contains(productId)) {
val userActivity = userActivityCache(productId)
val inventoryUpdate = inventoryCache(productId)

val userId = userActivity.getStringByField("userId")
val stockCount = inventoryUpdate.getIntegerByField("stockCount")

if (stockCount > 0) {
collector.emit(new org.apache.storm.tuple.Values(userId, productId, "Recommended"))
}
}
}

override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("userId", "productId", "status"))
}
}

Step 4: The Recommendation Bolt

The RecommendationBolt processes the joined data and outputs product recommendations. In this simplified example, it prints the recommendations to the console.

import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.Tuple
import org.apache.storm.task.OutputCollector

class RecommendationBolt extends BaseBasicBolt {
override def execute(input: Tuple, collector: OutputCollector): Unit = {
val userId = input.getStringByField("userId")
val productId = input.getStringByField("productId")
val status = input.getStringByField("status")

// Process the recommendation (e.g., store in a DB or print)
println(s"Recommendation: User $userId should be recommended product $productId. Status: $status")
}

override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
// No output fields since this is the final bolt
}
}

Step 5: Topology Definition

Now, let’s tie everything together by creating the Topology. We define the spouts and bolts, set up the parallelism, and run the topology.

import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.Config
import org.apache.storm.LocalCluster

object RecommendationTopology {
def main(args: Array[String]): Unit = {
val builder = new TopologyBuilder()

// Define Spouts
builder.setSpout("userActivitySpout", new UserActivitySpout("user_activity", "localhost:2181"))
builder.setSpout("inventoryUpdateSpout", new InventoryUpdateSpout("inventory_updates", "localhost:2181"))

// Define Bolts
builder.setBolt("joinBolt", new JoinBolt())
.shuffleGrouping("userActivitySpout", "userActivityStream")
.shuffleGrouping("inventoryUpdateSpout", "inventoryUpdateStream")

builder.setBolt("recommendationBolt", new RecommendationBolt())
.shuffleGrouping("joinBolt")

// Configure and run the topology
val config = new Config()
config.setDebug(true)
val cluster = new LocalCluster()
cluster.submitTopology("RealTimeRecommendation", config, builder.createTopology())

// Run for a fixed time for demo purposes
Thread.sleep(10000)
cluster.shutdown()
}
}

Step 6: Running the Topology

  1. Start Apache Storm and ensure Kafka is running with the topics user_activity and inventory_updates created.
  2. Run the Topology: Execute the RecommendationTopology in your Scala project.

This will process the data in real-time, join the streams based on productId, and emit recommendations when a product is in stock.

Conclusion

In this blog, we demonstrated how to use Apache Storm in Scala to read data from two Kafka topics, join the data, and generate real-time product recommendations. This use case can be extended for more complex applications such as fraud detection, real-time analytics, and personalized recommendations.

By combining spouts for different data sources, bolts for processing, and Storm’s parallel processing capabilities, you can build powerful real-time data pipelines to solve a wide variety of problems.

--

--

No responses yet