1 / 1
Nov 2024

I am trying to create a local pipeline that receives messages streamed from my kafka broker and process it in spark before writing to MongoDB. I have installed all the necessary JARs and included the mongo spark connector as well but I am receiving an error that the data source is not found. The JAR versions are correct. I am running pyspark 3.5.3, Scala 2.12, python 3.11.6.

from pymongo import MongoClient from pyspark.sql import SparkSession import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def create_sparkconnection(): spark = None spark = SparkSession.builder.appName("spark_streaming")\ .config("spark.jars", ".virtualenv/lib/python3.11/site-packages/pyspark/jars/mongo-spark-connector_2.12-10.4.0.jar," ".virtualenv/lib/python3.11/site-packages/pyspark/jars/commons-pool2-2.12.0.jar," ".virtualenv/lib/python3.11/site-packages/pyspark/jars/spark-sql-kafka-0-10_2.12-3.5.3.jar," ".virtualenv/lib/python3.11/site-packages/pyspark/jars/kafka-clients-3.5.1.jar," ".virtualenv/lib/python3.11/site-packages/pyspark/jars/spark-token-provider-kafka-0-10_2.12-3.5.3.jar")\ .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/my_database.my_collection") \ .config('spark.mongodb.output.uri',"mongodb://127.0.0.1:27017/my_database.my_collection")\ .getOrCreate() spark.sparkContext.setLogLevel("ERROR") logging.info("Spark connection successfully created") return spark def mongo_connect(): mongodb_uri = "mongodb://127.0.0.1:27017" client = MongoClient(mongodb_uri) db = client['my_database'] collection = db['my_collection'] print("Mongodb connected") return collection def kafkaconnect(sparkconnection): spark_df = None spark_df = sparkconnection.readStream \ .format('kafka') \ .option('kafka.bootstrap.servers','127.0.0.1:9092') \ .option('subscribe','users') \ .option('startingOffsets', 'earliest') \ .load() print (spark_df) return spark_df def jsonstream(kafkastream): json_df = kafkastream.selectExpr("CAST(value AS STRING) as json_value") print(json_df) return json_df if __name__ == "__main__": sparkconnected = create_sparkconnection() if sparkconnected is not None: df = kafkaconnect(sparkconnected) finaldf = jsonstream(df) mongosession = mongo_connect() if mongo_connect is not None: streaming = finaldf.writeStream.outputMode("append") \ .format("mongo") \ .option('checkpointLocation','/tmp/checkpoint') \ .option('spark.mongodb.output.uri',"mongodb://127.0.0.1:27017/my_database.my_collection") \ .start() streaming.awaitTermination()

Error Log:

py4j.protocol.Py4JJavaError: An error occurred while calling o51.start. : org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongo. Please find packages at `https://spark.apache.org/third-party-projects.html`. at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647) at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:370) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633) at scala.util.Failure.orElse(Try.scala:224) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633) ... 14 more

I also tried using referencing the dependency directly from maven using spark.jars.packages instead of downloading it but the same error messaged appeared.