Failed to find the data source: mongo

I am trying to create a local pipeline that receives messages streamed from my kafka broker and process it in spark before writing to MongoDB. I have installed all the necessary JARs and included the mongo spark connector as well but I am receiving an error that the data source is not found. The JAR versions are correct. I am running pyspark 3.5.3, Scala 2.12, python 3.11.6.

from pymongo import MongoClient
from pyspark.sql import SparkSession
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def create_sparkconnection():
    spark = None
    spark = SparkSession.builder.appName("spark_streaming")\
                                .config("spark.jars",
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/mongo-spark-connector_2.12-10.4.0.jar,"
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/commons-pool2-2.12.0.jar,"
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/spark-sql-kafka-0-10_2.12-3.5.3.jar,"
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/kafka-clients-3.5.1.jar,"
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/spark-token-provider-kafka-0-10_2.12-3.5.3.jar")\
                                .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/my_database.my_collection") \
                                .config('spark.mongodb.output.uri',"mongodb://127.0.0.1:27017/my_database.my_collection")\
                                .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")
    logging.info("Spark connection successfully created")

    return spark

def mongo_connect():
    mongodb_uri = "mongodb://127.0.0.1:27017"
    client = MongoClient(mongodb_uri)
    db = client['my_database']
    collection = db['my_collection']

    print("Mongodb connected")
    return collection

def kafkaconnect(sparkconnection):
    spark_df = None
    spark_df = sparkconnection.readStream \
                .format('kafka') \
                .option('kafka.bootstrap.servers','127.0.0.1:9092') \
                .option('subscribe','users') \
                .option('startingOffsets', 'earliest') \
                .load()
    
    print (spark_df)
    return spark_df

def jsonstream(kafkastream):
    json_df = kafkastream.selectExpr("CAST(value AS STRING) as json_value")

    print(json_df)
    return json_df




if __name__ == "__main__":
    sparkconnected = create_sparkconnection()
    
    if sparkconnected is not None:
        df = kafkaconnect(sparkconnected)
        finaldf = jsonstream(df)
        mongosession = mongo_connect()

        if mongo_connect is not None:
            streaming = finaldf.writeStream.outputMode("append") \
                                .format("mongo") \
                                .option('checkpointLocation','/tmp/checkpoint') \
                                .option('spark.mongodb.output.uri',"mongodb://127.0.0.1:27017/my_database.my_collection") \
                                .start()
            
            streaming.awaitTermination()

Error Log:

py4j.protocol.Py4JJavaError: An error occurred while calling o51.start.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongo. Please find packages at `https://spark.apache.org/third-party-projects.html`.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
        at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:370)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
        ... 14 more

I also tried using referencing the dependency directly from maven using spark.jars.packages instead of downloading it but the same error messaged appeared.