t.marcusの外部記憶装置

忘備録とかちょっとした考えとかをつらつらと...

GKEからPubsubへの接続でタイムアウトが発生する件

このアプリをビルド&AlpineのDockerイメージ化したものをGKEで動かして、Topicを取得するときに(内部的にはPubSubに接続する時)タイムアウトエラーで落ちてしまう現象が発生した

$ cat Dockerfile
FROM alpine:3.8

RUN apk add bash tree pstree

ENTRYPOINT [ "/opt/pubsub" ]


$ cat pubsub.go
package main

import (
	"cloud.google.com/go/pubsub"
	"context"
	"fmt"
	"os"
	"time"
)

func main() {
	projectName := os.Getenv("PROJECT")
	topicName := os.Getenv("TOPIC")
	subscriptionName := os.Getenv("SUBSCRIPTION")
	ctx := context.Background()
	ctx, _ = context.WithTimeout(ctx, 5*time.Second)
	client, err := pubsub.NewClient(ctx, projectName)
	if err != nil {
		panic(err)
	}
	topicExists, err := client.Topic(topicName).Exists(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Println(topicName, topicExists)

	subscription := client.Subscription(subscriptionName)
	subscriptionExists, err := subscription.Exists(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Println(subscriptionName, subscriptionExists)

	go func() {
		err = subscription.Receive(ctx, func(rcvCtx context.Context, message *pubsub.Message) {
			fmt.Println("RCV", message.ID, string(message.Data), message.Attributes)
		})
	}()

	time.Sleep(500 * time.Millisecond)

	if err != nil {
		panic(err)
	}

	fmt.Println("Wait for signal...")
	sigCh := make(chan os.Signal, 1)
	signal.Notify(
		sigCh,
		syscall.SIGHUP,
		syscall.SIGINT,
		syscall.SIGTERM,
		syscall.SIGQUIT)
	fmt.Println("SignalRcv ->", <-sigCh)
}

結論結論としては、Alpineに ca-certificates がなかったため、Pubsubのエンドポイント ( https://pubsub.googleapis.com )に接続しようとして接続できなかった模様。

なので apk add ca-certificates してパッケージをインストールすることでタイムアウト(に見えるhttps接続エラー)は出なくなった。

GooglePubsubEmulatorに任意のデータを書く(改)

tmarcus.hatenablog.com

前回のエントリだとGolangの開発環境や、GCPのサンプルプログラム取ってきたり、pip実行したりと、準備するものが多かったので、実行ファイル化したものを作りました。

github.com


実行方法はREADMEに

GooglePubsubEmulatorに任意のデータを書く(/w golang)

GCPのドキュメントサンプルでは任意のメッセージをpubsub(emulator)にpublishできなかったのでユーティリティコマンドを作ってみた

github.com


実行方法は以下の通り

Dockerでpubsub emulatorを起動

$ docker run --rm -p '8085;8085' pubsub

別コンソールでtopicとsubscriptionを作成

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ export PUBSUB_PROJECT_ID=mygcpproject-123123

$ git clone git@github.com:GoogleCloudPlatform/python-docs-samples.git
$ cd python-docs-samples.git
$ python publisher.py create mytopic
$ python subscriber.pymygcpproject-123123 create mytopic mytopic-subscription

別コンソールでsubscribeしてみる

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ export PUBSUB_PROJECT_ID=mygcpproject-123123
$ python subscriber.py mygcpproject-123123 receive mytopic-subscription


subscribe状態で別コンソールでpublishしてみる

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ cd path/to/workspace
$ go run main.go -project mygcpproject-123123 -mode publish_message \
    -topic mytopic -data 'Hello, world!' -attr '{"Hello":"world!"}'
# -> subscribeにpublishしたメッセージが出るはず

$ cat main.go
package main

import (
	"context"
	"encoding/json"
	"flag"
	"io/ioutil"
	"os"

	"cloud.google.com/go/pubsub"
	"github.com/satori/go.uuid"
)

func getClient(projectId string) *pubsub.Client {
	if client, err := pubsub.NewClient(context.Background(), projectId); err != nil {
		panic(err)
	} else {
		return client
	}
}

func publishMessage(projectId, topicName, data, attribute string) (messageID, serverID string) {
	if topicName == "" {
		panic("'topic' is required.")
	}
	if data == "" {
		panic("'data' is required.")
	}

	messageID = uuid.Must(uuid.NewV4()).String()

	ctx := context.Background()

	attributeMap := make(map[string]string)
	err := json.Unmarshal([]byte(attribute), &attributeMap)
	if err != nil {
		panic(err)
	}

	topic := getClient(projectId).Topic(topicName)
	result :=
		topic.Publish(
			ctx,
			&pubsub.Message{
				ID:         messageID,
				Data:       []byte(data),
				Attributes: attributeMap,
			})

	if serverID, err := result.Get(ctx); err != nil {
		panic(err)
	} else {
		return messageID, serverID
	}
}

func main() {
	mainFlagSet := flag.NewFlagSet("main", flag.ContinueOnError)
	mainFlagSet.SetOutput(ioutil.Discard)
	modePtr := mainFlagSet.String("mode", "", "Manipulation mode (publish_message)")
	projectPtr := mainFlagSet.String("project", "", "ProjectID")
	helpPtr := mainFlagSet.Bool("h", false, "Help")
	mainFlagSet.Parse(os.Args[1:])

	if *projectPtr == "" {
		panic("'project' is required.")
	}

	switch *modePtr {
	case "publish_message":
		publishMessageFlagSet := flag.NewFlagSet("publish-message", flag.ContinueOnError)
		publishMessageFlagSet.SetOutput(ioutil.Discard)
		topicPtr := publishMessageFlagSet.String("topic", "", "Topic")
		dataPtr := publishMessageFlagSet.String("data", "", "Data")
		attrPtr := publishMessageFlagSet.String("attr", "{}", "Attribute")
		publishMessageFlagSet.Parse(os.Args[5:])
		if *helpPtr {
			mainFlagSet.SetOutput(os.Stdout)
			publishMessageFlagSet.Usage()
		} else {
			publishMessage(*projectPtr, *topicPtr, *dataPtr, *attrPtr)
		}
		break

	default:
		if *helpPtr {
			mainFlagSet.SetOutput(os.Stdout)
			mainFlagSet.Usage()
		} else {
			mainFlagSet.SetOutput(os.Stderr)
			mainFlagSet.Usage()
			os.Exit(1)
		}
	}
}

Google Cloud bigtable(emulator)をgoから

ローカルでbigtable-emulator動かして、goでCRUDしてみるテスト

// Bigtable example by go.
//
// Install library
//   $ go get cloud.google.com/go/bigtable
//
// Start bigtable emulator by docker
//   $ docker run -it -p 8080:8080 spotify/bigtable-emulator:latest
//
// Need to set Environment variable
//   BIGTABLE_EMULATOR_HOST="127.0.0.1:8080"
//
// Reference URL)
//   https://cloud.google.com/bigtable/docs/samples-go-hello
//   https://github.com/spotify/docker-bigtable
//   https://github.com/Shopify/bigtable-emulator

package main

import (
	"context"
	"fmt"

	"golang.org/x/oauth2"
	"google.golang.org/api/option"

	"cloud.google.com/go/bigtable"
)

const (
	tblName          = "tbl"
	familyName       = "cf1"
	rowKeyPrefix     = "rowKey"
	columnNamePrefix = "colKey"
)

type devTokenSource struct{}

func (devTokenSource) Token() (*oauth2.Token, error) {
	return new(oauth2.Token), nil
}

func sliceContains(list []string, target string) bool {
	for _, s := range list {
		if s == target {
			return true
		}
	}
	return false
}

func adminCreate(adminClient *bigtable.AdminClient) {
	tables, familiesMap := dumpTables(adminClient)

	if !sliceContains(tables, tblName) {
		fmt.Println("Create table")
		err := adminClient.CreateTable(context.Background(), tblName)
		if err != nil {
			panic(err)
		}
		tables = append(tables, tblName)
	}

	families := familiesMap[tblName]
	if !sliceContains(families, familyName) {
		fmt.Println("Create cf")
		err := adminClient.CreateColumnFamily(context.Background(), tblName, familyName)
		if err != nil {
			panic(err)
		}
	}

	dumpTables(adminClient)
}

func dumpTables(adminClient *bigtable.AdminClient) ([]string, map[string][]string) {
	tables, err := adminClient.Tables(context.Background())
	if err != nil {
		panic(err)
	}

	familiesMap := make(map[string][]string)

	for _, table := range tables {
		tableInfo, err := adminClient.TableInfo(context.Background(), table)
		if err != nil {
			panic(err)
		}
		fmt.Printf("Table name : %s\n", table)

		familiesMap[table] = tableInfo.Families

		fmt.Println("ColumnFamily")
		for _, family := range tableInfo.Families {
			fmt.Printf("  - %s\n", family)
		}
		fmt.Println()
	}
	return tables, familiesMap
}

func scanRowPrefix(table *bigtable.Table, family, rowPrefix, column string) {
	err :=
		table.ReadRows(
			context.Background(),
			bigtable.PrefixRange(rowPrefix),
			func(row bigtable.Row) bool {
				for _, item := range row[family] {
					fmt.Printf("#%s\n", item)
				}
				item := row[family][0]
				fmt.Printf("Row=%s Column=%s Value=%s\n", item.Row, item.Column, string(item.Value))
				return true
			},
			bigtable.RowFilter(bigtable.ColumnFilter(column)))
	if err != nil {
		panic(err)
	}
}

func adminDelete(project, instance string) {

}

func write(table *bigtable.Table, family, row, column, value string) error {
	mut := bigtable.NewMutation()
	mut.Set(family, column, bigtable.Now(), []byte(value))
	rowErr, err := table.ApplyBulk(context.Background(), []string{row}, []*bigtable.Mutation{mut})
	if err != nil {
		return err
	}
	if rowErr != nil {
		return rowErr[0]
	}
	return nil
}

func main() {
	ctx := context.Background()
	project := "dev"
	instance := "dev"

	adminClient, err := bigtable.NewAdminClient(context.Background(), project, instance, option.WithTokenSource(&devTokenSource{}))
	if err != nil {
		panic(err)
	}

	adminCreate(adminClient)

	client, err := bigtable.NewClient(ctx, project, instance, option.WithTokenSource(&devTokenSource{}))
	if err != nil {
		panic(err)
	}

	table := client.Open(tblName)

	// Write to same key
	fmt.Println("Write")
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"1", "Hello11a")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"1", "Hello11b")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"2", "Hello12")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"2", columnNamePrefix+"1", "HelloB21")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"2", columnNamePrefix+"2", "HelloB22")
	if err != nil {
		panic(err)
	}
	fmt.Println()

	// Read
	fmt.Println("Read")
	result, err :=
		table.ReadRow(
			context.Background(),
			rowKeyPrefix+"1",
			bigtable.RowFilter(bigtable.ColumnFilter(columnNamePrefix+"1")))
	if err != nil {
		panic(err)
	}
	for _, v := range result[familyName] {
		fmt.Printf("#%s\n", v)
	}
	fmt.Println(string(result[familyName][0].Value))
	fmt.Println()

	// Scan
	fmt.Println("Scan")
	scanRowPrefix(table, familyName, rowKeyPrefix, columnNamePrefix+"1")
	fmt.Println()

	// Delete row range
	fmt.Println("DropRowRange")
	err = adminClient.DropRowRange(context.Background(), tblName, rowKeyPrefix+"2")
	if err != nil {
		panic(err)
	}
	fmt.Println()

	// Scan
	fmt.Println("Scan")
	scanRowPrefix(table, familyName, rowKeyPrefix, columnNamePrefix+"1")
	fmt.Println()

}

やりかけメモ

func injectEnv(c AppConfig) AppConfig {
	injectEnv2(&c)
	return c
}

func injectEnv2(c interface{}) {
	fmt.Println(">>")
	tmpC := c
	if reflect.TypeOf(c).Kind() == reflect.Ptr {
		tmpC = reflect.Indirect(reflect.ValueOf(c)).Interface()
	}
	t := reflect.TypeOf(tmpC)
	v := reflect.ValueOf(tmpC)
	for i := 0; i < t.NumField(); i++ {
		f := t.Field(i)
		fv := v.Field(i)

		fmt.Println("===")

		fmt.Printf("Name: %s\n", f.Name)
		fmt.Printf("Type: %s(%s)\n", f.Type, f.Type.Kind())

		switch f.Type.Kind() {
		case reflect.Struct:
			injectEnv2(fv.Interface())
			break
		default:
			break
		}
	}
	fmt.Println("<<")
}