Skip to main content

PubSub

FTL has first-class support for PubSub, modelled on the concepts of topics (where events are sent) and subscribers (a verb which consumes events). Subscribers are, as you would expect, sinks. Each subscriber is a cursor over the topic it is associated with. Each topic may have multiple subscriptions. Each published event has an at least once delivery guarantee for each subscription.

A topic can be exported to allow other modules to subscribe to it. Subscriptions are always private to their module.

When a subscription is first created in an environment, it can start consuming from the beginning of the topic or only consume events published afterwards.

Topics allow configuring the number of partitions and how each event should be mapped to a partition, allowing for greater throughput. Subscriptions will consume in order within each partition. There are cases where a small amount of progress on a subscription will be lost, so subscriptions should be able to handle receiving some events that have already been consumed.

First, declare a new topic:

package payments

import (
"github.com/block/ftl/go-runtime/ftl"
)

// Define an event type
type Invoice struct {
InvoiceNo string
}

//ftl:topic partitions=1
type Invoices = ftl.TopicHandle[Invoice, ftl.SinglePartitionMap[Invoice]]

If you want multiple partitions in the topic, you'll also need to write a partition mapper:

package payments

import (
"github.com/block/ftl/go-runtime/ftl"
)

// Define an event type
type Invoice struct {
InvoiceNo string
}

type PartitionMapper struct{}

var _ ftl.TopicPartitionMap[PubSubEvent] = PartitionMapper{}

func (PartitionMapper) PartitionKey(event PubSubEvent) string {
return event.Time.String()
}

//ftl:topic partitions=10
type Invoices = ftl.TopicHandle[Invoice, PartitionMapper]

Note that the name of the topic as represented in the FTL schema is the lower camel case version of the type name.

The Invoices type is a handle to the topic. It is a generic type that takes two arguments: the event type and the partition map type. The partition map type is used to map events to partitions.

Then define a Sink to consume from the topic:

// Configure initial event consumption with either from=beginning or from=latest
//
//ftl:subscribe payments.invoices from=beginning
func SendInvoiceEmail(ctx context.Context, in Invoice) error {
// ...
}

Events can be published to a topic by injecting the topic type into a verb:

//ftl:verb
func PublishInvoice(ctx context.Context, topic Invoices) error {
topic.Publish(ctx, Invoice{...})
// ...
}