66分かかる同期処理を10分以内に短縮せよ!~商品情報同期システムでの、処理速度と運用の改善~

はじめに

この記事では、モノタロウの基幹系を構成するシステムの一つである、商品情報管理システム(PIM:Product Information Management システム)の導入プロジェクトで、商品情報を基幹系と同期するシステム(商品情報同期機能)の性能や運用環境の改善を行った話をご紹介します。

背景

モノタロウの基幹系は、長年内製のシステムで支えられてきました。基幹系のシステムは、少数のWebアプリケーションと多数のバッチから構成されています。中でも商品情報の管理に関するシステムは、在庫や仕入先に関するシステムと一体化していて、商品情報に関する数多くのマスタメンテナンス画面を備えたやや複雑なシステムです(図1)。

図1 基幹系の概略図

当社のシステムは、もともと自分たちのビジネスに必要な機能を提供する手頃なパッケージ製品がなかったため、すべてを内製でまかなってきたという経緯があります。 しかし、近年では高度な機能を備え、カスタマイズ性の高い基幹系のパッケージ製品が多く登場して、私たちのニーズを満たせるようになっています。 基幹系の業務の中でも、例えば商品情報のマスタデータ管理業務は、比較的標準化された業務です。こうした業務のためのシステムをパッケージに移行して、システムの内製を減らすことで、リソースをビジネスの成長につながる他の領域に集中したいという背景から、モノタロウでは基幹系の機能の一部をパッケージ製品に移行するプロジェクトを推進してきました。その中の一つが商品情報管理システムの移行プロジェクトです。

このプロジェクトでは、それまで内製の商品情報マスタDBと管理画面、バッチ群によって実現していた商品情報管理業務を、パッケージ上で実現することを目指しています。 とはいっても、商品情報管理にまつわる機能は幅広く、パッケージへは段階的に移行を行う必要があります。そのときに必要になるのが、新旧システム間のデータ同期です。

図2 商品情報管理システムと基幹系の同期

商品情報管理システム導入のPJでも、既存の基幹系とデータを同期する必要があり、図2のような同期のための仕組み(商品情報同期システム、以下、同期システム)を独自に開発しました。

しかし、この同期システムの開発が進んだところで、同期処理のスループットが想定より遅いことがわかってきました。設計上は、1万商品(と、それに関連する付帯情報すべて)の同期を9分以下で行える目論見だったのですが、最初の実装では66分もかかってしまい、大幅にオーバーでした。性能を上げるために並列化を試みたものの届かず、なんとかしたいということで、速度改善に取り組むことになりました。

ところで、速度改善に取り組み始めたとき、同期システムは本運用に向けた準備も進んでいたのですが、運用にまつわる問題がいくつかありました。 まず、同期システムはGoogle Cloud PlatformのGCEインスタンス上にdockerホストを立ち上げ、その上で動かしていたのですが、デプロイ手順が手動になっていました。並列化で台数が増えると、それらのホストに対して繰り返しリリース作業を行うのが大変になっていました。 また、Cloud Logging に出力していたログがプレーンテキスト形式で、絞り込みや検索がしづらい状態でした。 加えて、監視がインフラ側のみで、アプリケーションの挙動を監視する仕組みがなく、要求したスループットが出ているか、データ同期の遅延が起きていないかという状況の把握が困難でした。速度改善と合わせて、こうした課題の改善も行いました。

以降の節では、処理速度の改善と、開発・運用環境の改善について説明します。

処理速度の改善

同期システムの処理速度の改善にあたっては、以下の3つに取り組みました。

  1. プロファイリングとコードレビューによって、遅い処理を特定しロジックを改善しました。
  2. 並列化パラメタを調整して、効果的に並列処理が行われるようにしました。
  3. 1ホストの処理性能の壁を超えるために、同期処理を複数台での分散処理に変えました。

遅い処理の特定と改善

まず行ったのは計測でした。処理中の各メソッドの実行時間を計測してもらい、積み上げでどの部分が全体の処理時間にインパクトを与えているのかを調べました。 「実行時間が積み上げでせいぜい数秒のロジックの処理時間を50%削減する」よりも、「実行時間が数分にわたるロジックの処理時間を10%削減する」ほうがずっと削り代があります。短い時間で大幅な改善を(できれば必要最小限に)行いたかったので、削減効果が美味しそうなロジックに対象を絞りました。

