Docs Home → MongoDB Spark Connector
Spark Streaming
Spark Streaming allows on-the-fly analysis of live data streams with MongoDB. See the Apache documentation for a detailed description of Spark Streaming functionality.
This tutorial uses the Spark Shell.For more information about starting the Spark Shell and configuring it for use with MongoDB, see Getting Started.
This tutorial demonstrates how to use Spark Streaming to analyze input data from a TCP port. It uses Netcat, a lightweight network utility, to send text inputs to a local port, then uses Scala to determine how many times each word occurs in each line and write the results to a MongoDB collection.
Start Netcat from the command line:
nc -lk 9999
Start the Spark Shell at another terminal prompt.
import com.mongodb.spark.sql._ import org.apache.spark.streaming._
Create a new StreamingContext
object and assign it to ssc
.
sc
is a SparkContext object that is automatically created when you
start the Spark Shell. The second argument specifies how often to check
for new input data.
val ssc = new StreamingContext(sc, Seconds(1))
Use the socketTextStream
method to create a connection
to Netcat on port 9999:
val lines = ssc.socketTextStream("localhost", 9999)
Determine how many times each word occurs in each line:
val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _)
Create a data structure to hold the results:
case class WordCount(word: String, count: Int)
Use a foreachRDD
loop to collect results and write to the MongoDB
collection specified in the Spark Connector
configuration. The append
mode causes data to be appended to the collection, whereas
overwrite
mode replaces the existing data.
wordCounts.foreachRDD({ rdd => import spark.implicits._ val wordCounts = rdd.map({ case (word: String, count: Int) => WordCount(word, count) }).toDF() wordCounts.write.mode("append").mongo() })
Start listening:
ssc.start()
To give your program something to listen to, go back to the terminal prompt where you started Netcat and start typing.
hello world cats cats dogs dogs dogs
In your MongoDB collection you'll find something similar to the following:
{ "_id" : ObjectId("588a539927c22bd43214131f"), "word" : "hello", "count" : 1 } { "_id" : ObjectId("588a539927c22bd432141320"), "word" : "world", "count" : 1 } { "_id" : ObjectId("588a53b227c22bd432141322"), "word" : "cats", "count" : 2 } { "_id" : ObjectId("588a53b227c22bd432141323"), "word" : "dogs", "count" : 3 }
To end your Netcat process, use ctrl-c
. To end your Spark Shell
session, use System.exit(0)
.