Robert Walters

38 results

The Challenges and Opportunities of Processing Streaming Data

Let’s consider a fictitious bank that has a credit card offering for its customers. Transactional data might land in their database from various sources such as a REST API call from a web application or from a serverless function call made by a cash machine. Regardless of how the data was written to the database, the database performed its job and made the data available for querying by the end-user or application. The mechanics are database-specific but the end goal of all databases is the same. Once data is in a database the bank can query and obtain business value from this data. In the beginning, their architecture worked well, but over time customer usage grew and the bank found it difficult to manage the volume of transactions. The company decides to do what many customers in this scenario do and adopts an event-streaming platform like Apache Kafka to queue these event data. Kafka provides a highly scalable event streaming platform capable of managing large data volumes without putting debilitating pressure on traditional databases. With this new design, the bank could now scale supporting more customers and product offerings. Life was great until some customers started complaining about unrecognized transactions occurring on their cards. Customers were refusing to pay for these and the bank was starting to spend lots of resources figuring out how to manage these fraudulent charges. After all, by the time the data gets written into the database, and the data is batch loaded into the systems that can process the data, the user's credit card was already charged perhaps a few times over. However, hope is not lost. The bank realized that if they could query the transactional event data as it's flowing into the database they might be able to compare it with historical spending data from the user, as well as geolocation information, to make a real-time determination if the transaction was suspicious and warranted further confirmation by the customer. This ability to continuously query the stream of data is what stream processing is all about. From a developer's perspective, building applications that work with streaming data is challenging. They need to consider the following: Different serialization formats: The data that arrives in the stream may contain different serialization formats such as JSON, AVRO, Protobuf or even binary. Different schemas: Data originating from a variety of sources may contain slightly different schemas. Fields like CustomerID could be customerId from one source or CustID in another and a third could not even use the field. Late arriving data: The data itself could arrive late due to network latency issues or being completely out of order. Operational complexity: Developers need to be concerned with reacting to application state changes like failed connections to data sources and how to efficiently scale the application to meet the demands of the business. Security: In larger enterprises, the developer usually doesn’t have access to production data. This makes troubleshooting and building queries from this data difficult. Stream processing can help address these challenges and enable real-time use cases, such as fraud detection, hyper-personalization, and predictive maintenance, that are otherwise difficult or extremely costly to overcome. While many stream processing solutions exist, the flexibility of the document model and the power of the aggregation framework are naturally well suited to help developers with the challenges found with complex event data. Discover MongoDB Atlas Stream Processing Read the MongoDB Atlas Stream Processing announcement and check out Atlas Stream Processing tutorials on the MongoDB Developer Center . Request private preview access to Atlas Stream processing Request access today to participate in the private preview. New to MongoDB? Get started for free today by signing up for MongoDB Atlas .

August 30, 2023

Introducing MongoDB Connector for Apache Kafka version 1.9

Today, MongoDB released version 1.9 of the MongoDB Connector for Apache Kafka! This article highlights the key features of this new release! Pre/Post document states In MongoDB 6.0, Change Streams added the ability to retrieve the before and after state of an entire document . To enable this functionality on the collection you can set it as a parameter in the createCollection command such as: db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } ) Alternatively, for existing collections, use colMod as shown below: db.runCommand( { collMod: <collection>, changeStreamPreAndPostImages: { enabled: <boolean> } } ) Once the collection is configured for pre and post images, you can set the change.stream.full.document.before.change source connector parameter to include this extra information in the change event. For example, consider this source definition: { "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "<< MONGODB CONNECTION STRING >>", "database": "test", "collection": "temperatureSensor", "change.stream.full.document.before.change":"whenavailable" } } When the following document is inserted: db.temperatureSensor.insertOne({'sensor_id':1,'value':100}) Then an update is applied: db.temperatureSensor.updateOne({'sensor_id':1},{ $set: { 'value':105}}) You can see the change stream event written to Kafka topic is as follows: { "_id": { "_data": "82636D39C8000000012B022C0100296E5A100444B0F5E386F04767814F28CB4AAE7FEE46645F69640064636D399B732DBB998FA8D67E0004" }, "operationType": "update", "clusterTime": { "$timestamp": { "t": 1668102600, "i": 1 } }, "wallTime": { "$date": 1668102600716 }, "ns": { "db": "test", "coll": "temperatureSensor" }, "documentKey": { "_id": { "$oid": "636d399b732dbb998fa8d67e" } }, "updateDescription": { "updatedFields": { "value": 105 }, "removedFields": [], "truncatedArrays": [] }, "fullDocumentBeforeChange": { "_id": { "$oid": "636d399b732dbb998fa8d67e" }, "sensor_id": 1, "value": 100 } } Note the fullDocumentBeforeChange key includes the original document before the update occurred. Starting the connector at a specific time Prior to version 1.9, when the connector starts as a source, it will open a MongoDB change stream and any new data will get processed by the source connector. To copy all the existing data in the collection first before you begin processing the new data, you specify the “ copy.existing ” property. One frequent user request is to start the connector based upon a specific timestamp versus when the connector starts. In 1.9 a new parameter called startup.mode was added to specify when to start writing data. startup.mode=latest (default) “Latest” is the default behavior and starts processing the data when the connector starts. It ignores any existing data when the connector starts. startup.mode=timestamp “timestamp” allows you to start processing at a specific point in time as defined by additional startup.mode.timestamp.* properties. For example, to start the connector from 7AM on November 21, 2022, you set the value as follows: startup.mode.timestamp.start.at.operation.time=’2022-11-21T07:00:00Z’ Supported values are an ISO-8601 format string date as shown above or as a BSON extended string format. startup.mode=copy.existing Same behavior as the existing as the configuration option, “copy.existing=true”. Note that “copy.existing” as a separate parameter is now deprecated. If you defined any granular copy.existing parameters such as copy.existing.pipeline, just prepend them with “startup.mode.copy.existing.” property name. Reporting MongoDB errors to the DLQ Kafka supports writing errors to a dead letter queue . In version 1.5 of the connector, you could write all exceptions to the DLQ through the mongo.error.tolerance=’all’ . One thing to note was that these errors were Kafka generated errors versus errors that occurred within MongoDB. Thus, if the sink connector failed to write to MongoDB due to a duplicate _id error, for example, this error wouldn’t be written to the DLQ. In 1.9, errors generated within MongoDB will be reported to the DLQ. Behavior change on inferring schema Prior to version 1.9 of the connector, if you are inferring schema and insert a MongoDB document that contains arrays with different value data types, the connector is naive and would simply set the type for the whole array to be a string. For example, consider a document that resembles: { "myfoo": [ { "key1": 1 }, { "key1": 1, "key2": "dogs" } ] } If we set output.schema.infer.value . to true on a source connector, the message in the Kafka Topic will resemble the following: … "fullDocument": { … "myfoo": [ "{\"key1\": 1}", "{\"key1\": 1, \"key2\": \"dogs\"}" ] }, … Notice the array items contain different values. In this example, key1 is a subdocument with a single value the number 1, the next item in the “myfoo” array is a subdocument with the same “key1” field and value of an integer, 1, and another field, “key 2” that has a string as a value. When this scenario occurs the connector will wrap the entire array as a string. This behavior can also apply when using different keys that contain different data type values. In version 1.9, the connector when presented with this configuration will not wrap the arrays, rather it will create the appropriate schemas for the variable arrays with different data type values. The same document when run in 1.9 will resemble: "fullDocument": { … "myfoo": [ { "key1": 1, }, { "key1": 1, "key2": "DOGS" } ] }, Note that this behavior is a breaking change and that inferring schemas when using arrays can cause performance degradation for very large arrays using different data type values. Download the latest version of the MongoDB Connector for Apache Kafka from Confluent Hub! To learn more about the connector read the MongoDB Online Documentation . Questions? Ask on the MongoDB Developer Community Connectors and Integrations forum!

January 12, 2023

Introducing MongoDB Spark Connector Version 10.1

Today, MongoDB released version 10.1 of the MongoDB Spark Connector. In this post, we highlight key features of this new release. Microbatch streaming support The MongoDB Spark connection version 10 introduced support for Apache Structured Spark Streaming. In this initial release, continuous mode streaming was the only mode supported. In this 10.1 update, microbatch mode is now supported, enabling you to stream writes to destinations that currently do not support continuous mode streams, such as Amazon S3 storage. Increased control of write behavior When the Spark Connector issues a write, the default behavior is for an upsert to occur. This can cause problems in some scenarios in which you may not want an upsert, such as with time series collections. There is a new configuration parameter, upsertDocument , that, when set to false, will only issue insert statements on write. solar.write.format("mongodb").mode("append").option("database", "sensors").option("collection", "panels").option("upsertDocument", "false").save() In the above code snippet we are writing to the "panels" time series collection by setting the upsertDocument to false. Alternatively, you can set operationType to the value, “insert”. Setting this option will ignore any upsertDocument option set. Support for BSON types The data types supported in BSON are not exactly the same as those supported in a Spark dataframe. For example, Spark doesn't support ObjectId as a type specifically. To mitigate these scenarios where you need to leverage different BSON types, you can now set the new configuration values : spark.mongodb.read.outputExtendedJson=<true/false> spark.mongodb.write.convertJson=<true/false> This will enable you to effectively leverage BSON datatypes within your Spark application. Call to action Version 10.1 of the MongoDB Spark Connector continues to enhance the streaming capabilities with support for microbatch processing. This version also adds more granular support for writing to MongoDB supporting use cases like time series collections. For those users wanting to upgrade from the 3.x version but could not because of lack of BSON data type support, the 10.1 version now provides an option for using BSON data types. To learn more about the MongoDB Spark Connector check out the online documentation . You can download the latest version of the MongoDB Spark Connector from the maven repository .

January 4, 2023

MongoDB Connector for Apache Kafka 1.8 Available Now

MongoDB has released version 1.8 of the MongoDB Connector for Apache Kafka with new monitoring and debugging capabilities. In this article, we’ll highlight key features of this release. JMX monitoring The MongoDB Connector works with Apache Kafka Connect to provide a way for users to easily move data between MongoDB and the Apache Kafka. The MongoDB connector is written in Java and now implements Java Management Extensions (JMX) interfaces that allow you to access metrics reporting. These metrics will make troubleshooting and performance tuning easier. JMX technology, which is part of the Java platform, provides a simple, standard way for applications to provide metrics reporting with many third-party tools available to consume and present the data. For those who might not be familiar with JMX monitoring , let’s look at a few key concepts. An MBean is a managed Java object, which represents a particular component that is being measured or controlled. Each component can have one or more MBean attributes. The MongoDB Connector for Apache Kafka publishes MBeans under the “com.mongodb.kafka.connector” domain. Many open source tools are available to monitor JMX metrics, such as the console-based JmxTerm or the more feature-complete monitoring and alerting tools like Prometheus . JConsole is also available as part of the Java Development Kit (JDK). Note: Regardless of your client tool, MBeans for the connector are only available when there are active source or sink configurations defined on the connector. Visualizing metrics Figure 1: Source task JMX metrics from JConsole. Figure 1 shows some of the metrics exposed by the source connector using JConsole. In this example, a sink task was created and by default is called “sink-task-0”. The applicable metrics are shown in the JConsole MBeans panel. A complete list of both source and sink metrics will be available in the MongoDB Kafka Connector online documentation shortly after the release of 1.8. MongoDB Atlas is a great platform to store, analyze, and visualize monitoring metrics produced by JMX. If you’d like to try visualizing JMX metrics in MongoDB Atlas generated by the connector, check out jmx2mongo . This tool continuously writes JMX metrics to a MongoDB time series collection. Once the data is in MongoDB Atlas, you can easily create charts from the data like the following: Figure 2: MongoDB Atlas Chart showing successful batch writes vs writes greater than 100ms. Figure 2 shows the number of successful batch writes performed by a MongoDB sink task and the number of those batch writes that took longer than 100ms to execute. There are many other monitoring use cases available; check out the latest MongoDB Kafka Connector documentation for more information. Extended debugging Over the years, the connector team collected requests from users to enhance error messages or provide additional debug information for troubleshooting. In 1.8, you will notice additional log messages and more descriptive errors. For example, before 1.8, if you set the copy.existing parameter, you may get the log message: “Shutting down executors.” This message is not clear. To address this lack of clarity, the message now reads: “Finished copying existing data from the collection(s).” These debugging improvements in combination with the new JMX metrics will make it easier for you to gain insight into the connector and help troubleshoot issues you may encounter. If you have ideas for additional metrics or scenarios where additional debugging messages would be helpful, please let us know by filing a JIRA ticket . For more information on the latest release, check out the MongoDB Kafka Connector documentation . To download the connector, go to the MongoDB Connector repository in GitHub or download from the Confluent Hub .

September 19, 2022

Introducing the Newest Version of the MongoDB Spark Connector

MongoDB has just released an all-new version of our Spark Connector. This article discusses the background behind the MongoDB Spark Connector and some of the key features of the new release. Why a new version? The current version of the MongoDB Spark Connector was written in 2016 and is based on Version 1 of the Spark Data Source API. This API is still supported, but Databricks has released an updated version of the API, making it easier for data sources like MongoDB to work within the Spark ecosystem. By using Version 2 of the MongoDB Spark Connector, you’ll immediately benefit from capabilities such as tighter integration with Spark Structured Streaming. MongoDB will continue to support Version 1 until Databricks deprecates its Datasource API, but no new features will be implemented, and upgrades to the Connector will include only bug fixes and support for the current version. Which version should I use? The new Spark Connector (Version 10.0) is not intended to be a direct replacement for applications that use the current MongoDB Spark Connector. Note that the new connector uses a different namespace, “com.mongodb.spark.sql.connector.MongoTableProvider”, versus the original Spark Connector, which uses “com.mongodb.spark.DefaultSource”. Having a different namespace makes it possible to use both versions of the Connector within the same Spark application. This is helpful in unit testing your application with the new Connector and making the transition on your timeline. Also note a change with versioning of the MongoDB Spark Connector. The current version of the existing MongoDB Spark Connector is 3.0. Up until now, as MongoDB released versions of the connector, the number was aligned with the version of Spark that was supported—i.e., Version 2.4 of the MongoDB Spark Connector works with Spark 2.4. Going forward, this will not be the case. MongoDB's documentation will make clear which versions of Spark the Connector supports and provide the appropriate information. Structured Streaming to MongoDB Apache Spark comes with a stream processing engine called Structured Streaming, which is based on Spark's SQL engine and DataFrame APIs. Spark Structured Streaming treats each incoming stream of data as a microbatch, continually appending each microbatch to the target dataset. This makes it easy to convert existing Spark batch jobs into streaming jobs. Structured Streaming provides maximum throughput via the same distributed capabilities that have made Spark such a popular platform. In the following example, we’ll show you how to stream data to MongoDB using Structured Stream. Consider a CSV file that contains natural gas prices. The following PySpark code will read the CSV file into a stream, compute a moving average, and stream the results into MongoDB. from pyspark.sql.types import StructType, DateType, StringType, TimestampType, DoubleType from pyspark.sql import functions as F from pyspark.sql.window import Window from pyspark.sql.functions import lit, count sc.setLogLevel('DEBUG') readSchema = ( StructType() .add('Type', StringType()) .add('Date', TimestampType()) .add('Price', DoubleType()) ) ds = (spark .readStream.format("csv") .option("header", "true") .schema(readSchema) .load("daily*.csv")) slidingWindows = (ds .withWatermark("Date", "1 minute") .groupBy(ds.Type, F.window(ds.Date, "7 day")) .avg() .orderBy(ds.Type,'window')) dsw = ( slidingWindows .writeStream .format("mongodb") .queryName("7DaySlidingWindow") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option('spark.mongodb.connection.uri', 'MONGODB CONNECTION HERE') .option('spark.mongodb.database', 'Pricing') .option('spark.mongodb.collection', 'NaturalGas') .outputMode("complete")) query = dsw.start() query.processAllAvailable() query.stop() For more information and examples on the new MongoDB Spark Connector V10.0, check out our documentation . Ask questions and give feedback on the MongoDB Community Forum. The Connector is open sourced; feel free to contribute at GitHub .

