小さく始めるETL基盤 Embluk/Digdag/ECS/Glue/Athena

aumoにはETL基盤が無かったのでまずは小さく作りました。 ETL界隈ではさんざっぱら擦られまくったEmbulk+Digdag構成ではありますが、導入の背景や工夫した点などをお伝えできればと思います。

皆様いかがお過ごしでしょうか。こんにちはエンジニアの堀江です。aumoでは主に推薦エンジンなどを担当しています。
気がつけば年度も半ばで時が経つのは早いものですね。パラリンピックが終わった後の下半期は何が流行るでしょう?私はスラッシュメタルが流行ると思います。

リフマスターを何人か挙げろと言われたら、私はまずTESTAMENTのエリックピーターソンを挙げます。

ETL導入の動機

aumoの推薦エンジンの根幹は機械学習が主となります。(一部ルールベースも掛け合わせています。早く断捨離したいですね。)
効率的な学習や推論を行うためには、予め目的に沿った整えられたデータカタログが不可欠になると思います。

これまでのaumoにはETL/DWH基盤がなく、推薦エンジン側の実装でデータカタログを準備していたので、下記の問題を抱えていました。

  • 推薦ロジック到達前のデータ準備で時間を要してしまう。開発時が特に辛くて待ち疲れするし、開発速度も渋る。
  • データ抽出と推薦ロジックが混在した実装になってしまっている。推薦ロジックの本質とは乖離したコードが増え見通しが悪い。
  • 推薦エンジンから参照されるデータソースが各種分散しているので、推薦エンジンのデータアクセス部分が複雑化している。
  • 推薦エンジン都合のインデックスをRDBに貼らざるを得ないケースがある。読み込み負荷も気にしたくないが、専用レプリカ立てるにしても事業とシステムのコストバランス的に過剰だったりする。

ETLで実現したかった事

ETLの目的といえば文字通り、Extract(抽出)/Transform(変形)/Load(読み込み)になりますが、前述の問題を解消するために今回のプロジェクトで定義した具体的な要件は下記となります。

各データストアのコピー・加工

各データストアからデータレイクに対し定時的にデータをコピーしたい。また加工やサマライズなどの事前処理においては、推薦エンジン側での処理を取り止め、ETL基盤ににその処理を移譲させたい。そのようにする事で推薦エンジンが抱えていた責務を減らす。

スモールスタート

工数的な問題もあるが、まずはETLの効果を早めに体感したいので、まずはガチらずに小さく作るところから始める。
最初はシンプルなジョブスケジューラからETL処理を蹴る事ができればOKとしたい。

Overview

いきなり俯瞰的な説明になりますが、システム構成のBefore/Afterです。

Before

ETL導入前の構成です。推薦エンジンがETL的な責務を負わなければいけないので辛い状況です。

After

移行後の構成です。前段のETL基盤がハードワークしてくれるおかげで、推薦エンジンの責務が減りました。

各ツール・実行環境について

ここでETL構築に利用した各ツールおよび、実行環境について紹介致します。

Embulk

オープンソースのデータバルクローダーです。スクリタブルというよりはコンフィギュラブルな記法で、異種ストレージ間のデータコピーや、軽いフィルタ・データ加工などを簡潔に定義することができます。
今回は主に下記のプラグインを利用しました。

プラグイン名備考
embulk-input-mysql
embulk-input-bigquery
embulk-output-s3_parquetS3にparquet形式でデータ出力する。Glueにテーブルを作る機能が超強くてETLの要と言って良い。

Digdag

オープンソースのワークフローエンジンです。
Emublkをジョブとして蹴るほか、Embulk単体では実現が難しい部分を補助する目的で採用しています。Sentryとの連携や、テーブルを順番にコピーするなどの繰り返し制御などで活用しています。

ECS Task

Emublk + Digdagの実行環境です。cron的な定時処理が必要なので、Fargateのタスクスケジュールサポートを利用しています。
選定理由については検討を重ねたと言うよりは、組織のスキルセット的に手に馴染んだものをって感じです。

Athena/Glue DataCatalog/S3

Athena + Glue + S3 これら三位一体で、推薦エンジンから参照されるDWHを構成します。各々の役割は下記となります。

サービス名備考
S3ETLから生成されたデータが置かれる物理的なストレージ
AthenaいわゆるマネージドなPresto。S3上のデータをSQLライクに検索するI/Fを提供する。使った時だけ課金
Glue Data CatalogS3上の物理データとテーブル定義のマッピングを持ち、この定義を行う事でAthenaからのデータ利用が可能となる。これらの定義は上述で紹介したembulk-output-s3_parquetから作成される。

Terraform

AWSマネージドサービスのオーケストレーションに使用しています。

実装手順

