Get Started with Atlas Stream Processing: Creating Your First Stream Processor
Rate this tutorial
If you're not already familiar, Atlas Stream Processing enables processing high-velocity streams of complex data using the same data model and Query API that's used in MongoDB Atlas databases. Streaming data is increasingly critical to building responsive, event-driven experiences for your customers. Stream processing is a fundamental building block powering these applications, by helping to tame the firehouse of data coming from many sources, by finding important events in a stream, or by combining data in motion with data in rest.
In this tutorial, we will create a stream processor that uses sample data included in Atlas Stream Processing. By the end of the tutorial, you will have an operational Stream Processing Instance (SPI) configured with a stream processor. This environment can be used for further experimentation and Atlas Stream Processing tutorials in the future.
This is what you'll need to follow along:
- An Atlas user with atlasAdmin permission. For the purposes of this tutorial, we'll have the user "tutorialuser".
- MongoDB shell (Mongosh) version 2.0+
Let's first create a Stream Processing Instance (SPI). Think of an SPI as a logical grouping of one or more stream processors. When created, the SPI has a connection string similar to a typical MongoDB Atlas cluster.
Under the Services tab in the Atlas Project click, "Stream Processing". Then click the "Create Instance" button.
This will launch the Create Instance dialog.
Enter your desired cloud provider and region, and then click "Create". You will receive a confirmation dialog upon successful creation.
The connection registry stores connection information to the external data sources you wish to use within a stream processor. In this example, we will use a sample data generator that is available without any extra configuration, but typically you would connect to either Kafka or an Atlas database as a source.
To manage the connection registry, click on "Configure" to navigate to the configuration screen.
Once on the configuration screen, click on the "Connection Registry" tab.
Next, click on the "Add Connection" button. This will launch the Add Connection dialog.
From here, you can add connections to Kafka, other Atlas clusters within the project, or a sample stream. In this tutorial, we will use the Sample Stream connection. Click on "Sample Stream" and select "sample_stream_solar" from the list of available sample streams. Then, click "Add Connection".
The new "sample_stream_solar" will show up in the list of connections.
Now that we have both created the SPI and configured the connection in the connection registry, we can create a stream processor. First, we need to connect to the SPI that we created previously. This can be done using the MongoDB Shell (mongosh).
To obtain the connection string to the SPI, return to the main Stream Processing page by clicking on the "Stream Processing" menu under the Services tab.
Next, locate the "Tutorial" SPI we just created and click on the "Connect" button. This will present a connection dialog similar to what is found when connecting to MongoDB Atlas clusters.
For connecting, we'll need to add a connection IP address and create a database user, if we haven't already.
Then we'll choose our connection method. If you do not already have mongosh installed, install it using the instructions provided in the dialog.
Once mongosh is installed, copy the connection string from the "I have the MongoDB Shell installed" view and run it in your terminal.
1 Command Terminal > mongosh <<CONNECTION STRING>> --tls --authenticationDatabase admin --username tutorialuser 2 3 Enter password: ******************* 4 5 Current Mongosh Log ID: 64e9e3bf025581952de31587 6 Connecting to: mongodb://***** 7 Using MongoDB: 6.2.0 8 Using Mongosh: 2.0.0 9 10 For mongosh info see: https://mongodb.prakticum-team.ru/proxy/docs.mongodb.com/mongodb-shell/ 11 12 AtlasStreamProcessing>
To confirm your sample_stream_solar is added as a connection, issue
sp.listConnections()
. Our connection to sample_stream_solar is shown as expected.1 AtlasStreamProcessing> sp.listConnections() 2 { 3 ok: 1, 4 connections: [ 5 { 6 name: 'sample_stream_solar', 7 type: 'inmemory', 8 createdAt: ISODate("2023-08-26T18:42:48.357Z") 9 } 10 ] 11 }
If you are reading through this post as a prerequisite to another tutorial, you can return to that tutorial now to continue.
In this section, we will wrap up by creating a simple stream processor to process the sample_stream_solar source that we have used throughout this tutorial. This sample_stream_solar source represents the observed energy production of different devices (unique solar panels). Stream processing could be helpful in measuring characteristics such as panel efficiency or when replacement is required for a device that is no longer producing energy at all.
First, let's define a $source stage to describe where Atlas Stream Processing will read the stream data from.
1 var solarstream={$source:{"connectionName": "sample_stream_solar"}}
Now we will issue .process to view the contents of the stream in the console.
sp.process([solarstream])
.process lets us sample our source data and quickly test the stages of a stream processor to ensure that it is set up as intended. A sample of this data is as follows:
1 { 2 device_id: 'device_2', 3 group_id: 3, 4 timestamp: '2023-08-27T13:51:53.375+00:00', 5 max_watts: 250, 6 event_type: 0, 7 obs: { 8 watts: 168, 9 temp: 15 10 }, 11 _ts: ISODate("2023-08-27T13:51:53.375Z"), 12 _stream_meta: { 13 sourceType: 'sampleData', 14 timestamp: ISODate("2023-08-27T13:51:53.375Z") 15 } 16 }
In this tutorial, we started by introducing Atlas Stream Processing and why stream processing is a building block for powering modern applications. We then walked through the basics of creating a stream processor – we created a Stream Processing Instance, configured a source in our connection registry using sample solar data (included in Atlas Stream Processing), connected to a Stream Processing Instance, and finally tested our first stream processor using .process. You are now ready to explore Atlas Stream Processing and create your own stream processors, adding advanced functionality like windowing and validation.
If you enjoyed this tutorial and would like to learn more check out the MongoDB Atlas Stream Processing announcement blog post. For more on stream processors in Atlas Stream Processing, visit our documentation.
Log in today to get started. Atlas Stream Processing is now available to all developers in Atlas. Give it a try today!