Using AWS IAM Authentication with the MongoDB Connector for Apache Kafka
Robert Walters, Jagadish Nallapaneni4 min read • Published Jul 01, 2024 • Updated Jul 01, 2024
Rate this tutorial
AWS Identity and Access Management (IAM) is a widely used service that enables developers to control secure access to AWS services and resources. A key benefit of AWS IAM is its ability to make secure authenticated connections without the client having to pass sensitive passwords.
As of version 1.13, the MongoDB Connector for Apache Kafka (Kafka Connector) now supports authentication via AWS IAM. AWS IAM support has been one of the most commonly requested features for the Kafka Connector, and in this tutorial, we will walk through how to use it.
To achieve support for AWS IAM, we added support for overriding the MongoClient interface allowing a developer to provide a CustomCredentialProvider. While this provider can be used for any authentication method, this tutorial will focus on building an AWS IAM credential provider.
To illustrate this, we will walk through a simple configuration of an AWS EC2 instance installed with Apache Kafka. This instance will use the MongoDB Connector for Apache Kafka to sink data from a Kafka topic to an Atlas cluster, and authenticate using AWS IAM.
If you wish to follow along in this article, make sure you have the following:
- An AWS IAM role, “KafkaBlogAtlasRole”. This role will later be mapped as an Atlas database user.
- AWS EC2 instance
- Configure the EC2 instance with an AWS IAM role, “KafkaConnectBlogRole.”
- Add a policy to KafkaConnectBlogRole that allows “AssumeRole” on the KafkaBlogAtlasRole.
Example policy:
1 { 2 "Version": "2012-10-17", 3 "Statement": [ 4 { 5 "Sid": "KafkaAtlasBlogPolicy", 6 "Effect": "Allow", 7 "Action": "sts:AssumeRole", 8 "Resource": "arn:aws:iam::979559056307:role/KafkaBlogAtlasRole" 9 } 10 ] 11 }
- Java Development Kit version 8 or later
- A database user in the desired MongoDB Atlas cluster
- Configure the database user with AWS IAM and use the AWS Role ARN for KafkaBlogAtlasRole. The Atlas database user should have access to read/write to the desired collection.
At this point, we are ready to start building the custom authentication provider.
Starting with version 1.13 of the MongoDB Connector for Apache Kafka, we now have the ability to plug in a custom authentication provider. In this article, we will create a custom authentication provider in Java, compile it, and add it to the lib directory of your Kafka deployment. The provider we will create will support AWS IAM authentication.
To start, in your favorite Java editor, create a project and build the following JAR that contains this source:
SampleAssumeRoleCredential.java
1 package com.mongodb; 2 3 import java.util.Map; 4 import java.util.function.Supplier; 5 6 import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider; 7 8 import com.amazonaws.auth.AWSCredentialsProvider; 9 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; 10 import com.amazonaws.services.securitytoken.AWSSecurityTokenService; 11 import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; 12 import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; 13 import com.amazonaws.services.securitytoken.model.AssumeRoleResult; 14 import com.amazonaws.services.securitytoken.model.Credentials; 15 import com.amazonaws.util.StringUtils; 16 17 public class SampleAssumeRoleCredential implements CustomCredentialProvider { 18 19 public SampleAssumeRoleCredential() {} 20 21 public MongoCredential getCustomCredential(Map<?, ?> map) { 22 AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain(); 23 Supplier<AwsCredential> awsFreshCredentialSupplier = () -> { 24 AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard() 25 .withCredentials(provider) 26 .withRegion("us-east-1") 27 .build(); 28 AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600) 29 .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn")) 30 .withRoleSessionName("Test_Session"); 31 AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest); 32 Credentials creds = assumeRoleResult.getCredentials(); 33 // Add your code to fetch new credentials 34 return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken()); 35 }; 36 return MongoCredential.createAwsCredential(null, null) 37 .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier); 38 } 39 40 41 public void validate(Map<?, ?> map) { 42 String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn"); 43 if (StringUtils.isNullOrEmpty(roleArn)) { 44 throw new RuntimeException("Invalid value set for customProperty"); 45 } 46 } 47 48 49 public void init(Map<?, ?> map) { 50 51 } 52 }
Example pom.xml file
1 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>com.mongodb</groupId> 7 <artifactId>SampleAssumeRoleCredential</artifactId> 8 <version>1.0-SNAPSHOT</version> 9 <build> 10 <plugins> 11 <plugin> 12 <groupId>org.apache.maven.plugins</groupId> 13 <artifactId>maven-shade-plugin</artifactId> 14 <version>3.5.3</version> 15 <configuration> 16 <!-- put your configurations here --> 17 </configuration> 18 <executions> 19 <execution> 20 <phase>package</phase> 21 <goals> 22 <goal>shade</goal> 23 </goals> 24 </execution> 25 </executions> 26 </plugin> 27 </plugins> 28 </build> 29 30 <dependencies> 31 <!-- Java MongoDB Driver dependency --> 32 <!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync --> 33 <dependency> 34 <groupId>org.mongodb</groupId> 35 <artifactId>mongodb-driver-sync</artifactId> 36 <version>5.1.0</version> 37 </dependency> 38 <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk --> 39 <dependency> 40 <groupId>com.amazonaws</groupId> 41 <artifactId>aws-java-sdk</artifactId> 42 <version>1.12.723</version> 43 </dependency> 44 45 <!-- slf4j logging dependency, required for logging output from the MongoDB Java Driver --> 46 <dependency> 47 <groupId>org.slf4j</groupId> 48 <artifactId>slf4j-jdk14</artifactId> 49 <version>1.7.28</version> 50 </dependency> 51 52 <dependency> 53 <groupId>kafka-connect</groupId> 54 <artifactId>kafka-connect</artifactId> 55 <scope>system</scope> 56 <version>1.13.0</version> 57 </dependency> 58 </dependencies> 59 60 <properties> 61 <maven.compiler.source>17</maven.compiler.source> 62 <maven.compiler.target>17</maven.compiler.target> 63 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 64 </properties> 65 66 </project>
Note: We need to reference MongoDB Connector for Apache Kafka version 1.13 or later. In this example, the connector was downloaded from the Kafka Connector GitHub repository and stored in the local Maven repository.
1 mvn install:install-file -Dfile=/Users/robert.walters/Downloads/mongodb-kafka-connect-mongodb-1.13.0/lib/mongo-kafka-connect-1.13.0-confluent.jar -DgroupId=kafka-connect -DartifactId=kafka-connector -Dversion=1.13.0 -Dpackaging=jar\n
Compile and place this SampleAssumeRole.jar file in the Kafka libs directory in your Kafka Connect deployment. You are now ready to configure your MongoDB Connector for Apache Kafka to use this custom authentication provider.
To support the custom provider in version 1.13, the following configuration options were added to the connector:
Configuration Parameter | Description |
---|---|
mongo.custom.auth.mechanism.enable | Determines if the connector should use the provider class; default is false |
mongo.custom.auth.mechanism.providerClass | Java class entry point for the custom provider |
mongodbaws.auth.mechanism.roleArn | Role that has IAM access to the database |
In this example, the custom authentication component will use AWS IAM AssumeRole to authenticate with MongoDB Atlas. Here is an example of this configuration connecting to a MongoDB Atlas cluster.
1 { 2 "name": "mongo-tutorial-sink", 3 "config": { 4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", 5 "topics": "Stocks2", 6 "connection.uri": "<MongoDB Connection String>", 7 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 8 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 9 "value.converter.schemas.enable": false, 10 "database": "Stocks", 11 "collection": "StockData", 12 "mongo.custom.auth.mechanism.enable":"true", 13 "mongo.custom.auth.mechanism.providerClass":"com.mongodb.SampleAssumeRoleCredential", 14 "mongodbaws.auth.mechanism.roleArn":"<AWS IAM ARN>" 15 } 16 }
Notice that the roleArn is the ARN (Amazon Resource Name) of the AWS IAM role that is used for Atlas users, KafkaBlogAtlasRole. At runtime, since Kafka Connect is running under the AWS IAM role of the EC2 instance, KafkaConnectBlogRole, and this role has a policy that allows AssumeRole on the KafkaBlogAtlasRole, the service can connect to Atlas without storing any credentials!
You are now ready to send data to Kafka and watch it arrive in a MongoDB collection using the MongoDB Connector for Apache Kafka configured with AWS IAM authentication to MongoDB Atlas.
In this article, we discussed the requirements needed to support a custom authentication provider such as AWS IAM authentication. Version 1.13 of the Kafka Connector introduced new parameters that define which class to use within the custom provider JAR file. This JAR file includes the logic for the custom authentication.
Note that while adding AWS IAM authentication capability natively to the Kafka Connector is possible (it is just software, after all), doing so would not only add a version dependency on the AWS IAM SDK, but it would also significantly increase the size of the connector, making it impractical for all other users who do not wish to use this authentication method. For these reasons, writing the custom credential provider is the solution for integrating the Kafka Connector with authentication methods such as AWS IAM.
Read the MongoDB Connector for Apache Kafka documentation to learn more. Questions? Join us in the MongoDB Developer Community.
Top Comments in Forums
There are no comments on this article yet.
Related
Tutorial
How to Get Started with MongoDB Atlas Stream Processing and the HashiCorp Terraform MongoDB Atlas Provider
May 20, 2024 | 5 min read
Tutorial
Deploying MongoDB Across Multiple Kubernetes Clusters With MongoDBMulti
Sep 05, 2023 | 11 min read