BigQueryエミュレータを使ったETLのインテグレーションテスト

TL;DR

BigQuery Emulatorfake-gcs-server を組み合わせることでbqコマンドでCSVファイルを読み込んでETLのインテグレーションテストができた。

はじめに

こんにちは。先日こちらの記事を書いたCTO-Officeの藤本です。そこでは書ききれなかったETLについて書いておきたいと思います。

ビッグデータを扱うETLのテストを行いたい場合に、DBからExtractするEの部分など、ユニットテストやモックでは担保できないところが出てきます。

そのようなインテグレーションテストに、OSSのBigQuery Emulatorを活用できる場合があります。

背景

モノタロウでは、マイクロサービスで実装したGoのロジックをApache Beam Go SDKでラップして、Cloud Dataflowで定期実行することで商品の出荷目安のデータを生成して様々なチャネルで活用しています。

こうしたビジネス影響の大きいETLパイプラインでは、マイクロサービスとロジックを共有することはメリットがある一方で、マイクロサービスの変更に追随しつつ品質を担保することが課題となります。

マイクロサービスのロジックはユニットテストである程度網羅されているため、ETLパイプラインの構築やクエリを含むインテグレーションテストは有力な手法の一つです。

そこで、今回はBigQuery Emulatorを使ったインテグレーションテストを導入しました。

BigQuery Emulatorとは

Googleから公式には提供されていませんが、goccyさんをはじめ有志によって開発されています。(以下BQ Emuと省略)

提供されているイメージで簡単に起動することができ、Docker Composeであれば、

docker-compose.yml

services:
  bq:
    image: ghcr.io/goccy/bigquery-emulator:latest
    platform: linux/amd64
    ports:
      - "9050:9050"
      - "9060:9060"
    command: "bigquery-emulator --project=test"

で、

docker-compose up

とすれば起動します。

ではbqコマンドを使って基本的な操作を試してみましょう。

スキーマは、

schema/fruits.json

[
  {
    "name": "id",
    "type": "INT64"
  },
  {
    "name": "name",
    "type": "STRING"
  },
  {
    "name": "unit",
    "type": "STRING"
  }
]

を使います。(BigQueryのスキーマについては公式ドキュメントをご参照ください)

まず、データセットを作ってみます。

❯ bq mk --api http://0.0.0.0:9050 --project_id=test --dataset products                                             
Dataset 'test:products' successfully created.

上記のスキーマファイルを使ってテーブル作成

❯ bq mk --api http://0.0.0.0:9050 --project_id=test --table products.fruits ./schema/fruits.json                          
Table 'test:products.fruits' successfully created.

INSERTしてみます。

❯ bq --api http://0.0.0.0:9050 query --project_id=test "INSERT products.fruits(id, name, unit) VALUES (1, 'りんご', '個')"

SELECTしてみます。

❯ bq --api http://0.0.0.0:9050 query --project_id=test "SELECT * FROM products.fruits"                                 

+----+--------+------+
| id |  name  | unit |
+----+--------+------+
|  1 | りんご | 個   |
+----+--------+------+

このように、基本的なコマンドは普通に使えます。

CSVデータをロードする

BQ Emu では --data-from-yaml オプションでスキーマとレコードを渡して初期化することもできますが、テストケースのデータをyamlで管理するのはそれなりに手間なのでcsvをロードしたいケースもあると思います。

例えば、こういうCSVファイルがあったとします。

data/bucket/fruits.csv

id,name,unit
1,りんご,個
2,すいか,玉
3,バナナ,本
4,ぶどう,房

bqコマンドでローカルのCSVファイルを読み込もうとするとどうなるでしょうか?

❯ bq load --api http://0.0.0.0:9050 --project_id=test \
  --source_format=CSV --skip_leading_rows=1 \
  products.fruits ./data/bucket/fruits.csv
BigQuery error in load operation: unspecified job configuration query

見たこともないエラーが出ました。ダメそうです。かといってプログラムを書くのも面倒なので、別の方法を考えたいと思います。

本物のBQではGCSからデータをロードすることができて、BQ Emuも fake-gcs-server からロードすることができます。

先ほどの構成に追加するとこうなります。

docker-compose.yml

services:
  bq:
    image: ghcr.io/goccy/bigquery-emulator:latest
    platform: linux/amd64
    ports:
      - "9050:9050"
      - "9060:9060"
    environment:
      - STORAGE_EMULATOR_HOST=http://gcs:4443
    command: "bigquery-emulator --project=test"
  gcs:
    image: fsouza/fake-gcs-server:latest
    ports:
      - 4443:4443
    volumes:
      - type: bind
        source: ./data
        target: /data
    command: -scheme http

gcsを追加して、bqの方にSTORAGE_EMULATOR_HOSTの環境変数を追加しました。

では、先ほどのfruits.csvを読み込んでみましょう。

❯ bq mk --api http://0.0.0.0:9050 --project_id=test --dataset products
Dataset 'test:products' successfully created.

❯ bq mk --api http://0.0.0.0:9050 --project_id=test --table products.fruits ./schema/fruits.json
Table 'test:products.fruits' successfully created.

❯ bq load --api http://0.0.0.0:9050 --project_id=test --source_format=CSV \
  products.fruits gs://bucket/fruits.csv

