delayed_job まとめ
イントロ
なんとなく使ってきて、なんとなくわかってるつもりの delayed_job について、改めてまとめてみたもの。
休日暇だったのでソースコード読んでみたり、step 実行したりして、何処に何が書いてあるかをざっくりとまとめたものです。
環境
rails (4.2.1)delayed_job (4.0.6)delayed_job_active_record (4.0.3)daemons (1.2.3)
導入
- Gemfile
gem "delayed_job_active_record"
- ActiveJob
ActiveJob のバックエンドとして、delayed_job を指定する
config.active_job.queue_adapter = :delayed_job
- command
rails generate delayed_job:active_record rake db:migrate
基本的な仕組み
Queue
- queue は database に保存される
- テーブル名は
delayed_jobs- scheme は gory-details に書かれている
rails generate delayed_job:active_recordで自動生成される
serialize
- ruby の object を yaml 形式で dump し、DB に保存する
handlerカラムに保存する
- 取り出す時に dump された yaml を load し、ruby のオブジェクトに戻す
worker
- worker が
delayed_jobsテーブルを数秒おきにポーリングする - レコードがあったら、yaml をロードして ruby のオブジェクトに戻し job を実行する
- job を実行し終わったらレコードは削除される
- worker はポーリングを続ける、の繰り返し
worker の詳細
要の worker について詳しく見てみる。
woker の起動方法
rake jobs:workbin/delayed_job start
の二通り
共通してやっていること
- worker プロセスの起動
rake jobs:workbin/delayed_job start- https://github.com/collectiveidea/delayed_job/blob/v.4.0.6/lib/delayed/command.rb
- 起動した worker プロセス自体の管理は
daemonsgem の仕事
両者の違いは?
rake jobs:workは worker を起動するだけ- worker プロセスは一つだけ
bin/delayed_jobは 起動オプションを受け取り、柔軟な worker プロセスの設定ができるdaemonsgem がないと 起動不可能
なので
- 開発環境で軽く使う場合は
rake jobs:work(もしくはrake jobs:workoff) - 本番環境で運用する場合は、オプションを指定して
bin/delayed_job
のようなイメージ
worker の option
Delayed::Worker クラスのアクセサとして定義する。
README に書いてあるとおり、rails なら config/initializers の下に適当に書いておけばいい。
default_queue_namequeueの名前。queueカラムに入る。
destroy_failed_jobs- true だと、失敗した job を DB から削除する(デフォルト true)
max_run_time- job の timeout を設定(デフォルトは4時間)
- この時間より長いジョブは
Timeout::Errorを継承したDelayedJob::WorkerTimeoutをraise する
raise_signal_exceptions- シグナル
INT,TERMを受け取った時の挙動を制御する - https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/worker.rb#L53-L58 にコメントで書かれている
- シグナル
bin/delayed_job のオプション
https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/command.rb#L26-L71
書いてあるとおり。重要そうなものをピックアップ
--number_of_workers- 起動する worker 数
- ただし
--identifieroption との併用は不可(identifier 指定とプロセスの自動連番が衝突するから) - stop と start の worker 数が等しくないと、
bin/delayed_job stopで stop できない
--pid-dir- pid ファイルのディレクトリを指定
--log-dir- log ファイルのディレクトリを指定
--identifier- プロセス名を指定できる。指定すると
delayed_job.#{@options[:identifier]}というプロセス名になる
- プロセス名を指定できる。指定すると
--prefix- プロセス名を指定できる。指定すると
File.join(options[:prefix], process_name)というプロセス名になる
- プロセス名を指定できる。指定すると
bin/delayed_job stop で何が起こるか
起動中の delayed_job プロセスには TERM が送られる。(daemons gem の仕事)
https://github.com/thuehlinger/daemons/blob/v1.2.3/lib/daemons/application.rb#L374
発生する例外
起動中のプロセスが TERM, INT を受け取った時、job の挙動はDelayed::Worker.raise_signal_exceptions の値で変わる
falseTERMを受け取った場合- 例外は出ない
INTを受け取った場合- 例外は出ない
:termTERMを受け取った場合SignalExceptionを raise
INTを受け取った場合- 例外は出ない
trueTERMを受け取った場合SignalExceptionを raise
INTを受け取った場合SignalExceptionを raise
コードを見たほうが腑に落ちるかも https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/worker.rb#L132-L142
例外が出た後に何が起こるか
:errorの callback が登録されていれば、それを実行する- Job を DB に戻して、再度実行し直す(
handle_failed_job)
callback について
- error のフックとしては
:before,:afterが空席handle_failed_jobの前と後に実行されるcallback- callback の定義方法はコードは
Lifecycleというクラスに書かれてある
書き方は以下の様な感じ
Delayed::Worker.lifecycle.before(:error) do |worker, job| # puts worker # puts job endブロック引数に
run_callbacks(:error, self, job)の第二引数以降、即ちDelayed::Worker,Delayed::Jobインスタンスを受け取れる- ただし、
callbackを定義する場所に注意- 実質
config/initializerしかない? - アプリ側で動的に callback を定義しても、既に起動中の worker には伝わらない
- ワーカーに依存しない処理が書ける場所
- 実質
Capistrano
https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/recipes.rb
- 各 host で
bin/delayed_jobコマンド + オプションを実行する
だけ。
つまり bin/delayed_job の章で述べたとおり、capistrano で deploy した時、実行中の job は中断し DB に戻される。
そして新しいコードを load した状態で、再度 job が実行される、という流れになる。
細かすぎて伝わらない delayed_job のネタ
実は Delayed::Job というクラスはない
正確には collectiveidea/delayed_job に定義されていない、ということ。
実体は collectiveidea/delayed_job_active_record になっていて、このクラスが include Delayed::Backend::Base している
>> Delayed::Job => Delayed::Backend::ActiveRecord::Job
なので Delayed::Job インスタンスのメソッドを調べる時は delayed_job_active_record か Delayed::Backend::Base を見ると良い。
find_available メソッドの謎
Delayed::Backend::Base.reserve に定義されている find_available メソッド、よく調べてみたけど、実は何処にも定義されていない。なので NoMethodError が発生するはず。
https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/backend/base.rb#L43
けど上記で述べたとおり、実体は collectiveidea/delayed_job_active_record のオブジェクトになっていて、https://github.com/collectiveidea/delayed_job_active_record/blob/v4.0.6/lib/delayed/backend/active_record.rb#L43 で reserve を上書きしているから、実質問題は起きていない。
delayed_job バックエンドによっては find_available を上書きしているものもあるらしい
まとめ
- delayed_job の仕組みについて調べました
- DB に queue を溜めて、worker がそれを取り出して実行する
- worker の起動方法は2種類ある
rakeとbin/delayed_jobの2つ
- worker の設定は
Delayed::Workerクラスのアクセサとして定義- worker の設定は沢山あるんやで
- worker を stop させると、worker が既に動いていても DB に一旦戻されて再実行を行う仕組みになっている
- デフォルト