March 31, 2022

MongoDB Connector for Apache Kafka 1.7 Available Now

Today, MongoDB has released version 1.7 of the MongoDB Connector for Apache Kafka! This article highlights some of the key features of this new release! MongoDB errors to the Dead Letter Queue Apache Kafka version 2.6 added support for handling errant records. The MongoDB Kafka Connector for Apache Kafka automatically sends messages that it cannot process to the dead letter queue. This includes messages that fail during conversion but up until this release did not include errors that were generated within MongoDB. For example, consider the scenario where we have a topic, “Sales.OrderStaging” This topic includes messages that contain an ‘order-id’ field. The application needs to insert a new document into MongoDB and use that order-id as the primary key or ‘_id’ of the document. If there happens to be a duplicate order-id entered on the kafka topic, the kafka message should be routed to a dead letter queue topic and the mongodb connector should continue to process other orders. the following sink configuration highlights the configuration parameters that support this scenario: "errors.tolerance":"all", "mongo.errors.tolerance":"all", "mongo.errors.log.enable":"true", "errors.log.include.messages":"true", "errors.deadletterqueue.topic.name":"orders.deadletterqueue", "errors.deadletterqueue.context.headers.enable":"true", "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy", "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.overwrite.existing":"true", "document.id.strategy.partial.value.projection.type": "AllowList", "document.id.strategy.partial.value.projection.list": "order-id" For example consider a kafka message with an order-id=5 and another message with the same order-id of 5. The sink connector will try to insert that second message with the same _id and there will be a MongoDB Error generated as expected. The kafka topic message which caused the error will be written to the orders.deadletterqueue topic. Once on the dead letter queue, you can inspect the errant records, update them, and resubmit them for processing. Setting the errors.deadletterqueue.context.headers.enable to true will add metadata to the DLQ message. This extra information may help with any automatic processing of errors in the queue. In addition to the DLQ, you can set errors.log.enable and error.log.include.messages configuration to write errors in the kafka connect log. Here is an example error from our scenario above: com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=11000, message=E11000 duplicate key error collection: Sales.Orders index: id dup key: { _id: { order-id: 5 } }, details={} Bulk write improvements Today the connector sink process works with a bulk insert in an ordered fashion. For example, consider these 10 documents in a bulk operation: [1,2,3,4,5,6,7,8,9,10] If document number 5 failed, perhaps due to a duplicate _id error, the MongoDB driver would return this error back to the connector and the rest of the documents would not be written to MongoDB e.g. only [1,2,3,4] is written in MongoDB in the above example. While this might be acceptable for some use cases, for other scenarios with large batch sizes this can make reprocessing messages cumbersome. In Kafka 1.7, we introduced a new parameter bulk.write.ordered that by default is set to true which is the behavior as it exists today with the Kafka connector. Setting to false and running the above scenario will result in an end state of [1,2,3,4,6,7,8,9,10] written to MongoDB with 5 being written to the topic defined in the dead letter queue. Note that the actual order of the documents may be different since we specified false to bulk.write.ordered. For more information on error handling including information on the format of the DLQ headers check out the MongoDB Kafka Connector documentation . To setup a dead letter queue, check out the Creating a dead letter queue section within the Confluent Kafka Connect documentation. Changed retry logic Currently the Kafka Connector manages retries of the writes to MongoDB using the max.num.retries and retries.defer.timeout configuration properties . This feature was originally intended to address challenges such as network connection issues. Since that time the MongoDB drivers have implemented native capabilities that handle retry logic: The Kafka Connector uses the MongoDB Java driver and has retries enabled by default so there are no changes or extra configuration you need to do to enable retry in the Kafka Connector. Note: If you set retryWrites to false in the connection.uri configuration property, then retries are disabled for the sink connector. If you would like to leverage the drivers native retry capability, simply remove the “retryWrites”' parameter from the connection.uri. Allow disk use when copying The copy.existing.allow.disk.use configuration copies existing data from the source. It uses an aggregation pipeline that filters change stream events coming from the MongoDB source. In certain situations this pipeline can use up large amounts of memory. This flag enabled by default allows the copy existing aggregation to use temporary disk storage if required by the query. Note the default is true, but set to false if the process running MongoDB doesn't have the permissions for disk access. For more information see the ‘allowDiskuse’ option in the aggregate() documentation .

February 17, 2022

Real-time Applications Made Simple with MongoDB and Redpanda

MongoDB has a long history of advocating for simplicity and focusing on making developers more agile and productive. MongoDB first disrupted the database market with the document model, storing data records as BSON (binary representation of JSON documents). This approach to working with data enables developers to easily store and query their data as they use it naturally within their applications. As your data changes, you simply add an attribute to your documents and move on to the next ticket. There is no need to waste time altering tables and constraints when the needs of your application change. MongoDB is always on the lookout for more ways to make life easier for developers, such as addressing the challenges of working with streaming data. With streaming data, it may take armies of highly skilled operational personnel to build and maintain a production-ready platform (like Apache Kafka). Developers then have to integrate their applications with these streaming data platforms resulting in complex application architectures. It’s exciting to see technologies like Redpanda seeking to improve developer productivity for working with streaming data. For those unfamiliar with Redpanda, it is a Kafka API compatible streaming platform that works with the entire Kafka ecosystem, such as Kafka-Connect and popular Kafka drivers : librdkafka , kafka-python , and the Apache Kafka Java Client . Redpanda is written in C++ and leverages the RAFT protocol, which makes Apache ZooKeeper irrelevant. Also, its thread-per-core architecture and JVM-free implementation enable performance improvements over other data streaming platforms. On a side note, MongoDB also implements a protocol similar to RAFT for its replica set cluster primary and secondary elections and management. Both MongoDB and Redpanda share a common goal of simplicity and making complex tasks trivial for the developer. So we decided to show you how to pull together a simple streaming application using both technologies. The example application (found in this GitHub repository ) considers the scenario where stock ticker data is written to a Redpanda and consumed by MongoDB. Once you have the example running, a “stock generator” creates a list of 10 fictitious companies and starts writing ticker data to a Redpanda topic. Kafka Connect service listens for data coming into this topic and “sinks” the data to the MongoDB cluster. Once landed in MongoDB, the application issues an aggregation query to determine the moving averages of the stock securities and updates the UI. MongoDB consumes the ticker data and calculates the average stock price trends using the aggregation framework . Once you have downloaded the repository, a docker-compose script includes a Node server, Redpanda deployment, Kafka Connect service, and a MongoDB instance. The Kafka Connect image includes the Dockerfile-MongoConnect file to install the MongoDB Connector for Apache Kafka . The Dockerfile-Nodesvr is included in the nodesvr image and it copies the web app code & installs the necessary files via NPM. There is a run.sh script file that will launch the docker-compose script to launch the containers. To start the demo, simply run this script file via sh run.sh and upon success, you will see a list of the servers and their ports: The following services are running: MongoDB server on port 27017 Redpanda on 8082 (Redpanda proxy on 8083) Kafka Connect on 8083 Node Server on 4000 is hosting the API and homepage Status of kafka connectors: sh status.sh To tear down the environment and stop these services: docker-compose down -v Once started, navigate to localhost:4000 in a browser and click the “Start” button. After a few seconds, you will see the sample stock data from 10 fictitious companies with the moving average price. Get started with MongoDB and Redpanda This example showcases the simplicity of moving data through the Redpanda streaming platform and into MongoDB for processing. Check out these resources to learn more: Introduction to Redpanda MongoDB + Redpanda Example Application GitHub repository Learn more about the MongoDB Connector for Apache Kafka Ask questions on the MongoDB Developer Community forums Sign up for MongoDB Atlas to get your free tier cluster

