Publish Messages to a Topic

Publishing a message to a topic with the GDK takes two steps:

  1. Open a topic with the Pub/Sub provider of your choice (once per topic).
  2. Send messages on the topic.

Opening a Topic🔗

The first step in publishing messages to a topic is to instantiate a portable *pubsub.Topic for your service.

The easiest way to do so is to use pubsub.OpenTopic and a service-specific URL pointing to the topic, making sure you “blank import” the driver package to link it in.

import (
    "context"

    "github.com/sraphs/gdk/pubsub"
    _ "github.com/sraphs/gdk/pubsub/<driver>"
)
...
ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "<driver-url>")
if err != nil {
    return fmt.Errorf("could not open topic: %v", err)
}
defer topic.Shutdown(ctx)
// topic is a *pubsub.Topic; see usage below
...

See Concepts: URLs for general background and the guide below for URL usage for each supported service.

Alternatively, if you need fine-grained control over the connection settings, you can call the constructor function in the driver package directly (like gcppubsub.OpenTopic).

import "github.com/sraphs/gdk/pubsub/<driver>"
...
topic, err := <driver>.OpenTopic(...)
...

You may find the wire package useful for managing your initialization code when switching between different backing services.

See the guide below for constructor usage for each supported service.

Sending Messages on a Topic🔗

Sending a message on a Topic looks like this:

err := topic.Send(ctx, &pubsub.Message{
	Body: []byte("Hello, World!\n"),
	// Metadata is optional and can be nil.
	Metadata: map[string]string{
		// These are examples of metadata.
		// There is nothing special about the key names.
		"language":   "en",
		"importance": "high",
	},
})
if err != nil {
	return err
}

Note that the semantics of message delivery can vary by backing service.

Other Usage Samples🔗

Supported Pub/Sub Services🔗

RabbitMQ🔗

