商品数の増加を見据えて商品情報作成処理をPythonからBigQueryに移行した話 | SQLによるバッチ処理で工夫した3つのポイント

こんにちは、EC基盤グループ 商品情報基盤チームの江村です。今回は私が所属している商品情報基盤チームで構築、運用を行っているシステムについてお話します。
モノタロウでは以前から記事になっていますが、検索システムの移行を行っており、現在商品検索ページの裏側の検索システムのSolrからElasticsearchへの切り替え*1が完了しました。
私が所属している商品情報基盤チームではElasticsearch、Spannerに入れるための商品情報の作成とSpannerおよび、Spannerからデータを取得するAPIの運用を行っています。今回はその中でもElasticsearch、SpannerのためのBigQueryでの商品情報作成処理について取り上げます。(詳しい検索部分の構成については以前の記事を参照ください)

システム移行の背景

システムの設計にも関わってくるためまずは簡単に移行の背景をお話します。
モノタロウでは現在1900万点に及ぶ商品を取り揃えており、会社の売上としても20%以上の成長が続いています。そのためシステムで扱うデータ量が今後も増えていくことが予想されており、数年前から検索システムの移行に取り組んでいます。商品情報を作成する部分についても同様で今後のモノタロウの成長を見据えて移行を行っています。

ではここからシステムの紹介、設計について紹介をしていきたいと思います。

移行による設計ポイント

「MySQL + Python」の処理を「BigQuery + SQL」に変更

最初の設計ポイントを説明するために、まずは移行元、移行先(私達のチームで管理)のシステムの構成を紹介します。 説明のため図、処理の流れは簡略化していますが、おおまかに以下のようになっています。

移行元

  1. BigQuery上でSQLを実行し中間テーブルを作成
  2. BigQuery上の中間テーブルをEC2上のMySQLにINSERT
  3. EC2上のMySQLのデータを元にPythonで商品情報を作成

ここから私達のチームで管理している移行先のシステムでは商品件数の増加にそなえて下記のような構成に変更を行いました。

移行先

  1. BigQuery上でSQLを実行し中間テーブルを作成
  2. BigQuery上の中間テーブルからさらに、BigQueryに商品データを作成

移行元では「BigQuery + SQL」、「MySQL + Python」を使用していたのに対し、移行時に「MySQL + Python」の処理もすべて「BigQuery + SQL」で処理を行うようにしました。
「MySQL + Python」から「BigQuery + SQL」に移行した背景は大きくは2点あります。1点目は元々一部の処理(移行元1)はBigQueryで処理していたので、使用するツールを統一するという目的がありました。そして、2点目としては背景でもデータ量の増加を見据えていると記載しましたが、BigQueryのスケーラビリティ、マネージドといった特徴を活かして今後の商品件数の増加に対応をするためです。

商品情報のフィールドごとにSQLファイルを分けた

続いては具体的な処理の分け方の話になります。設計の方針として、依存関係を減らすために商品情報のフィールドごとにSQLファイルを分けました。前提として商品情報のフィールドとは「商品名」、「属性」、「価格」のようなものです。
移行元のシステムではPython処理を行っている部分が1ファイル3000行ほどになっており、1つの関数で160行ほどのものがある、条件分岐も多い...などフィールドごとの依存関係がわかりづらくなっている状態でした。
そのためSQLファイルを下記のようなイメージにしています。

price.sql

SELECT
  item_id,
  price
FROM
  item_price
WHERE
  price_type = 1

item_attribute.sql

SELECT
  item_id,
  ARRAY_AGG(attribute) AS item_attributes
FROM
  item_attribute
GRPUP BY
  item_id

item.sql

SELECT
  item_id,
  price,
  item_attributes
FROM
  item_base
LEFT JOIN
  price
ON item_base.item_id = price.item_id
LEFT JOIN
  item_attribute
ON item_base.item_id = item_attribute.item_id

これにより、フィールドの修正時にも影響範囲を限定して修正を行うことができるようになりました。

バッチ処理として冪等性を保つようにした

バッチ処理では当たり前ですが、移行元の設計を引き継ぎ冪等性を保つように設定を入れました。日付ごとに商品情報を作っており、同じ日付であればすでに作成されているテーブルを上書きするようにしています。実現方法としてはDigdagのファイルの_export設定にbqの書き込み時の設定でWRITE_TRUNCATEを設定しています。

workflow.dig

timezone: Asia/Tokyo

_export:
    !include : 'variables.yaml'

    bq:
      write_disposition: WRITE_TRUNCATE

+main:
  +price:
    +execute:
      bq>: sql/price.sql
      destination_table: price

