LIVESENSE ENGINEER BLOG

リブセンスエンジニアの活動や注目していることを発信しています

ECSでDigdagからEmbulkを並列実行する

これは Livesense Advent Calendar 2023 DAY 20の記事です。

データ分析基盤で開発をしている晝間です。 マサカリが怖くてブログ執筆を避けてましたが、初めての記事投稿になります。

リブセンスのデータ基盤では、RDSからRedshiftへのデータ同期に、DigdagとEmbulkを使っています。 Embulkで複数の事業部からDB情報を取得し、Redshiftへデータを投入しています。 DigdagとEmbulkは長年1つのVM上で長年運用してきましたが、それをECS化したという記事になります。 だいぶ前にはなりますが、DigdagとEmbulkについて書かれた記事があります。 以下の記事も読んでいただけると、前回から本記事で変わった箇所が分かります。

analytics.livesense.co.jp

背景と目的

今までのDigdagとEmbulkではリソースをうまく使えていない状態にありました。 多くのバッチ処理を動かすためには大きなリソースを必要としますが、バッチ処理が動いていないときにはそこまでのリソースは不要です。 必要なときにリソースを使える状態にできれば、高い費用対効果が得られます。

そこでDigdagだけを常駐させ、Embulkは必要なときに起動するシステムを組み直しました。

事業部DBではTailscale Subnet Routerが稼働しているため、Embulk側もTailscaleを利用しDBへ接続するように変更しました。 TailscaleはWireGuardベースのVPNサービスになります。

tailscale.com

ゴール

  • 並列実行可能な仕組みにする
  • DBへはTailscaleを利用して接続する

また、ECSを使う上でFargateだけでなくEC2も利用しました。 Fargateではprivilegedモードが利用できないため、Tailscaleをセットアップできなかったためです。 その解決策としてEC2を使用しました。

やったこと

FargateではDigdagが起動し、バッチ処理開始ではECSタスク(Embulk)をEC2上で実行します。 バッチ処理を終えEC2が不要になったら破棄するようにしました。

このEC2のスケーリングはAuto Scaling GroupをECSに紐付けることで実現できます。

docs.aws.amazon.com

またスケーリングするEC2のオンデマンドインスタンスとスポットインスタンスの割合もコントロールできます(キャパシティプロバイダー戦略)。 そこで今回はコストも抑えたかったので、スポットも混ぜる戦略を取りました。

以下はイメージになりますが、バッチ処理が動いていないときはFargateでDigdagのみが起動しています。

動作するEmbulkが無いケース

バッチ処理が動いているときでは下図のようになります。

Embulkが起動しているケース

DigdagにECSについての設定をする

Digdag設定ファイル

Digdagの設定ファイルのECSを利用するように記載します。

以下はDigdag公式を参考に作成したdigdag.propertiesのECS設定部分です。

agent.command_executor.ecs.name = digdag-ecs-cluster

agent.command_executor.ecs.digdag-ecs-cluster.region = ap-northeast-1
agent.command_executor.ecs.digdag-ecs-cluster.max_retries = 60
agent.command_executor.ecs.digdag-ecs-cluster.capacity_provider_name = capacity-provider-name
agent.command_executor.ecs.digdag-ecs-cluster.use_environment_file = true

agent.command_executor.ecs.temporal_storage.type = s3
agent.command_executor.ecs.temporal_storage.s3.bucket = ecs-tmp-bucket
agent.command_executor.ecs.temporal_storage.s3.direct_upload_expiration = 21600

実はv0.10.4以降からECS周りで便利機能が追加されていたので、今回のECS化ではv0.10.5を使用しました。

access_keyといった秘匿情報は用いず、IAMインスタンスプロファイルの権限を参照するようにしています。アクセスキーを使わずに済むのは保守運用の面で非常に助かります。

また、Digdag側ではキャパシティプロバイダーに関する引数もあり、キャパシティプロバイダー名を渡すと利用できることが分かりました。

加えて、ECSタスクで起動したEmbulkタスクで環境変数を使いたい場面もあります。特にEmbulkのテンプレート機能(liquid)を利用しているケースです。 そういう場合は ecs.<name>.use_environment_file の値をtrueにすると、envと _exportで定義された値がEmbulkタスクへ渡されます。これをうまく使うことで共通モジュールの作成や環境変数の管理が楽になります。

digファイルに記載する内容

DigdagのworkflowにはDigdag公式にもある通り、以下のようにECSタスク定義のARNを記載します。 それだけでDigdagのECS機能を利用することができます。

# 公式に書いてあるものと同じ
_export:
  ecs:
    task_definition_arn: "arn:aws:ecs:us-east-1:..."

おわりに

Digdag v0.10.5を利用するなかで、いくつか知ったことをECS化で活かしてみました。 Digdagも工夫次第で便利に使えるんだ、というのが伝われば幸いです。

21日目のアドベントカレンダーもお楽しみに!