t.marcusの外部記憶装置

忘備録とかちょっとした考えとかをつらつらと...

GKEからPubsubへの接続でタイムアウトが発生する件

このアプリをビルド&AlpineのDockerイメージ化したものをGKEで動かして、Topicを取得するときに(内部的にはPubSubに接続する時)タイムアウトエラーで落ちてしまう現象が発生した

$ cat Dockerfile
FROM alpine:3.8

RUN apk add bash tree pstree

ENTRYPOINT [ "/opt/pubsub" ]


$ cat pubsub.go
package main

import (
	"cloud.google.com/go/pubsub"
	"context"
	"fmt"
	"os"
	"time"
)

func main() {
	projectName := os.Getenv("PROJECT")
	topicName := os.Getenv("TOPIC")
	subscriptionName := os.Getenv("SUBSCRIPTION")
	ctx := context.Background()
	ctx, _ = context.WithTimeout(ctx, 5*time.Second)
	client, err := pubsub.NewClient(ctx, projectName)
	if err != nil {
		panic(err)
	}
	topicExists, err := client.Topic(topicName).Exists(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Println(topicName, topicExists)

	subscription := client.Subscription(subscriptionName)
	subscriptionExists, err := subscription.Exists(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Println(subscriptionName, subscriptionExists)

	go func() {
		err = subscription.Receive(ctx, func(rcvCtx context.Context, message *pubsub.Message) {
			fmt.Println("RCV", message.ID, string(message.Data), message.Attributes)
		})
	}()

	time.Sleep(500 * time.Millisecond)

	if err != nil {
		panic(err)
	}

	fmt.Println("Wait for signal...")
	sigCh := make(chan os.Signal, 1)
	signal.Notify(
		sigCh,
		syscall.SIGHUP,
		syscall.SIGINT,
		syscall.SIGTERM,
		syscall.SIGQUIT)
	fmt.Println("SignalRcv ->", <-sigCh)
}

結論結論としては、Alpineに ca-certificates がなかったため、Pubsubのエンドポイント ( https://pubsub.googleapis.com )に接続しようとして接続できなかった模様。

なので apk add ca-certificates してパッケージをインストールすることでタイムアウト(に見えるhttps接続エラー)は出なくなった。

GooglePubsubEmulatorに任意のデータを書く(改)

tmarcus.hatenablog.com

前回のエントリだとGolangの開発環境や、GCPのサンプルプログラム取ってきたり、pip実行したりと、準備するものが多かったので、実行ファイル化したものを作りました。

github.com


実行方法はREADMEに

GooglePubsubEmulatorに任意のデータを書く(/w golang)

GCPのドキュメントサンプルでは任意のメッセージをpubsub(emulator)にpublishできなかったのでユーティリティコマンドを作ってみた

github.com


実行方法は以下の通り

Dockerでpubsub emulatorを起動

$ docker run --rm -p '8085;8085' pubsub

別コンソールでtopicとsubscriptionを作成

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ export PUBSUB_PROJECT_ID=mygcpproject-123123

$ git clone git@github.com:GoogleCloudPlatform/python-docs-samples.git
$ cd python-docs-samples.git
$ python publisher.py create mytopic
$ python subscriber.pymygcpproject-123123 create mytopic mytopic-subscription

別コンソールでsubscribeしてみる

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ export PUBSUB_PROJECT_ID=mygcpproject-123123
$ python subscriber.py mygcpproject-123123 receive mytopic-subscription


subscribe状態で別コンソールでpublishしてみる

$ export PUBSUB_EMULATOR_HOST=localhost:8085
$ cd path/to/workspace
$ go run main.go -project mygcpproject-123123 -mode publish_message \
    -topic mytopic -data 'Hello, world!' -attr '{"Hello":"world!"}'
# -> subscribeにpublishしたメッセージが出るはず

$ cat main.go
package main

import (
	"context"
	"encoding/json"
	"flag"
	"io/ioutil"
	"os"

	"cloud.google.com/go/pubsub"
	"github.com/satori/go.uuid"
)

func getClient(projectId string) *pubsub.Client {
	if client, err := pubsub.NewClient(context.Background(), projectId); err != nil {
		panic(err)
	} else {
		return client
	}
}

func publishMessage(projectId, topicName, data, attribute string) (messageID, serverID string) {
	if topicName == "" {
		panic("'topic' is required.")
	}
	if data == "" {
		panic("'data' is required.")
	}

	messageID = uuid.Must(uuid.NewV4()).String()

	ctx := context.Background()

	attributeMap := make(map[string]string)
	err := json.Unmarshal([]byte(attribute), &attributeMap)
	if err != nil {
		panic(err)
	}

	topic := getClient(projectId).Topic(topicName)
	result :=
		topic.Publish(
			ctx,
			&pubsub.Message{
				ID:         messageID,
				Data:       []byte(data),
				Attributes: attributeMap,
			})

	if serverID, err := result.Get(ctx); err != nil {
		panic(err)
	} else {
		return messageID, serverID
	}
}

func main() {
	mainFlagSet := flag.NewFlagSet("main", flag.ContinueOnError)
	mainFlagSet.SetOutput(ioutil.Discard)
	modePtr := mainFlagSet.String("mode", "", "Manipulation mode (publish_message)")
	projectPtr := mainFlagSet.String("project", "", "ProjectID")
	helpPtr := mainFlagSet.Bool("h", false, "Help")
	mainFlagSet.Parse(os.Args[1:])

	if *projectPtr == "" {
		panic("'project' is required.")
	}

	switch *modePtr {
	case "publish_message":
		publishMessageFlagSet := flag.NewFlagSet("publish-message", flag.ContinueOnError)
		publishMessageFlagSet.SetOutput(ioutil.Discard)
		topicPtr := publishMessageFlagSet.String("topic", "", "Topic")
		dataPtr := publishMessageFlagSet.String("data", "", "Data")
		attrPtr := publishMessageFlagSet.String("attr", "{}", "Attribute")
		publishMessageFlagSet.Parse(os.Args[5:])
		if *helpPtr {
			mainFlagSet.SetOutput(os.Stdout)
			publishMessageFlagSet.Usage()
		} else {
			publishMessage(*projectPtr, *topicPtr, *dataPtr, *attrPtr)
		}
		break

	default:
		if *helpPtr {
			mainFlagSet.SetOutput(os.Stdout)
			mainFlagSet.Usage()
		} else {
			mainFlagSet.SetOutput(os.Stderr)
			mainFlagSet.Usage()
			os.Exit(1)
		}
	}
}

Google Cloud bigtable(emulator)をgoから

ローカルでbigtable-emulator動かして、goでCRUDしてみるテスト

// Bigtable example by go.
//
// Install library
//   $ go get cloud.google.com/go/bigtable
//
// Start bigtable emulator by docker
//   $ docker run -it -p 8080:8080 spotify/bigtable-emulator:latest
//
// Need to set Environment variable
//   BIGTABLE_EMULATOR_HOST="127.0.0.1:8080"
//
// Reference URL)
//   https://cloud.google.com/bigtable/docs/samples-go-hello
//   https://github.com/spotify/docker-bigtable
//   https://github.com/Shopify/bigtable-emulator

package main

import (
	"context"
	"fmt"

	"golang.org/x/oauth2"
	"google.golang.org/api/option"

	"cloud.google.com/go/bigtable"
)

const (
	tblName          = "tbl"
	familyName       = "cf1"
	rowKeyPrefix     = "rowKey"
	columnNamePrefix = "colKey"
)

type devTokenSource struct{}

func (devTokenSource) Token() (*oauth2.Token, error) {
	return new(oauth2.Token), nil
}

func sliceContains(list []string, target string) bool {
	for _, s := range list {
		if s == target {
			return true
		}
	}
	return false
}

func adminCreate(adminClient *bigtable.AdminClient) {
	tables, familiesMap := dumpTables(adminClient)

	if !sliceContains(tables, tblName) {
		fmt.Println("Create table")
		err := adminClient.CreateTable(context.Background(), tblName)
		if err != nil {
			panic(err)
		}
		tables = append(tables, tblName)
	}

	families := familiesMap[tblName]
	if !sliceContains(families, familyName) {
		fmt.Println("Create cf")
		err := adminClient.CreateColumnFamily(context.Background(), tblName, familyName)
		if err != nil {
			panic(err)
		}
	}

	dumpTables(adminClient)
}

func dumpTables(adminClient *bigtable.AdminClient) ([]string, map[string][]string) {
	tables, err := adminClient.Tables(context.Background())
	if err != nil {
		panic(err)
	}

	familiesMap := make(map[string][]string)

	for _, table := range tables {
		tableInfo, err := adminClient.TableInfo(context.Background(), table)
		if err != nil {
			panic(err)
		}
		fmt.Printf("Table name : %s\n", table)

		familiesMap[table] = tableInfo.Families

		fmt.Println("ColumnFamily")
		for _, family := range tableInfo.Families {
			fmt.Printf("  - %s\n", family)
		}
		fmt.Println()
	}
	return tables, familiesMap
}

func scanRowPrefix(table *bigtable.Table, family, rowPrefix, column string) {
	err :=
		table.ReadRows(
			context.Background(),
			bigtable.PrefixRange(rowPrefix),
			func(row bigtable.Row) bool {
				for _, item := range row[family] {
					fmt.Printf("#%s\n", item)
				}
				item := row[family][0]
				fmt.Printf("Row=%s Column=%s Value=%s\n", item.Row, item.Column, string(item.Value))
				return true
			},
			bigtable.RowFilter(bigtable.ColumnFilter(column)))
	if err != nil {
		panic(err)
	}
}

func adminDelete(project, instance string) {

}

func write(table *bigtable.Table, family, row, column, value string) error {
	mut := bigtable.NewMutation()
	mut.Set(family, column, bigtable.Now(), []byte(value))
	rowErr, err := table.ApplyBulk(context.Background(), []string{row}, []*bigtable.Mutation{mut})
	if err != nil {
		return err
	}
	if rowErr != nil {
		return rowErr[0]
	}
	return nil
}

func main() {
	ctx := context.Background()
	project := "dev"
	instance := "dev"

	adminClient, err := bigtable.NewAdminClient(context.Background(), project, instance, option.WithTokenSource(&devTokenSource{}))
	if err != nil {
		panic(err)
	}

	adminCreate(adminClient)

	client, err := bigtable.NewClient(ctx, project, instance, option.WithTokenSource(&devTokenSource{}))
	if err != nil {
		panic(err)
	}

	table := client.Open(tblName)

	// Write to same key
	fmt.Println("Write")
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"1", "Hello11a")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"1", "Hello11b")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"2", "Hello12")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"2", columnNamePrefix+"1", "HelloB21")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"2", columnNamePrefix+"2", "HelloB22")
	if err != nil {
		panic(err)
	}
	fmt.Println()

	// Read
	fmt.Println("Read")
	result, err :=
		table.ReadRow(
			context.Background(),
			rowKeyPrefix+"1",
			bigtable.RowFilter(bigtable.ColumnFilter(columnNamePrefix+"1")))
	if err != nil {
		panic(err)
	}
	for _, v := range result[familyName] {
		fmt.Printf("#%s\n", v)
	}
	fmt.Println(string(result[familyName][0].Value))
	fmt.Println()

	// Scan
	fmt.Println("Scan")
	scanRowPrefix(table, familyName, rowKeyPrefix, columnNamePrefix+"1")
	fmt.Println()

	// Delete row range
	fmt.Println("DropRowRange")
	err = adminClient.DropRowRange(context.Background(), tblName, rowKeyPrefix+"2")
	if err != nil {
		panic(err)
	}
	fmt.Println()

	// Scan
	fmt.Println("Scan")
	scanRowPrefix(table, familyName, rowKeyPrefix, columnNamePrefix+"1")
	fmt.Println()

}