October 26, 2021

Data Movement from Oracle to MongoDB Made Easy with Apache Kafka

Change Data Capture features have existed for many years in the database world. CDC makes it possible to listen to changes to the database like inserting, updating and deleting data and have these events be sent to other database systems in various scenarios like ETL, replications and database migrations. By leveraging the Apache Kafka, the Confluent Oracle CDC Connector and the MongoDB Connector for Apache Kafka, you can easily stream database changes from Oracle to MongoDB. In this post we will pass data from Oracle to MongoDB providing a step by step configuration for you to easily re-use, tweak and explore the functionality. At a high level, we will configure the above references image in a self-contained docker compose environment that consists of the following: Oracle Database MongoDB Apache Kafka Confluent KSQL These containers will be run all within a local network bridged so you can play around with them from your local Mac or PC. Check out the GitHub repository to download the complete example. Preparing the Oracle Docker image If you have an existing Oracle database, remove the section “database” from the docker-compose file. If you do not already have an Oracle database, you can pull the Oracle Database Enterprise Edition from Docker Hub . You will need to accept the Oracle terms and conditions and then login into your docker account via docker login then docker pull store/oracle/database-enterprise:12.2.0.1-slim to download the image locally. Launching the docker environment The docker-compose file will launch the following: Apache Kafka including Zookeeper, REST API, Schema Registry, KSQL Apache Kafka Connect MongoDB Connector for Apache Kafka Confluent Oracle CDC Connector Oracle Database Enterprise The complete sample code is available from a GitHub repository . To launch the environment, make sure you have your Oracle environment ready and then git clone the repo and build the following: docker-compose up -d --build Once the compose file finishes you will need to configure your Oracle environment to be used by the Confluent CDC Connector. Step 1: Connect to your Oracle instance If you are running Oracle within the docker environment, you can use docker exec as follows: docker exec -it oracle bash -c "source /home/oracle/.bashrc; sqlplus /nolog " connect / as sysdba Step 2: Configure Oracle for CDC Connector First, check if the database is in archive log mode. select log_mode from v$database; If the mode is not “ARCHIVELOG”, perform the following: SHUTDOWN IMMEDIATE; STARTUP MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN; Verify the archive mode: select log_mode from v$database The LOG_MODE should now be, “ARCHIVELOG”. Next, enable supplemental logging for all columns ALTER SESSION SET CONTAINER=cdb$root; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; The following should be run on the Oracle CDB: CREATE ROLE C##CDC_PRIVS; GRANT CREATE SESSION, EXECUTE_CATALOG_ROLE, SELECT ANY TRANSACTION, SELECT ANY DICTIONARY TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO C##CDC_PRIVS; CREATE USER C##myuser IDENTIFIED BY password CONTAINER=ALL; GRANT C##CDC_PRIVS TO C##myuser CONTAINER=ALL; ALTER USER C##myuser QUOTA UNLIMITED ON sysaux; ALTER USER C##myuser SET CONTAINER_DATA = (CDB$ROOT, ORCLPDB1) CONTAINER=CURRENT; ALTER SESSION SET CONTAINER=CDB$ROOT; GRANT CREATE SESSION, ALTER SESSION, SET CONTAINER, LOGMINING, EXECUTE_CATALOG_ROLE TO C##myuser CONTAINER=ALL; GRANT SELECT ON GV_$DATABASE TO C##myuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##myuser CONTAINER=ALL; GRANT SELECT ON GV_$ARCHIVED_LOG TO C##myuser CONTAINER=ALL; GRANT CONNECT TO C##myuser CONTAINER=ALL; GRANT CREATE TABLE TO C##myuser CONTAINER=ALL; GRANT CREATE SEQUENCE TO C##myuser CONTAINER=ALL; GRANT CREATE TRIGGER TO C##myuser CONTAINER=ALL; ALTER SESSION SET CONTAINER=cdb$root; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; GRANT FLASHBACK ANY TABLE TO C##myuser; GRANT FLASHBACK ANY TABLE TO C##myuser container=all; Next, create some objects CREATE TABLE C##MYUSER.emp ( i INTEGER GENERATED BY DEFAULT AS IDENTITY, name VARCHAR2(100), lastname VARCHAR2(100), PRIMARY KEY (i) ) tablespace sysaux; insert into C##MYUSER.emp (name, lastname) values ('Bob', 'Perez'); insert into C##MYUSER.emp (name, lastname) values ('Jane','Revuelta'); insert into C##MYUSER.emp (name, lastname) values ('Mary','Kristmas'); insert into C##MYUSER.emp (name, lastname) values ('Alice','Cambio'); commit; Step 3: Create Kafka Topic Open a new terminal/shell and connect to your kafka server as follows: docker exec -it broker /bin/bash When connected create the kafka topic : kafka-topics --create --topic SimpleOracleCDC-ORCLCDB-redo-log \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000 Step 4: Configure the Oracle CDC Connector The oracle-cdc-source.json file in the repository contains the configuration of Confluent Oracle CDC connector. To configure simply execute: curl -X POST -H "Content-Type: application/json" -d @oracle-cdc-source.json http://localhost:8083/connectors Step 5: Setup kSQL data flows within Kafka As Oracle CRUD events arrive in the Kafka topic, we will use KSQL to stream these events into a new topic for consumption by the MongoDB Connector for Apache Kafka. docker exec -it ksql-server bin/bash ksql http://127.0.0.1:8088 Enter the following commands: CREATE STREAM CDCORACLE (I DECIMAL(20,0), NAME varchar, LASTNAME varchar, op_type VARCHAR) WITH ( kafka_topic='ORCLCDB-EMP', PARTITIONS=1, REPLICAS=1, value_format='AVRO'); CREATE STREAM WRITEOP AS SELECT CAST(I AS BIGINT) as "_id", NAME , LASTNAME , OP_TYPE from CDCORACLE WHERE OP_TYPE!='D' EMIT CHANGES; CREATE STREAM DELETEOP AS SELECT CAST(I AS BIGINT) as "_id", NAME , LASTNAME , OP_TYPE from CDCORACLE WHERE OP_TYPE='D' EMIT CHANGES; To verify the steams were created: SHOW STREAMS; This command will show the following: Stream Name | Kafka Topic | Format ------------------------------------ CDCORACLE | ORCLCDB-EMP | AVRO DELETEOP | DELETEOP | AVRO WRITEOP | WRITEOP | AVRO ------------------------------------ Step 6: Configure MongoDB Sink The following is the configuration for the MongoDB Connector for Apache Kafka: { "name": "Oracle", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "WRITEOP", "connection.uri": "mongodb://mongo1", "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy", "database": "kafka", "collection": "oracle", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.overwrite.existing": "true", "document.id.strategy.partial.value.projection.type": "allowlist", "document.id.strategy.partial.value.projection.list": "_id", "errors.log.include.messages": true, "errors.deadletterqueue.context.headers.enable": true, "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081", "key.converter":"org.apache.kafka.connect.storage.StringConverter" } } In this example, this sink process consumes records from the WRITEOP topic and saves the data to MongoDB. The write model, UpdateOneBusinessKeyTimestampStrategy, performs an upsert operation using the filter defined on PartialValueStrategy property which in this example is the "_id" field. For your convenience, this configuration script is written in the mongodb-sink.json file in the repository. To configure execute: curl -X POST -H "Content-Type: application/json" -d @mongodb-sink.json http://localhost:8083/connectors Delete events are written in the DELETEOP topic and are sinked to MongoDB with the following sink configuration: { "name": "Oracle-Delete", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "DELETEOP", "connection.uri": "mongodb://mongo1”, "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy", "database": "kafka", "collection": "oracle", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.overwrite.existing": "true", "document.id.strategy.partial.value.projection.type": "allowlist", "document.id.strategy.partial.value.projection.list": "_id", "errors.log.include.messages": true, "errors.deadletterqueue.context.headers.enable": true, "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081" } } curl -X POST -H "Content-Type: application/json" -d @mongodb-sink-delete.json http://localhost:8083/connectors This sink process uses the DeleteOneBusinessKeyStrategy writemdoel strategy . In this configuration, the sink reads from the DELETEOP topic and deletes documents in MongoDB based upon the filter defined on PartialValueStrategy property. In this example that filter is the “_id” field. Step 7: Write data to Oracle Now that your environment is setup and configured, return to the Oracle database and insert the following data: insert into C##MYUSER.emp (name, lastname) values ('Juan','Soto'); insert into C##MYUSER.emp (name, lastname) values ('Robert','Walters'); insert into C##MYUSER.emp (name, lastname) values ('Ruben','Trigo'); commit; Next, notice the data as it arrived in MongoDB by accessing the MongoDB shell. docker exec -it mongo1 /bin/mongo The inserted data will now be available in MongoDB. If we update the data in Oracle e.g. UPDATE C##MYUSER.emp SET name=’Rob’ WHERE name=’Robert’; COMMIT;\ The document will be updated in MongoDB as: { "_id" : NumberLong(11), "LASTNAME" : "Walters", "NAME" : "Rob", "OP_TYPE" : "U", "_insertedTS" : ISODate("2021-07-27T10:25:08.867Z"), "_modifiedTS" : ISODate("2021-07-27T10:25:08.867Z") } If we delete the data in Oracle e.g. DELETE FROM C##MYUSER.emp WHERE name=’Rob’; COMMIT;. The documents with name=’Rob’ will no longer be in MongoDB. Note that it may take a few seconds for the propagation from Oracle to MongoDB. Many possibilities In this post we performed a basic setup of moving data from Oracle to MongoDB via Apache Kafka and the Confluent Oracle CDC Connector and MongoDB Connector for Apache Kafka. While this example is fairly simple, you can add more complex transformations using KSQL and integrate other data sources within your Kafka environment making a production ready ETL or streaming environment with best of breed solutions. The docker scripts and images used on this blog have been tested against Docker running on an Intel-based Macs, the Oracle image might not work with the Apple M1 Chipset. Resources How to Get Started with MongoDB Atlas and Confluent Cloud Announcing the MongoDB Atlas Sink and Source Connectors in Confluent Cloud Making your Life Easier with MongoDB and Kafka Streaming Time-Series Data Using Apache Kafka and MongoDB

