Skip to content

Google Pub/Sub with dead letter queues

April 23, 2022

Today we will build a simple system with Google Pub/Sub to explore the basic components needed for an asynchronous publisher-subscriber pattern. We will use Terraform for creating the infrastucture, and go for the application code. All code here.

At the end of this post, you should know

Background

Asynchronous communication and event driven systems are common in modern software development. Most cloud providers offer such messages services these use cases.

One common pattern is the publisher-subscriber pattern where (usually) one publisher publishes message to a certain topic, and one or many subscribers listens for these events. The publisher does not know who is subscribing, and in fact, does not know if there are any subscribers at all. This can be used as glue between your own services and those of the cloud provider.

Dead letter queues (DLQ)

A problem in messaging systems is “what do you do with bad messages?“. Let’s say a bad producer publishes a message that the consumer can’t parse, or the consumer has lost the connection to its datastore. We have at least two options:

Discard the messages - with this option, we risk losing data that is actually valid.

Retry forever - with this we might also ultimately lose data, if the messaging service fills up. This could also cause increase latency in the system.

A third option is a dead letter queue (DLQ). The main purpose of a dead letter queue is to handle messages that cannot be handled by the consumer. After a certain amount of time or retries, we can send messages “somewhere else”, where the problematic messages can be handled manually. For example, an operator can inspect and drop the messages, or resend them to the service after a bug-fix has been released.

Pub/Sub with DLQ on GCP

In GCP, it works as follows:

img.png

  1. An application publishes a message to a topic, e.g. used-created
  2. The consuming application is subscribed to this topic via a subscription myapp.user-created
  3. When the retry limit is reached, the message is moved over to the dead letter queue topic, myapp.used-created.dlq.
  4. The messages end up in the subscription myapp.user-created.dlq, where it can handled manually by an operator.

Infrastructure

We use Terraform to set up the necessary infrastructure in GCP.

Next we set up our main topic and subscription. There is a Terraform module that simplifies the setup, called terraform-google-modules/pubsub/google.

Note: For this demo, we keep retries few, and short. Normally I would configure exponential back-off over a longer period.

terraform {
}

locals {
  topic_name  = "user-created"
  app_name    = "app"
}

provider "google" {
  project = var.project_id
}

module "pubsub-main" {
  source     = "terraform-google-modules/pubsub/google"
  project_id = var.project_id

  topic              = "${local.topic_name}"
  pull_subscriptions = [
    {
      name                  = "${local.app_name}.${local.topic_name}"
      dead_letter_topic     = module.pubsub-dlq.id
      service_account       = google_service_account.pubsub_sa.email
      max_delivery_attempts = 5
      maximum_backoff       = "10s"
      minimum_backoff       = "1s"
    }
  ]
}

Above, we set dead_letter_topic = module.pubsub-dlq.id. This references this section where we set up our DLQ.

module "pubsub-dlq" {
  source     = "terraform-google-modules/pubsub/google"
  project_id = var.project_id

  topic              = "${local.app_name}.${local.topic_name}.dlq"
  pull_subscriptions = [
    {
      name = "${local.app_name}.${local.topic_name}.dlq"
    }
  ]
}

Then we run terraform apply

Publish messages

Now that we have our infrastructure in place, we can publish a couple of messages. The easiest way to publish messages to pub/sub is with the gcloud-cli.

gcloud pubsub topics publish \
  projects/b32-demo-projects/topics/user-created  --message="alive"
gcloud pubsub topics publish \
  projects/b32-demo-projects/topics/user-created  --message="dead"

Consuming

Next is consuming. Here is a small consumer written in Go that will pull messages from our subscription. It will Nack (negatively acknowledge) any message starting with “dead”. Any other messages are Ack (acknowledged).

func ProcessMessages(ctx context.Context, projectID, subID string) error {
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	subscription := client.Subscription(subID)
	return subscription.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		txt := fmt.Sprintf("Received message: %q (attempt %d)", string(msg.Data), *msg.DeliveryAttempt)

		if bytes.HasPrefix(msg.Data, []byte("dead")) {
			fmt.Println(txt, " - NACK")
			msg.Nack()
			return
		}

		fmt.Println(txt, " - ACK")
		msg.Ack()
	})
}

*Note: we use googles pubsub client cloud.google.com/go/pubsub*

Now we can run our app.

We can see that we receive the two messages we published earlier, “alive” and “dead”. The former is Ack:ed, while “dead” gets Nack:ed. After attempt 5, it disappears. Is it lost forever? No! It got automatically sent over to our DLQ! Success!

What now?

Before we wrap up, we need to cover two things.

  1. How do we know that something is stuck in the DLQ?
  2. What do we do when it is stuck there?

Monitoring

The answer to the first question is “monitoring”, which GCP has good support for! The GCP app is surprisingly good for this purpose I think. I will not cover the details, but here is an additional Terraform-snippet I used to set up monitoring for our app. It checks if the num_undelivered_messages on the dlq-subscription is greater than 0.

The notification_channel refers to the phone on my app. I set this up separately.

resource "google_monitoring_alert_policy" "alert_policy" {
  display_name = "Messages on dead-letter queue (app: ${local.app_name}, topic: ${local.topic_name})"
  combiner     = "OR"
  conditions {
    display_name = "Cloud Pub/Sub Subscription - Unacked messages"
    condition_threshold {
      filter     = "resource.type = \"pubsub_subscription\" AND resource.labels.subscription_id = \"${module.pubsub-dlq.subscription_names.0}\" AND metric.type = \"pubsub.googleapis.com/subscription/num_undelivered_messages\""
      duration   = "120s"
      comparison = "COMPARISON_GT"
      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_MAX"
      }
    }
  }
  notification_channels = [
    "projects/b32-demo-projects/notificationChannels/6746093406737316903"
  ]

  user_labels = {
    app : local.app_name
  }
}

Handling stuck messages

When an operator has received the alarm from the monitor, they manually inspect the messages in the DLQ. If the message is completely invalid, they can just pull and acknowledge that specific message in the cloud console.

If the message ended up in the DLQ because of a bug in the application. We can republish it to the main topic with the following code.

func RepublishMessages(ctx context.Context, projectID, dlqSubID, toTopicID string) error {
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	subscription := client.Subscription(dlqSubID)
	topic := client.Topic(toTopicID)

	return subscription.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		// Republish message
		result := topic.Publish(ctx, msg)
		if _, err := result.Get(ctx); err != nil {
			msg.Nack()
			return
		}

		fmt.Printf("Republished message: %q\n", string(msg.Data))
		msg.Ack()
	})
}

A few caveats on this method.

  1. If the message still cannot be processed by the application, it will be sent to the DLQ again, meaning we can end up in an infinite loop DLQ -> main -> DLQ -> ....
  2. If there are more subscribers to the main topic user-created, they will also receive this message. This might not be what you intented. This can be solved in a few ways, one of them is by filtering messages.
  3. In the case of missing data in the message, we could use the function above as a quick fix to add the missing data. In my experience, this gets messy quite quickly. I prefer one of the following
    1. The publisher adds the missing data and resend the message
    2. The consumer supports missing data by adding defaults

Conclusion

That’s all. Now we have a simple publisher-subscriber setup that we can use for our application. We even have a dead letter queue with monitoring if something goes wrong.

Good luck!