次に、処理時間の長いロジックのコードレビューを行いました。 システム間のデータ同期は、単純に言えば同期元の更新内容を同期先に書き込むだけの単純な作業です。そこに、同期データの検証や、同期先に合わせたデータの組み換えなどの処理が組み合わさってできています。 プログラムの処理時間が長くなる理由は、I/O待ちか、CPUを酷使する処理か、プログラムを動かす環境自体の制約(CPUを効率的に使えない言語やライブラリ)や、性能低下(例えば、スラッシングのような現象が起きている)です。 時間がかかっているロジックの処理内容と処理時間との関連を調べ、不自然な部分がないかコードを調べていきました。 「不自然」というのは、たとえば1万件の同期処理の中で何十万回も呼び出されていてトータルで時間を食っていたり、入出力を行っていないのに、1回の処理に何ミリ秒もかけている部分です。同期システムで行っている単純なデータの検証や組み換えでは、そんなに時間がかからないはずだからです。 深いネストを使って探索を行っている部分はないか、同じ値を計算する処理を繰り返し行っていないかといった検査を行いました。

その結果、いくつか処理性能に影響しそうな実装がみつかりました。 その一つは、例えば以下のように、木構造をした商品情報を再帰的に探索していて、何度も同じ情報を探索しているケースです。

   def top_entity(self, entity):
        """指定したEntityのRootEntityを取得
        """
        if self._entity_level(entity) == 1:
            return entity
        return self.top_entity(self.parent_entity(entity))

    def parent_entity(self, entity):
        """指定したEntityの親Entityを取得
        """
        for root_entity in self.metaData['entities']:
            for sub_entity in root_entity['entities']:
                if entity == sub_entity['identifier']:
                    return root_entity['identifier']
                for child_entity in sub_entity['entities']:
                    if entity == child_entity['identifier']:
                        return sub_entity['identifier']
        return None

このコードは、図3のように、木構造になったエンティティ(商品情報)の最上位(ルート)のエンティティを探すだけの機能ですが、すでに探索したことのあるエンティティのルートでも、繰り返し探索してしまいます。 しかも、中で使われている「親エンティティの探索」ロジックは、ルートから下って、標的のエンティティを子に持つ親を線形に探索しています。

このメソッドでほとんどのエンティティが探索されることはわかっているし、エンティティのデータはルートから網羅的に探索ができる構造なので、クラスの初期化時に、エンティティの構造を上からたどって、各エンティティのルートを一度だけ計算してキャッシュしておけば効率化できます。同時に親子のエンティティの関係もキャッシュしておけば、非効率な線形探索を辞書の参照に変えられ、計算量を減らせます。

図3 ツリー構造のルートの探索

経験的に、内製アプリケーションの実装では、目的の機能を実現するためにアルゴリズムを実直に書いた結果「入れ子の構造をループや再帰呼び出しで何度も探索する」「同じ探索を繰り返す」ことがよくあります。このようなロジック上の非効率な部分を一つ一つ取り除いてゆき、1商品あたりの同期時間を少しづつ削りました(図4)。

他にも、同期元の商品情報管理システムのAPIから、HTTP API呼び出しでデータを取得するときに、必要に応じて五月雨式に呼び出している部分がありました。HTTP APIは呼び出しの遅延が大きく、細かく呼び出すと処理速度の大きな足かせとなります。まとめて呼び出すことで、データ取得にかかる時間が大幅に短縮されました(図4、最後の短縮)。

図4 処理時間の改善。横軸は左端が最適化前、右がロジック改善済。 縦軸は10000件の処理時間(秒)。同期処理中のステップを積み上げで表示。

並列化パラメタの調整

速度改善に取り組み始めた時、すでに並列化による処理速度向上が試みられていたのですが、並列化のパラメタ(ワーカー数)に改善の余地がありました。

同期システムでは、マルチプロセスの並列化は主にSQLの実行に使われていました。そのため、並列化した部分の処理時間はほぼ I/O だったのですが、それに対してワーカー数が少なかったのです。 CPUを酷使するような処理を並列化するのなら、システムのコア数よりも多く並列処理を展開してもメリットはありませんが、I/O待ちが多い場合では、オーバーコミット気味に並列化を行ったほうがワーカーを効率的につかえます(図5)

図5 ワーカー数が少なすぎると並列化の効果が出ない

ワーカー数ハードコードされていた部分もあったので、可変にしてチューニング可能にし、最適な数値を探ってもらいました。

同期処理の分散処理化

マルチプロセスやマルチスレッドを駆使して、CPU時間を最大限に活用しつつ処理しても、システムのコア数などの制約で、やはり要求性能には届かない場合があります。 同期システムのチューニングでも、上で説明した最適化の組み合わせで、処理時間を60%程度削減できたのですが、1台で制限時間内に同期を完了させるには足りませんでした。 そこで、図6のように、同期処理を行うためのクラスタを構成して、複数台のマシンで処理を実行することにしました。

図6 5台のクラスタで同期処理を実行する

これまでの最適化で、同期処理の制限時間内で余裕をもって2000件の商品情報を同期できる状態までこぎつけていたので、5台のクラスタで2000件づつを分散処理する方式とし、要求性能をクリアするところまでこぎつけました。

開発・運用環境の改善

背景でも触れましたが、同期処理の最適化と平行して、開発・運用環境の改善も行いました。

当時、同期処理の機能面の開発はほぼ完了しつつあり、本番運用に向けた準備が始まっていたのですが、ログが分類されておらず読みづらかったり、同期処理の進捗状況を監視する方法がなく、開発でも連続運用でも不便な部分がありました。 また、デプロイの一連の手順が手動ステップで構成されていて、デプロイ手順が煩雑でした。複数台で分散処理する構成に変わり、7台のホストに順にデプロイしなければなりませんでした。デプロイターゲットも開発・ステージング・本番環境をふくめ複数ある中で、ターゲットごとの違いを確認しながら手動でデプロイを行っていくのは大変な作業でした。 そこで、以下のような運用環境の改善に取り組みました。

  1. リクエスト情報やリクエスト追跡用のIDを埋め込んだ構造化ログを書けるようにし、Cloud Loggingにログを集約する仕組みを作りました。
  2. Datadogを導入し、アプリの状態をDatadogでモニタできるようにしました。
  3. 同期処理クラスタのホストにコンテナをデプロイするためのツールを開発し、デプロイ作業の自動化を図りました。

ログの構造化と追跡用のIDの埋め込み

同期システムは GCP 上で構築されていたので、サービスアカウントを使えば比較的簡単に Cloud Logging にログを書き込むことができました。 Cloud Loggingでは、アクセスログのように、HTTPのレスポンスステータスやレイテンシといった情報を構造化して送ると、自動的にログのフィールドとして認識され、ログコンソールで絞り込みできます。そこで、図6の黄色線の経路のように、同期システムのログフォーマッタをカスタマイズして、ログを JSON 形式で出力するようにしました。当初、この方法はうまく行ったかのように見えたのですが、実は Docker を使って構築したコンテナ環境には、長いログを書くと分割されるという制約があり、分断されて JSON として認識されないログがたくさんできてしまいました。そのため、図7の青色線の経路のように、fluentd をサイドカーコンテナとして立ち上げ、同期システムから fluentd プロトコルで直接構造化ログを送信する形にしました。

図7 ログ収集の仕組み

構造化ログに対応したことで、アプリケーションの付帯情報をログに簡単に付加できるようになりました。リクエストの追跡がその一つです。 同期処理を複数台に分散する構成になったことで、同期システムには「同期開始を指示するバッチ」「同期開始指示を受けるマスタ」「分散で同期処理を行うワーカー」の3つのプレイヤーができ、それぞれがHTTP APIで通信しています。バッチから同期開始を指示するときにHTTPヘッダで追跡用のID=Request IDを送信しておき、マスタやワーカーではヘッダを伝播しつつ、ログを書く時にRequest IDをフィールドとして含めることで、Cloud Logging上では同期の一連の動作を串刺しで検索できるようになりました。

図8 request_idの伝播とログの串刺し検索

Datadogによるアプリケーション監視

実際に本運用が始まると、同期の失敗や障害の発生を検知して、速やかにリカバリを実施する必要があります。また、同期処理のスループットや処理ホストの負荷を監視して、性能不足におちいる前に対処しなければなりません。ホストの死活監視だけでなく、アプリケーションの挙動も込みで監視を行いたかったので、社内でも広く利用されているDatadogを組み込みました。

Datadogによる監視は、Agentと呼ばれるプログラムを監視対象のホストに配置することで、Agent経由でシステムの状態値(メトリクス)を収集します。同期システムも含め、商品情報管理システムに関連する他のホストの多くにもAgentを配置しました。Datadog Agentには、一定時間ごとに様々なアプリケーションのメトリクスを取得するAgent Checkというプラグインの仕組みがあり、同期システムでも、同期対象のレコードの状態(同期中か、完了したか、失敗したか、など)をDBに問い合わせるAgent Checkを書くことで、同期処理のスループットを監視できるようにしました。

