I am trying to create a local pipeline that receives messages streamed from my kafka broker and process it in spark before writing to MongoDB. I have installed all the necessary JARs and included the mongo spark connector as well but I am receiving an error that the data source is not found. The JAR versions are correct. I am running pyspark 3.5.3, Scala 2.12, python 3.11.6.
from pymongo import MongoClient
from pyspark.sql import SparkSession
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def create_sparkconnection():
spark = None
spark = SparkSession.builder.appName("spark_streaming")\
.config("spark.jars",
".virtualenv/lib/python3.11/site-packages/pyspark/jars/mongo-spark-connector_2.12-10.4.0.jar,"
".virtualenv/lib/python3.11/site-packages/pyspark/jars/commons-pool2-2.12.0.jar,"
".virtualenv/lib/python3.11/site-packages/pyspark/jars/spark-sql-kafka-0-10_2.12-3.5.3.jar,"
".virtualenv/lib/python3.11/site-packages/pyspark/jars/kafka-clients-3.5.1.jar,"
".virtualenv/lib/python3.11/site-packages/pyspark/jars/spark-token-provider-kafka-0-10_2.12-3.5.3.jar")\
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/my_database.my_collection") \
.config('spark.mongodb.output.uri',"mongodb://127.0.0.1:27017/my_database.my_collection")\
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logging.info("Spark connection successfully created")
return spark
def mongo_connect():
mongodb_uri = "mongodb://127.0.0.1:27017"
client = MongoClient(mongodb_uri)
db = client['my_database']
collection = db['my_collection']
print("Mongodb connected")
return collection
def kafkaconnect(sparkconnection):
spark_df = None
spark_df = sparkconnection.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers','127.0.0.1:9092') \
.option('subscribe','users') \
.option('startingOffsets', 'earliest') \
.load()
print (spark_df)
return spark_df
def jsonstream(kafkastream):
json_df = kafkastream.selectExpr("CAST(value AS STRING) as json_value")
print(json_df)
return json_df
if __name__ == "__main__":
sparkconnected = create_sparkconnection()
if sparkconnected is not None:
df = kafkaconnect(sparkconnected)
finaldf = jsonstream(df)
mongosession = mongo_connect()
if mongo_connect is not None:
streaming = finaldf.writeStream.outputMode("append") \
.format("mongo") \
.option('checkpointLocation','/tmp/checkpoint') \
.option('spark.mongodb.output.uri',"mongodb://127.0.0.1:27017/my_database.my_collection") \
.start()
streaming.awaitTermination()
Error Log:
py4j.protocol.Py4JJavaError: An error occurred while calling o51.start.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongo. Please find packages at `https://spark.apache.org/third-party-projects.html`.
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:370)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
... 14 more
I also tried using referencing the dependency directly from maven using spark.jars.packages
instead of downloading it but the same error messaged appeared.