[Python] luigiで動的アウトプット(DB登録)
前回の残課題
ArchiveはDB登録する処理だから、outputはMySqlTargetにしたほうがいい
どうやってoutputをDB登録にするか、普段SQLAlchemyを使っているのでSQLAlchemyとの連携をやりたいと思う。
前回のコード
前回のコードは以下のものだ、outputがファイル出力になっているのでこれをDB登録に対応させていく
import luigi
class Archive(luigi.Task):
def run(self):
pickup = Pickup()
yield pickup
with pickup.output().open('r') as f:
input_path = f.read()
filecopy = FileCopy(input_path)
yield filecopy
output_path = filecopy.output()
# ファイルアーカイブ処理
with self.output().open('w') as f:
f.write('Archive done')
def output(self):
return luigi.LocalTarget('archive-output-file')
参考にしたのは
Python: データパイプライン構築用フレームワーク Luigi を使ってみるとUsing SQLAlchemy in Luigi Workflow Pipelineと公式ドキュメント
データベースの準備
SQLAlchemy Migrateを使ってMySQLのデータベースを準備する。
テーブル定義はこんな感じ
from sqlalchemy import *
from migrate import *
from sqlalchemy.dialects.mysql import BIGINT, DATETIME, TEXT
meta = MetaData()
table = Table(
'archives', meta,
Column('id', BIGINT(unsigned=True), primary_key=True),
Column('filename', TEXT, nullable=False, unique=True),
Column('filepath', TEXT, nullable=False),
Column('created_at', DATETIME, nullable=False),
Column('updated_at', DATETIME, nullable=False),
mysql_charset='utf8')
def upgrade(migrate_engine):
meta.bind = migrate_engine
table.create()
def downgrade(migrate_engine):
meta.bind = migrate_engine
table.drop()
Dynamic dependencies
ベーシックなサンプルは以下の通りだ(テーブルは既にあるのでreflect = Trueのパターン)
class SQLATask(sqla.CopyToTable):
reflect = True
connection_string = "sqlite://"
table = "item_property"
def rows(self):
for row in [("item1" "property1"), ("item2", "property2")]:
yield row
しかしこのサンプルをそのまま適用することはできない…動的依存関係の解決の為にrun()を定義する必要があるからだ。
なので、ここではデータを登録するタスクとデータを準備するタスクを分けるサンプルが参考になる。
データを準備するタスクとしてBuildRecordタスクを定義し、動的依存関係の解決はそのタスクのrun()で行うのだ。
import luigi
from luigi.contrib import sqla
from luigi.mock import MockTarget
from pathlib import Path
from datetime import datetime
class BuildRecord(luigi.Task):
def run(self):
pickup = Pickup()
yield pickup
with pickup.output().open('r') as f:
input_path = f.read()
filecopy = FileCopy(input_path)
yield filecopy
output_path = Path(filecopy.output().path)
with self.output().open('w') as f:
f.write('0\t{0}\t{1}\t{2}\t{3}'.format(
output_path.name, output_path.as_posix(), datetime.now(), datetime.now()))
def output(self):
return MockTarget("BuildRecord")
class Archive(sqla.CopyToTable):
reflect = True
connection_string = 'mysql://<user>:<password>@<server>/<db_name>?charset=utf8'
table = "archives"
def requires(self):
return BuildRecord()
これでDB登録できるようなった!
基本的な流れは今まで通り、Archiveタスクを実行するとrequiresでBuildRecordを呼び、BuildRecordのrunでデータを作成、Archiveに戻ってSQLAlchemyでDB登録という形だ。
細かな注意点は以下の通りだ
- 登録するデータは
\tで区切った文字列で渡す。今回は1行のみだが、複数行渡す場合は\nで改行する。 - 区切り文字
\tはcolumn_separatorオプションで変更可能 idはAUTO INCREMENTだが、登録データを空白にするとカラムがずれるので0を指定してあげる。そうするとInsertする時にちゃんとAUTO INCREMENTしてくれる。
sqla.CopyToTableのoutput
ArchiveタスクはoutputとしてレコードをDBに登録することができたが、このレコードを削除したら次にタスクを回した時再度登録されるだろうか?
実際やってみると再登録はされない。それどころかArchive処理自体が走らなくなる。
sqla.CopyToTableのcompleteをどうやって判定しているかという話になるが、update_idというメソッドで判定しており、内部でtask_idを参照している。task_idはタスクを走らせた時のログに出てくるArchive__fbe666d9feのような値で、<タスク>_<パラメータ>_<パラメータのmd5ハッシュ>(正確じゃないかもしれないけどこんな感じ)というフォーマットになっている。
つまりタスクとパラメータが同一ならキーが変わらないのでsqla.CopyToTableは走らない。
そしてこの情報はtable_updatesというテーブルが自動的に作成されて保存されている。
今回の要件だとタスクのIDよりもレコードの内容でcompleteを判定して欲しい。具体的にはユニークであるfilenameのカラムだ。
しかしArchiveタスクでupdate_idメソッドをオーバーライドしようとしても、completeを判定するタイミングでfilenameはわからない。ジレンマの再来だ。
残課題
update_idをオーバーライドして、レコード内容をもとにcomplete判定をするようにしたい。現状ではパラメータを渡さなければ一回限りの実行しかしないし、何かしらパラメータを渡しても無条件でrunが走る形になってしまう。
余談
サンプルでもMockTarget使ってる…MockTargetはユニットテスト用途と言われているけど、やっぱりoutputがファイルである必要性が無いタスクはMockTarget使ったほうがスマートだよね
実行環境
- Windows 10
- Python 3.6.3
- luigi 2.7.2
- SQLAlchemy 1.2.1
- sqlalchemy-migrate 0.11.0

![[Python] luigiで動的アウトプット(DB登録)](https://skullware.net/blog/wp-content/uploads/2018/04/luigi-1558018_960_720-800x518.jpg)