GooglePubsubEmulatorに任意のデータを書く(/w golang)
GCPのドキュメントサンプルでは任意のメッセージをpubsub(emulator)にpublishできなかったのでユーティリティコマンドを作ってみた
実行方法は以下の通り
Dockerでpubsub emulatorを起動
$ docker run --rm -p '8085;8085' pubsub
別コンソールでtopicとsubscriptionを作成
$ export PUBSUB_EMULATOR_HOST=localhost:8085 $ export PUBSUB_PROJECT_ID=mygcpproject-123123 $ git clone git@github.com:GoogleCloudPlatform/python-docs-samples.git $ cd python-docs-samples.git $ python publisher.py create mytopic $ python subscriber.pymygcpproject-123123 create mytopic mytopic-subscription
別コンソールでsubscribeしてみる
$ export PUBSUB_EMULATOR_HOST=localhost:8085 $ export PUBSUB_PROJECT_ID=mygcpproject-123123 $ python subscriber.py mygcpproject-123123 receive mytopic-subscription
subscribe状態で別コンソールでpublishしてみる
$ export PUBSUB_EMULATOR_HOST=localhost:8085 $ cd path/to/workspace $ go run main.go -project mygcpproject-123123 -mode publish_message \ -topic mytopic -data 'Hello, world!' -attr '{"Hello":"world!"}' # -> subscribeにpublishしたメッセージが出るはず $ cat main.go package main import ( "context" "encoding/json" "flag" "io/ioutil" "os" "cloud.google.com/go/pubsub" "github.com/satori/go.uuid" ) func getClient(projectId string) *pubsub.Client { if client, err := pubsub.NewClient(context.Background(), projectId); err != nil { panic(err) } else { return client } } func publishMessage(projectId, topicName, data, attribute string) (messageID, serverID string) { if topicName == "" { panic("'topic' is required.") } if data == "" { panic("'data' is required.") } messageID = uuid.Must(uuid.NewV4()).String() ctx := context.Background() attributeMap := make(map[string]string) err := json.Unmarshal([]byte(attribute), &attributeMap) if err != nil { panic(err) } topic := getClient(projectId).Topic(topicName) result := topic.Publish( ctx, &pubsub.Message{ ID: messageID, Data: []byte(data), Attributes: attributeMap, }) if serverID, err := result.Get(ctx); err != nil { panic(err) } else { return messageID, serverID } } func main() { mainFlagSet := flag.NewFlagSet("main", flag.ContinueOnError) mainFlagSet.SetOutput(ioutil.Discard) modePtr := mainFlagSet.String("mode", "", "Manipulation mode (publish_message)") projectPtr := mainFlagSet.String("project", "", "ProjectID") helpPtr := mainFlagSet.Bool("h", false, "Help") mainFlagSet.Parse(os.Args[1:]) if *projectPtr == "" { panic("'project' is required.") } switch *modePtr { case "publish_message": publishMessageFlagSet := flag.NewFlagSet("publish-message", flag.ContinueOnError) publishMessageFlagSet.SetOutput(ioutil.Discard) topicPtr := publishMessageFlagSet.String("topic", "", "Topic") dataPtr := publishMessageFlagSet.String("data", "", "Data") attrPtr := publishMessageFlagSet.String("attr", "{}", "Attribute") publishMessageFlagSet.Parse(os.Args[5:]) if *helpPtr { mainFlagSet.SetOutput(os.Stdout) publishMessageFlagSet.Usage() } else { publishMessage(*projectPtr, *topicPtr, *dataPtr, *attrPtr) } break default: if *helpPtr { mainFlagSet.SetOutput(os.Stdout) mainFlagSet.Usage() } else { mainFlagSet.SetOutput(os.Stderr) mainFlagSet.Usage() os.Exit(1) } } }