やりかけメモ

func injectEnv(c AppConfig) AppConfig {
	injectEnv2(&c)
	return c
}

func injectEnv2(c interface{}) {
	fmt.Println(">>")
	tmpC := c
	if reflect.TypeOf(c).Kind() == reflect.Ptr {
		tmpC = reflect.Indirect(reflect.ValueOf(c)).Interface()
	}
	t := reflect.TypeOf(tmpC)
	v := reflect.ValueOf(tmpC)
	for i := 0; i < t.NumField(); i++ {
		f := t.Field(i)
		fv := v.Field(i)

		fmt.Println("===")

		fmt.Printf("Name: %s\n", f.Name)
		fmt.Printf("Type: %s(%s)\n", f.Type, f.Type.Kind())

		switch f.Type.Kind() {
		case reflect.Struct:
			injectEnv2(fv.Interface())
			break
		default:
			break
		}
	}
	fmt.Println("<<")
}

gRPC over SSL/TLS with ClientCertification

gRPCを利用したサービスを作ることになったので、その際に調べた認証・認可周りのメモ

キーワード:grpc, grpcs, client認証

gRPCにおける認証方法

gRPCの認証は公式サイトによると以下の方式がビルトインされてるらしい

また、認証情報は以下の方式で設定できる

  • Channel credential
    • SSL証明書などのように gRPC Channel に紐付ける方式
  • Call credential
    • Request時にClientContextに対して設定する方式

