Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Atlas
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Atlaschevron-right

Get Started with Atlas Stream Processing: Creating Your First Stream Processor

Robert Walters4 min read • Published Aug 13, 2024 • Updated Aug 13, 2024
AtlasStream Processing
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Atlas Stream Processing is now generally available. Learn more about it here.
If you're not already familiar, Atlas Stream Processing enables processing high-velocity streams of complex data using the same data model and Query API that's used in MongoDB Atlas databases. Streaming data is increasingly critical to building responsive, event-driven experiences for your customers. Stream processing is a fundamental building block powering these applications, by helping to tame the firehouse of data coming from many sources, by finding important events in a stream, or by combining data in motion with data in rest.
In this tutorial, we will create a stream processor that uses sample data included in Atlas Stream Processing. By the end of the tutorial, you will have an operational Stream Processing Instance (SPI) configured with a stream processor. This environment can be used for further experimentation and Atlas Stream Processing tutorials in the future.

Tutorial Prerequisites

This is what you'll need to follow along:
  • An Atlas user with atlasAdmin permission. For the purposes of this tutorial, we'll have the user "tutorialuser".
  • MongoDB shell (Mongosh) version 2.0+

Create the Stream Processing Instance

Let's first create a Stream Processing Instance (SPI). Think of an SPI as a logical grouping of one or more stream processors. When created, the SPI has a connection string similar to a typical MongoDB Atlas cluster.
Under the Services tab in the Atlas Project click, "Stream Processing". Then click the "Create Instance" button.
Stream Processing Instance button
This will launch the Create Instance dialog.
Create Stream Processing Instance dialog in the Atlas UI
Enter your desired cloud provider and region, and then click "Create". You will receive a confirmation dialog upon successful creation.

Configure the connection registry

The connection registry stores connection information to the external data sources you wish to use within a stream processor. In this example, we will use a sample data generator that is available without any extra configuration, but typically you would connect to either Kafka or an Atlas database as a source.
To manage the connection registry, click on "Configure" to navigate to the configuration screen.
The Stream Processing page in Atlas will be empty until the first stream processor is created
Once on the configuration screen, click on the "Connection Registry" tab. Configuration screen with Atlas Stream Processing page in Atlas
Next, click on the "Add Connection" button. This will launch the Add Connection dialog. Add Connection screen within the Stream Processing page in Atlas
From here, you can add connections to Kafka, other Atlas clusters within the project, or a sample stream. In this tutorial, we will use the Sample Stream connection. Click on "Sample Stream" and select "sample_stream_solar" from the list of available sample streams. Then, click "Add Connection".
The new "sample_stream_solar" will show up in the list of connections. A list of your connection names and types is visible on the Stream Processing page in Atlas

Connect to the Stream Processing Instance (SPI)

Now that we have both created the SPI and configured the connection in the connection registry, we can create a stream processor. First, we need to connect to the SPI that we created previously. This can be done using the MongoDB Shell (mongosh).
To obtain the connection string to the SPI, return to the main Stream Processing page by clicking on the "Stream Processing" menu under the Services tab.
Navigate to Stream Processing within the Services pane in Atlas
Next, locate the "Tutorial" SPI we just created and click on the "Connect" button. This will present a connection dialog similar to what is found when connecting to MongoDB Atlas clusters.
In the Atlas UI, each Stream Processing Instance is visible for users to connect and configure
For connecting, we'll need to add a connection IP address and create a database user, if we haven't already.
The connection dialog for a Stream Processing Instance steps through setting up connection security, choosing a connection method, and finally, connecting to the SPI
Then we'll choose our connection method. If you do not already have mongosh installed, install it using the instructions provided in the dialog.
Stream processor connection screen in Atlas
Once mongosh is installed, copy the connection string from the "I have the MongoDB Shell installed" view and run it in your terminal.
1Command Terminal > mongosh <<CONNECTION STRING>> --tls --authenticationDatabase admin --username tutorialuser
2
3Enter password: *******************
4
5Current Mongosh Log ID: 64e9e3bf025581952de31587
6Connecting to: mongodb://*****
7Using MongoDB: 6.2.0
8Using Mongosh: 2.0.0
9
10For mongosh info see: https://mongodb.prakticum-team.ru/proxy/docs.mongodb.com/mongodb-shell/
11
12AtlasStreamProcessing>
To confirm your sample_stream_solar is added as a connection, issue sp.listConnections(). Our connection to sample_stream_solar is shown as expected.
1AtlasStreamProcessing> sp.listConnections()
2{
3 ok: 1,
4 connections: [
5 {
6 name: 'sample_stream_solar',
7 type: 'inmemory',
8 createdAt: ISODate("2023-08-26T18:42:48.357Z")
9 }
10 ]
11}

Create a stream processor

If you are reading through this post as a prerequisite to another tutorial, you can return to that tutorial now to continue.
In this section, we will wrap up by creating a simple stream processor to process the sample_stream_solar source that we have used throughout this tutorial. This sample_stream_solar source represents the observed energy production of different devices (unique solar panels). Stream processing could be helpful in measuring characteristics such as panel efficiency or when replacement is required for a device that is no longer producing energy at all.
First, let's define a $source stage to describe where Atlas Stream Processing will read the stream data from.
1var solarstream={$source:{"connectionName": "sample_stream_solar"}}
Now we will issue .process to view the contents of the stream in the console. sp.process([solarstream])
.process lets us sample our source data and quickly test the stages of a stream processor to ensure that it is set up as intended. A sample of this data is as follows:
1{
2 device_id: 'device_2',
3 group_id: 3,
4 timestamp: '2023-08-27T13:51:53.375+00:00',
5 max_watts: 250,
6 event_type: 0,
7 obs: {
8 watts: 168,
9 temp: 15
10 },
11 _ts: ISODate("2023-08-27T13:51:53.375Z"),
12 _stream_meta: {
13 sourceType: 'sampleData',
14 timestamp: ISODate("2023-08-27T13:51:53.375Z")
15 }
16}

Wrapping up

In this tutorial, we started by introducing Atlas Stream Processing and why stream processing is a building block for powering modern applications. We then walked through the basics of creating a stream processor – we created a Stream Processing Instance, configured a source in our connection registry using sample solar data (included in Atlas Stream Processing), connected to a Stream Processing Instance, and finally tested our first stream processor using .process. You are now ready to explore Atlas Stream Processing and create your own stream processors, adding advanced functionality like windowing and validation.
If you enjoyed this tutorial and would like to learn more check out the MongoDB Atlas Stream Processing announcement blog post. For more on stream processors in Atlas Stream Processing, visit our documentation.

Learn more about MongoDB Atlas Stream Processing

For more on managing stream processors in Atlas Stream Processing, visit our documentation.
Log in today to get started. Atlas Stream Processing is now available to all developers in Atlas. Give it a try today!

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Sip, Swig, and Search With Playwright, OpenAI, and MongoDB Atlas Search


Oct 01, 2024 | 12 min read
Tutorial

How to Migrate PostgreSQL to MongoDB With Confluent Kafka


Aug 30, 2024 | 10 min read
Tutorial

Build a CRUD API With MongoDB, Typescript, Express, Prisma, and Zod


Sep 04, 2024 | 10 min read
Tutorial

How to Develop a Web App With Netlify Serverless Functions and MongoDB


Aug 30, 2024 | 6 min read
Table of Contents