ここまでは弊社のETL基盤について俯瞰的な説明をしてきました。ここからはMySQLからS3/Glueへのデータコピーを例題として、断片的なコードを交えて具体的な実装の手順を説明します。

[Embulk] 設定ファイルの記述

Embulkはread/writeするストレージの種類に応じて、プラグインを予めインストールしておく必要があります。Rubyistには馴染みのある手順ですね。

embulk gem install embulk-input-mysql
embulk gem install embulk-output-s3_parquet

プラグインの数が増えたりバージョンのピンニングなどをしたい場合は、Gemfileを利用するのがベターかと思います。

# Gemfile
source 'https://rubygems.org/'

gem 'embulk', '0.9.23'
gem 'embulk-input-mysql'
gem 'embulk-output-s3_parquet'
embulk bundle install

プラグインのインストールを終えたら、設定ファイルを記述します。
(秘匿すべき情報や複雑な部分については省略しています。)

# mysql_to_s3.yml.liquid
in:
  type: mysql
  # MySQL接続情報
  user: {{env.DB_USER}}
  database: {{env.DB_NAME}}
  host: {{env.DB_HOST}}
  # テーブル名
  table: {{env.TABLE_NAME}}

out:
  type: s3_parquet
  auth_method: env
  region: ap-northeast-1
  # データのロード先
  bucket: {{env.S3_BUCKET}}
  path_prefix: {{env.S3_PATH]}{{env.DB_NAME}}{{env.TABLE_NAME}}/table
  file_ext: snappy.parquet
  compression_codec: snappy
  default_timezone: Asia/Tokyo
  # Glueにテーブルを作る
  catalog:
    database: {{env.GLUE_DB_NAME}}
    table: {{env.TABLE_NAME}

Embulkの設定ファイルは原則的にYAML形式なのですが、Embulkに内包されるテンプレートエンジンliquidにより、変数のバインドや制御構文を利用できます。
上述のenv.で始まる変数名は環境変数を表します。弊社では環境に依存するパラメータは、コンテナ起動オプションとして注入する方式にしています。
また、各々名前の異なるテーブル毎に設定ファイルを複数設けるのは現実的な手段ではないので、テーブル名も環境変数として定義しています。ただしこちらの環境変数はコンテナ起動時ではなく、これから後述するDigdagで制御しています。

設定ファイルを記述したら、いよいよEmbulkの実行が可能です。

TABLE_NAME=my_table_name digdag preview mysql_to_s3.yml.liquid
TABLE_NAME=my_table_name digdag run mysql_to_s3.yml.liquid

[Digdag] 設定ファイルの記述

前述の通りDigdagは、Embullk単体では対応できない処理の埋め合わせとして利用しています。
下記はテーブル名をリスト化し、ループ制御で順次Embulkにテーブル名を注入し、実行するための設定ファイルです。
(秘匿すべき情報や複雑な部分については省略しています。)

# mysql_to_s3.dig
_export:
  # テーブル一覧
  tables: [
      areas, articles, spots
  ]

+exec_embulk:
  for_each>:
    table: ${tables}
  _do:
   # テーブル名を環境変数として注入
    _export:
      TABLE_NAME:  ${table}
  # embulkをkick
  sh>: embulk run /path/to/mysql_to_s3.yml.liquid

DigdagからEmbulkを実行する段取りができました。

digdag run mysql_to_s3.dig

[Terraform/ECS Task] Digdagの実行をスケジューリングする

ETLジョブのオーケストレーションには、Terraformを利用しています。現在どのようなジョブが有効なのかコード面での見通しを良くするために、実行すべきコマンドはハードコードせずに変数として集約しています。

variable schduled_tasks {
    default = {
        import_article_click_times =  {
            description         = "BigQueryから記事閲覧履歴をS3/Glue/Athenaにインポートする"
            command             = ["digdag", "run", "--project", "digdag/statistics", "article_click_event.dig", "-p", "from_days_ago=1"]
            schedule_expression = "cron(0 7 * * ? *)"
        }
        incremental_import_media_rds = {
            description         = "MediaのDBテーブルをS3/Glue/Athenaに差分インポートする"
            command             = ["digdag", "run", "--project", "digdag/importer",  "media_rds_tables.dig",  "-p", "mode=incremental"]
            schedule_expression = "cron(0 7 * * ? *)"
        }
    }
}

resource "aws_cloudwatch_event_target" "event_target" {
    for_each = var.schduled_tasks
    target_id = "${var.family}-${each.key}"
    ... なんかすごい設定 ...
}

まとめ

ETLを設けることで、推薦ロジックが非常に簡潔になりました。推薦ロジックの実装者も私本人ですが、機械学習の数理に集中できる環境が作れたと思います。また、将来分業するにしても良い布石が作れたと思います。

そしてスラッシュメタルって本当に良いものですね。さよなら、さよなら。