t.marcusの外部記憶装置

忘備録とかちょっとした考えとかをつらつらと...

Google Cloud bigtable(emulator)をgoから

ローカルでbigtable-emulator動かして、goでCRUDしてみるテスト

// Bigtable example by go.
//
// Install library
//   $ go get cloud.google.com/go/bigtable
//
// Start bigtable emulator by docker
//   $ docker run -it -p 8080:8080 spotify/bigtable-emulator:latest
//
// Need to set Environment variable
//   BIGTABLE_EMULATOR_HOST="127.0.0.1:8080"
//
// Reference URL)
//   https://cloud.google.com/bigtable/docs/samples-go-hello
//   https://github.com/spotify/docker-bigtable
//   https://github.com/Shopify/bigtable-emulator

package main

import (
	"context"
	"fmt"

	"golang.org/x/oauth2"
	"google.golang.org/api/option"

	"cloud.google.com/go/bigtable"
)

const (
	tblName          = "tbl"
	familyName       = "cf1"
	rowKeyPrefix     = "rowKey"
	columnNamePrefix = "colKey"
)

type devTokenSource struct{}

func (devTokenSource) Token() (*oauth2.Token, error) {
	return new(oauth2.Token), nil
}

func sliceContains(list []string, target string) bool {
	for _, s := range list {
		if s == target {
			return true
		}
	}
	return false
}

func adminCreate(adminClient *bigtable.AdminClient) {
	tables, familiesMap := dumpTables(adminClient)

	if !sliceContains(tables, tblName) {
		fmt.Println("Create table")
		err := adminClient.CreateTable(context.Background(), tblName)
		if err != nil {
			panic(err)
		}
		tables = append(tables, tblName)
	}

	families := familiesMap[tblName]
	if !sliceContains(families, familyName) {
		fmt.Println("Create cf")
		err := adminClient.CreateColumnFamily(context.Background(), tblName, familyName)
		if err != nil {
			panic(err)
		}
	}

	dumpTables(adminClient)
}

func dumpTables(adminClient *bigtable.AdminClient) ([]string, map[string][]string) {
	tables, err := adminClient.Tables(context.Background())
	if err != nil {
		panic(err)
	}

	familiesMap := make(map[string][]string)

	for _, table := range tables {
		tableInfo, err := adminClient.TableInfo(context.Background(), table)
		if err != nil {
			panic(err)
		}
		fmt.Printf("Table name : %s\n", table)

		familiesMap[table] = tableInfo.Families

		fmt.Println("ColumnFamily")
		for _, family := range tableInfo.Families {
			fmt.Printf("  - %s\n", family)
		}
		fmt.Println()
	}
	return tables, familiesMap
}

func scanRowPrefix(table *bigtable.Table, family, rowPrefix, column string) {
	err :=
		table.ReadRows(
			context.Background(),
			bigtable.PrefixRange(rowPrefix),
			func(row bigtable.Row) bool {
				for _, item := range row[family] {
					fmt.Printf("#%s\n", item)
				}
				item := row[family][0]
				fmt.Printf("Row=%s Column=%s Value=%s\n", item.Row, item.Column, string(item.Value))
				return true
			},
			bigtable.RowFilter(bigtable.ColumnFilter(column)))
	if err != nil {
		panic(err)
	}
}

func adminDelete(project, instance string) {

}

func write(table *bigtable.Table, family, row, column, value string) error {
	mut := bigtable.NewMutation()
	mut.Set(family, column, bigtable.Now(), []byte(value))
	rowErr, err := table.ApplyBulk(context.Background(), []string{row}, []*bigtable.Mutation{mut})
	if err != nil {
		return err
	}
	if rowErr != nil {
		return rowErr[0]
	}
	return nil
}

func main() {
	ctx := context.Background()
	project := "dev"
	instance := "dev"

	adminClient, err := bigtable.NewAdminClient(context.Background(), project, instance, option.WithTokenSource(&devTokenSource{}))
	if err != nil {
		panic(err)
	}

	adminCreate(adminClient)

	client, err := bigtable.NewClient(ctx, project, instance, option.WithTokenSource(&devTokenSource{}))
	if err != nil {
		panic(err)
	}

	table := client.Open(tblName)

	// Write to same key
	fmt.Println("Write")
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"1", "Hello11a")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"1", "Hello11b")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"1", columnNamePrefix+"2", "Hello12")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"2", columnNamePrefix+"1", "HelloB21")
	if err != nil {
		panic(err)
	}
	err = write(table, familyName, rowKeyPrefix+"2", columnNamePrefix+"2", "HelloB22")
	if err != nil {
		panic(err)
	}
	fmt.Println()

	// Read
	fmt.Println("Read")
	result, err :=
		table.ReadRow(
			context.Background(),
			rowKeyPrefix+"1",
			bigtable.RowFilter(bigtable.ColumnFilter(columnNamePrefix+"1")))
	if err != nil {
		panic(err)
	}
	for _, v := range result[familyName] {
		fmt.Printf("#%s\n", v)
	}
	fmt.Println(string(result[familyName][0].Value))
	fmt.Println()

	// Scan
	fmt.Println("Scan")
	scanRowPrefix(table, familyName, rowKeyPrefix, columnNamePrefix+"1")
	fmt.Println()

	// Delete row range
	fmt.Println("DropRowRange")
	err = adminClient.DropRowRange(context.Background(), tblName, rowKeyPrefix+"2")
	if err != nil {
		panic(err)
	}
	fmt.Println()

	// Scan
	fmt.Println("Scan")
	scanRowPrefix(table, familyName, rowKeyPrefix, columnNamePrefix+"1")
	fmt.Println()

}