Skip to content

Event-based apps with Go and EventBridge

June 26, 2022

In this post I will build a small event-driven system with Go. It will be serverless app with an API Gateway in front. I will use AWS EventBridge as message bus that serves as glue between our components and the app will be deployed using Serverless Stack .

Event-based system

Overview

Our overall goal is quite simple. We want to build a simple system where we can have things create events (producers) and other things receive those events (consumers). To tie the producers and consumers together we need a message bus. We will use AWS EventBridge for this purpose.

In this tutorial, we will build an order taking app. On the one side, orders are placed through a public API endpoint POST /orders. The handler for that endpoint is a Lambda function that “creates” order by publishing them to EventBridge. It in turn is configured to route messages to our consumers, two Lambda functions. CreateInvoice and LogEvents will receive order.created events and log them to standard out, just to showcase the broadcasting of events.

overview

Target architecture

Our app is deployed to AWS using my new favorite tool: Serverless Stack (SST). It has great CDK constructs for quick development, and gives us hot reload of the Lambda functions. Great!

Project set up

First we need to bootstrap our project. The SST CLI will do most of the heavy lifting. Since most of our application code is Go, I use the starters/go-starter template.

npx create-sst@latest --template=starters/go-starter eventbus
cd eventbus

Our directory now looks as follows.

> tree -a -L 2
.
├── .env
├── .gitignore
├── .vscode
│   └── launch.json
├── package.json
├── services
│   ├── functions
│   │  └── lambda.go
│   ├── go.mod
│   └── go.sum
├── sst.json
├── stacks
│   ├── MyStack.ts
│   └── index.ts
├── tsconfig.json
└── vitest.config.ts

In this tutorial, we are interested in two parts

We also install Dave Cheney’s package go-spew. It is a Go package to dump things to the logs. I often use it to explore what data is passed around in a new service, in this case AWS EventBridge.

cd eventbus/services
go get github.com/davecgh/go-spew/spew

With everything in place, we can deploy our initial stack

npm install
npm start

Output:

...

 ✅  magnus-eventbus-MyStack


Stack magnus-eventbus-MyStack
  Status: deployed
  Outputs:
    ApiEndpoint: https://c5fk9hc8kk.execute-api.us-east-1.amazonaws.com


==========================
 Starting Live Lambda Dev
==========================

SST Console: https://console.sst.dev/eventbus/magnus/local
Debug session started. Listening for requests...

Consuming events

Now we can start building! First out we will get the event consuming to work. Our setup will look like this:

  1. A way of producing messages
  2. An event bus with routing rules
  3. A Lambda function to receive the events

consume

Goal: produce events from the terminal, through EventBridge and consume in our Lambda function

First up, let’s create a serverless function that listens to events from the event bus. SST makes this easy for us! All we need to do is to replace the initial stack in ./stack/MyStack.ts with the following:

import {EventBus, StackContext} from "@serverless-stack/resources";

export function Stack({stack}: StackContext) {
    // 1. Create event bus
    const bus = new EventBus(stack, "EventBus", {})

    // 2. Add set up event routing rules
    bus.addRules(stack, {
        "order_created": {
            pattern: {
                detailType: ["order.created"],
            },
            targets: {
                function: "functions/log_events.go"
            }
        }
    })

	// 3. Output the event bus name
    stack.addOutputs({
        BusName: bus.eventBusName
    });
}

Let’s break this down.

  1. We create our event bus using EventBus from @serverless-stack/resources. This is a higher order CDK construct that simplifies our set up. This create a new custom EventBridge bus.
  2. Next, we add routing rules for events. This rule send any messages that have the detailType == "order.created" and forwards them to all our targets. For now, we only have a single function at functions/log_events.go
  3. Finally, we output the name of the created event.

We haven’t actually created our function yet. Create a new file called functions/log_events.go.

package main

import (
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/davecgh/go-spew/spew"
)

func LogHandler(request events.CloudWatchEvent) error {
	fmt.Printf("received event of type %q\n", request.DetailType)
	spew.Dump(request)
	return nil
}

func main() {
	lambda.Start(LogHandler)
}

This function receives an event of type CloudWatchEvent (this is the event format sent over EventBridge) and logs it to standard out using go-spew that we installed earlier. We will see it in action.

Testing the event consumer

Before we run our tests, make sure that the SST CLI is still running. If not, start it with npm start in the root folder. Next, we create a file with a single event payload in a file called order_created.json.

[
  {
    "Source": "terminal",
    "Detail": "{ \"amount\": 999, \"line_items\": [] }",
    "EventBusName": "magnus-eventbus-EventBus",
    "DetailType": "order.created"
  }
]

This is the minimal payload (I believe). Full definition here.

aws --region us-east-1 events put-events --entries file://order_created.json

You should see something like this to indicate that the publishing was successful:

{
    "FailedEntryCount": 0,
    "Entries": [
        {
            "EventId": "4d2d7ae8-57e4-5e17-97be-dcd98c22014b"
        }
    ]
}

If you open the terminal with the SST CLI, you should see our the function functions/log_events.go was invoked, and the whole payload was logged!

9e3818c0-5f28-449c-a4e5-f94fd2d24cd3 REQUEST magnus-eventbus-Stack-TargetEventBusordercreatedfu-O4LYRS80WS4K [functions/log_events.go] invoked

received event of type "order.created"
(events.CloudWatchEvent) {
 Version: (string) (len=1) "0",
 ID: (string) (len=36) "4d2d7ae8-57e4-5e17-97be-dcd98c22014b",
 DetailType: (string) (len=13) "order.created",
 Source: (string) (len=8) "terminal",
 AccountID: (string) (len=12) "601855032707",
 Time: (time.Time) 2022-06-26 13:10:29 +0000 UTC,
 Region: (string) (len=9) "us-east-1",
 Resources: ([]string) {
 },
 Detail: (json.RawMessage) (len=30 cap=32) {
  00000000  7b 22 61 6d 6f 75 6e 74  22 3a 39 39 39 2c 22 6c  |{"amount":999,"l|
  00000010  69 6e 65 5f 69 74 65 6d  73 22 3a 5b 5d 7d        |ine_items":[]}|
 }
}

As you can see, this is mostly the same event as we sent from using aws events put-events. The most interesting fields here are

Consuming events is working! 🎉

Producing events

Now we can work on our event producing. We will work on the part of the left here.

consume

Once again, SST makes this easy.

  1. We import Api from @serverless-stack/resources.
  2. Our route is defined in the props under routers, we point it to a new function defined in the file create_order.go.
  3. We add permission to publish to the event bus, and define the environment variable EVENT_BUS_NAME, needed when publishing events.
-import {EventBus, StackContext} from "@serverless-stack/resources";
+import {EventBus, Api, StackContext} from "@serverless-stack/resources";

 export function Stack({stack}: StackContext) {
    const bus = new EventBus(stack, "EventBus", {})

    bus.addRules(stack, {
        "order_created": {
            pattern: {
                detailType: ["order.created"],
            },
            targets: {
                function: "functions/log_events.go"
            }
        }
    })

+    const api = new Api(stack, "api", {
+        routes: {
+            "POST /order": "functions/create_order.go",
+        },
+        defaults: {
+            function: {
+                permissions: [bus],
+                environment: {
+                    EVENT_BUS_NAME: bus.eventBusName
+                }
+            },
+
+        },
+    });
+
     stack.addOutputs({
+        ApiEndpoint: api.url,
         BusName: bus.eventBusName
     });
 }

Next we define our new function. Create a new file called functions/create_order.go.

package main

import (
	"context"
	"encoding/json"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/cloudwatchevents"
	"github.com/aws/aws-sdk-go-v2/service/cloudwatchevents/types"
	"os"
)

var busName = os.Getenv("EVENT_BUS_NAME")

type CreateOrderRequest struct {
	Amount    int64    `json:"amount"`
	LineItems []string `json:"line_items"`
}

func CreateOrderHandler(request events.APIGatewayProxyRequest) error {
	// 1. Unmarshal request
	var req CreateOrderRequest
	if err := json.Unmarshal([]byte(request.Body), &req); err != nil {
		return err
	}

	// 2. Set up EventBridge client
	cfg, err := config.LoadDefaultConfig(context.TODO())
	if err != nil {
		return err
	}
	client := cloudwatchevents.NewFromConfig(cfg)

	// 3. Marshal order into JSON again
	b, err := json.Marshal(req)
	if err != nil {
		return err
	}

	// 4. Publish events to EventBridge
	_, err = client.PutEvents(context.Background(),
		&cloudwatchevents.PutEventsInput{
			Entries: []types.PutEventsRequestEntry{
				{
					EventBusName: aws.String(busName),
					Source:       aws.String("create_order_fn"),

					DetailType: aws.String("order.created"),
					Detail:     aws.String(string(b)),
				},
			},
		})
	if err != nil {
		return err
	}
	return nil
}

func main() {
	lambda.Start(CreateOrderHandler)
}

Here we do the following:

  1. Unmarshal the request body from the API gateway into CreateOrderRequest.
  2. Create an EventBridge client
  3. Marshal the JSON we want to send in our event
  4. Send our event to EventBridge

Note:

We could probably get away with creating a global client and re-use that between requests. During testing, I got an authentication error due to stale credentials. This might only be a test environment problem, but for now we will create the client on every request.

Once again, let SST deploy all updates.

Testing the event producer

Now we are ready to test our new event producer. There are at least two ways of doing this.

POST https://4ty44qcuya.execute-api.us-east-1.amazonaws.com/order
Content-Type: application/json

{
  "amount": 999,
  "line_items": ["food", "shelter"]
}

Going back to the terminal, we see the result:

# 1. create_order invoked via the API gateway
e31bc6c6-fe85-4e92-b30d-050bf33f0498 REQUEST magnus-eventbus-Stack-apiLambdaPOSTorderCF2979AB-jZgSzQgXMeRR [functions/create_order.go] invoked by API POST /order
e31bc6c6-fe85-4e92-b30d-050bf33f0498 RESPONSE null

# 2. log_events.go invoked (by event from event bus)
ea5ecb52-90f0-416a-aa69-1645ab6b965e REQUEST magnus-eventbus-Stack-TargetEventBusordercreatedfu-O4LYRS80WS4K [functions/log_events.go] invoked

# 3. Log invocation
received event of type "order.created"
(events.CloudWatchEvent) {
 Version: (string) (len=1) "0",
 ID: (string) (len=36) "c3a5c355-f1b0-b642-a957-48dec75c8f8d",
 DetailType: (string) (len=13) "order.created",
 Source: (string) (len=15) "create_order_fn",
 AccountID: (string) (len=12) "601855032707",
 Time: (time.Time) 2022-06-28 20:46:55 +0000 UTC,
 Region: (string) (len=9) "us-east-1",
 Resources: ([]string) {
 },
 Detail: (json.RawMessage) (len=46 cap=48) {
  00000000  7b 22 61 6d 6f 75 6e 74  22 3a 39 39 39 2c 22 6c  |{"amount":999,"l|
  00000010  69 6e 65 5f 69 74 65 6d  73 22 3a 5b 22 66 6f 6f  |ine_items":["foo|
  00000020  64 22 2c 22 73 68 65 6c  74 65 72 22 5d 7d        |d","shelter"]}|
 }
}
ea5ecb52-90f0-416a-aa69-1645ab6b965e RESPONSE null

Not only did we test event producing, but we ended up consuming the events as well! 🍾

Adding a new consumer

Now, imagine the following scenario. The accounting department contacts us:

We are happy that we have started receiving a lot of orders in our system, but where are the corresponding invoices?

We have forgotten our second event consumer, CreateInvoice 😱.

Lucky for us, one of the benefit of event based systems with a message bus, is that it (should be) easy to add a new consumers to the system, based on new requirements. First, we create our new function as functions/create_invoice.go:

package main

import (
	"encoding/json"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type OrderCreatedEvent struct {
	Amount    int64    `json:"amount"`
	LineItems []string `json:"line_items"`
}

func CreateInvoiceHandler(request events.CloudWatchEvent) error {
	var event OrderCreatedEvent
	if err := json.Unmarshal(request.Detail, &event); err != nil {
		return err
	}

	fmt.Printf("received event of type %q\n", request.DetailType)
	fmt.Println("creating invoice for event", event)
	return nil
}

func main() {
	lambda.Start(CreateInvoiceHandler)
}

Then we add it as a target for our stack:

    bus.addRules(stack, {
        "order_created": {
            pattern: {
                detailType: ["order.created"],
            },
            targets: {
-                function: "functions/log_events.go"
+                function: "functions/log_events.go",
+                invoiceFunction: "functions/create_invoice.go",
            }
        }
    })

Now all we need to do is to create a new order

# 1. create_order invoked via the API gateway
60647f24-3e36-438c-964e-e6644e472f84 REQUEST magnus-eventbus-Stack-apiLambdaPOSTorderCF2979AB-jZgSzQgXMeRR [functions/create.go] invoked by API POST /order
60647f24-3e36-438c-964e-e6644e472f84 RESPONSE null

# 2. create_invoice.go invoked (by event from event bus)
5b939ded-8715-4ad4-90e4-775d783893b4 REQUEST magnus-eventbus-Stack-TargetEventBusordercreatedin-awtbjGrQeKcw [functions/create_invoice.go] invoked

# 3. log_events.go invoked (by event from event bus)
525f7e61-e03a-4a31-a58b-4d9e9593bc91 REQUEST magnus-eventbus-Stack-TargetEventBusordercreatedfu-O4LYRS80WS4K [functions/log_events.go] invoked
received event of type "order.created"
...
525f7e61-e03a-4a31-a58b-4d9e9593bc91 RESPONSE null

# 4. Our invoice
received event of type "order.created"
creating invoice for event {999 [food shelter]}
5b939ded-8715-4ad4-90e4-775d783893b4 RESPONSE null

If we forward this log to the accounting department, they will surely be happy!

Conclusion

We have built an event-based system using Go and AWS services. SST helps us quickly set up the infrastructure, and simplifies things like IAM permissions and propagating environment variables.

There are a few improvements we could make to our system. One would be to put SQS queues between EventBridge and our event consuming Lambda function. This would make retries possible if the functions are unavailable for some reason. Another improvement could be to let our event consumers actually do something useful, like storing data to a DynamoDB table or sending e-mail notifications using SNS. But that’s for another time.

I hope you enjoyed this article. Feel free to reach out to me at @wahlstra.

Resources