今回はgRPCServiceをグローバルに公開する予定があったのと通信を暗号化したかったので、クライアントに自己署名証明書(通称、オレオレ証明書)を払い出して、その証明書のCommonNameで認可を行う SSL/TLS + Channel credential の方式で試してみた。

[利用方法のイメージ]
               +------- Internet ---------+               
+---------+    |                          |
| Client  |  -------(gRPC over SSL/TLS)---------> [gRPCService]
+---------+    |    ( with ClientCert )   | :443
               +--------------------------+

準備

オレオレ認証局

クライアントに払い出す証明書を生成する必要があるので、オレオレ認証局オレオレ証明書の発行はこちらのQiita記事を参考にキーの出力フォーマットをなどを調整したものをGithubに上げた

https://github.com/tmarcus87/grpc-training-self-signed-ca

クライアントコード

Serverが1台の場合

以下のような感じでSSLで待ち受けるサーバを実装して起動すると動くはず

Serverが複数台の場合

公式のドキュメントによるとLoadBalancingは以下の3種の方法が提案されている

  • Proxy方式
  • 負荷分散に対応したClientを利用する方式 GoとJavaだとそういう対応クライアントがあるらしい(未検証)
  • 外部のLoadBalcningServiceを利用する方式
    • ちゃんと読んでないけど、EtcdとかZooKeeperとかEurekaとか使ってDNS応答を変化させてやる方式?

