In my previous blog post["My First Go Microservice using MongoDB and Docker Multi-Stage Builds"][9], I created a Go microservice sample which exposes a REST http endpoint and saves the data received from an HTTP POST to a MongoDB database.
In this example, I decoupled the saving of data to MongoDB and created another microservice to handle this. I also added Kafka to serve as the messaging layer so the microservices can work on its own concerns asynchronously.
> In case you have time to watch, I recorded a walkthrough of this blog post in the[video below][1]:)
Here is the high-level architecture of this simple asynchronous processing example wtih 2 microservices.
Microservice 1- is a REST microservice which receives data from a /POST http call to it. After receiving the request, it retrieves the data from the http request and saves it to Kafka. After saving, it responds to the caller with the same data sent via /POST
Microservice 2- is a microservice which subscribes to a topic in Kafka where Microservice 1 saves the data. Once a message is consumed by the microservice, it then saves the data to MongoDB.
Before you proceed, we need a few things to be able to run these microservices:
1. [Download Kafka][2]- I used version kafka_2.11-1.1.0
2. Install[librdkafka][3]- Unfortunately, this library should be present in the target system
3. Install the[Kafka Go Client by Confluent][4]
4. Run MongoDB. You can check my[previous blog post][5]about this where I used a MongoDB docker image.
Let's get rolling!
Start Kafka first, you need Zookeeper running before you run the Kafka server. Here's how
Then run Kafka - I am using port 9092 to connect to Kafka. If you need to change the port, just configure it in config/server.properties. If you are just a beginner like me, I suggest to just use default ports for now.
Here is the code ofMicroservice 2. What is important in this code is the consumption from Kafka, the saving part I already discussed in my previous blog post. Here are the important parts of the code which consumes the data from Kafka.
[kafka-to-mongo/kafka-mongo-sample.go][11]
```
func main() {
//Create MongoDB session
session := initialiseMongo()
mongoStore.session = session
receiveFromKafka()
}
func receiveFromKafka() {
fmt.Println("Start receiving from Kafka")
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "group-id-1",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"jobs-topic1"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
job := string(msg.Value)
saveJobToMongo(job)
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
break
}
}
c.Close()
}
func saveJobToMongo(jobString string) {
fmt.Println("Save to MongoDB")
col := mongoStore.session.DB(database).C(collection)
//Save data into Job struct
var _job Job
b := []byte(jobString)
err := json.Unmarshal(b, &_job)
if err != nil {
panic(err)
}
//Insert job into MongoDB
errMongo := col.Insert(_job)
if errMongo != nil {
panic(errMongo)
}
fmt.Printf("Saved to MongoDB : %s", jobString)
}
```
Let's get down to the demo, run Microservice 1\. Make sure Kafka is running.
Since we are not running Microservice 2 yet, the data saved by Microservice 1 will just be in Kafka. Let's consume it and save to MongoDB by running Microservice 2.
```
$ go run kafka-mongo-sample.go
```
Now you'll see that Microservice 2 consumes the data and saves it to MongoDB
Shameless plug! If you like this blog post, please follow me in Twitter[@donvito][6]. I tweet about Docker, Kubernetes, GoLang, Cloud, DevOps, Agile and Startups. Would love to connect in[GitHub][7]and[LinkedIn][8]