Real-Time Recommendations Using Apache Storm and Kafka in Scala
In this blog, we will walk through the implementation of a real-time recommendation system using Apache Storm and Kafka in Scala. We’ll create a topology that reads user activity and inventory data from two separate Kafka topics, joins them, and emits real-time product recommendations. This example will showcase how you can use Apache Storm to process large streams of data in real-time.
Prerequisites
Before diving into the code, you’ll need:
- Apache Storm running on your system.
- Kafka setup for the data streams (user activity and inventory updates).
- Scala and SBT installed for building the project.
Step 1: Setting Up the Project with SBT
To manage dependencies and build the project, we’ll use SBT (Scala Build Tool). Here’s the build.sbt
file to set up the necessary dependencies for Apache Storm, Kafka, and Jackson serialization.
name := "RealTimeRecommendation"
version := "1.0"
scalaVersion := "2.13.10"
libraryDependencies ++= Seq(
"org.apache.storm" % "storm-core" % "2.3.0", // Apache Storm dependency
"org.apache.kafka" % "kafka-clients" % "2.8.1", // Kafka client dependency
"com.fasterxml.jackson.core" % "jackson-databind" % "2.12.3" // Jackson for serialization
)
Step 2: Defining Spouts to Read from Kafka Topics
A Spout in Apache Storm is responsible for emitting tuples into the topology. In our case, we’ll define two spouts: one for reading user activity data and another for inventory updates from Kafka.
User Activity Spout
This spout will simulate reading user activity events from a Kafka topic and emit the user ID, product ID, and timestamp.
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.Fields
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.task.TopologyContext
import org.apache.storm.spout.SpoutContext
class UserActivitySpout(kafkaTopic: String, zookeeperHosts: String) extends BaseRichSpout {
var collector: SpoutOutputCollector = _
override def open(config: java.util.Map[String, Object], context: SpoutContext, collector: SpoutOutputCollector): Unit = {
this.collector = collector
}
override def nextTuple(): Unit = {
// Simulating user activity data
val userActivity = Map("userId" -> "user123", "productId" -> "prod456", "timestamp" -> System.currentTimeMillis())
collector.emit(new org.apache.storm.tuple.Values(userActivity("userId"), userActivity("productId"), userActivity("timestamp")))
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("userId", "productId", "timestamp"))
}
}
Inventory Update Spout
Similarly, this spout will simulate reading inventory update data from a Kafka topic and emit the product ID and stock count.
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.Fields
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.task.TopologyContext
import org.apache.storm.spout.SpoutContext
class InventoryUpdateSpout(kafkaTopic: String, zookeeperHosts: String) extends BaseRichSpout {
var collector: SpoutOutputCollector = _
override def open(config: java.util.Map[String, Object], context: SpoutContext, collector: SpoutOutputCollector): Unit = {
this.collector = collector
}
override def nextTuple(): Unit = {
// Simulating inventory update data
val inventoryUpdate = Map("productId" -> "prod456", "stockCount" -> 10, "lastUpdated" -> System.currentTimeMillis())
collector.emit(new org.apache.storm.tuple.Values(inventoryUpdate("productId"), inventoryUpdate("stockCount")))
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("productId", "stockCount"))
}
}
Step 3: The Join Bolt
We now need to join data from the two streams: user activity and inventory updates. The JoinBolt will combine the data based on productId
and emit recommendations when a product is available.
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.Tuple
import org.apache.storm.task.OutputCollector
class JoinBolt extends BaseBasicBolt {
var userActivityCache: Map[String, Tuple] = Map()
var inventoryCache: Map[String, Tuple] = Map()
override def execute(input: Tuple, collector: OutputCollector): Unit = {
val streamId = input.getSourceStreamId
val productId = input.getStringByField("productId")
streamId match {
case "userActivityStream" =>
// Cache user activity by productId
userActivityCache += (productId -> input)
case "inventoryUpdateStream" =>
// Cache inventory update by productId
inventoryCache += (productId -> input)
}
// Join logic: If both user activity and inventory update exist, emit the recommendation
if (userActivityCache.contains(productId) && inventoryCache.contains(productId)) {
val userActivity = userActivityCache(productId)
val inventoryUpdate = inventoryCache(productId)
val userId = userActivity.getStringByField("userId")
val stockCount = inventoryUpdate.getIntegerByField("stockCount")
if (stockCount > 0) {
collector.emit(new org.apache.storm.tuple.Values(userId, productId, "Recommended"))
}
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("userId", "productId", "status"))
}
}
Step 4: The Recommendation Bolt
The RecommendationBolt processes the joined data and outputs product recommendations. In this simplified example, it prints the recommendations to the console.
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.Tuple
import org.apache.storm.task.OutputCollector
class RecommendationBolt extends BaseBasicBolt {
override def execute(input: Tuple, collector: OutputCollector): Unit = {
val userId = input.getStringByField("userId")
val productId = input.getStringByField("productId")
val status = input.getStringByField("status")
// Process the recommendation (e.g., store in a DB or print)
println(s"Recommendation: User $userId should be recommended product $productId. Status: $status")
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
// No output fields since this is the final bolt
}
}
Step 5: Topology Definition
Now, let’s tie everything together by creating the Topology. We define the spouts and bolts, set up the parallelism, and run the topology.
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.Config
import org.apache.storm.LocalCluster
object RecommendationTopology {
def main(args: Array[String]): Unit = {
val builder = new TopologyBuilder()
// Define Spouts
builder.setSpout("userActivitySpout", new UserActivitySpout("user_activity", "localhost:2181"))
builder.setSpout("inventoryUpdateSpout", new InventoryUpdateSpout("inventory_updates", "localhost:2181"))
// Define Bolts
builder.setBolt("joinBolt", new JoinBolt())
.shuffleGrouping("userActivitySpout", "userActivityStream")
.shuffleGrouping("inventoryUpdateSpout", "inventoryUpdateStream")
builder.setBolt("recommendationBolt", new RecommendationBolt())
.shuffleGrouping("joinBolt")
// Configure and run the topology
val config = new Config()
config.setDebug(true)
val cluster = new LocalCluster()
cluster.submitTopology("RealTimeRecommendation", config, builder.createTopology())
// Run for a fixed time for demo purposes
Thread.sleep(10000)
cluster.shutdown()
}
}
Step 6: Running the Topology
- Start Apache Storm and ensure Kafka is running with the topics
user_activity
andinventory_updates
created. - Run the Topology: Execute the
RecommendationTopology
in your Scala project.
This will process the data in real-time, join the streams based on productId
, and emit recommendations when a product is in stock.
Conclusion
In this blog, we demonstrated how to use Apache Storm in Scala to read data from two Kafka topics, join the data, and generate real-time product recommendations. This use case can be extended for more complex applications such as fraud detection, real-time analytics, and personalized recommendations.
By combining spouts for different data sources, bolts for processing, and Storm’s parallel processing capabilities, you can build powerful real-time data pipelines to solve a wide variety of problems.