読者です 読者をやめる 読者になる 読者になる

scramble cadenza

技術ネタのガラクタ置き場

delayed_job まとめ

ruby 小ネタ delayed_job

イントロ

collectiveidea/delayed_job

なんとなく使ってきて、なんとなくわかってるつもりの delayed_job について、改めてまとめてみたもの。
休日暇だったのでソースコード読んでみたり、step 実行したりして、何処に何が書いてあるかをざっくりとまとめたものです。

環境

  • rails (4.2.1)
  • delayed_job (4.0.6)
  • delayed_job_active_record (4.0.3)
  • daemons (1.2.3)

導入

  • Gemfile
gem "delayed_job_active_record"
  • ActiveJob

ActiveJob のバックエンドとして、delayed_job を指定する

config.active_job.queue_adapter = :delayed_job
  • command
rails generate delayed_job:active_record
rake db:migrate

基本的な仕組み

Queue

  • queue は database に保存される
  • テーブル名は delayed_jobs
  • rails generate delayed_job:active_record で自動生成される

serialize

  • ruby の object を yaml 形式で dump し、DB に保存する
    • handler カラムに保存する
  • 取り出す時に dump された yaml を load し、ruby のオブジェクトに戻す

worker

  • worker が delayed_jobs テーブルを数秒おきにポーリングする
  • レコードがあったら、yaml をロードして ruby のオブジェクトに戻し job を実行する
    • job を実行し終わったらレコードは削除される
  • worker はポーリングを続ける、の繰り返し

worker の詳細

要の worker について詳しく見てみる。

woker の起動方法

  • rake jobs:work
  • bin/delayed_job start

の二通り

共通してやっていること

両者の違いは?

  • rake jobs:work は worker を起動するだけ
    • worker プロセスは一つだけ
  • bin/delayed_job は 起動オプションを受け取り、柔軟な worker プロセスの設定ができる
    • daemons gem がないと 起動不可能

なので

  • 開発環境で軽く使う場合は rake jobs:work(もしくは rake jobs:workoff)
  • 本番環境で運用する場合は、オプションを指定して bin/delayed_job

のようなイメージ

worker の option

Delayed::Worker クラスのアクセサとして定義する。 README に書いてあるとおり、rails なら config/initializers の下に適当に書いておけばいい。

  • default_queue_name
    • queue の名前。queue カラムに入る。
  • destroy_failed_jobs
    • true だと、失敗した job を DB から削除する(デフォルト true)
  • max_run_time
    • job の timeout を設定(デフォルトは4時間)
    • この時間より長いジョブは Timeout::Error を継承した DelayedJob::WorkerTimeout をraise する
  • raise_signal_exceptions

bin/delayed_job のオプション

https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/command.rb#L26-L71

書いてあるとおり。重要そうなものをピックアップ

  • --number_of_workers
    • 起動する worker 数
    • ただし --identifier option との併用は不可(identifier 指定とプロセスの自動連番が衝突するから)
    • stop と start の worker 数が等しくないと、bin/delayed_job stop で stop できない
  • --pid-dir
    • pid ファイルのディレクトリを指定
  • --log-dir
    • log ファイルのディレクトリを指定
  • --identifier
    • プロセス名を指定できる。指定すると delayed_job.#{@options[:identifier]} というプロセス名になる
  • --prefix
    • プロセス名を指定できる。指定すると File.join(options[:prefix], process_name) というプロセス名になる

bin/delayed_job stop で何が起こるか

起動中の delayed_job プロセスには TERM が送られる。(daemons gem の仕事) https://github.com/thuehlinger/daemons/blob/v1.2.3/lib/daemons/application.rb#L374

発生する例外

起動中のプロセスが TERM, INT を受け取った時、job の挙動はDelayed::Worker.raise_signal_exceptions の値で変わる

  • false
    • TERM を受け取った場合
      • 例外は出ない
    • INT を受け取った場合
      • 例外は出ない
  • :term
    • TERM を受け取った場合
      • SignalException を raise
    • INT を受け取った場合
      • 例外は出ない
  • true
    • TERM を受け取った場合
      • SignalException を raise
    • INT を受け取った場合
      • SignalException を raise

コードを見たほうが腑に落ちるかも https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/worker.rb#L132-L142

例外が出た後に何が起こるか

callback について

  • error のフックとしては :before, :after が空席
  • 書き方は以下の様な感じ

    Delayed::Worker.lifecycle.before(:error) do |worker, job| # puts worker # puts job end

  • ブロック引数に run_callbacks(:error, self, job) の第二引数以降、即ち Delayed::Worker, Delayed::Job インスタンスを受け取れる

  • ただし、callback を定義する場所に注意
    • 実質 config/initializer しかない?
    • アプリ側で動的に callback を定義しても、既に起動中の worker には伝わらない
    • ワーカーに依存しない処理が書ける場所

Capistrano

https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/recipes.rb

  • 各 host で bin/delayed_job コマンド + オプションを実行する

だけ。

つまり bin/delayed_job の章で述べたとおり、capistrano で deploy した時、実行中の job は中断し DB に戻される。 そして新しいコードを load した状態で、再度 job が実行される、という流れになる。

細かすぎて伝わらない delayed_job のネタ

実は Delayed::Job というクラスはない

正確には collectiveidea/delayed_job に定義されていない、ということ。

実体は collectiveidea/delayed_job_active_record になっていて、このクラスが include Delayed::Backend::Base している

>> Delayed::Job
=> Delayed::Backend::ActiveRecord::Job

なので Delayed::Job インスタンスメソッドを調べる時は delayed_job_active_recordDelayed::Backend::Base を見ると良い。

find_available メソッドの謎

Delayed::Backend::Base.reserve に定義されている find_available メソッド、よく調べてみたけど、実は何処にも定義されていない。なので NoMethodError が発生するはず。 https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/backend/base.rb#L43

けど上記で述べたとおり、実体は collectiveidea/delayed_job_active_record のオブジェクトになっていて、https://github.com/collectiveidea/delayed_job_active_record/blob/v4.0.6/lib/delayed/backend/active_record.rb#L43reserve を上書きしているから、実質問題は起きていない。

delayed_job バックエンドによっては find_available を上書きしているものもあるらしい

まとめ

  • delayed_job の仕組みについて調べました
    • DB に queue を溜めて、worker がそれを取り出して実行する
  • worker の起動方法は2種類ある
    • rakebin/delayed_job の2つ
  • worker の設定は Delayed::Worker クラスのアクセサとして定義
    • worker の設定は沢山あるんやで
  • worker を stop させると、worker が既に動いていても DB に一旦戻されて再実行を行う仕組みになっている
    • デフォルト