Docs Menu

Apache Kafka Connections

Atlas Stream Processing supports both source and sink connections to Apache Kafka.

To add a Kafka connection to your stream processing instance's connection registry:

To create one connection for the stream processing instance you specify using the Atlas CLI, run the following command:

atlas streams connections create [connectionName] [options]

To learn more about the command syntax and parameters, see the Atlas CLI documentation for atlas streams connections create.

1

Warning

Navigation Improvements In Progress

We're currently rolling out a new and improved navigation experience. If the following steps don't match your view in the Atlas UI, see the preview documentation.

  1. If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.

  2. If it's not already displayed, select your project from the Projects menu in the navigation bar.

  3. In the sidebar, click Stream Processing under the Services heading.

    The Stream Processing page displays.

2
  1. Locate the overview panel of the stream processing instance you want to modify and click Configure.

  2. Select the Connection Registry tab.

3
4
  1. Select a Kafka connection.

  2. Provide a Connection Name. Each connection name must be unique within a stream processing instance. This is the name used to reference the connection in Atlas Stream Processing aggregations.

  3. Select a Network Access type. Atlas Stream Processing supports Public IP or VPC Peering connections.

    Click the Public IP button. No further configuration is needed for this network access type.

    1. Click the VPC Peering button.

    2. Toggle Enable VPC Peering on. Atlas Stream Processing automatically selects the appropriate VPC peering connection from your configured connections.

      If you do not have a VPC peering connection, Configure an Atlas Network Peering Connection.

  4. Specify an IP address for one or more bootstrap servers for your Apache Kafka system.

  5. From the dropdown menu, select a Security Protocol Method.

    Atlas Stream Processing supports SASL_PLAINTEXT or SASL_SSL.

    SASL_PLAINTEXT is incompatible with VPC peering. To use VPC peering, you must select the SASL_SSL method.

    1. From the dropdown menu, select a SASL Mechanism.

      Atlas Stream Processing supports:

      • PLAIN

      • SCRAM-SHA-256

      • SCRAM-SHA-512

    2. Provide a Username for authentication.

    3. Provide a password for authentication.

    4. Click Add connection.

    1. From the dropdown menu, select a SASL Mechanism.

      Atlas Stream Processing supports:

      • PLAIN

      • SCRAM-SHA-256

      • SCRAM-SHA-512

    2. Click Upload to upload your Certificate Authority PEM file

    3. Provide a Username for authentication.

    4. Provide a password for authentication.

    5. Click Add connection.

The Atlas Administration API provides an endpoint for adding a connection to a connection registry.

Add a Connection to the Connection Registry

Important

After adding an external connection such as an Apache Kafka cluster to your connection registry, you must add Atlas IP addresses to an access list for that external connection. For more information, see Allow Access to or from the Atlas Control Plane.

Atlas Stream Processing currently supports creating AWS Private Link connections to the following:

  • AWS Confluent clusters

  • AWS MSK clusters

  • Microsoft Azure EventHub

To create an AWS Private Link connection to use in your Atlas Stream Processing project:

Important

You can't have more than one Private Link connection to a given Confluent cluster per Atlas project. Before you begin this procedure, call the Return All Private Link Connections endpoint. If you have an existing Private Link connection to your Confluent cluster within Atlas but not within your Confluent account, only perform those steps that configure your Confluent-side networking.

1

You must configure your Confluent cluster to accept incoming connections from your Atlas project.

Important

Confluent accepts incoming connections only from AWS. To use a Confluent Private Link connection, you must host your stream processing instances on AWS.

  1. Call the Return Account ID and VPC ID for group and region Atlas Administration API endpoint. Note the value of awsAccountId, you will need this in a later step.

  2. In your Confluent account, navigate to the cluster you want to connect to. In your cluster networking interface, navigate to your cluster networking details.

  3. Add Private Link Access

For a Confluent dedicated cluster, provide a name of your choice. For the AWS account number, provide the value of the awsAccountId field you noted earlier.

Note

This step is not required for Confluent serverless clusters.

2

The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.

Create One Private Link

For an AWS Confluent Private Link connection, you must set the following key-value pairs:

Key
Value

serviceEndpointId

Your Confluent cluster's VPC Endpoint service name.

dnsDomain

Fully qualified domain name of the bootstrap server on your Confluent cluster.

dnsSubDomain

If your cluster doesn't use subdomains, you must set this to the empty array []. If your cluster uses subdomains, you must set this to an array containing one fully qualified subdomain name for each of your cluster's subdomains.

You can find these values in your Confluent cluster's networking details.

The following example command requests a connection to your Confluent cluster and illustrates a typical response:

curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \
--digest \
--user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \
--header 'Content-Type: application/json' \
--header 'Accept: application/vnd.atlas.2023-02-01+json' \
--data '{ "vendor": "Confluent", "provider": "AWS",
"region": "us_east_1", "serviceEndpointId":
"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9",
"dnsDomain": "sample.us-east-1.aws.confluent.cloud",
"dnsSubDomain: [
"use1-az1.sample.us-east-1.aws.confluent.cloud",
"use1-az2.sample.us-east-1.aws.confluent.cloud",
"use1-az4.sample.us-east-1.aws.confluent.cloud"
]
}'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"sample.us-east-1.aws.confluent.cloud.","vendor":"Confluent","provider":"AWS","region":"us_east_1","serviceEndpointId":"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9"}

After you send the request, note the value of the _id field in the response body. You will need this in a later step.

3

Note

This step applies only to Confluent serverless clusters.

Call the Return All Private Link Connections endpoint. Note the value of interfaceEndpointId.

In your Confluent account, navigate to the cluster you want to connect to. In your cluster networking interface, navigate to your cluster networking details. Navigate to the access points interface, and add a new access point. When Confluent prompts you for an interface endpoint, provide the value of interfaceEndpointId that you noted previously.

4

Add a connection with the following key-value pairs:

Key
Value

bootstrapServers

IP address of your cloud provider's Kafka bootstrap server.

security.protocol

SASL_SSL

authentication.mechanism

"PLAIN"

authentication.password

The password associated with your Confluent API key

authentication.username

The username associated with your Confluent API key

type

"Kafka"

networking.access.type

"PRIVATE_LINK"

networking.access.connectionId

_id value from your Private Link request response

Set all other values as necessary.

The following example command creates a Apache Kafka connection in Atlas:

curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \
--digest \
--user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \
--header 'Content-Type: application/json' \
--header 'Accept: application/vnd.atlas.2023-02-01+json' \
--data '{
"name": "confluent_demo",
"bootstrapServers": "slr-ntgrbn.sample.us-east-1.aws.confluent.cloud:9092",
"security": {
"protocol": "SASL_SSL"
},
"authentication": {
"mechanism": "PLAIN",
"password": "apiSecretDemo",
"username": "apiUserDemo"
},
"type": "Kafka",
"networking": {
"access": {
"type": "PRIVATE_LINK",
"connectionId": "38972b0cbe9c2aa40a30a246"
}
}
}'
1

You must configure your AWS MSK cluster to accept incoming connections from your Atlas project.

Important

AWS MSK accepts incoming connections from AWS only. To use an AWS MSK Private Link connection, you must host your stream processing instances on AWS.

  1. Use the Get Account Details endpoint to retrieve the AWS Principal identity. You will need this value for your AWS MSK cluster policy.

  2. Sign in to the AWS Management Console and navigate to the AWS MSK console. Ensure that multi-VPC connectivity is enabled on the cluster to which you want to connect.

  3. Click Properties, Security Settings, and Edit cluster policy.

    Provide the full ARN form of the Principal identity you retrieved earlier as the value of Statement.Principal.Aws.[] and ensure the policy takes the following form:

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Principal": {
    "AWS": [
    "arn:aws:iam:123456789012:root"
    ]
    },
    "Action": [
    "kafka:CreateVpcConnection",
    "kafka:GetBootstrapBrokers",
    "kafka:DescribeCluster",
    "kafka:DescribeClusterV2"
    ],
    "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/testing/de8982fa-8222-4e87-8b20-9bf3cdfa1521-2"
    }
    ]
    }
2

The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.

Create One Private Link

For an AWS MSK Private Link connection, you must set the following key-value pairs:

Key
Value

vendor

Must be set to "msk"

provider

Must be set to "aws"

arn

String representing the Amazon Resource Number of your AWS MSK cluster.

You can find the ARN in your AWS MSK cluster's networking details.

The following example command requests a connection to your AWS MSK cluster and illustrates a typical response:

curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \
--digest \
--user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \
--header 'Content-Type: application/json' \
--header 'Accept: application/vnd.atlas.2023-02-01+json' \
--data '{ "vendor": "msk",
"provider": "AWS",
"arn": "1235711"}'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"scram.sample.us-east-1.amazonaws.com","vendor":"msk","provider":"AWS","region":"us_east_1","serviceEndpointId":"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9"}

After you send the request, note the value of the _id field in the response body. You will need this in a later step.

3

Add a connection with the following key-value pairs:

Key
Value

bootstrapServers

IP address of your cloud provider's Kafka bootstrap server.

security.protocol

SASL_SSL

authentication.mechanism

"PLAIN"

authentication.password

The SCRAM password associated with your cluster. You must define a paired SCRAM user and password and associate it with your AWS MSK cluster using AWS Secrets Manager.

authentication.username

The SCRAM user associated with your cluster. You must define a paired SCRAM user and password and associate it with your AWS MSK cluster using AWS Secrets Manager.

type

"Kafka"

networking.access.type

"PRIVATE_LINK"

networking.access.connectionId

_id value from your Private Link request response

Set all other values as necessary.

The following example command creates a Apache Kafka connection in Atlas:

curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \
--digest \
--user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \
--header 'Content-Type: application/json' \
--header 'Accept: application/vnd.atlas.2023-02-01+json' \
--data '{
"name": "msk_demo",
"bootstrapServers": "slr-ntgrbn.sample.us-east-1.amazonaws.com:9092",
"security": {
"protocol": "SASL_SSL"
},
"authentication": {
"mechanism": "PLAIN",
"password": "scramSecretDemo",
"username": "scramUserDemo"
},
"type": "Kafka",
"networking": {
"access": {
"type": "PRIVATE_LINK",
"connectionId": "38972b0cbe9c2aa40a30a246"
}
}
}'

To create an Microsoft Azure Private Link connection to use in your Atlas Stream Processing project:

1

Warning

Navigation Improvements In Progress

We're currently rolling out a new and improved navigation experience. If the following steps don't match your view in the Atlas UI, see the preview documentation.

  1. If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.

  2. If it's not already displayed, select your project from the Projects menu in the navigation bar.

  3. In the sidebar, click Network Access under the Security heading.

    The Network Access page displays.

2
  1. Select the Private Endpoint tab.

  2. Select the Atlas Stream Processing tab.

    If you have not created an Atlas Stream Processing private endpoint previously, click Create endpoint. If you have, click Add ASP Endpoint.

3
  1. Set Cloud Provider to Azure.

  2. Set Vendor to EventHub.

  3. Click Next, enter service details

4
  1. Provide your Azure service endpoint ID.

  2. Select your Endpoint region.

  3. Select your Host name.

  4. Click Next, generate endpoint ID

5
  1. Log in to your Azure account.

  2. Go to your EventHub namespace.

  3. Click Settings, Networking, Private endpoint connections.

  4. Select and approve your incoming request.

    The request ID value appears in the Private endpoint column.

You may now view your Azure EventHub private endpoint in the Network Access interface under the Atlas Stream Processing tab by clicking the View button in its row.

1

The Atlas Administration API provides an endpoint for requesting a Private Link connection configured for Atlas Stream Processing.

Create One Private Link

For an Azure Private Link connection, you must set the following key-value pairs:

Key
Value

serviceEndpointId

Your EventHub namespace endpoint. Note that this value must be the Azure Resource Manager (ARM) ID of the Event Hub namespace, not the ARM ID of an individual Event Hub.

dnsDomain

Fully qualified domain name, with port number, of the bootstrap server in your Azure Event Hub namespace. This domain name conforms to the format described here.

The following example command requests a connection to your Azure Event Hub and illustrates a typical response:

curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \
--digest \
--user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \
--header 'Content-Type: application/json' \
--header 'Accept: application/vnd.atlas.2023-02-01+json' \
--data '{ "provider": "AZURE", "region": "US_EAST_2", "serviceEndpointId": "/subscriptions/b82d6aa0-0b0a-ffa3-7c22-e167dc44f5b0/resourceGroups/asp/providers/Microsoft.EventHub/namespaces/sample", "dnsDomain": "sample.servicebus.windows.net" }'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"sample.servicebus.windows.net","provider":"AZURE","region":"US_EAST_2","serviceEndpointId":"/subscriptions/b82d6aa0-0b0a-ffa3-7c22-e167dc44f5b0/resourceGroups/asp/providers/Microsoft.EventHub/namespaces/sample"}

After you send the request, note the value of the _id field in the response body. You will need this in a later step.

2

Accept the requested connection within your cloud provider account.

For Private Link connections to Azure, navigate to your Event Hub networking page and select the Private endpoint connections tab. In the table of connections, identify your newly requested connection and approve it.

3

Add a connection with the following key-value pairs:

Key
Value

bootstrapServers

IP address of your cloud provider's Kafka bootstrap server.

security.protocol

SASL_SSL

authentication.mechanism

"PLAIN"

authentication.password

Your Event Hub connection string

authentication.username

"$ConnectionString"

type

"Kafka"

networking.access.type

"PRIVATE_LINK"

networking.access.connectionId

_id value from your Private Link request response

Set all other values as necessary.

The following example command creates a Apache Kafka connection in Atlas:

curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \
--digest \
--user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \
--header 'Content-Type: application/json' \
--header 'Accept: application/vnd.atlas.2023-02-01+json' \
--data '{ "name": "eventhubpl33333", "bootstrapServers": "sample.servicebus.windows.net:9093", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "Endpoint=sb://sample.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Irlo3OoRkc27T3ZoGOlbhEOqXQRXzb12+Q2hNXm0lc=", "username": "$ConnectionString" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'

Each interface for creating a Kafka connection allows you to provide configuration parameters for your Kafka cluster. These configurations take the form of key-value pairs, and correspond to one of the following:

Atlas Stream Processing passes only these parameters to your Kafka cluster. If you declare any parameters not explicitly allowed, Atlas Stream Processing ignores them.

Interface
Configuration Mechanism

Atlas CLI

Provide configurations as a .json file specified with the --file flag.

Atlas Administration API

Provide configurations as a .json object specified in the config field.

Atlas UI

Provide configurations in the Configuration File field of the Add Connection page.

Atlas Stream Processing supports the following configurations: