Subscribe to Messages from a Topic
Subscribing to receive message from a topic with the GDK takes three steps:
- Open a subscription to a topic with the Pub/Sub service of your choice (once per subscription).
- Receive and acknowledge messages from the topic. After completing any work related to the message, use the Ack method to prevent it from being redelivered.
Opening a Subscription🔗
The first step in subscribing to receive messages from a topic is
to instantiate a portable *pubsub.Subscription
for your service.
The easiest way to do so is to use pubsub.OpenSubscription
and a service-specific URL pointing to the topic, making sure you
“blank import” the driver package to link it in.
import (
"context"
"github.com/sraphs/gdk/pubsub"
_ "github.com/sraphs/gdk/pubsub/<driver>"
)
...
ctx := context.Background()
subs, err := pubsub.OpenSubscription(ctx, "<driver-url>")
if err != nil {
return fmt.Errorf("could not open topic subscription: %v", err)
}
defer subs.Shutdown(ctx)
// subs is a *pubsub.Subscription; see usage below
...
See Concepts: URLs for general background and the guide below for URL usage for each supported service.
Alternatively, if you need fine-grained
control over the connection settings, you can call the constructor function in
the driver package directly (like gcppubsub.OpenSubscription
).
import "github.com/sraphs/gdk/pubsub/<driver>"
...
subs, err := <driver>.OpenSubscription(...)
...
You may find the wire
package useful for managing your initialization code
when switching between different backing services.
See the guide below for constructor usage for each supported service.
Receiving and Acknowledging Messages🔗
A simple subscriber that operates on messages serially looks like this:
// Loop on received messages.
for {
msg, err := subscription.Receive(ctx)
if err != nil {
// Errors from Receive indicate that Receive will no longer succeed.
log.Printf("Receiving message: %v", err)
break
}
// Do work based on the message, for example:
fmt.Printf("Got message: %q\n", msg.Body)
// Messages must always be acknowledged with Ack.
msg.Ack()
}
If you want your subscriber to operate on incoming messages concurrently, you can start multiple goroutines:
// Loop on received messages. We can use a channel as a semaphore to limit how
// many goroutines we have active at a time as well as wait on the goroutines
// to finish before exiting.
const maxHandlers = 10
sem := make(chan struct{}, maxHandlers)
recvLoop:
for {
msg, err := subscription.Receive(ctx)
if err != nil {
// Errors from Receive indicate that Receive will no longer succeed.
log.Printf("Receiving message: %v", err)
break
}
// Wait if there are too many active handle goroutines and acquire the
// semaphore. If the context is canceled, stop waiting and start shutting
// down.
select {
case sem <- struct{}{}:
case <-ctx.Done():
break recvLoop
}
// Handle the message in a new goroutine.
go func() {
defer func() { <-sem }() // Release the semaphore.
defer msg.Ack() // Messages must always be acknowledged with Ack.
// Do work based on the message, for example:
fmt.Printf("Got message: %q\n", msg.Body)
}()
}
// We're no longer receiving messages. Wait to finish handling any
// unacknowledged messages by totally acquiring the semaphore.
for n := 0; n < maxHandlers; n++ {
sem <- struct{}{}
}
Note that the semantics of message delivery can vary by backing service.
Other Usage Samples🔗
Supported Pub/Sub Services🔗
RabbitMQ🔗
The GDK can receive messages from an AMQP 0.9.1 queue, the dialect of
AMQP spoken by RabbitMQ. A RabbitMQ URL only includes the queue name.
The RabbitMQ’s server is discovered from the RABBIT_SERVER_URL
environment
variable (which is something like amqp://guest:guest@localhost:5672/
).
import (
"context"
"github.com/sraphs/gdk/pubsub"
_ "github.com/sraphs/gdk/pubsub/rabbitpubsub"
)
// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// This URL will Dial the RabbitMQ server at the URL in the environment
// variable RABBIT_SERVER_URL and open the queue "myqueue".
subscription, err := pubsub.OpenSubscription(ctx, "rabbit://myqueue")
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
RabbitMQ Constructor🔗
The rabbitpubsub.OpenSubscription
constructor opens a RabbitMQ queue.
You must first create an *amqp.Connection
to your RabbitMQ instance.
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sraphs/gdk/pubsub/rabbitpubsub"
)
rabbitConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
return err
}
defer rabbitConn.Close()
subscription := rabbitpubsub.OpenSubscription(rabbitConn, "myqueue", nil)
defer subscription.Shutdown(ctx)
NATS🔗
The GDK can publish to a NATS subject. A NATS URL only includes the
subject name. The NATS server is discovered from the NATS_SERVER_URL
environment variable (which is something like nats://nats.example.com
).
import (
"context"
"github.com/sraphs/gdk/pubsub"
_ "github.com/sraphs/gdk/pubsub/natspubsub"
)
// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// This URL will Dial the NATS server at the URL in the environment variable
// NATS_SERVER_URL and receive messages with subject "example.my-subject".
subscription, err := pubsub.OpenSubscription(ctx, "nats://example.my-subject")
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
NATS guarantees at-most-once delivery; it will never redeliver a message.
Therefore, Message.Ack
is a no-op.
To parse messages published via the GDK, the NATS driver
will first attempt to decode the payload using gob. Failing that, it will
return the message payload as the Data
with no metadata to accomodate
subscribing to messages coming from a source not using the GDK.
NATS Constructor🔗
The natspubsub.OpenSubscription
constructor opens a NATS subject as a
topic. You must first create an *nats.Conn
to your NATS instance.
import (
"context"
"github.com/nats-io/nats.go"
"github.com/sraphs/gdk/pubsub/natspubsub"
)
natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
return err
}
defer natsConn.Close()
subscription, err := natspubsub.OpenSubscription(
natsConn,
"example.my-subject",
nil)
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
Kafka🔗
The GDK can receive messages from a Kafka cluster.
A Kafka URL includes the consumer group name, plus at least one instance
of a query parameter specifying the topic to subscribe to.
The brokers in the Kafka cluster are discovered from the
KAFKA_BROKERS
environment variable (which is a comma-delimited list of
hosts, something like 1.2.3.4:9092,5.6.7.8:9092
).
import (
"context"
"github.com/sraphs/gdk/pubsub"
_ "github.com/sraphs/gdk/pubsub/kafkapubsub"
)
// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// The host + path are used as the consumer group name.
// The "topic" query parameter sets one or more topics to subscribe to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
subscription, err := pubsub.OpenSubscription(ctx,
"kafka://my-group?topic=my-topic")
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
Kafka Constructor🔗
The kafkapubsub.OpenSubscription
constructor creates a consumer in a
consumer group, subscribed to one or more topics.
In addition to the list of brokers, you’ll need a *sarama.Config
, which
exposes many knobs that can affect performance and semantics; review and set
them carefully. kafkapubsub.MinimalConfig
provides a minimal config to
get you started.
import (
"context"
"github.com/sraphs/gdk/pubsub/kafkapubsub"
)
// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()
// Construct a *pubsub.Subscription, joining the consumer group "my-group"
// and receiving messages from "my-topic".
subscription, err := kafkapubsub.OpenSubscription(
addrs, config, "my-group", []string{"my-topic"}, nil)
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
In-Memory🔗
The GDK includes an in-memory Pub/Sub provider useful for local testing.
The names in mem://
URLs are a process-wide namespace, so subscriptions to
the same name will receive messages posted to that topic. For instance, if
you open a topic mem://topicA
and open two subscriptions with
mem://topicA
, you will have two subscriptions to the same topic.
import (
"context"
"github.com/sraphs/gdk/pubsub"
_ "github.com/sraphs/gdk/pubsub/mempubsub"
)
// Create a topic.
topic, err := pubsub.OpenTopic(ctx, "mem://topicA")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
// Create a subscription connected to that topic.
subscription, err := pubsub.OpenSubscription(ctx, "mem://topicA")
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
In-Memory Constructor🔗
To create a subscription to an in-memory Pub/Sub topic, pass the topic you
created into the mempubsub.NewSubscription
function.
You will also need to pass an acknowledgement deadline: once a message is
received, if it is not acknowledged after the deadline elapses, then it will be
redelivered.
import (
"context"
"time"
"github.com/sraphs/gdk/pubsub/mempubsub"
)
// Construct a *pubsub.Topic.
topic := mempubsub.NewTopic()
defer topic.Shutdown(ctx)
// Construct a *pubsub.Subscription for the topic.
subscription := mempubsub.NewSubscription(topic, 1*time.Minute /* ack deadline */)
defer subscription.Shutdown(ctx)
Redis🔗
The GDK can publish to a Redis subject. A Redis URL only includes the
subject name. The Redis server is discovered from the REDIS_SERVER_URL
environment variable (which is something like redis://redis.example.com
).
import (
"context"
"github.com/sraphs/gdk/pubsub"
_ "github.com/sraphs/gdk/pubsub/redispubsub"
)
// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// This URL will Dial the Redis server at the URL in the environment variable
// REDIS_SERVER_URL and receive messages with nodeID "node-1" and subject "example.my-topic".
subscription, err := pubsub.OpenSubscription(ctx, "redis://node-1?topic=my-topic")
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
Because Redis does not natively support metadata, messages sent to Redis will be encoded with gob.
Redis Constructor🔗
The redispubsub.OpenSubscription
constructor opens a Redis subject as a topic. You
must first create an *redis.Client
to your Redis instance.
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/sraphs/gdk/pubsub/redispubsub"
)
opt, err := redis.ParseURL("redis://localhost:6379")
if err != nil {
return err
}
client := redis.NewClient(opt)
defer client.Close()
subscription, err := redispubsub.OpenSubscription(client, "node-1", []string{"example.my-topic"}, nil)
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
Pulsar🔗
The GDK can publish to a Pulsar subject. A Pulsar URL only includes the
subject name. The Pulsar server is discovered from the Pulsar_SERVER_URL
environment variable (which is something like pulsar://localhost:6650
).
import (
"context"
"github.com/sraphs/gdk/pubsub"
_ "github.com/sraphs/gdk/pubsub/pulsarpubsub"
)
// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// The host + path are used as the consumer group name.
// The "topic" query parameter sets one or more topics to subscribe to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
subscription, err := pubsub.OpenSubscription(ctx,
"pulsar://my-sub?topic=my-topic")
if err != nil {
return err
}
defer subscription.Shutdown(ctx)
Pulsar Constructor🔗
The pulsarpubsub.OpenSubscription
constructor opens a Pulsar topic to publish
messages to.
import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/sraphs/gdk/pubsub/pulsarpubsub"
)
localPulsarURL := "pulsar://localhost:6650"
config := pulsarpubsub.MinimalConfig(localPulsarURL)
client, err := pulsar.NewClient(config)
if err != nil {
return err
}
// Construct a *pubsub.Subscription, use the SubscriptionName "my-sub"
// and receiving messages from "my-topic".
subscription, err := pulsarpubsub.OpenSubscription(client, &pulsarpubsub.SubscriptionOptions{
ConsumerOptions: pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
},
KeyName: "",
})
if err != nil {
return err
}
defer subscription.Shutdown(ctx)