これにより、バッチ処理が途中で失敗したときも状態を気にせず、インプットとなる日付情報だけを気にすればよく運用時に考えることが減るというメリットがあります。

移行元のシステムと日々データの比較を行い開発した

データ作成処理の設計ではないですが、システムの移行を安全に行う上での仕組みを紹介します。「MySQL + Python」の処理を「BigQuery + SQL」に置き換えたため作成された商品情報が移行の際に差異がないことを確認する必要があります。今回はデータの作成処理とは別にデータの比較処理を別のバッチとして構築を行いました。移行元のテーブルは社内の別の仕組みでBigQueryに結果を出力しており、移行先のテーブルとBigQueryを使って比較をしています。(モノタロウでは様々なデータをBigQueryに出力しています
データの比較処理はフィールドごとに行い、いくつの商品で差がでているかやフィールドの差異をBigQueryのテーブルに書き込んでいます。

データの比較処理構成

実際のSQLがだいぶ長いため簡略化していますが以下のイメージです。

  1. 移行元、移行先のテーブルをSELECT
  2. フィールドの結果をJSONにして、ハッシュ化
  3. それぞれのテーブルのIDをキーとしてハッシュの内容を比較

data_check.sql

WITH
  # 1. 移行元、移行先のテーブルをSELECT
  before_data AS (
  SELECT
    item_id AS key,
    price
  FROM
    before_item ), # 移行元のテーブル
  after_data AS (
  SELECT
    item_id AS key,
    price
  FROM
    after_item ), # 移行先のテーブル
  
  # 2. フィールドの結果をJSONにして、ハッシュ化
  before AS (
  SELECT
    key,
    SHA1(TO_JSON_STRING(STRUCT(d))) AS hash_string, # JSONにしてハッシュ化
  FROM
    before_data AS d ),
  after AS (
  SELECT
    key,
    SHA1(TO_JSON_STRING(STRUCT(d))) AS hash_string, # JSONにしてハッシュ化
  FROM
    after_data AS d ),

  # 3. それぞれのテーブルのIDをキーとしてハッシュの内容を比較
  diff AS (
  SELECT
    key,
    CASE
      WHEN before.key IS NULL THEN 'Add' # 移行元になければAdd
      WHEN after.key IS NULL THEN 'Delete' # 移行先になければDelete
      WHEN before.key = after.key AND before.hash_string = after.hash_string THEN 'Equal' # ハッシュが同じならEqual
    ELSE
    'Change' # 該当しない→ハッシュが異なるのでChange
  END
    AS result,
  FROM … # before, afterのテーブルをkeyを元にして結合

そして、差異が出た場合には、Slackにアラート通知が来るようにしています。この仕組みにより日々の開発の中で継続的に検証を行うことができ安全に移行をしています。

Slack通知

移行で苦労した点

今回の移行では当初から設計面で大きな変更はありませんでしたが、設計以外の部分でいくつか苦労した点がありました。

データの比較処理での移行元との差異

具体的には配列内の順序が異なる、値が重複しているなどです。これは移行元のPythonをSQLに置き換えていく中で、コードの理解が浅く処理が抜けている場合や、そもそも移行元の処理の不備がある場合、今回の構成では表されていない商品情報の入力時点での予期しないデータの混入など様々な要因がありました。
また日々のデータ比較なので、季節性の商品など実装後にすぐにデータの差異に気づけないこともありました。

後続システムのと強い依存関係

以前出された記事にもありましたが作成した商品情報を利用する後続のElasticsearch、Spannerへのデータ投入処理と密結合になっており変更が行いづらくなっているといった点があります。

こういった苦労した点の中で細かいデータの差異については各グループ、チームと連携を行うことで1つ1つ地道にデータの差異を解消していくことで初回のリリースを迎えることができましたが、仕組みとしてデータの品質の保証、後続システムとの依存関係の削減についてはまだ解決はできておらず今後の改善ポイントとして残っています。

今後の取り組み

最初のリリースは無事に達成できましたが、まだ並行稼動状態であり、プロジェクトの目標としてはシステムの完全な移行を目指しています。そのためにもデータの移行元との前後比較ではなく、自分たちのシステムだけでデータの品質を保証しなければなりません。具体的にはBigQueryでの季節性など特定のロジックのテストの検証を予定しています。その他にも後続の処理との依存関係を減らす構成変更などチームでは多くの改善に取り組み始めています。

最後に、本記事で紹介したBigQueryを用いたデータパイプライン構築、改善に興味がある方はぜひカジュアル面談でお話できたらと思います!

*1:その他のページについてはまだSolrベースのため現在は並行稼動中です