❯ bq --api http://0.0.0.0:9050 query --project_id=test "SELECT * FROM products.fruits"
+----+--------+------+
| id |  name  | unit |
+----+--------+------+
|  1 | りんご | 個   |
|  2 | すいか | 玉   |
|  3 | バナナ | 本   |
|  4 | ぶどう | 房   |
+----+--------+------+

問題なく入りました。

GitHub Actionsのトリガーで実行できるようにする

ローカルのbqコマンドは通りましたが、インテグレーションテストは自動化したいものです。

これまでの手順をシェルスクリプトにするとこのようになります。

tool/runner.sh

#!/bin/bash

DATASET="products"
TABLE_NAME="fruits"
SCHEMA_FILE="${TABLE_NAME}.json"
CSV_FILE="${TABLE_NAME}.csv"

bq mk --api ${ENDPOINT} --project_id=${PROJECT} --dataset ${DATASET}

if [[ -f ${SCHEMA_PATH}/${SCHEMA_FILE} && -f ${BUCKET_PATH}/${CSV_FILE} ]]; then
    echo "Creating table ${TABLE_NAME}"
    bq mk --api ${ENDPOINT} --project_id=${PROJECT} --table ${DATASET}.${TABLE_NAME} \
        ${SCHEMA_PATH}/${SCHEMA_FILE}

    echo "Loading ${CSV_FILE} to ${TABLE_NAME}"
    bq load --api ${ENDPOINT} --project_id=${PROJECT} --source_format=CSV \
        ${DATASET}.${TABLE_NAME} gs://bucket/${CSV_FILE}
fi

bq query --api ${ENDPOINT} --project_id=${PROJECT} \
    "SELECT * FROM ${PROJECT}.${DATASET}.${TABLE_NAME}"

これを、Docker Composeで実行すれば良さそうです。

他のコンテナの起動を待ってから処理を開始します。(先ほどと同じところは省略しています)

docker-compose.yml

services:
  bq:
      ...
  gcs:
      ...
  runner:
    image: google/cloud-sdk:alpine
    depends_on:
      bq:
        condition: service_started
      gcs:
        condition: service_started
    command: "/app/tool/runner.sh"
    volumes:
      - type: bind
        source: ./schema
        target: /app/schema
      - type: bind
        source: ./data
        target: /app/data
      - type: bind
        source: ./tool
        target: /app/tool
      - type: bind
        source: ~/.config/gcloud
        target: /root/.config/gcloud
    environment:
      ENDPOINT: http://bq:9050
      PROJECT: test
      SCHEMA_PATH: /app/schema
      BUCKET_PATH: /app/data/bucket

これを以下のように実行します。

❯ docker-compose run runner
Dataset 'test:products' successfully created.
Creating table fruits
Table 'test:products.fruits' successfully created.
Loading fruits.csv to fruits
+----+--------+------+
| id |  name  | unit |
+----+--------+------+
|  1 | りんご | 個   |
|  2 | すいか | 玉   |
|  3 | バナナ | 本   |
|  4 | ぶどう | 房   |
+----+--------+------+

期待通り入ったようです。でも、上のdocker-compose.yamlをよく見ると ~/.config/gcloudがbind されていますね。実は、bqコマンドはgcloudの設定がないとエラーになってしまいます。

このままではGitHub Actionsで動かすときにエラーになってしまいます。

力技で恐縮ですが、モノタロウではGitHub EnterpriseのWorkload IdentityでGCPのService Accountのクレデンシャルを取得することができるので、(本来の目的とは違う気はしつつも)それで回避しました。

GitHub Actionsのyamlに以下のようなstepを挟み込むことで、gcloudの設定ファイルが作られました。(環境固有なのでオプションは省略しています)

      - uses: actions/checkout@v4

      ...

      - id: auth
        name: Authenticate to Google Cloud
        uses: google-github-actions/auth@v2
        with:
          workload_identity_provider: ...
          service_account: ...

      ...

      - name: 'Set up Cloud SDK'
        uses: 'google-github-actions/setup-gcloud@v2'

まとめ

一筋縄ではいきませんでしたが、GHAのトリガーで実行できるようになったので、ETLパイプラインがデグレードする心配からは解放されました。

やはり、bqコマンドのようにエミュレータを想定していないツールをどうするかが悩みの種です。 公式のツールを使ってスマートにテストできれば最高ですが、活用できるものをうまく使ってチャレンジすることで道が開けることもあるのではないでしょうか。

モノタロウでは随時エンジニアの採用募集をしております。この記事に興味を持っていただけた方や、モノタロウのエンジニアと話してみたい!という方はカジュアル面談 登録フォームからご応募お待ちしております。

おまけ:Apache Beam GoからBQ Emuを使う

ここまで書いてきたように、BQ Emuをクライアントがサポートしていないことはしばしばあります。バイナリではどうしようもありませんが、ソースコードがあれば該当モジュールに手を入れることで対応できる場合があります。

例えば、Apache Beam Go SDKのbigqueryioはapi-endpointのオプションがなくそのままでは接続できませんが、bigqueryioに以下のようにオプションを追加したらできました。

bigqueryread.go

func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit func(beam.X)) error {
    opts := []option.ClientOption{}
    if f.Options.Endpoint != "" {
        opts = append(opts,
            option.WithEndpoint(f.Options.Endpoint),
            option.WithoutAuthentication())
    }
    client, err := bigquery.NewClient(ctx, f.Project,
        opts...)