[Python] luigiで動的アウトプット(DB登録)
前回の残課題
Archive
はDB登録する処理だから、output
はMySqlTargetにしたほうがいい
どうやってoutput
をDB登録にするか、普段SQLAlchemy
を使っているのでSQLAlchemy
との連携をやりたいと思う。
前回のコード
前回のコードは以下のものだ、output
がファイル出力になっているのでこれをDB登録に対応させていく
import luigi
class Archive(luigi.Task):
def run(self):
pickup = Pickup()
yield pickup
with pickup.output().open('r') as f:
input_path = f.read()
filecopy = FileCopy(input_path)
yield filecopy
output_path = filecopy.output()
# ファイルアーカイブ処理
with self.output().open('w') as f:
f.write('Archive done')
def output(self):
return luigi.LocalTarget('archive-output-file')
参考にしたのは
Python: データパイプライン構築用フレームワーク Luigi を使ってみるとUsing SQLAlchemy in Luigi Workflow Pipelineと公式ドキュメント
データベースの準備
SQLAlchemy Migrateを使ってMySQL
のデータベースを準備する。
テーブル定義はこんな感じ
from sqlalchemy import *
from migrate import *
from sqlalchemy.dialects.mysql import BIGINT, DATETIME, TEXT
meta = MetaData()
table = Table(
'archives', meta,
Column('id', BIGINT(unsigned=True), primary_key=True),
Column('filename', TEXT, nullable=False, unique=True),
Column('filepath', TEXT, nullable=False),
Column('created_at', DATETIME, nullable=False),
Column('updated_at', DATETIME, nullable=False),
mysql_charset='utf8')
def upgrade(migrate_engine):
meta.bind = migrate_engine
table.create()
def downgrade(migrate_engine):
meta.bind = migrate_engine
table.drop()
Dynamic dependencies
ベーシックなサンプルは以下の通りだ(テーブルは既にあるのでreflect = True
のパターン)
class SQLATask(sqla.CopyToTable):
reflect = True
connection_string = "sqlite://"
table = "item_property"
def rows(self):
for row in [("item1" "property1"), ("item2", "property2")]:
yield row
しかしこのサンプルをそのまま適用することはできない…動的依存関係の解決の為にrun()
を定義する必要があるからだ。
なので、ここではデータを登録するタスクとデータを準備するタスクを分けるサンプルが参考になる。
データを準備するタスクとしてBuildRecord
タスクを定義し、動的依存関係の解決はそのタスクのrun()
で行うのだ。
import luigi
from luigi.contrib import sqla
from luigi.mock import MockTarget
from pathlib import Path
from datetime import datetime
class BuildRecord(luigi.Task):
def run(self):
pickup = Pickup()
yield pickup
with pickup.output().open('r') as f:
input_path = f.read()
filecopy = FileCopy(input_path)
yield filecopy
output_path = Path(filecopy.output().path)
with self.output().open('w') as f:
f.write('0\t{0}\t{1}\t{2}\t{3}'.format(
output_path.name, output_path.as_posix(), datetime.now(), datetime.now()))
def output(self):
return MockTarget("BuildRecord")
class Archive(sqla.CopyToTable):
reflect = True
connection_string = 'mysql://<user>:<password>@<server>/<db_name>?charset=utf8'
table = "archives"
def requires(self):
return BuildRecord()
これでDB登録できるようなった!
基本的な流れは今まで通り、Archive
タスクを実行するとrequires
でBuildRecord
を呼び、BuildRecord
のrun
でデータを作成、Archive
に戻ってSQLAlchemyでDB登録という形だ。
細かな注意点は以下の通りだ
- 登録するデータは
\t
で区切った文字列で渡す。今回は1行のみだが、複数行渡す場合は\n
で改行する。 - 区切り文字
\t
はcolumn_separator
オプションで変更可能 id
はAUTO INCREMENTだが、登録データを空白にするとカラムがずれるので0
を指定してあげる。そうするとInsertする時にちゃんとAUTO INCREMENTしてくれる。
sqla.CopyToTable
のoutput
Archive
タスクはoutput
としてレコードをDBに登録することができたが、このレコードを削除したら次にタスクを回した時再度登録されるだろうか?
実際やってみると再登録はされない。それどころかArchive
処理自体が走らなくなる。
sqla.CopyToTable
のcomplete
をどうやって判定しているかという話になるが、update_id
というメソッドで判定しており、内部でtask_id
を参照している。task_id
はタスクを走らせた時のログに出てくるArchive__fbe666d9fe
のような値で、<タスク>_<パラメータ>_<パラメータのmd5ハッシュ>
(正確じゃないかもしれないけどこんな感じ)というフォーマットになっている。
つまりタスクとパラメータが同一ならキーが変わらないのでsqla.CopyToTable
は走らない。
そしてこの情報はtable_updates
というテーブルが自動的に作成されて保存されている。
今回の要件だとタスクのIDよりもレコードの内容でcomplete
を判定して欲しい。具体的にはユニークであるfilename
のカラムだ。
しかしArchive
タスクでupdate_id
メソッドをオーバーライドしようとしても、complete
を判定するタイミングでfilename
はわからない。ジレンマの再来だ。
残課題
update_id
をオーバーライドして、レコード内容をもとにcomplete
判定をするようにしたい。現状ではパラメータを渡さなければ一回限りの実行しかしないし、何かしらパラメータを渡しても無条件でrun
が走る形になってしまう。
余談
サンプルでもMockTarget
使ってる…MockTarget
はユニットテスト用途と言われているけど、やっぱりoutput
がファイルである必要性が無いタスクはMockTarget
使ったほうがスマートだよね
実行環境
- Windows 10
- Python 3.6.3
- luigi 2.7.2
- SQLAlchemy 1.2.1
- sqlalchemy-migrate 0.11.0