SQLを使った監視でデータ基盤の品質を向上させる

こんにちは、データ基盤グループの吉田(id:syou6162)です。データ基盤グループでは安定してデータを利用できるように様々な取り組みを行なっています。本エントリでは、データ品質に問題がある場合にすぐに気付けるようにしたSQLによる監視の仕組みを紹介します。

背景

データ基盤の運用をしていると、日々様々なトラブルと向き合う必要があります。例えば、以下のようなものがあります。

  • 他チームがdailyで転送しているデータがバッチの失敗により遅れている
  • TerraformなどのIaCで承認済みビューの権限管理を行なっているが、コードの設定ミスで意図せぬ状態になり、利用者が必要なデータにアクセスできない
  • オペレーションのミスにより、BigQueryの特定のエラーレートが急激に増加する
  • 重複を許さないはずのカラムに重複したデータが入っている
  • 値が0から100の間に収まるはずのカラムにマイナスの値が入っている

データ利用者からの指摘からこういったトラブルに気付くことも少なくありませんでした。データ基盤側でなるべく早期に気付ける仕組みを取り入れたいと思い、監視の基盤を作ることにしました。

SQLを使った監視基盤の構築

とはいえ、個別の項目に対してそれぞれbashやPythonなどのスクリプトを書くと、メンテナンスが大変です。本来やりたいこととしては「満たすべき条件を事前に記述しておいて、そこから逸脱した場合にアラートを出す」でした。「満たすべき条件を事前に記述する」の部分に関してはSQLで書けると、比較的学習コストが低くて済みます(個別のsdkの使い方などを学習する必要がないため)。

モノタロウではデータ基盤にBigQueryを採用していますが、BigQueryはデータセットやジョブなどの情報を提供してくれるINFORMATION_SCHEMAがあります。また、それ以外の情報もCloud Asset Inventoryから取得することができ、定期的にBigQueryにExportしています。これらの情報は全てSQLを通じてアクセスできるため、今回の監視の要件にとても適していました。

監視を行なうためのスクリプトとしては、以下の簡単なPythonスクリプトを用意しました。新たに監視項目を増やしたい場合、監視設定が書かれているyamlファイルを追加すればよいだけなので、メンテナンスもこのPythonスクリプトだけで済みます。このスクリプトをArgo Workflowsに乗せて、定期実行しています。

余談になりますが、最初はbqコマンドとjqコマンドを組合せたbashスクリプトで動かしていました。しかし、BigQuery側から5xxエラーが返ってきた場合などのエラーハンドリングのしやすさを考慮して、Pythonスクリプトに後日書き換えたという背景があります。

import glob
import yaml
import sys

from google.cloud import bigquery

files = glob.glob("sql_templates/**/*.yaml")
client = bigquery.Client()

is_failed = False

for file in files:
    print(f"Checking {file}...")
    with open(file, 'r') as yml:
        monitoring_items = yaml.safe_load(yml)
        for monitor in monitoring_items:
            query = monitor["sql"]
            query_job = client.query(query)
            try:
                rows = query_job.result()
                records = [dict(row) for row in rows]
                if records != monitor["expect"]:
                    is_failed = True

                    print("====================", file=sys.stderr)
                    print(f"Unexpected query result for {file}", file=sys.stderr)
                    print(f"Executed SQL:\n{query}", file=sys.stderr)
                    print(f"Expected result:\n{monitor['expect']}", file=sys.stderr)
                    print(f"Actual result:\n{records}", file=sys.stderr)
            except bigquery.exceptions.GoogleAPICallError as e:
                print(e)

if is_failed:
    sys.exit(1)

実際の監視の設定としては、以下のようなyamlファイルを書くだけです。

- description: SQLのサンプルです。サンプルなので、特にアクションを起こす必要はありません。
  sql: |-
    SELECT
      1 + 1 AS calc
  expect: [{"calc": 2}]

設定項目としては以下の3つがあります。

  • sql: 監視したい項目を記述したSQL
  • expect: SQLを実行した結果、期待される内容。一致しなかった場合にアラート通知される
  • description: 監視項目の説明やアラートが出た際に取るべきアクションが書かれる

通知自体はPythonスクリプトでは行なわず、STDERRからCloud Loggingに流すだけに留めており、ログをカウンタ指標に変換することでCloud Monitoringから通知をさせるという仕組みを取っています。Cloud Monitoringで通知先の設定を一元的に管理できたり、現在起きている障害をインシデントの一覧で把握できるといったメリットがあり、この方法を取っています。私のブログに詳細を書いているので、興味がある方はそちらをご覧ください。

