t.marcusの外部記憶装置

忘備録とかちょっとした考えとかをつらつらと...

GooglePubsubEmulatorに任意のデータを書く(/w golang)

GCPのドキュメントサンプルでは任意のメッセージをpubsub(emulator)にpublishできなかったのでユーティリティコマンドを作ってみた

github.com


実行方法は以下の通り

Dockerでpubsub emulatorを起動

$ docker run --rm -p '8085;8085' pubsub

別コンソールでtopicとsubscriptionを作成

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ export PUBSUB_PROJECT_ID=mygcpproject-123123

$ git clone git@github.com:GoogleCloudPlatform/python-docs-samples.git
$ cd python-docs-samples.git
$ python publisher.py create mytopic
$ python subscriber.pymygcpproject-123123 create mytopic mytopic-subscription

別コンソールでsubscribeしてみる

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ export PUBSUB_PROJECT_ID=mygcpproject-123123
$ python subscriber.py mygcpproject-123123 receive mytopic-subscription


subscribe状態で別コンソールでpublishしてみる

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ cd path/to/workspace
$ go run main.go -project mygcpproject-123123 -mode publish_message \
    -topic mytopic -data 'Hello, world!' -attr '{"Hello":"world!"}'
# -> subscribeにpublishしたメッセージが出るはず

$ cat main.go
package main

import (
	"context"
	"encoding/json"
	"flag"
	"io/ioutil"
	"os"

	"cloud.google.com/go/pubsub"
	"github.com/satori/go.uuid"
)

func getClient(projectId string) *pubsub.Client {
	if client, err := pubsub.NewClient(context.Background(), projectId); err != nil {
		panic(err)
	} else {
		return client
	}
}

func publishMessage(projectId, topicName, data, attribute string) (messageID, serverID string) {
	if topicName == "" {
		panic("'topic' is required.")
	}
	if data == "" {
		panic("'data' is required.")
	}

	messageID = uuid.Must(uuid.NewV4()).String()

	ctx := context.Background()

	attributeMap := make(map[string]string)
	err := json.Unmarshal([]byte(attribute), &attributeMap)
	if err != nil {
		panic(err)
	}

	topic := getClient(projectId).Topic(topicName)
	result :=
		topic.Publish(
			ctx,
			&pubsub.Message{
				ID:         messageID,
				Data:       []byte(data),
				Attributes: attributeMap,
			})

	if serverID, err := result.Get(ctx); err != nil {
		panic(err)
	} else {
		return messageID, serverID
	}
}

func main() {
	mainFlagSet := flag.NewFlagSet("main", flag.ContinueOnError)
	mainFlagSet.SetOutput(ioutil.Discard)
	modePtr := mainFlagSet.String("mode", "", "Manipulation mode (publish_message)")
	projectPtr := mainFlagSet.String("project", "", "ProjectID")
	helpPtr := mainFlagSet.Bool("h", false, "Help")
	mainFlagSet.Parse(os.Args[1:])

	if *projectPtr == "" {
		panic("'project' is required.")
	}

	switch *modePtr {
	case "publish_message":
		publishMessageFlagSet := flag.NewFlagSet("publish-message", flag.ContinueOnError)
		publishMessageFlagSet.SetOutput(ioutil.Discard)
		topicPtr := publishMessageFlagSet.String("topic", "", "Topic")
		dataPtr := publishMessageFlagSet.String("data", "", "Data")
		attrPtr := publishMessageFlagSet.String("attr", "{}", "Attribute")
		publishMessageFlagSet.Parse(os.Args[5:])
		if *helpPtr {
			mainFlagSet.SetOutput(os.Stdout)
			publishMessageFlagSet.Usage()
		} else {
			publishMessage(*projectPtr, *topicPtr, *dataPtr, *attrPtr)
		}
		break

	default:
		if *helpPtr {
			mainFlagSet.SetOutput(os.Stdout)
			mainFlagSet.Usage()
		} else {
			mainFlagSet.SetOutput(os.Stderr)
			mainFlagSet.Usage()
			os.Exit(1)
		}
	}
}