August 17, 2021

Streaming Time-Series Data Using Apache Kafka and MongoDB

There is one thing the world agrees on and it is the concept of time. Many applications are heavily time-based. Consider solar field power generation, stock trading, and health monitoring. These are just a few of the plethora of applications that produce and use data that contains a critical time component. In general, time-series data applications are heavy on inserts, rarely perform updates and are even more unlikely to delete the data. These applications generate a tremendous amount of data and need a robust data platform to effectively manage and query data. With MongoDB, you can easily: Pre-aggregate data using the MongoDB Query language and window functions Optimally store large amounts of time-series data with MongoDB time-series collections Archive data to cost effective storage using MongoDB Atlas Online Archive Apache Kafka is often used as an ingestion point for data due to its scalability. Through the use of the MongoDB Connector for Apache Kafka and the Apache Kafka Connect service, it is easy to transfer data between Kafka topics and MongoDB clusters. Starting in the 1.6 release of the MongoDB Connector for Apache Kafka, you can configure kafka topic data to be written directly into a time-series collection in MongoDB. This configuration happens in the sink. Configuring time series collections in the sink With MongoDB, applications do not need to create the database and collection before they start writing data. These objects are created automatically upon first arrival of data into MongoDB. However, a time-series collection type needs to be created first before you start writing data. To make it easy to ingest time-series data into MongoDB from Kafka, these collection options are exposed as sink parameters and the time-series collection is created by the connector if it doesn’t already exist . Some of the new parameters are defined as follows: timeseries.timefield Name of the top level field used for time. timeseries.expire.after.seconds This optional field determines the amount of time the data will be in MongoDB before being automatically deleted. Omitting this field means data will not be deleted automatically. If you are familiar with TTL indexes in MongoDB, setting this field provides a similar behavior. timeseries.timefield.auto.convert This optional field tells the connector to convert the data in the field into a BSON Date format. Supported formats include integer, long, and string. For a complete list of the new time-seris parameters check out the MongoDB Sink connector online documentation . When data is stored in time-series collections, MongoDB optimizes the storage and bucketization of your data behind the scenes. This saves a tremendous amount of storage space compared to the typical one document per data point data structure in regular collections. You can also explore the many new time and window functionalities within the MongoDB Query Language. For example, consider this sample document structure: { tx_time: 2021-06-30T15:47:31.000Z, _id: '60dc921372f0f39e2cd6cba5', company_name: 'SILKY CORNERSTONE LLC', price: 94.0999984741211, company_symbol: 'SCL' } You can use the new $setWindowFields pipeline to define the window of documents to perform an operation on then perform rankings, cumulative totals, and other analytics of complex time series data. For example, using the data generated in the tutorial, let’s determine the rolling average to the data as follows: db.StockDataTS.aggregate( [ { $match: {company_symbol: 'SCL'} }, { $setWindowFields: { partitionBy: '$company_name', sortBy: { 'tx_time': 1 }, output: { averagePrice: { $avg: "$price", window: { documents: [ "unbounded", "current" ] } } } } } ]) A sample of the result set is as follows: { tx_time: 2021-06-30T15:47:45.000Z, _id: '60dc922172f0f39e2cd6cbeb', company_name: 'SILKY CORNERSTONE LLC', price: 94.06999969482422, company_symbol: 'SCL', averagePrice: 94.1346669514974 }, { tx_time: 2021-06-30T15:47:47.000Z, _id: '60dc922372f0f39e2cd6cbf0', company_name: 'SILKY CORNERSTONE LLC', price: 94.1500015258789, company_symbol: 'SCL', averagePrice: 94.13562536239624 }, { tx_time: 2021-06-30T15:47:48.000Z, _id: '60dc922472f0f39e2cd6cbf5', company_name: 'SILKY CORNERSTONE LLC', price: 94.0999984741211, company_symbol: 'SCL', averagePrice: 94.13352966308594 } Notice the additional “averagePrice” field is now populated with a rolling average. For more information on time-series collection in MongoDB check out the online documentation . Migrating existing collections To convert an existing MongoDB collection to a time-series collection you can use the MongoDB Connector for Apache Kafka. Simply configure the source connection to your existing collection and configure the sink connector to write to a MongoDB time series collection by using the “timeseries.timefield” parameter. You can configure the source connector to copy existing data by setting the “copy.existing” parameter to true. This will create insert events for all existing documents in the source. Any documents that were inserted during the copying process will be inserted once the copying process has finished. While not always possible, it is recommended to pause writes to the source data while the copy process is running. To see when it finishes, you can view the logs for the message, “Finished copying existing data from the collection(s).”. For example, consider a source document that has this structure: { company_symbol: (STRING), company_name: (STRING), price: (DECIMAL), tx_time: (STRING) } For the initial release of MongoDB Time series collections, the field that represents the time is required to be stored as a Date. In our example, we are using a string to showcase the ability for the connector to automatically convert from a string to a Date. If you chose to perform the conversion outside of the connector you could use a Single Message Transform in Kafka Connect to convert the string into a Date at the Sink. However, certain SMTs like Timestampconverter require schemas to be defined for the data in the Kafka topic in order to work. This may add some complexity to the configuration. Instead of using an SMT you can automatically convert into Dates using the new timeseries.timefield.auto.convert, and timeseries.timefield.auto.convert.date.format options. Here is a sample source configuration that will copy all the existing data from the StockData collection then continue to push data changes to the stockdata.Stocks.StockData topic: {"name": "mongo-source-stockdata", "config": { "tasks.max":"1", "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only": true, "connection.uri":(MONGODB SOURCE CONNECTION STRING), "topic.prefix":"stockdata", "database":"Stocks", "collection":"StockData", "copy.existing":"true" }} This is a sample configuration for the sink to write the data from the stockdata.Stocks.StockData topic to a MongoDB time series collection: {"name": "mongo-sink-stockdata", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "topics":"stockdata.Stocks.StockData", "connection.uri":(MONGODB SINK CONNECTION STRING), "database":"Stocks", "collection":"StockDataMigrate", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield":"tx_time", "timeseries.timefield.auto.convert":"true", "timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'" }} In this sink example, the connector will convert the data in the “tx_time” field into a Date and parse it expecting the string format yyyy-MM-ddTHH:mm:ssZ (e.g. '2021-07-06T12:25:45Z') Note that in the initial version of time-series collections, only insert into a time-series collection is supported. Updating or deleting documents on the source will not propagate to the destination. Also, you can not use the MongoDB CDC Handler in this scenario because the handler uses ReplaceOne which is a type of update command. These are limitations of the initial release of time-series in MongoDB and may be irrelevant by the time you read this post. Check the online documentation for the latest information. The MongoDB Connector for Apache Kafka version 1.6 is available to download from GitHub . Look for it on the Confluent Hub later this week!

July 13, 2021

Exploring Data with MongoDB Atlas, Databricks, and Google Cloud

>> Announcement: Some features mentioned below will be deprecated on Sep. 30, 2025. Learn more . MongoDB Atlas supports Google Cloud (GC), enabling you to easily spin up managed MongoDB clusters within GC in minutes. We’re excited to share that Databricks recently launched Databricks on GC, giving customers the freedom to move and analyze their data within GC and MongoDB Atlas. With the latest update to Databricks, it’s now easier to get started with a cloud-first approach on GC that leverages MongoDB Atlas with its flexible data model designed for modern applications and Databricks for more advanced analytics use cases. The following tutorial illustrates how to use MongoDB Atlas on GC and Databricks. We’ll use sample sales data in MongoDB Atlas and calculate the rolling average using Databricks on GC. This tutorial covers the following: How to read data from MongoDB Atlas on GC into Spark How to run the MongoDB Connector for Spark as a library in Databricks How to use the PySpark libraries to perform rolling averages of sales data How to write these averages back to MongoDB so they are accessible to applications Create Databricks Workspace To provision a new Databricks workspace, you will need to have a GC project already created. If you do not already have a Databricks cluster deployed on GC, follow the online documentation to create one. Note: It is important to follow the documentation, because there are a few key settings you will need to make in your GC project, such as enabling container.googleapis.com, storage.googleapis.com, and deploymentmanager.googleapis.com services and adjusting certain Google Cloud quotas before creating your Databricks cluster. In this example we have already created the Google Cloud project mongodb-supplysales and are ready to go to the Google Marketplace and add Databricks to our project. Within your Google project, click on “Marketplace” and enter “Databricks” in the search box. Click on the resulting tile and follow the instructions. Once your Databricks cluster is created, navigate to the Databricks cluster with the URL provided. Here you can create a new workspace. Once you’ve created your workspace, you will be able to launch it from the URL provided: Logging into your workspace brings up the following welcome screen: In this article, we will create a notebook to read data from MongoDB and use the PySpark libraries to perform the rolling average calculation. We can create our Databricks cluster by selecting the “+ Create Cluster” button from the Clusters menu. Note: For the purposes of this walkthrough we chose only one worker and preemptible instances; in a production environment you would want to include more workers and autoscaling. Before we create our cluster, we have the option under Advanced Options to provide Spark configuration variables. One of the common settings for Spark config is to define spark.mongodb.output.uri and spark.mongodb.input.uri . First we need to create the MongoDB Atlas cluster so we have a connection string to enter for these values. At this point, open a new browser tab and navigate to MongoDB Atlas. Prepare a MongoDB Atlas Instance Once in the MongoDB Atlas portal, you will need to do the following before you can use Atlas with Databricks: Create your MongoDB Atlas cluster Define user credentials for use in the Spark connector Define network access Add sample data (optional for this article) Create Your MongoDB Atlas Cluster If you already have a MongoDB Atlas account, log in and create a new Atlas cluster. If you do not have an account, you can set up a free cluster at the following URL: https://mongodb.prakticum-team.ru/cloud . Once your account is set up, you can create a new Atlas cluster by using the “+ New Cluster” dialog. MongoDB provides a free tier for Google Cloud. Once you provide a cluster name and click on “create,” Atlas will take approximately five to seven minutes to create your Atlas cluster. Define Database Access By default there are no users created in an Atlas cluster. To create an identity for our Spark cluster to connect to MongoDB Atlas, launch the “Add New Database User” dialog from the Database Access menu item. Notice that there are three options for authentication to MongoDB Atlas: Password, Certificate, and AWS IAM authentication. Select “Password,” and enter a username and password. Atlas provides granular access control: For example, you could restrict this user account to work only with a specific Atlas cluster or define the account as temporary and have Atlas expire within a specific time period. Defining Network Access MongoDB Atlas does not allow any connection from the internet by default. You need to include MongoDB Atlas as part of a VPC peering or AWS PrivateLink configuration. If you do not have that set up with your cloud provider, you need to specify from which IP addresses Atlas can accept incoming connections. You can do this via the “Add IP Address” dialog in the Network Access menu. In this article, we will add “0.0.0.0,” allowing access from anywhere, because we don’t know specifically which IP our Databricks cluster will be running on. MongoDB Atlas can also make this IP access list temporary, which is great for situations where you need to allow access from anywhere. Add Sample Data Now that we have added our user account and allowed network access to our Atlas cluster, we need to add some sample data. Atlas provides several sample collections that are accessible from the menu item on the cluster. In this example, we will use the sales collection within the sample_supplies database. Update Spark Configuration with Atlas Connection String Copy the MongoDB Atlas connection string by clicking on the Connect button and selecting “Connect your application.” Copy the contents of the connection string and note the placeholders for username and password . You will have to change those to your own credentials. Return to your Databricks workspace. Under Advanced Options in your Databricks workspace, paste the connection string for both the spark.mongodb.output.uri and spark.mongodb.input.uri variables. Note that you will need to update the credentials in the MongoDB Atlas connection string with those you defined previously. For simplicity in your PySpark code, change the default database in the connection string from MyFirstDatabase to sample_supplies. (This is optional, because you can always define the database name via Spark configuration options at runtime.) Start the Databricks Cluster Now that your Spark config is set, start the cluster. Note: If the cluster fails to start, check the event log and view the JSON tab. This is an example error message you will receive if you forgot to increase the SSD storage quota: Add MongoDB Spark Connector Once the cluster is up and running, click on “Install New” from the Libraries menu. Here we have a variety of ways to create a library, including uploading a JAR file or downloading the Spark connector from Maven. In this example, we will use Maven and specify org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 as the coordinates. Click on “Install” to add our MongoDB Spark Connector library to the cluster. Note: If you get the error message “Maven libraries are only supported on Databricks Runtime version 7.3 LTS, and versions >= 8.1,” you can download the MongoDB Spark Connector JAR file from https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/3.0.1/ and then upload it to Databricks by using the Upload menu option. Create a New Notebook Click on the Databricks home icon from the menu and select “Create a blank notebook.” Attach this new notebook to the cluster you created in the previous step. Because we defined our MongoDB connection string as part of the Spark conf cluster configuration, your notebook already has the MongoDB Atlas connection context. In the first cell, paste the following: from pyspark.sql import SparkSession pipeline="[{'$match': { 'items.name':'printer paper' }}, {'$unwind': { path: '$items' }}, {'$addFields': { totalSale: { \ '$multiply': [ '$items.price', '$items.quantity' ] } }}, {'$project': { saleDate:1,totalSale:1,_id:0 }}]" salesDF = spark.read.format("mongo").option("collection","sales").option("pipeline", pipeline).option("partitioner", "MongoSinglePartitioner").load() Run the cell to make sure you can connect the Atlas cluster. Note: If you get an error such as “MongoTimeoutException,” make sure your MongoDB Atlas cluster has the appropriate network access configured. The notebook gave us a schema view of what the data looks like. Although we could have continued to transform the data in the Mongo pipeline before it reached Spark, let’s use PySpark to transform it. Create a new cell and enter the following: from pyspark.sql.window import Window from pyspark.sql import functions as F salesAgg=salesDF.withColumn('saleDate', F.col('saleDate').cast('date')).groupBy("saleDate").sum("totalSale").orderBy("saleDate") w = Window.orderBy('saleDate').rowsBetween(-7, 0) df = salesAgg.withColumn('rolling_average', F.avg('sum(totalSale)').over(w)) df.show(truncate=False) Once the code is executed, the notebook will display our new dataframe with the rolling averages column: It is this cell where we will provide some additional transformation of the data such as grouping the data by saleDate and provide a summation of the totalSale per day. Once the data is in our desired format, we define a window of time as the past seven entries and then add a column to our data frame that is a rolling average of the total sales data. Once we have performed our analytics, we can write the data back to MongoDB for additional reporting, analytics, or archiving. In this scenario, we are writing the data back to a new collection called sales-averages: df.write.format("mongo").option("collection","sales-averages").save() You can see the data by using the Collections tab within the MongoDB Atlas cluster UI. WIth the data in MongoDB Atlas, you can now leverage many of the services available, including Atlas Online Archive, Atlas Search, and Atlas Data Lake. Summary The integration between MongoDB Atlas, Google Cloud, and Databricks enables you to gain deep insights into your data and gives you freedom to move and analyze data as your needs evolve. Check out the resources below for more information: Getting started with MongoDB Atlas MongoDB Spark Connector MongoDB Atlas on Google Cloud

May 11, 2021

How to Get Started with MongoDB Atlas and Confluent Cloud

Every year more and more applications are leveraging the public cloud and reaping the benefits of elastic scale and rapid provisioning. Forward-thinking companies such as MongoDB and Confluent have embraced this trend, building cloud-based solutions such as MongoDB Atlas and Confluent Cloud that work across all three major cloud providers. Companies across many industries have been leveraging Confluent and MongoDB to drive their businesses forward for years. From insurance providers gaining a customer-360 view for a personalized experience to global retail chains optimizing logistics with a real-time supply chain application, the connected technologies have made it easier to build applications with event-driven data requirements. The latest iteration of this technology partnership simplifies getting started with a cloud-first approach, ultimately improving developer’s productivity when building modern cloud-based applications with data in motion. Today, the MongoDB Atlas source and sink connectors are generally available within Confluent Cloud. With Confluent’s cloud-native service for Apache Kafka® and these fully managed connectors, setup of your MongoDB Atlas integration is simple. There is no need to install Kafka Connect or the MongoDB Connector for Apache Kafka, or to worry about scaling your deployment. All the infrastructure provisioning and management is taken care of for you, enabling you to focus on what brings you the most value — developing and releasing your applications rapidly. Let’s walk through a simple example of taking data from a MongoDB cluster in Virginia and writing it into a MongoDB cluster in Ireland. We will use a python application to write fictitious data into our source cluster. Step 1: Set up Confluent Cloud First, if you’ve not done so already, sign up for a free trial of Confluent Cloud . You can then use the Quick Start for Apache Kafka using Confluent Cloud tutorial to create a new Kafka cluster. Once the cluster is created, you need to enable egress IPs and copy the list of IP addresses. This list of IPs will be used as an IP Allow list in MongoDB Atlas. To locate this list, select “Custer Settings” and then the “Networking” tab. Keep this tab open for future reference: you will need to copy these IP addresses into the Atlas cluster in Step 2. Step 2: Set Up the Source MongoDB Atlas Cluster For a detailed guide on creating your own MongoDB Atlas cluster, see the Getting Started with Atlas tutorial. For the purposes of this article, we have created an M10 MongoDB Atlas cluster using the AWS cloud in the us-east-1 (Virginia) data center to be used as the source, and an M10 MongoDB Atlas cluster using the AWS cloud in the eu-west-1 (Ireland) data center to be used as the sink. Once your clusters are created, you will need to configure two settings in order to make a connection: database access and network access. Network Access You have two options for allowing secure network access from Confluent Cloud to MongoDB Atlas: You can use AWS PrivateLink, or you can secure the connection by allowing only specific IP connections from Confluent Cloud to your Atlas cluster. In this article, we cover securing via IPs. For information on setting up using PrivateLink, read the article Using the Fully Managed MongoDB Atlas Connector in a Secure Environment . To accept external connections in MongoDB Atlas via specific IP addresses, launch the “IP Access List” entry dialog under the Network Access menu. Here you add all the IP addresses that were listed in Confluent Cloud from Step 1. Once all the egress IPs from Confluent Cloud are added, you can configure the user account that will be used to connect from Confluent Cloud to MongoDB Atlas. Configure user authentication in the Database Access menu. Database Access You can authenticate to MongoDB Atlas using username/password, certificates, or AWS identity and access management (IAM) authentication methods. To create a username and password that will be used for connection from Confluent Cloud, select the “+ Add new Database User” option from the Database Access menu. Provide a username and password and make a note of this credential, because you will need it in Step 3 and Step 4 when you configure the MongoDB Atlas source and sink connectors in Confluent Cloud. Note: In this article we are creating one credential and using it for both the MongoDB Atlas source and MongoDB sink connectors. This is because both of the clusters used in this article are from the same Atlas project. Now that the Atlas cluster is created, the Confluent Cloud egress IPs are added to the MongoDB Atlas Allow list, and the database access credentials are defined, you are ready to configure the MongoDB Atlas source and MongoDB Atlas sink connectors in Confluent Cloud. Step 3: Configure the Atlas Source Now that you have two clusters up and running, you can configure the MongoDB Atlas connectors in Confluent Cloud. To do this, select “Connectors” from the menu, and type “MongoDB Atlas” in the Filters textbox. Note: When configuring MongoDB Atlas source And MongoDB Atlas sink, you will need the connection host name of your Atlas clusters. You can obtain this host name from the MongoDB connection string. An easy way to do this is by clicking on the "Connect" button for your cluster. This will launch the Connect dialog. You can choose any of the Connect options. For purposes of illustration, if you click on “Connect using MongoDB Compass.” you will see the following: The highlighted part in the above figure is the connection hostname you will use when configuring the source and sink connectors in Confluent Cloud. Configuring the MongoDB Atlas Source Connector Selecting “MongoDbAtlasSource” from the list of Confluent Cloud connectors presents you with several configuration options. The “Kafka Cluster credentials” choice is an API-based authentication that the connector will use for authentication with the Kafka broker. You can generate a new API key and secret by using the hyperlink. Recall that the connection host is obtained from the MongoDB connection string. Details on how to find this are described at the beginning of this section. The “Copy existing data” choice tells the connector upon initial startup to copy all the existing data in the source collection into the desired topic. Any changes to the data that occur during the copy process are applied once the copy is completed. By default, messages from the MongoDB source are sent to the Kafka topic as strings. The connector supports outputting messages in formats such as JSON and AVRO. Recall that the MongoDB source connector reads change stream data as events. Change stream event metadata is wrapped in the message sent to the Kafka topic. If you want just the message contents, you can set the “Publish full document only” output message to true. Note: For source connectors, the number of tasks will always be “1”: otherwise you will run the risk of duplicate data being written to the topic, because multiple workers would effectively be reading from the same change stream event stream. To scale the source, you could create multiple source connectors and define a pipeline that looks at only a portion of the collection. Currently this capability for defining a pipeline is not yet available in Confluent Cloud. Step 4: Generate Test Data At this point, you could run your python data generator application and start inserting data into the Stocks.StockData collection at your source. This will cause the connector to automatically create the topic “demo.Stocks.StockData.” To use the generator, git-clone the stockgenmongo folder in the above-referenced repository and launch the data generation as follows: python stockgen.py -c "< >" Where the MongoDB connection URL is the full connection string obtained from the Atlas source cluster. An example connection string is as follows: mongodb+srv://kafkauser:kafkapassword123@democluster.lkyil.mongodb.net Note: You might need to pip-install pymongo and dnspython first. If you do not wish to use this data generator, you will need to create the Kafka topic first before configuring the MongoDB Atlas sink. You can do this by using the Add a Topic dialog in the Topics tab of the Confluent Cloud administration portal. Step 5: Configuring the MongoDB Atlas Sink Selecting “MongoDB Atlas Sink” from the list of Confluent Cloud connectors will present you with several configuration options. After you pick the topic to source data from Kafka, you will be presented with additional configuration options. Because you chose to write your data in the source by using JSON, you need to select “JSON” in the input message format. The Kafka API key is an API key and secret used for connector authentication with Confluent Cloud. Recall that you obtain the connection host from the MongoDB connection string. Details on how to find this are described previously at the beginning of Step 3. The “Connection details” section allows you to define behavior such as creating a new document for every topic message or updating an existing document based upon a value in the message. These behaviors are known as document ID and write model strategies. For more information, check out the MongoDB Connector for Apache Kafka sink documentation . If order of the data in the sink collection is not important, you could spin up multiple tasks to gain an increase in write performance. Step 6: Verify Your Data Arrived at the Sink You can verify the data has arrived at the sink via the Atlas web interface. Navigate to the collection data via the Collections button. Now that your data is in Atlas, you can leverage many of the Atlas platform capabilities such as Atlas Search, Atlas Online Archive for easy data movement to low-cost storage, and MongoDB Charts for point-and-click data visualization. Here is a chart created in about one minute using the data generated from the sink cluster. Summary Apache Kafka and MongoDB help power many strategic business use cases, such as modernizing legacy monolithic systems, single views, batch processing, and event-driven architectures, to name a few. Today, Confluent and MongoDB Cloud and MongoDB Atlas provide fully managed solutions that enable you to focus on the business problem you are trying to solve versus spinning your tires in infrastructure configuration and maintenance. Register for our joint webinar to learn more!

May 6, 2021

MongoDB Connector for Apache Kafka 1.5 Available Now

Today, MongoDB has released version 1.5 of the MongoDB Connector for Apache Kafka! This article highlights some of the key features of this new release in addition to continuing to improve the overall quality & stability of the Connector . DeleteOne write model strategy When messages arrive on Kafka topics, the MongoDB Sink Connector reads them and by default will upsert them into the MongoDB cluster specified in the sink configuration. However, what if you didn’t want to always upsert them? This is where write strategies come in and provide you with the flexibility to define what you want to do with the document. While the concept of write strategies is not new to the connector, in this release there is a new write strategy available called DeleteOneBusinessKeyStrategy . This is useful for when a topic contains records identifying data that should be removed from a collection in the MongoDB sink. Consider the following: You run an online store selling fashionable face masks. As part of your architecture, the website sends orders to a Kafka topic, “web-orders” which upon message arrival kicks off a series of actions such as sending an email confirmation, and inserting the order details into an “Orders” collection in a MongoDB cluster. A sample Orders document: { _id: ObjectId("6053684f2fe69a6ad3fed028"), 'customer-id': 123, 'order-id': 100, order: { lineitem: 1, SKU: 'FACE1', quantity: 1 } } This process works great, however, when a customer cancels an order, we need to have another business process to update our inventory, send the cancellation, email and remove the order from our MongoDB sink. In this scenario a cancellation message is sent to another Kafka topic, “canceled-orders”. For messages in this topic, we don’t just want to upsert this into a collection, we want to read the message from the topic and use a field within the document to identify the documents to delete in the sink. For this example, let’s use the order-id key field and define a sink connector using the DeleteOneBusinessKeyStrategy as follows: "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics":"FaceMaskWeb.OrderCancel", "connection.uri":"mongodb://mdb1", "database":"FaceMaskWeb", "collection":"Orders", "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.partial.value.projection.type": "AllowList", "document.id.strategy.partial.value.projection.list": "order-id", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":false, "document.id.strategy.overwrite.existing": true Now when messages arrive in the “FakeMaskWeb.OrderCancel” topic, the “order-id” field is used to delete documents in the Orders collection. For example, using the sample document above, if we put this value into the OrderCancel topic { “order-id”: 100 } It would cause the document in the Orders collection with order-id and value 100 to be deleted. For a complete list of write model strategies check out the MongoDB Kafka Connector Sink documentation . Qlik Replicate Qlik Replicate is recognized as an industry leader in data replication and ingestion. With this new release of the Connector, you can now replicate and stream heterogeneous data from data sources like Oracle, MySQL, PostGres and others to MongoDB via Kafka and the Qlik Replicate CDC handler . To configure the MongoDB Connector for Apache Kafka to consume Qlik Replicate CDC events, use “com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler” as the value for the change data capture handler configuration parameter. The handler supports, insert, refresh, read, update and delete events. Errant Record Reporting Kafka Connect, the service which manages connectors that integrate with a Kafka deployment, has the ability to write records to a dead letter queue (DLQ) topic if those records could not be serialized or deserialized. Starting with Apache Kafka version 2.6, there was added support for error reporting within the sink connectors. This gives sink connectors the ability to send individual records to the DLQ if the connector deems the records to be invalid or problematic. For example, if you are projecting fields in the sink that do not exist in the kafka message or if your sink is expecting a JSON document and the message arrives in a different format. In these cases an error is written to the DLQ versus failing the connector. Various Improvements As with every release of the connector, we are constantly improving the quality and functionality. This release is no different. You’ll also see pipeline errors now showing up in the connect logs, and the sink connector can now be configured to write to the dead letter queue! Next Steps Download the latest MongoDB Connector for Apache Kafka 1.5 from the Confluent Hub ! Read the MongoDB Connector for Apache Kafka documentation . Questions/Need help with the connector? Ask the Community . Have a feature request? Provide Feedback or a file a JIRA .

April 7, 2021