Apache Kafka Connections
Atlas Stream Processing supports both source and sink connections to Apache Kafka.
Add a Kafka Connection
To add a Kafka connection to your stream processing instance's connection registry:
To create one connection for the stream processing instance you specify using the Atlas CLI, run the following command:
atlas streams connections create [connectionName] [options]
To learn more about the command syntax and parameters, see the Atlas CLI documentation for atlas streams connections create.
In Atlas, go to the Stream Processing page for your project.
Warning
Navigation Improvements In Progress
We're currently rolling out a new and improved navigation experience. If the following steps don't match your view in the Atlas UI, see the preview documentation.
If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.
If it's not already displayed, select your project from the Projects menu in the navigation bar.
In the sidebar, click Stream Processing under the Services heading.
The Stream Processing page displays.
Add a new connection.
Select a Kafka connection.
Provide a Connection Name. Each connection name must be unique within a stream processing instance. This is the name used to reference the connection in Atlas Stream Processing aggregations.
Select a Network Access type. Atlas Stream Processing supports Public IP or VPC Peering connections.
Click the Public IP button. No further configuration is needed for this network access type.
Click the VPC Peering button.
Toggle Enable VPC Peering on. Atlas Stream Processing automatically selects the appropriate VPC peering connection from your configured connections.
If you do not have a VPC peering connection, Configure an Atlas Network Peering Connection.
Specify an IP address for one or more bootstrap servers for your Apache Kafka system.
From the dropdown menu, select a Security Protocol Method.
Atlas Stream Processing supports
SASL_PLAINTEXT
orSASL_SSL
.SASL_PLAINTEXT
is incompatible with VPC peering. To use VPC peering, you must select theSASL_SSL
method.From the dropdown menu, select a SASL Mechanism.
Atlas Stream Processing supports:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
Provide a Username for authentication.
Provide a password for authentication.
Click Add connection.
From the dropdown menu, select a SASL Mechanism.
Atlas Stream Processing supports:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
Click Upload to upload your Certificate Authority PEM file
Provide a Username for authentication.
Provide a password for authentication.
Click Add connection.
The Atlas Administration API provides an endpoint for adding a connection to a connection registry.
Important
After adding an external connection such as an Apache Kafka cluster to your connection registry, you must add Atlas IP addresses to an access list for that external connection. For more information, see Allow Access to or from the Atlas Control Plane.
Add a Kafka Private Link Connection
Atlas Stream Processing currently supports creating AWS Private Link connections to the following:
AWS Confluent clusters
AWS MSK clusters
Microsoft Azure EventHub
Amazon Web Services Private Link Connections
To create an AWS Private Link connection to use in your Atlas Stream Processing project:
Important
You can't have more than one Private Link connection to a given Confluent cluster per Atlas project. Before you begin this procedure, call the Return All Private Link Connections endpoint. If you have an existing Private Link connection to your Confluent cluster within Atlas but not within your Confluent account, only perform those steps that configure your Confluent-side networking.
Configure your Confluent cluster.
You must configure your Confluent cluster to accept incoming connections from your Atlas project.
Important
Confluent accepts incoming connections only from AWS. To use a Confluent Private Link connection, you must host your stream processing instances on AWS.
Call the Return Account ID and VPC ID for group and region Atlas Administration API endpoint. Note the value of
awsAccountId
, you will need this in a later step.In your Confluent account, navigate to the cluster you want to connect to. In your cluster networking interface, navigate to your cluster networking details.
For a Confluent dedicated cluster, provide a name of your
choice. For the AWS account number, provide the value of
the awsAccountId
field you noted earlier.
Note
This step is not required for Confluent serverless clusters.
Request a connection to your cloud provider.
The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.
For an AWS Confluent Private Link connection, you must set the following key-value pairs:
Key | Value |
---|---|
| Your Confluent cluster's VPC Endpoint service name. |
| Fully qualified domain name of the bootstrap server on your Confluent cluster. |
| If your cluster doesn't use subdomains, you must set this to
the empty array |
You can find these values in your Confluent cluster's networking details.
The following example command requests a connection to your Confluent cluster and illustrates a typical response:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "vendor": "Confluent", "provider": "AWS", "region": "us_east_1", "serviceEndpointId": "com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9", "dnsDomain": "sample.us-east-1.aws.confluent.cloud", "dnsSubDomain: [ "use1-az1.sample.us-east-1.aws.confluent.cloud", "use1-az2.sample.us-east-1.aws.confluent.cloud", "use1-az4.sample.us-east-1.aws.confluent.cloud" ] }'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"sample.us-east-1.aws.confluent.cloud.","vendor":"Confluent","provider":"AWS","region":"us_east_1","serviceEndpointId":"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9"}
After you send the request, note the value of the _id
field in the response body. You will need this in a later step.
Provide the interface endpoint ID to Confluent.
Note
This step applies only to Confluent serverless clusters.
Call the Return All Private Link Connections endpoint. Note
the value of interfaceEndpointId
.
In your Confluent account, navigate to the cluster you want to
connect to. In your cluster networking interface, navigate to
your cluster networking details. Navigate to the access points
interface, and add a new access point. When Confluent prompts
you for an interface endpoint, provide the value of
interfaceEndpointId
that you noted previously.
Create the Atlas-side connection.
Add a connection with the following key-value pairs:
Key | Value |
---|---|
| IP address of your cloud provider's Kafka bootstrap server. |
|
|
|
|
| The password associated with your Confluent API key |
| The username associated with your Confluent API key |
|
|
|
|
|
|
Set all other values as necessary.
The following example command creates a Apache Kafka connection in Atlas:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "confluent_demo", "bootstrapServers": "slr-ntgrbn.sample.us-east-1.aws.confluent.cloud:9092", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "apiSecretDemo", "username": "apiUserDemo" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Configure your AWS MSK cluster.
You must configure your AWS MSK cluster to accept incoming connections from your Atlas project.
Important
AWS MSK accepts incoming connections from AWS only. To use an AWS MSK Private Link connection, you must host your stream processing instances on AWS.
Use the Get Account Details endpoint to retrieve the AWS Principal identity. You will need this value for your AWS MSK cluster policy.
Sign in to the AWS Management Console and navigate to the AWS MSK console. Ensure that
multi-VPC connectivity
is enabled on the cluster to which you want to connect.Click Properties, Security Settings, and Edit cluster policy.
Provide the full ARN form of the Principal identity you retrieved earlier as the value of
Statement.Principal.Aws.[]
and ensure the policy takes the following form:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": [ "arn:aws:iam:123456789012:root" ] }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeCluster", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/testing/de8982fa-8222-4e87-8b20-9bf3cdfa1521-2" } ] }
Request a connection to your cloud provider.
The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.
For an AWS MSK Private Link connection, you must set the following key-value pairs:
Key | Value |
---|---|
| Must be set to |
| Must be set to |
| String representing the Amazon Resource Number of your AWS MSK cluster. |
You can find the ARN in your AWS MSK cluster's networking details.
The following example command requests a connection to your AWS MSK cluster and illustrates a typical response:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "vendor": "msk", "provider": "AWS", "arn": "1235711"}'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"scram.sample.us-east-1.amazonaws.com","vendor":"msk","provider":"AWS","region":"us_east_1","serviceEndpointId":"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9"}
After you send the request, note the value of the _id
field
in the response body. You will need this in a later step.
Create the Atlas-side connection.
Add a connection with the following key-value pairs:
Key | Value |
---|---|
| IP address of your cloud provider's Kafka bootstrap server. |
|
|
|
|
| The SCRAM password associated with your cluster. You must define a paired SCRAM user and password and associate it with your AWS MSK cluster using AWS Secrets Manager. |
| The SCRAM user associated with your cluster. You must define a paired SCRAM user and password and associate it with your AWS MSK cluster using AWS Secrets Manager. |
|
|
|
|
|
|
Set all other values as necessary.
The following example command creates a Apache Kafka connection in Atlas:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "msk_demo", "bootstrapServers": "slr-ntgrbn.sample.us-east-1.amazonaws.com:9092", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "scramSecretDemo", "username": "scramUserDemo" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Microsoft Azure Private Link Connections
To create an Microsoft Azure Private Link connection to use in your Atlas Stream Processing project:
In Atlas, go to the Network Access page for your project.
Warning
Navigation Improvements In Progress
We're currently rolling out a new and improved navigation experience. If the following steps don't match your view in the Atlas UI, see the preview documentation.
If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.
If it's not already displayed, select your project from the Projects menu in the navigation bar.
In the sidebar, click Network Access under the Security heading.
The Network Access page displays.
Provide your Azure EventHub endpoint details.
Provide your Azure service endpoint ID.
Select your Endpoint region.
Select your Host name.
Click Next, generate endpoint ID
You may now view your Azure EventHub private endpoint in the Network Access interface under the Atlas Stream Processing tab by clicking the View button in its row.
Request a connection to your cloud provider.
The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.
For an Azure Private Link connection, you must set the following key-value pairs:
Key | Value |
---|---|
| Your EventHub namespace endpoint. Note that this value must be the Azure Resource Manager (ARM) ID of the Event Hub namespace, not the ARM ID of an individual Event Hub. |
| Fully qualified domain name, with port number, of the bootstrap server in your Azure Event Hub namespace. This domain name conforms to the format described here. |
The following example command requests a connection to your Azure Event Hub and illustrates a typical response:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "provider": "AZURE", "region": "US_EAST_2", "serviceEndpointId": "/subscriptions/b82d6aa0-0b0a-ffa3-7c22-e167dc44f5b0/resourceGroups/asp/providers/Microsoft.EventHub/namespaces/sample", "dnsDomain": "sample.servicebus.windows.net" }'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"sample.servicebus.windows.net","provider":"AZURE","region":"US_EAST_2","serviceEndpointId":"/subscriptions/b82d6aa0-0b0a-ffa3-7c22-e167dc44f5b0/resourceGroups/asp/providers/Microsoft.EventHub/namespaces/sample"}
After you send the request, note the value of the _id
field
in the response body. You will need this in a later step.
Accept the requested connection within your cloud provider account.
For Private Link connections to Azure, navigate to your Event Hub networking page and select the Private endpoint connections tab. In the table of connections, identify your newly requested connection and approve it.
Create the Atlas-side connection.
Add a connection with the following key-value pairs:
Key | Value |
---|---|
| IP address of your cloud provider's Kafka bootstrap server. |
|
|
|
|
| Your Event Hub connection string |
|
|
|
|
|
|
|
|
Set all other values as necessary.
The following example command creates a Apache Kafka connection in Atlas:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "eventhubpl33333", "bootstrapServers": "sample.servicebus.windows.net:9093", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "Endpoint=sb://sample.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Irlo3OoRkc27T3ZoGOlbhEOqXQRXzb12+Q2hNXm0lc=", "username": "$ConnectionString" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Configuration
Each interface for creating a Kafka connection allows you to provide configuration parameters for your Kafka cluster. These configurations take the form of key-value pairs, and correspond to one of the following:
Atlas Stream Processing passes only these parameters to your Kafka cluster. If you declare any parameters not explicitly allowed, Atlas Stream Processing ignores them.
Interface | Configuration Mechanism |
---|---|
Atlas CLI | Provide configurations as a |
Atlas Administration API | Provide configurations as a |
Atlas UI | Provide configurations in the Configuration File field of the Add Connection page. |
Atlas Stream Processing supports the following configurations: