Go to MongoDB Using Kafka Connectors - Ultimate Agent Guide
Rate this tutorial
Go is a modern language built on typed and native code compiling concepts while feeling and utilizing some benefits of dynamic languages. It is fairly simple to install and use, as it provides readable and robust code for many application use cases.
One of those use cases is building agents that report to a centralized data platform via streaming. A widely accepted approach is to communicate the agent data through subscription of distributed queues like Kafka. The Kafka topics can then propagate the data to many different sources, such as a MongoDB Atlas cluster.
Having a Go agent allows us to utilize the same code base for various operating systems, and the fact that it has good integration with JSON data and packages such as a MongoDB driver and Confluent Go Kafka Client makes it a compelling candidate for the presented use case.
This article will demo how file size data on a host is monitored from a cross-platform agent written in Golang via a Kafka cluster using a Confluent hosted sink connector to MongoDB Atlas. MongoDB Atlas stores the data in a time series collection. The MongoDB Charts product is a convenient way to show the gathered data to the user.
Our agent is going to run Go. Therefore, you will need to install the Go language software on your host.
Once this step is done, we will create a Go module to begin our project in our working directory:
1 go mod init example/main
Now we will need to add the Confluent Kafka dependency to our Golang project:
1 go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
Creating a Confluent Kafka Cluster is done via the Confluent UI. Start by creating a basic Kafka cluster in the Confluent Cloud. Once ready, create a topic to be used in the Kafka cluster. I created one named “files.”
Generate an api-key and api-secret to interact with this Kafka cluster. For the simplicity of this tutorial, I have selected the “Global Access” api-key. For production, it is recommended to give as minimum permissions as possible for the api-key used. Get a hold of the generated keys for future use.
Obtain the Kafka cluster connection string via Cluster Overview > Cluster Settings > Identification > Bootstrap server for future use. Basic clusters are open to the internet and in production, you will need to amend the access list for your specific hosts to connect to your cluster via advanced cluster ACLs.
Important: The Confluent connector requires that the Kafka cluster and the Atlas cluster are deployed in the same region.
Create a project and cluster or use an existing Atlas cluster in your project.
Since we are using a time series collection, the clusters must use a 5.0+ version. Prepare your Atlas cluster for a Confluent sink Atlas connection. Inside your project’s access list, enable user and relevant IP addresses of your connector IPs. The access list IPs should be associated to the Atlas Sink Connector, which we will configure in a following section. Finally, get a hold of the Atlas connection string and the main cluster DNS. For more information about best securing and getting the relevant IPs from your Confluent connector, please read the following article: MongoDB Atlas Sink Connector for Confluent Cloud.
Now that we have our Kafka cluster and Atlas clusters created and prepared, we can initialize our agent code by building a small main file that will monitor my
./files
directory and capture the file names and sizes. I’ve added a file called test.txt
with some data in it to bring it to ~200MB.Let’s create a file named
main.go
and write a small logic that performs a constant loop with a 1 min sleep to walk through the files in the files
folder:1 package main 2 3 import ( 4 "fmt" 5 "encoding/json" 6 "time" 7 "os" 8 "path/filepath" 9 ) 10 11 type Message struct { 12 Name string 13 Size float64 14 Time int64 15 } 16 17 func samplePath (startPath string) error { 18 err := filepath.Walk(startPath, 19 func(path string, info os.FileInfo, err error) error { 20 21 var bytes int64 22 bytes = info.Size() 23 24 25 var kilobytes int64 26 kilobytes = (bytes / 1024) 27 28 var megabytes float64 29 megabytes = (float64)(kilobytes / 1024) // cast to type float64 30 31 var gigabytes float64 32 gigabytes = (megabytes / 1024) 33 34 now := time.Now().Unix()*1000 35 36 37 38 39 m := Message{info.Name(), gigabytes, now} 40 value, err := json.Marshal(m) 41 42 43 if err != nil { 44 panic(fmt.Sprintf("Failed to parse JSON: %s", err)) 45 } 46 47 48 fmt.Printf("value: %v\n", string(value)) 49 return nil; 50 }) 51 if err != nil { 52 return err 53 } 54 return nil; 55 } 56 57 func main() { 58 for { 59 err := samplePath("./files"); 60 if err != nil { 61 panic(fmt.Sprintf("Failed to run sample : %s", err)) 62 } 63 time.Sleep(time.Minute) 64 } 65 66 }
The above code simply imports helper modules to traverse the directories and for JSON documents out of the files found.
Since we need the data to be marked with the time of the sample, it is a great fit for time series data and therefore should eventually be stored in a time series collection on Atlas. If you want to learn more about time series collection and data, please read our article, MongoDB Time Series Data.
We can test this agent by running the following command:
1 go run main.go
The agent will produce JSON documents similar to the following format:
1 value: {"Name":"files","Size":0,"Time":1643881924000} 2 value: {"Name":"test.txt","Size":0.185546875,"Time":1643881924000}
Now we are going to create a Kafka Sink connector to write the data coming into the “files” topic to our Atlas Cluster’s time series collection.
Confluent Cloud has a very popular integration running MongoDB’s Kafka connector as a hosted solution integrated with their Kafka clusters. Follow these steps to initiate a connector deployment.
The following are the inputs provided to the connector:
Once you set it up, following the guide, you will eventually have a similar launch summary page:
After provisioning every populated document into the
files
queue will be pushed to a time series collection hostMonitor.files
where the date field is Time
and metadata field is Name
.Now let’s edit the
main.go
file to use a Kafka client and push each file measurement into the “files” queue.Add the client library as an imported module:
1 import ( 2 "fmt" 3 "encoding/json" 4 "time" 5 "os" 6 "path/filepath" 7 "github.com/confluentinc/confluent-kafka-go/kafka" 8 )
Add the Confluent cloud credentials and cluster DNS information. Replace
<CONFLUENT-SERVER>:<CONFLUENT-PORT>
found on the Kafka Cluster details page and the <ACCESS-KEY>
, <SECRET-KEY>
generated in the Kafka Cluster:1 const ( 2 bootstrapServers = “<CONFLUENT-SERVER>:<CONFLUENT-PORT>" 3 ccloudAPIKey = "<ACCESS-KEY>" 4 ccloudAPISecret = "<SECRET-KEY>" 5 )
The following code will initiate the producer and produce a message out of the marshaled JSON document:
1 topic := "files" 2 // Produce a new record to the topic... 3 producer, err := kafka.NewProducer(&kafka.ConfigMap{ 4 "bootstrap.servers": bootstrapServers, 5 "sasl.mechanisms": "PLAIN", 6 "security.protocol": "SASL_SSL", 7 "sasl.username": ccloudAPIKey, 8 "sasl.password": ccloudAPISecret}) 9 10 if err != nil { 11 panic(fmt.Sprintf("Failed to create producer: %s", err)) 12 } 13 14 producer.Produce(&kafka.Message{ 15 TopicPartition: kafka.TopicPartition{Topic: &topic, 16 Partition: kafka.PartitionAny}, 17 Value: []byte(value)}, nil) 18 19 // Wait for delivery report 20 e := <-producer.Events() 21 22 message := e.(*kafka.Message) 23 if message.TopicPartition.Error != nil { 24 fmt.Printf("failed to deliver message: %v\n", 25 message.TopicPartition) 26 } else { 27 fmt.Printf("delivered to topic %s [%d] at offset %v\n", 28 *message.TopicPartition.Topic, 29 message.TopicPartition.Partition, 30 message.TopicPartition.Offset) 31 } 32 33 producer.Close()
The entire
main.go
file will look as follows:1 package main 2 3 4 import ( 5 "fmt" 6 "encoding/json" 7 "time" 8 "os" 9 "path/filepath" 10 "github.com/confluentinc/confluent-kafka-go/kafka") 11 12 type Message struct { 13 Name string 14 Size float64 15 Time int64 16 } 17 18 19 const ( 20 bootstrapServers = "<SERVER-DNS>:<SERVER-PORT>" 21 ccloudAPIKey = "<ACCESS-KEY>" 22 ccloudAPISecret = "<SECRET-KEY>" 23 ) 24 25 func samplePath (startPath string) error { 26 27 err := filepath.Walk(startPath, 28 func(path string, info os.FileInfo, err error) error { 29 if err != nil { 30 return err 31 } 32 fmt.Println(path, info.Size()) 33 34 var bytes int64 35 bytes = info.Size() 36 37 38 var kilobytes int64 39 kilobytes = (bytes / 1024) 40 41 var megabytes float64 42 megabytes = (float64)(kilobytes / 1024) // cast to type float64 43 44 var gigabytes float64 45 gigabytes = (megabytes / 1024) 46 47 now := time.Now().Unix()*1000 48 49 50 51 52 m := Message{info.Name(), gigabytes, now} 53 value, err := json.Marshal(m) 54 55 56 if err != nil { 57 panic(fmt.Sprintf("Failed to parse JSON: %s", err)) 58 } 59 60 61 fmt.Printf("value: %v\n", string(value)) 62 63 topic := "files" 64 // Produce a new record to the topic... 65 producer, err := kafka.NewProducer(&kafka.ConfigMap{ 66 "bootstrap.servers": bootstrapServers, 67 "sasl.mechanisms": "PLAIN", 68 "security.protocol": "SASL_SSL", 69 "sasl.username": ccloudAPIKey, 70 "sasl.password": ccloudAPISecret}) 71 72 if err != nil { 73 panic(fmt.Sprintf("Failed to create producer: %s", err)) 74 } 75 76 producer.Produce(&kafka.Message{ 77 TopicPartition: kafka.TopicPartition{Topic: &topic, 78 Partition: kafka.PartitionAny}, 79 Value: []byte(value)}, nil) 80 81 // Wait for delivery report 82 e := <-producer.Events() 83 84 message := e.(*kafka.Message) 85 if message.TopicPartition.Error != nil { 86 fmt.Printf("failed to deliver message: %v\n", 87 message.TopicPartition) 88 } else { 89 fmt.Printf("delivered to topic %s [%d] at offset %v\n", 90 *message.TopicPartition.Topic, 91 message.TopicPartition.Partition, 92 message.TopicPartition.Offset) 93 } 94 95 producer.Close() 96 97 return nil; 98 }) 99 if err != nil { 100 return err 101 } 102 return nil; 103 } 104 105 func main() { 106 for { 107 err := samplePath("./files"); 108 if err != nil { 109 panic(fmt.Sprintf("Failed to run sample : %s", err)) 110 } 111 time.Sleep(time.Minute) 112 113 } 114 115 }
Now when we run the agent while the Confluent Atlas sink connector is fully provisioned, we will see messages produced into the
hostMonitor.files
time series collection:
To put our data into use, we can create some beautiful charts on top of the time series data. In a line graph, we configure the X axis to use the Time field, the Y axis to use the Size field, and the series to use the Name field. The following graph shows the colored lines represented as the evolution of the different file sizes over time.
Now we have an agent and a fully functioning Charts dashboard to analyze growing files trends. This architecture allows big room for extensibility as the Go agent can have further functionalities, more subscribers can consume the monitored data and act upon it, and finally, MongoDB Atlas and Charts can be used by various applications and embedded to different platforms.
Building Go applications is simple yet has big benefits in terms of performance, cross platform code, and a large number of supported libraries and clients. Adding MongoDB Atlas via a Confluent Cloud Kafka service makes the implementation a robust and extensible stack, streaming data and efficiently storing and presenting it to the end user via Charts.
In this tutorial, we have covered all the basics you need to know in order to start using Go, Kafka, and MongoDB Atlas in your next streaming projects.