また、Datadogには、Cloud Loggingからログを取り込む仕組みがあり、ログの一部だけをフィルタしてDatadogに同期しておくことで、ログの内容に基づいて、Datadogからアラートを発報できるようにしました。アラートは社内のSlackやメールで受け取れます。 Datadog上にダッシュボードを作ることで、ホストの状態、同期システム間のAPI呼び出しのレイテンシ、進行中の同期処理の件数といった情報をひと目で確認できるようになりました(図9)。

図9 Datadogの導入

fabricによるデプロイ手順の自動化

同期システムは、Google Computing Engine インスタンス上に作ったDockerホスト上でコンテナとして動かす設計になっていました。デプロイ手順は、コンテナホストにログインして、そこでライブラリの入ったコンテナをビルドし、ホスト上に置かれたソースコードをマウントして、Docker Composeでデプロイするという手順になっていました(図10)。

図10 改善前のデプロイ方式

これだと、デプロイのたびにコンテナホストにコードをチェックアウトする必要があり、コンテナはデプロイの都度違うソースコードを参照する可能性があるため、再現性がありません。一連の手順が手動操作になっていたことも問題でした。 分散処理のためにデプロイ対象の台数が増えたため、同じイメージをすべてのホストで実行できる必要もありました。 今思えば、一気にKubernetesに移行できていれば、こうしたデプロイの課題を簡単に解決できたのかもしれませんが、できるだけコンテナアプリケーションのメリットを生かして、デプロイを省力化する方法を考えた結果、図9のような、Pythonベースのタスク実行ツールfabricを使って各ホストにコンテナをpullさせるデプロイ方式にたどり着きました。

デプロイのとき、不具合が見つかって切り戻しが行われることは少なくないので、各ホストでソースコードを更新する方法はどうしてもやめたいと考えていました。このフローでは、図11のように、ビルド用のホストであらかじめコンテナをビルドしておき(①)、レジストリに配置(②)しておいてから、リリースツール(③)で各コンテナホストで docker-compose コマンドを使ってコンテナを展開します。 対象ホストにログインしてコマンドを実行する作業には、SSH経由の操作をPythonプログラムで書けるfabricが非常に役立ちました。 コンテナレジストリを経由した配布とリリースツールの導入によって、リリース手順が自動化され、開発・ステージング・本番環境といったリリースターゲットの切り替えも、ツール内のマニフェストで制御できるようになりました。

図11 コンテナレジストリとデプロイ手順の自動化

こうした改善を通して、同期システムの運用に必要なリリースと監視の仕組みの土台ができました。

まなび

今回は、最適化に着手したとき、プロファイリングの手段と実測のデータがある程度揃っていて、取っ掛かりが楽でした。ほぼメソッドレベルで実行時間を追跡できたので、メソッドの役割と照らし合わせて、処理の内容に比べて想定外に時間を使っているようなコードに注目することができました。 言語やライブラリの機能で並列・並行処理が簡単に書けるようになっていますが、その仕組みや制約を理解した上でパラメタ設定をすることで、その効果を大きく引き出せました。

最適化と並行してログや監視の枠組み、そしてデプロイツールの実装を進めましたが、これらはプロジェクトの初期に導入されていればよかったね、というのが正直な感想です。 初期に確立していれば、開発・実行・検証のサイクルを効率的に回せたし、後付けだと導入するときの制約や工数が大きいと感じました。

あと、これは後で痛感したことなのですが、Kubernetesベースで開発していれば、クラスタ化やログ・デプロイツール関連の課題はスタンダードな方法でスマートに解決出来たのではないかと思います。 まとめ 商品情報管理システムの導入プロジェクトで、同期システムの性能や運用環境の改善を行った話を紹介しました。

性能面では、処理時間のプロファイリングに基づいてレビューを行い、ボトルネックとなっていた部分を改善していきました。ロジックやデータの入出力方法を見直したほか、分散処理を取り入れることで、目標としていた処理性能に到達できました。 運用面では、ログ出力の高度化やDatadogの導入で、アプリケーションの不具合や障害の検出・原因の特定をしやすくしました。また、デプロイ手順を自動化してリリースにかかっていた時間を短縮できました。

モノタロウでは、ECサイトや基幹系システムの様々な分野を支えるエンジニア・データサイエンティストを募集しています。

カジュアル面談も実施していますので、ご興味がありましたら、ぜひ カジュアルMTG登録フォームよりご応募ください!