The GDK can publish to an AMQP 0.9.1 fanout exchange, the dialect of AMQP spoken by RabbitMQ. A RabbitMQ URL only includes the exchange name. The RabbitMQ’s server is discovered from the RABBIT_SERVER_URL environment variable (which is something like amqp://guest:guest@localhost:5672/).

import (
	"context"

	"github.com/sraphs/gdk/pubsub"
	_ "github.com/sraphs/gdk/pubsub/rabbitpubsub"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will Dial the RabbitMQ server at the URL in the environment
// variable RABBIT_SERVER_URL and open the exchange "myexchange".
topic, err := pubsub.OpenTopic(ctx, "rabbit://myexchange")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

RabbitMQ Constructor🔗

The rabbitpubsub.OpenTopic constructor opens a RabbitMQ exchange. You must first create an *amqp.Connection to your RabbitMQ instance.

import (
	"context"

	amqp "github.com/rabbitmq/amqp091-go"
	"github.com/sraphs/gdk/pubsub/rabbitpubsub"
)

rabbitConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
	return err
}
defer rabbitConn.Close()
topic := rabbitpubsub.OpenTopic(rabbitConn, "myexchange", nil)
defer topic.Shutdown(ctx)

NATS🔗

The GDK can publish to a NATS subject. A NATS URL only includes the subject name. The NATS server is discovered from the NATS_SERVER_URL environment variable (which is something like nats://nats.example.com).

import (
	"context"

	"github.com/sraphs/gdk/pubsub"
	_ "github.com/sraphs/gdk/pubsub/natspubsub"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will Dial the NATS server at the URL in the environment variable
// NATS_SERVER_URL and send messages with subject "example.my-subject".
topic, err := pubsub.OpenTopic(ctx, "nats://example.my-subject")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Because NATS does not natively support metadata, messages sent to NATS will be encoded with gob.

NATS Constructor🔗

The natspubsub.OpenTopic constructor opens a NATS subject as a topic. You must first create an *nats.Conn to your NATS instance.

import (
	"context"

	"github.com/nats-io/nats.go"
	"github.com/sraphs/gdk/pubsub/natspubsub"
)

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	return err
}
defer natsConn.Close()

topic, err := natspubsub.OpenTopic(natsConn, "example.my-subject", nil)
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Kafka🔗

The GDK can publish to a Kafka cluster. A Kafka URL only includes the topic name. The brokers in the Kafka cluster are discovered from the KAFKA_BROKERS environment variable (which is a comma-delimited list of hosts, something like 1.2.3.4:9092,5.6.7.8:9092).

import (
	"context"

	"github.com/sraphs/gdk/pubsub"
	_ "github.com/sraphs/gdk/pubsub/kafkapubsub"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// The host + path are the topic name to send to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Kafka Constructor🔗

The kafkapubsub.OpenTopic constructor opens a Kafka topic to publish messages to. Depending on your Kafka cluster configuration (see auto.create.topics.enable), you may need to provision the topic beforehand.

In addition to the list of brokers, you’ll need a *sarama.Config, which exposes many knobs that can affect performance and semantics; review and set them carefully. kafkapubsub.MinimalConfig provides a minimal config to get you started.

import (
	"context"

	"github.com/sraphs/gdk/pubsub/kafkapubsub"
)

// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()

// Construct a *pubsub.Topic.
topic, err := kafkapubsub.OpenTopic(addrs, config, "my-topic", nil)
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

In-Memory🔗

The GDK includes an in-memory Pub/Sub provider useful for local testing. The names in mem:// URLs are a process-wide namespace, so subscriptions to the same name will receive messages posted to that topic. This is detailed more in the subscription guide.

import (
	"context"

	"github.com/sraphs/gdk/pubsub"
	_ "github.com/sraphs/gdk/pubsub/mempubsub"
)

topic, err := pubsub.OpenTopic(ctx, "mem://topicA")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

In-Memory Constructor🔗

To create an in-memory Pub/Sub topic, use the mempubsub.NewTopic function. You can use the returned topic to create in-memory subscriptions, as detailed in the subscription guide.

import (
	"context"

	"github.com/sraphs/gdk/pubsub/mempubsub"
)

topic := mempubsub.NewTopic()
defer topic.Shutdown(ctx)

Redis🔗

The GDK can publish to a Redis subject. A Redis URL only includes the subject name. The Redis server is discovered from the REDIS_SERVER_URL environment variable (which is something like redis://redis.example.com).

import (
	"context"

	"github.com/sraphs/gdk/pubsub"
	_ "github.com/sraphs/gdk/pubsub/redispubsub"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will Dial the Redis server at the URL in the environment variable
// REDIS_SERVER_URL and send messages with subject "example.my-topic".
topic, err := pubsub.OpenTopic(ctx, "redis://example.my-topic")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Because Redis does not natively support metadata, messages sent to Redis will be encoded with gob.

Redis Constructor🔗

The redispubsub.OpenTopic constructor opens a Redis subject as a topic. You must first create an *redis.Client to your Redis instance.

import (
	"context"

	"github.com/go-redis/redis/v8"
	"github.com/sraphs/gdk/pubsub/redispubsub"
)

opt, err := redis.ParseURL("redis://localhost:6379")
if err != nil {
	return err
}
client := redis.NewClient(opt)
defer client.Close()

topic, err := redispubsub.OpenTopic(client, "example.my-topic", nil)
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Pulsar🔗

The GDK can publish to a Pulsar subject. A Pulsar URL only includes the subject name. The Pulsar server is discovered from the Pulsar_SERVER_URL environment variable (which is something like pulsar://localhost:6650).

import (
	"context"

	"github.com/sraphs/gdk/pubsub"
	_ "github.com/sraphs/gdk/pubsub/pulsarpubsub"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// The host + path are the topic name to send to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
topic, err := pubsub.OpenTopic(ctx, "pulsar://my-topic")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Pulsar Constructor🔗

The pulsarpubsub.OpenTopic constructor opens a Pulsar topic to publish messages to.

import (
	"context"

	"github.com/apache/pulsar-client-go/pulsar"
	"github.com/sraphs/gdk/pubsub/pulsarpubsub"
)

localPulsarURL := "pulsar://localhost:6650"
config := pulsarpubsub.MinimalConfig(localPulsarURL)
client, err := pulsar.NewClient(config)
if err != nil {
	return err
}

// Construct a *pubsub.Topic.
topic, err := pulsarpubsub.OpenTopic(client, &pulsarpubsub.TopicOptions{
	ProducerOptions: pulsar.ProducerOptions{
		Topic: "my-topic",
	},
	KeyName: "",
})
if err != nil {
	return err
}
defer topic.Shutdown(ctx)