Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Learn why MongoDB was selected as a leader in the 2024 Gartner® Magic Quadrant™
MongoDB Developer
Connectors
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Connectorschevron-right

Go to MongoDB Using Kafka Connectors - Ultimate Agent Guide

Pavel Duchovny7 min read • Published Feb 08, 2022 • Updated Sep 17, 2024
KafkaConnectorsGo
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Go is a modern language built on typed and native code compiling concepts while feeling and utilizing some benefits of dynamic languages. It is fairly simple to install and use, as it provides readable and robust code for many application use cases.
One of those use cases is building agents that report to a centralized data platform via streaming. A widely accepted approach is to communicate the agent data through subscription of distributed queues like Kafka. The Kafka topics can then propagate the data to many different sources, such as a MongoDB Atlas cluster.
Having a Go agent allows us to utilize the same code base for various operating systems, and the fact that it has good integration with JSON data and packages such as a MongoDB driver and Confluent Go Kafka Client makes it a compelling candidate for the presented use case.
This article will demo how file size data on a host is monitored from a cross-platform agent written in Golang via a Kafka cluster using a Confluent hosted sink connector to MongoDB Atlas. MongoDB Atlas stores the data in a time series collection. The MongoDB Charts product is a convenient way to show the gathered data to the user. Architecture Overview

Preparing the Golang project, Kafka cluster, and MongoDB Atlas

Configuring a Go project

Our agent is going to run Go. Therefore, you will need to install the Go language software on your host.
Once this step is done, we will create a Go module to begin our project in our working directory:
1go mod init example/main
Now we will need to add the Confluent Kafka dependency to our Golang project:
1go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

Configuring a Kafka cluster

Creating a Confluent Kafka Cluster is done via the Confluent UI. Start by creating a basic Kafka cluster in the Confluent Cloud. Once ready, create a topic to be used in the Kafka cluster. I created one named “files.”
Generate an api-key and api-secret to interact with this Kafka cluster. For the simplicity of this tutorial, I have selected the “Global Access” api-key. For production, it is recommended to give as minimum permissions as possible for the api-key used. Get a hold of the generated keys for future use.
Obtain the Kafka cluster connection string via Cluster Overview > Cluster Settings > Identification > Bootstrap server for future use. Basic clusters are open to the internet and in production, you will need to amend the access list for your specific hosts to connect to your cluster via advanced cluster ACLs.
Important: The Confluent connector requires that the Kafka cluster and the Atlas cluster are deployed in the same region.

Configuring Atlas project and cluster

Create a project and cluster or use an existing Atlas cluster in your project. Atlas Cluster Since we are using a time series collection, the clusters must use a 5.0+ version. Prepare your Atlas cluster for a Confluent sink Atlas connection. Inside your project’s access list, enable user and relevant IP addresses of your connector IPs. The access list IPs should be associated to the Atlas Sink Connector, which we will configure in a following section. Finally, get a hold of the Atlas connection string and the main cluster DNS. For more information about best securing and getting the relevant IPs from your Confluent connector, please read the following article: MongoDB Atlas Sink Connector for Confluent Cloud.

Adding agent main logic

Now that we have our Kafka cluster and Atlas clusters created and prepared, we can initialize our agent code by building a small main file that will monitor my ./files directory and capture the file names and sizes. I’ve added a file called test.txt with some data in it to bring it to ~200MB.
Let’s create a file named main.go and write a small logic that performs a constant loop with a 1 min sleep to walk through the files in the files folder:
1package main
2
3import (
4 "fmt"
5 "encoding/json"
6 "time"
7 "os"
8 "path/filepath"
9)
10
11type Message struct {
12 Name string
13 Size float64
14 Time int64
15}
16
17func samplePath (startPath string) error {
18 err := filepath.Walk(startPath,
19 func(path string, info os.FileInfo, err error) error {
20
21 var bytes int64
22 bytes = info.Size()
23
24
25 var kilobytes int64
26 kilobytes = (bytes / 1024)
27
28 var megabytes float64
29 megabytes = (float64)(kilobytes / 1024) // cast to type float64
30
31 var gigabytes float64
32 gigabytes = (megabytes / 1024)
33
34 now := time.Now().Unix()*1000
35
36
37
38
39 m := Message{info.Name(), gigabytes, now}
40 value, err := json.Marshal(m)
41
42
43 if err != nil {
44 panic(fmt.Sprintf("Failed to parse JSON: %s", err))
45 }
46
47
48 fmt.Printf("value: %v\n", string(value))
49 return nil;
50 })
51 if err != nil {
52 return err
53 }
54 return nil;
55}
56
57func main() {
58 for {
59 err := samplePath("./files");
60 if err != nil {
61 panic(fmt.Sprintf("Failed to run sample : %s", err))
62 }
63 time.Sleep(time.Minute)
64 }
65
66}
The above code simply imports helper modules to traverse the directories and for JSON documents out of the files found.
Since we need the data to be marked with the time of the sample, it is a great fit for time series data and therefore should eventually be stored in a time series collection on Atlas. If you want to learn more about time series collection and data, please read our article, MongoDB Time Series Data.
We can test this agent by running the following command:
1go run main.go
The agent will produce JSON documents similar to the following format:
1value: {"Name":"files","Size":0,"Time":1643881924000}
2value: {"Name":"test.txt","Size":0.185546875,"Time":1643881924000}