Proxy以外の方式では1IP:1Serverである必要があるっぽい?

Proxy方式でnginxを利用する場合以下の選択肢がある

  • L4 load balancing
    • TCPセッション単位のバランシング
    • TCPセッション単位のバランシングになるので、クライアント数が少ないと負荷分散が難しそう
    • SSL終端とクライアント認証はgRPCServerで行う
  • L7 load balancing
    • gRPCリクエスト単位のバランシングになる(設定次第?)
    • SSL終端とクライアント認証はnginxで行う
    • nginxのconfigでクライアント証明書の情報をアプリ側にmetadata(http header)として流す
    • nginx <--> gRPCサーバ間も暗号化しようと思えば可能ぽっい(あくまでnginx<--> grpcServer間の暗号化でclient<-->nginx間をのデータをそのままバイパスできないっぽい)

// todo Plain接続の場合のアプリ側のコードを書く

// todo L4 Load balancingの場合のnginx.confを貼り付ける

L7 Load balancingの場合のnginx.confは以下の通り

[nginxの設定]
$ cat /etc/nginx/nginx.conf

user  nginx;
worker_processes  1;

error_log  /var/log/nginx/error.log debug;
pid        /var/run/nginx.pid;

events {
    worker_connections  1024;
}

http {
    upstream grpc_servers {
        server grpc_server1:5000;
        server grpc_server2:5000;
    }

    map $http_upgrade $connection_upgrade {
        default upgrade;
        ''        close;
    }

    server {
        listen 443 ssl http2;

        ssl_certificate /opt/server/dev-gateway-api.example.com.crt;
        ssl_certificate_key /opt/server/dev-gateway-api.example.com.pem;

        ssl_client_certificate /opt/ca/ca.crt;
        ssl_verify_client on;

        proxy_set_header ssl-client-cert        $ssl_client_escaped_cert;
        proxy_set_header ssl-client-verify      $ssl_client_verify;
        proxy_set_header ssl-client-subject-dn  $ssl_client_s_dn;
        proxy_set_header ssl-client-issuer-dn   $ssl_client_i_dn;
        grpc_set_header  ssl-client-cert        $ssl_client_escaped_cert;
        grpc_set_header  ssl-client-verify      $ssl_client_verify;
        grpc_set_header  ssl-client-subject-dn  $ssl_client_s_dn;
        grpc_set_header  ssl-client-issuer-dn   $ssl_client_i_dn;

        location /error502grpc {
            internal;
            default_type application/grpc;
            add_header grpc-status 14;
            add_header grpc-message "unavailable";
            return 204;
        }

        location / {
            grpc_pass grpc://grpc_servers;
            
            error_page 502 = /error502grpc;
        }
    }
}

サーバ側で以下のデータがmetadataから取れる

こんな感じの関数をUnaryServerInterceptorに差し込めば
func getMetaFromHeader(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    if md, ok := metadata.FromIncomingContext(ctx); ok {
        json, _ := json.Marshal(md)
        fmt.Println(string(json))
    }
    return handler(ctx, req)
}

こんなデータが取れるはず

{
  ":authority": [
    "grpc_servers"
  ],
  "content-type": [
    "application/grpc"
  ],
  "ssl-client-cert": [
    "-----BEGIN%20CERTIFICATE-----なんやかんや証明書の中身%0A-----END%20CERTIFICATE-----%0A"
  ],
  "ssl-client-issuer-dn": [
    "emailAddress=grpc@example.com,CN=gateway-gateway.example.com,OU=DevelopmentHQ,O=Hoge\\,Inc.,L=Sumida-ku,ST=Tokyo,C=JP"
  ],
  "ssl-client-subject-dn": [
    "emailAddress=service1@example.com,CN=service1,OU=DevelopmentHQ,O=Hoge\\,Inc.,L=Sumida-ku,ST=Tokyo,C=JP"
  ],
  "ssl-client-verify": [
    "SUCCESS"
  ],
  "user-agent": [
    "grpc-go/1.15.0"
  ]
}