実際の監視項目例

実際の例を挙げてみましょう。

他チームがdailyで転送しているデータがバッチの失敗により遅れていないか

データ転送が失敗 / 必要以上に遅延していないかを監視する項目です。BigQueryのINFORMATION_SCHEMA.TABLESにテーブルのメタ情報が格納されているため、簡単なクエリで遅延を発見できます。

- description: |-
    my-project.my_dataset内のMyTableテーブル(日付別テーブル)に最新のスナップショットが転送されているか。バッチの若干の転送の遅れは許容したいため、前日のテーブルが存在しているかを確認する
  sql: |-
    SELECT
      COUNT(*) AS cnt
    FROM
      `my-project.my_dataset.INFORMATION_SCHEMA.TABLES`
    WHERE
      table_name = "MyTABLE_" || FORMAT_DATE("%Y%m%d", DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 1 DAY))
      AND table_type = "BASE TABLE"
  expect: [{"cnt": 1}]

BigQueryのエラーレートが急激に増加していないか

何かのオペレーション起因でジョブのエラーレートが想定以上に増えていないかを監視します。こちらもINFORMATION_SCHEMA.JOBS_BY_ORGANIZATIONで情報が取れるため、簡単に実装することができます。error_result.messageに詳細なエラーメッセージが入っているため、より細かいエラー毎の挙動を監視したい場合はこれを利用するとよいでしょう。

- description: |-
    Jobで権限エラーになっている割合0.1%以下になっているか確認する
    エラーになる場合は、xxxで具体的なエラーを確認してください。
  sql: |-
    SELECT 
      COUNTIF(error_result.reason = "accessDenied") / COUNT(*) < 0.001 AS access_denied_error_rate_lower_than_0_0_0_1,
    FROM 
      `region-us`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
    WHERE 
      end_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 30 MINUTE)
  expect: [{"access_denied_error_rate_lower_than_0_0_0_1": true}]

承認済みビューの設定が意図せず消えていないか

特定のメンバーにのみ情報を公開したい場合や、さらに公開する情報を行や列単位で制限したい場合、承認済みビューを設定することがあると思います。 *1。スクリプトやIaCで承認済みビューを設定する場合、設定のミスにより意図せず既存の設定を消してしまう場合があります。レビューで気付けるとよいですが、そうでない場合でもすぐに気付けるように監視項目として設定しています。

一般的な権限設定であればCloud Asset InventoryからBigQueryにExportしておけばよいですし、承認済みビューの場合はCloud Asset Inventoryでカバーしきれていない部分があったため、独自にExportしています。そのため、INFORMATION_SCHEMAを使った場合ほどすぐに使ってもらえるものにはなっていないですが、雰囲気を感じ取ってもらえればと思います。

- description: |-
    my-project.my_datasetに対する承認済みビューの数が100以上あるかを確認。100を切っている場合、承認済みビューに関するオペレーションで設定が吹き飛んでいる可能性がある(Terraformの設定ミス、Python SDK経由などが考えられる)
  sql: |-
    WITH
      latest_bq_auth_view_datetime AS (
        SELECT
          MAX(DATETIME_TRUNC(DATETIME(record_time), HOUR)) AS record_time
        FROM
          my-project.bq_authview_info.bq_authview_access
        GROUP BY
          DATETIME_TRUNC(DATETIME(record_time), HOUR)
        ORDER BY
          record_time DESC
        LIMIT
          1
      )
    SELECT
      COUNT(*) >= 100 AS has_more_than_100_auth_views
    FROM
      my-project.bq_authview_info.bq_authview_access
    WHERE
      DATETIME_TRUNC(DATETIME(record_time), HOUR) = (SELECT record_time FROM latest_bq_auth_view_datetime)
      AND view_project_id = "my-project"
      AND view_dataset_id = "my_dataset"
  expect: [{"has_more_than_100_auth_views": true}]

今後の展望

「このカラムは0から100までの値を取る」などの監視もこれまで同様に行なうこともできますが、そういった制約はデータ管理者だけでなく、データ活用者側にも有用な情報です。データ活用者にもこういったメタデータの情報を提供できるように、データ基盤グループではdbtDataformの採用を検討しています。

モノタロウでは社員の約6割がBigQueryでクエリを実行し、日々意思決定を行なっているデータドリブンな会社であり、データ基盤グループに期待される役割も大きくなっています。データ品質を高めることでデータ活用者の生産性を挙げることに興味を持つ方のご応募をお待ちしております。

*1:BigQueryでは承認済みビュー以外にも行レベル列レベルのセキュリティの設定も可能です