Creating a Confluent MongoDB connector for Kafka

Now we are going to create a Kafka Sink connector to write the data coming into the “files” topic to our Atlas Cluster’s time series collection.
Confluent Cloud has a very popular integration running MongoDB’s Kafka connector as a hosted solution integrated with their Kafka clusters. Follow these steps to initiate a connector deployment.
The following are the inputs provided to the connector: Connector Setup 1 Connector Setup 2 Connector Setup 3 Once you set it up, following the guide, you will eventually have a similar launch summary page: Connector Final Summary After provisioning every populated document into the files queue will be pushed to a time series collection hostMonitor.files where the date field is Time and metadata field is Name.

Pushing data to Kafka

Now let’s edit the main.go file to use a Kafka client and push each file measurement into the “files” queue.
Add the client library as an imported module:
1import (
2 "fmt"
3 "encoding/json"
4 "time"
5 "os"
6 "path/filepath"
7 "github.com/confluentinc/confluent-kafka-go/kafka"
8)
Add the Confluent cloud credentials and cluster DNS information. Replace <CONFLUENT-SERVER>:<CONFLUENT-PORT> found on the Kafka Cluster details page and the <ACCESS-KEY> , <SECRET-KEY> generated in the Kafka Cluster:
1const (
2 bootstrapServers = “<CONFLUENT-SERVER>:<CONFLUENT-PORT>"
3 ccloudAPIKey = "<ACCESS-KEY>"
4 ccloudAPISecret = "<SECRET-KEY>"
5)
The following code will initiate the producer and produce a message out of the marshaled JSON document:
1 topic := "files"
2 // Produce a new record to the topic...
3 producer, err := kafka.NewProducer(&kafka.ConfigMap{
4 "bootstrap.servers": bootstrapServers,
5 "sasl.mechanisms": "PLAIN",
6 "security.protocol": "SASL_SSL",
7 "sasl.username": ccloudAPIKey,
8 "sasl.password": ccloudAPISecret})
9
10 if err != nil {
11 panic(fmt.Sprintf("Failed to create producer: %s", err))
12 }
13
14 producer.Produce(&kafka.Message{
15 TopicPartition: kafka.TopicPartition{Topic: &topic,
16 Partition: kafka.PartitionAny},
17 Value: []byte(value)}, nil)
18
19 // Wait for delivery report
20 e := <-producer.Events()
21
22 message := e.(*kafka.Message)
23 if message.TopicPartition.Error != nil {
24 fmt.Printf("failed to deliver message: %v\n",
25 message.TopicPartition)
26 } else {
27 fmt.Printf("delivered to topic %s [%d] at offset %v\n",
28 *message.TopicPartition.Topic,
29 message.TopicPartition.Partition,
30 message.TopicPartition.Offset)
31 }
32
33 producer.Close()
The entire main.go file will look as follows:
1package main
2
3
4import (
5 "fmt"
6 "encoding/json"
7 "time"
8 "os"
9 "path/filepath"
10 "github.com/confluentinc/confluent-kafka-go/kafka")
11
12type Message struct {
13 Name string
14 Size float64
15 Time int64
16}
17
18
19const (
20 bootstrapServers = "<SERVER-DNS>:<SERVER-PORT>"
21 ccloudAPIKey = "<ACCESS-KEY>"
22 ccloudAPISecret = "<SECRET-KEY>"
23)
24
25func samplePath (startPath string) error {
26
27 err := filepath.Walk(startPath,
28 func(path string, info os.FileInfo, err error) error {
29 if err != nil {
30 return err
31 }
32 fmt.Println(path, info.Size())
33
34 var bytes int64
35 bytes = info.Size()
36
37
38 var kilobytes int64
39 kilobytes = (bytes / 1024)
40
41 var megabytes float64
42 megabytes = (float64)(kilobytes / 1024) // cast to type float64
43
44 var gigabytes float64
45 gigabytes = (megabytes / 1024)
46
47 now := time.Now().Unix()*1000
48
49
50
51
52 m := Message{info.Name(), gigabytes, now}
53 value, err := json.Marshal(m)
54
55
56 if err != nil {
57 panic(fmt.Sprintf("Failed to parse JSON: %s", err))
58 }
59
60
61 fmt.Printf("value: %v\n", string(value))
62
63 topic := "files"
64 // Produce a new record to the topic...
65 producer, err := kafka.NewProducer(&kafka.ConfigMap{
66 "bootstrap.servers": bootstrapServers,
67 "sasl.mechanisms": "PLAIN",
68 "security.protocol": "SASL_SSL",
69 "sasl.username": ccloudAPIKey,
70 "sasl.password": ccloudAPISecret})
71
72 if err != nil {
73 panic(fmt.Sprintf("Failed to create producer: %s", err))
74 }
75
76 producer.Produce(&kafka.Message{
77 TopicPartition: kafka.TopicPartition{Topic: &topic,
78 Partition: kafka.PartitionAny},
79 Value: []byte(value)}, nil)
80
81 // Wait for delivery report
82 e := <-producer.Events()
83
84 message := e.(*kafka.Message)
85 if message.TopicPartition.Error != nil {
86 fmt.Printf("failed to deliver message: %v\n",
87 message.TopicPartition)
88 } else {
89 fmt.Printf("delivered to topic %s [%d] at offset %v\n",
90 *message.TopicPartition.Topic,
91 message.TopicPartition.Partition,
92 message.TopicPartition.Offset)
93 }
94
95 producer.Close()
96
97 return nil;
98})
99if err != nil {
100 return err
101}
102 return nil;
103}
104
105func main() {
106 for {
107 err := samplePath("./files");
108 if err != nil {
109 panic(fmt.Sprintf("Failed to run sample : %s", err))
110 }
111 time.Sleep(time.Minute)
112
113 }
114
115}
Now when we run the agent while the Confluent Atlas sink connector is fully provisioned, we will see messages produced into the hostMonitor.files time series collection: Atlas Data

