[Python] luigiとSQLAlchemyでUpsert
luigiのタスクのoutput
をDB登録とする時、前回までではSQLAlchemy
との連携を使用した。
その中でタスクのcomplete
判定は以下のようになっている。
complete
判定用にtable_updates
テーブルをDB上に持つ。このテーブルは最初に参照された時に自動生成される。- 判定は
table_updates
テーブルにupdate_id
をキーとしたレコードの有無で判断する。あれば実行済みだ。 update_id
はデフォルトでは<タスクネームスペース>.<タスク名>_<パラメータ(の一部)>_<パラメータのmd5ハッシュ>
をキーとして使用する。complete
がFalse
の場合、データを対象テーブルにInsertする。
ただしこのデフォルト設定にはいくつか問題点がある。
update_idが固定
固定というわけではないが、タスクとパラメータが同一ならcomplete
はTrue
を返してしまい、run
が走らずに依存元タスクへ処理が戻ってしまう。
これの対応は簡単で、update_id
をオーバーライドして自分で定義してしまえばいい。前回の例ならば「ファイルパス」をキーとしてしまえばよい。
def update_id(self):
return self.path
Insertが失敗する可能性がある
update_id
は実際に登録するデータのキーと紐づいているわけではないので、complete
がFalse
になったとしてもデータ登録がDuplicate Key Error
になる可能性がある。ここはupdate_id
をプライマリーキーの値にする(もしくはユニークとなる値)よう自力で設定するしかない。
InsertではなくUpdateしたい
今回の本題がこれ
データをInsertしてDuplicate Key Error
になる時は、そもそもInsertではなくUpdateしたいという場合だ。(いわゆるUpsert
)
MySQLのON DUPLICATE KEY UPDATE
機能をSQLAlchemyから呼び出して対応してみたい。
sqla.CopyToTableのソースコードを確認すると、Insert処理をしているのはcopy
メソッドであることがわかる。このメソッドのコメントに以下のようなことが書いてある。
A task that needs row updates instead of insertions should overload this method.
Updateにしたければオーバーロードしろと
オーバーライドじゃないのか…?と思いつつ実装してみる
オーバーライド後
前回のコードに適用するとこんな感じだ
from sqlalchemy.dialects.mysql import insert
class Archive(sqla.CopyToTable):
path = luigi.Parameter()
reflect = True
connection_string = 'mysql://<user>:<password>@<server>/<db_name>?charset=utf8'
table = "archives"
def requires(self):
return BuildRecord(path)
def update_id(self):
return self.path
def copy(self, conn, ins_rows, table_bound):
bound_cols = dict((c, sqlalchemy.bindparam("_" + c.key)) for c in table_bound.columns)
ins = insert(table_bound).values(bound_cols)
on_duplicate_key = ins.on_duplicate_key_update(
filename=ins.inserted.filename,
filepath=ins.inserted.filepath,
created_at=ins.inserted.created_at,
updated_at=ins.inserted.updated_at
)
conn.execute(on_duplicate_key, ins_rows)
オーバーライド前はこのようなコードだったので、一行ずつ比較して見てみたい。
# luigi/contrib/sqla.py
def copy(self, conn, ins_rows, table_bound):
bound_cols = dict((c, sqlalchemy.bindparam("_" + c.key)) for c in table_bound.columns)
ins = table_bound.insert().values(bound_cols)
conn.execute(ins, ins_rows)
1行目
bound_cols = dict((c, sqlalchemy.bindparam("_" + c.key)) for c in table_bound.columns)
投入されるデータ(ins_rows
)はカラム名に_
がついているので、そのままではカラム名不一致でテーブルに投入できない。その為、カラム名の対応表として上記のdictを作成している。
この行は特にいじらず、このままにしておく。
2行目
ins = table_bound.insert().values(bound_cols)
values
にカラム名の対応表(1行目で作成したdict)を設定することで、パラメータを更新している。しかしこのままではMySQLに対応していないのでsqlalchemy.dialects.mysql
のinsert
を利用して再定義する。
from sqlalchemy.dialects.mysql import insert
ins = insert(table_bound).values(bound_cols)
on_duplicate_key = ins.on_duplicate_key_update(
filename=ins.inserted.filename,
filepath=ins.inserted.filepath,
created_at=ins.inserted.created_at,
updated_at=ins.inserted.updated_at
)
するとon_duplicate_key_update
メソッドが使用できるので、Duplicate Key
の時にUpdateをするカラムを設定することができる。
3行目
conn.execute(ins, ins_rows)
ここは単純にSQLを流している。以下のように書き換えてあげよう。
conn.execute(on_duplicate_key, ins_rows)
以上でUpsert
を実装することができた。
いままでUpsert
って使ったことがなかったけど、これってluigiだけじゃなくて普通のSQLAlchemyでも使えるね
MySQL以外にも、PostgreSQLやSQLiteでもできるらしいが、今回はここまで
参考
実行環境
- Windows 10
- Python 3.6.3
- luigi 2.7.2
- SQLAlchemy 1.2.1