In this post I will build a small event-driven system with Go. It will be serverless app with an API Gateway in front. I will use AWS EventBridge as message bus that serves as glue between our components and the app will be deployed using Serverless Stack .
Overview
Our overall goal is quite simple. We want to build a simple system where we can have things create events (producers) and other things receive those events (consumers). To tie the producers and consumers together we need a message bus. We will use AWS EventBridge for this purpose.
In this tutorial, we will build an order taking app. On the one side, orders are placed through a public API endpoint POST /orders
. The handler for that endpoint is a Lambda function that “creates” order by publishing them to EventBridge. It in turn is configured to route messages to our consumers, two Lambda functions. CreateInvoice and LogEvents will receive order.created
events and log them to standard out, just to showcase the broadcasting of events.
Target architecture
Our app is deployed to AWS using my new favorite tool: Serverless Stack (SST). It has great CDK constructs for quick development, and gives us hot reload of the Lambda functions. Great!
Project set up
First we need to bootstrap our project. The SST CLI will do most of the heavy lifting. Since most of our application code is Go, I use the starters/go-starter template.
npx create-sst@latest --template=starters/go-starter eventbus
cd eventbus
Our directory now looks as follows.
> tree -a -L 2
.
├── .env
├── .gitignore
├── .vscode
│ └── launch.json
├── package.json
├── services
│ ├── functions
│ │ └── lambda.go
│ ├── go.mod
│ └── go.sum
├── sst.json
├── stacks
│ ├── MyStack.ts
│ └── index.ts
├── tsconfig.json
└── vitest.config.ts
In this tutorial, we are interested in two parts
./stack/MyStack.ts
- This is where our SST/CDK stack is defined. It will define the infrastructure for API Gateway, EventBridge and our Lambda functions../services/functions/
- Here we define our Lambda functions. The go-starter template comes with a single function calledlambda.go
.
We also install Dave Cheney’s package go-spew. It is a Go package to dump things to the logs. I often use it to explore what data is passed around in a new service, in this case AWS EventBridge.
cd eventbus/services
go get github.com/davecgh/go-spew/spew
With everything in place, we can deploy our initial stack
npm install
npm start
Output:
...
✅ magnus-eventbus-MyStack
Stack magnus-eventbus-MyStack
Status: deployed
Outputs:
ApiEndpoint: https://c5fk9hc8kk.execute-api.us-east-1.amazonaws.com
==========================
Starting Live Lambda Dev
==========================
SST Console: https://console.sst.dev/eventbus/magnus/local
Debug session started. Listening for requests...
Consuming events
Now we can start building! First out we will get the event consuming to work. Our setup will look like this:
- A way of producing messages
- An event bus with routing rules
- A Lambda function to receive the events
Goal: produce events from the terminal, through EventBridge and consume in our Lambda function
First up, let’s create a serverless function that listens to events from the event bus. SST makes this easy for us! All we need to do is to replace the initial stack in ./stack/MyStack.ts
with the following:
import {EventBus, StackContext} from "@serverless-stack/resources";
export function Stack({stack}: StackContext) {
// 1. Create event bus
const bus = new EventBus(stack, "EventBus", {})
// 2. Add set up event routing rules
bus.addRules(stack, {
"order_created": {
pattern: {
detailType: ["order.created"],
},
targets: {
function: "functions/log_events.go"
}
}
})
// 3. Output the event bus name
stack.addOutputs({
BusName: bus.eventBusName
});
}
Let’s break this down.
- We create our event bus using
EventBus
from@serverless-stack/resources
. This is a higher order CDK construct that simplifies our set up. This create a new custom EventBridge bus. - Next, we add routing rules for events. This rule send any messages that have the
detailType
=="order.created"
and forwards them to all ourtargets
. For now, we only have a single function atfunctions/log_events.go
- Finally, we output the name of the created event.
We haven’t actually created our function yet. Create a new file called functions/log_events.go
.
package main
import (
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/davecgh/go-spew/spew"
)
func LogHandler(request events.CloudWatchEvent) error {
fmt.Printf("received event of type %q\n", request.DetailType)
spew.Dump(request)
return nil
}
func main() {
lambda.Start(LogHandler)
}
This function receives an event of type CloudWatchEvent (this is the event format sent over EventBridge) and logs it to standard out using go-spew
that we installed earlier. We will see it in action.
Testing the event consumer
Before we run our tests, make sure that the SST CLI is still running. If not, start it with npm start
in the root folder. Next, we create a file with a single event payload in a file called order_created.json
.
[
{
"Source": "terminal",
"Detail": "{ \"amount\": 999, \"line_items\": [] }",
"EventBusName": "magnus-eventbus-EventBus",
"DetailType": "order.created"
}
]
This is the minimal payload (I believe). Full definition here.
aws --region us-east-1 events put-events --entries file://order_created.json
You should see something like this to indicate that the publishing was successful:
{
"FailedEntryCount": 0,
"Entries": [
{
"EventId": "4d2d7ae8-57e4-5e17-97be-dcd98c22014b"
}
]
}
If you open the terminal with the SST CLI, you should see our the function functions/log_events.go
was invoked, and the whole payload was logged!
9e3818c0-5f28-449c-a4e5-f94fd2d24cd3 REQUEST magnus-eventbus-Stack-TargetEventBusordercreatedfu-O4LYRS80WS4K [functions/log_events.go] invoked
received event of type "order.created"
(events.CloudWatchEvent) {
Version: (string) (len=1) "0",
ID: (string) (len=36) "4d2d7ae8-57e4-5e17-97be-dcd98c22014b",
DetailType: (string) (len=13) "order.created",
Source: (string) (len=8) "terminal",
AccountID: (string) (len=12) "601855032707",
Time: (time.Time) 2022-06-26 13:10:29 +0000 UTC,
Region: (string) (len=9) "us-east-1",
Resources: ([]string) {
},
Detail: (json.RawMessage) (len=30 cap=32) {
00000000 7b 22 61 6d 6f 75 6e 74 22 3a 39 39 39 2c 22 6c |{"amount":999,"l|
00000010 69 6e 65 5f 69 74 65 6d 73 22 3a 5b 5d 7d |ine_items":[]}|
}
}
As you can see, this is mostly the same event as we sent from using aws events put-events
. The most interesting fields here are
DetailType
: order.created, this is the field that we use for routing.Detail
contains our payload. In Go, reprepsented asjson.RawMessage/[]byte
.
Consuming events is working! 🎉
Producing events
Now we can work on our event producing. We will work on the part of the left here.
Once again, SST makes this easy.
- We import
Api
from@serverless-stack/resources
. - Our route is defined in the props under
routers
, we point it to a new function defined in the filecreate_order.go
. - We add permission to publish to the event bus, and define the environment variable EVENT_BUS_NAME, needed when publishing events.
-import {EventBus, StackContext} from "@serverless-stack/resources";
+import {EventBus, Api, StackContext} from "@serverless-stack/resources";
export function Stack({stack}: StackContext) {
const bus = new EventBus(stack, "EventBus", {})
bus.addRules(stack, {
"order_created": {
pattern: {
detailType: ["order.created"],
},
targets: {
function: "functions/log_events.go"
}
}
})
+ const api = new Api(stack, "api", {
+ routes: {
+ "POST /order": "functions/create_order.go",
+ },
+ defaults: {
+ function: {
+ permissions: [bus],
+ environment: {
+ EVENT_BUS_NAME: bus.eventBusName
+ }
+ },
+
+ },
+ });
+
stack.addOutputs({
+ ApiEndpoint: api.url,
BusName: bus.eventBusName
});
}
Next we define our new function. Create a new file called functions/create_order.go
.
package main
import (
"context"
"encoding/json"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchevents"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchevents/types"
"os"
)
var busName = os.Getenv("EVENT_BUS_NAME")
type CreateOrderRequest struct {
Amount int64 `json:"amount"`
LineItems []string `json:"line_items"`
}
func CreateOrderHandler(request events.APIGatewayProxyRequest) error {
// 1. Unmarshal request
var req CreateOrderRequest
if err := json.Unmarshal([]byte(request.Body), &req); err != nil {
return err
}
// 2. Set up EventBridge client
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return err
}
client := cloudwatchevents.NewFromConfig(cfg)
// 3. Marshal order into JSON again
b, err := json.Marshal(req)
if err != nil {
return err
}
// 4. Publish events to EventBridge
_, err = client.PutEvents(context.Background(),
&cloudwatchevents.PutEventsInput{
Entries: []types.PutEventsRequestEntry{
{
EventBusName: aws.String(busName),
Source: aws.String("create_order_fn"),
DetailType: aws.String("order.created"),
Detail: aws.String(string(b)),
},
},
})
if err != nil {
return err
}
return nil
}
func main() {
lambda.Start(CreateOrderHandler)
}
Here we do the following:
- Unmarshal the request body from the API gateway into
CreateOrderRequest
. - Create an EventBridge client
- Marshal the JSON we want to send in our event
- Send our event to EventBridge
Note:
We could probably get away with creating a global client and re-use that between requests. During testing, I got an authentication error due to stale credentials. This might only be a test environment problem, but for now we will create the client on every request.
Once again, let SST deploy all updates.
Testing the event producer
Now we are ready to test our new event producer. There are at least two ways of doing this.
- Use curl, or your favorite HTTP client and make the request. Remember to update the API gateway URL.
POST https://4ty44qcuya.execute-api.us-east-1.amazonaws.com/order
Content-Type: application/json
{
"amount": 999,
"line_items": ["food", "shelter"]
}
- Or, the option I used: go to the SST console and make the request.
Going back to the terminal, we see the result:
# 1. create_order invoked via the API gateway
e31bc6c6-fe85-4e92-b30d-050bf33f0498 REQUEST magnus-eventbus-Stack-apiLambdaPOSTorderCF2979AB-jZgSzQgXMeRR [functions/create_order.go] invoked by API POST /order
e31bc6c6-fe85-4e92-b30d-050bf33f0498 RESPONSE null
# 2. log_events.go invoked (by event from event bus)
ea5ecb52-90f0-416a-aa69-1645ab6b965e REQUEST magnus-eventbus-Stack-TargetEventBusordercreatedfu-O4LYRS80WS4K [functions/log_events.go] invoked
# 3. Log invocation
received event of type "order.created"
(events.CloudWatchEvent) {
Version: (string) (len=1) "0",
ID: (string) (len=36) "c3a5c355-f1b0-b642-a957-48dec75c8f8d",
DetailType: (string) (len=13) "order.created",
Source: (string) (len=15) "create_order_fn",
AccountID: (string) (len=12) "601855032707",
Time: (time.Time) 2022-06-28 20:46:55 +0000 UTC,
Region: (string) (len=9) "us-east-1",
Resources: ([]string) {
},
Detail: (json.RawMessage) (len=46 cap=48) {
00000000 7b 22 61 6d 6f 75 6e 74 22 3a 39 39 39 2c 22 6c |{"amount":999,"l|
00000010 69 6e 65 5f 69 74 65 6d 73 22 3a 5b 22 66 6f 6f |ine_items":["foo|
00000020 64 22 2c 22 73 68 65 6c 74 65 72 22 5d 7d |d","shelter"]}|
}
}
ea5ecb52-90f0-416a-aa69-1645ab6b965e RESPONSE null
Not only did we test event producing, but we ended up consuming the events as well! 🍾
Adding a new consumer
Now, imagine the following scenario. The accounting department contacts us:
We are happy that we have started receiving a lot of orders in our system, but where are the corresponding invoices?
We have forgotten our second event consumer, CreateInvoice
😱.
Lucky for us, one of the benefit of event based systems with a message bus, is that it (should be) easy to add a new consumers to the system, based on new requirements. First, we create our new function as functions/create_invoice.go
:
package main
import (
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
type OrderCreatedEvent struct {
Amount int64 `json:"amount"`
LineItems []string `json:"line_items"`
}
func CreateInvoiceHandler(request events.CloudWatchEvent) error {
var event OrderCreatedEvent
if err := json.Unmarshal(request.Detail, &event); err != nil {
return err
}
fmt.Printf("received event of type %q\n", request.DetailType)
fmt.Println("creating invoice for event", event)
return nil
}
func main() {
lambda.Start(CreateInvoiceHandler)
}
Then we add it as a target for our stack:
bus.addRules(stack, {
"order_created": {
pattern: {
detailType: ["order.created"],
},
targets: {
- function: "functions/log_events.go"
+ function: "functions/log_events.go",
+ invoiceFunction: "functions/create_invoice.go",
}
}
})
Now all we need to do is to create a new order
# 1. create_order invoked via the API gateway
60647f24-3e36-438c-964e-e6644e472f84 REQUEST magnus-eventbus-Stack-apiLambdaPOSTorderCF2979AB-jZgSzQgXMeRR [functions/create.go] invoked by API POST /order
60647f24-3e36-438c-964e-e6644e472f84 RESPONSE null
# 2. create_invoice.go invoked (by event from event bus)
5b939ded-8715-4ad4-90e4-775d783893b4 REQUEST magnus-eventbus-Stack-TargetEventBusordercreatedin-awtbjGrQeKcw [functions/create_invoice.go] invoked
# 3. log_events.go invoked (by event from event bus)
525f7e61-e03a-4a31-a58b-4d9e9593bc91 REQUEST magnus-eventbus-Stack-TargetEventBusordercreatedfu-O4LYRS80WS4K [functions/log_events.go] invoked
received event of type "order.created"
...
525f7e61-e03a-4a31-a58b-4d9e9593bc91 RESPONSE null
# 4. Our invoice
received event of type "order.created"
creating invoice for event {999 [food shelter]}
5b939ded-8715-4ad4-90e4-775d783893b4 RESPONSE null
If we forward this log to the accounting department, they will surely be happy!
Conclusion
We have built an event-based system using Go and AWS services. SST helps us quickly set up the infrastructure, and simplifies things like IAM permissions and propagating environment variables.
There are a few improvements we could make to our system. One would be to put SQS queues between EventBridge and our event consuming Lambda function. This would make retries possible if the functions are unavailable for some reason. Another improvement could be to let our event consumers actually do something useful, like storing data to a DynamoDB table or sending e-mail notifications using SNS. But that’s for another time.
I hope you enjoyed this article. Feel free to reach out to me at @wahlstra.
Resources
- AWS Lambda function handler in Go
- YouTube: SST Weekly: Event Bus - inspiration for this video