Analyzing the data using MongoDB Charts

To put our data into use, we can create some beautiful charts on top of the time series data. In a line graph, we configure the X axis to use the Time field, the Y axis to use the Size field, and the series to use the Name field. The following graph shows the colored lines represented as the evolution of the different file sizes over time. Line Graph Chart Now we have an agent and a fully functioning Charts dashboard to analyze growing files trends. This architecture allows big room for extensibility as the Go agent can have further functionalities, more subscribers can consume the monitored data and act upon it, and finally, MongoDB Atlas and Charts can be used by various applications and embedded to different platforms.

Wrap Up

Building Go applications is simple yet has big benefits in terms of performance, cross platform code, and a large number of supported libraries and clients. Adding MongoDB Atlas via a Confluent Cloud Kafka service makes the implementation a robust and extensible stack, streaming data and efficiently storing and presenting it to the end user via Charts.
In this tutorial, we have covered all the basics you need to know in order to start using Go, Kafka, and MongoDB Atlas in your next streaming projects.
Try MongoDB Atlas and Go today!

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Using AWS IAM Authentication with the MongoDB Connector for Apache Kafka


Jul 01, 2024 | 4 min read
Tutorial

How to Get Started with MongoDB Atlas Stream Processing and the HashiCorp Terraform MongoDB Atlas Provider


May 20, 2024 | 5 min read
Podcast

MongoDB Podcast Interview With Connectors and Translators Team


Sep 11, 2024 | 16 min
Tutorial

Deploying MongoDB Across Multiple Kubernetes Clusters With MongoDBMulti


Sep 05, 2023 | 11 min read
Table of Contents