[Python] luigiとSQLAlchemyでUpsert

[Python] luigiとSQLAlchemyでUpsert

luigiのタスクのoutputをDB登録とする時、前回までではSQLAlchemyとの連携を使用した。
その中でタスクのcomplete判定は以下のようになっている。

  • complete判定用にtable_updatesテーブルをDB上に持つ。このテーブルは最初に参照された時に自動生成される。
  • 判定はtable_updatesテーブルにupdate_idをキーとしたレコードの有無で判断する。あれば実行済みだ。
  • update_idはデフォルトでは<タスクネームスペース>.<タスク名>_<パラメータ(の一部)>_<パラメータのmd5ハッシュ>をキーとして使用する。
  • completeFalseの場合、データを対象テーブルにInsertする。

ただしこのデフォルト設定にはいくつか問題点がある。

 

update_idが固定

固定というわけではないが、タスクとパラメータが同一ならcompleteTrueを返してしまい、runが走らずに依存元タスクへ処理が戻ってしまう。
これの対応は簡単で、update_idをオーバーライドして自分で定義してしまえばいい。前回の例ならば「ファイルパス」をキーとしてしまえばよい。

    def update_id(self):
        return self.path

 

Insertが失敗する可能性がある

update_idは実際に登録するデータのキーと紐づいているわけではないので、completeFalseになったとしてもデータ登録がDuplicate Key Errorになる可能性がある。ここはupdate_idをプライマリーキーの値にする(もしくはユニークとなる値)よう自力で設定するしかない。

 

InsertではなくUpdateしたい

今回の本題がこれ
データをInsertしてDuplicate Key Errorになる時は、そもそもInsertではなくUpdateしたいという場合だ。(いわゆるUpsert

MySQLのON DUPLICATE KEY UPDATE機能をSQLAlchemyから呼び出して対応してみたい。

sqla.CopyToTableのソースコードを確認すると、Insert処理をしているのはcopyメソッドであることがわかる。このメソッドのコメントに以下のようなことが書いてある。

A task that needs row updates instead of insertions should overload this method.

Updateにしたければオーバーロードしろと
オーバーライドじゃないのか…?と思いつつ実装してみる

 

オーバーライド後

前回のコードに適用するとこんな感じだ

from sqlalchemy.dialects.mysql import insert

class Archive(sqla.CopyToTable):
    path = luigi.Parameter()
    reflect = True
    connection_string = 'mysql://<user>:<password>@<server>/<db_name>?charset=utf8'
    table = "archives"

    def requires(self):
        return BuildRecord(path)

    def update_id(self):
        return self.path

    def copy(self, conn, ins_rows, table_bound):
        bound_cols = dict((c, sqlalchemy.bindparam("_" + c.key)) for c in table_bound.columns)
        ins = insert(table_bound).values(bound_cols)
        on_duplicate_key = ins.on_duplicate_key_update(
            filename=ins.inserted.filename,
            filepath=ins.inserted.filepath,
            created_at=ins.inserted.created_at,
            updated_at=ins.inserted.updated_at
        )
        conn.execute(on_duplicate_key, ins_rows)

オーバーライド前はこのようなコードだったので、一行ずつ比較して見てみたい。

# luigi/contrib/sqla.py
    def copy(self, conn, ins_rows, table_bound):
        bound_cols = dict((c, sqlalchemy.bindparam("_" + c.key)) for c in table_bound.columns)
        ins = table_bound.insert().values(bound_cols)
        conn.execute(ins, ins_rows)

 

1行目

bound_cols = dict((c, sqlalchemy.bindparam("_" + c.key)) for c in table_bound.columns)

投入されるデータ(ins_rows)はカラム名に_がついているので、そのままではカラム名不一致でテーブルに投入できない。その為、カラム名の対応表として上記のdictを作成している。
この行は特にいじらず、このままにしておく。

 

2行目

ins = table_bound.insert().values(bound_cols)

valuesにカラム名の対応表(1行目で作成したdict)を設定することで、パラメータを更新している。しかしこのままではMySQLに対応していないのでsqlalchemy.dialects.mysqlinsertを利用して再定義する。

from sqlalchemy.dialects.mysql import insert

ins = insert(table_bound).values(bound_cols)
on_duplicate_key = ins.on_duplicate_key_update(
    filename=ins.inserted.filename,
    filepath=ins.inserted.filepath,
    created_at=ins.inserted.created_at,
    updated_at=ins.inserted.updated_at
)

するとon_duplicate_key_updateメソッドが使用できるので、Duplicate Keyの時にUpdateをするカラムを設定することができる。

 

3行目

conn.execute(ins, ins_rows)

ここは単純にSQLを流している。以下のように書き換えてあげよう。

conn.execute(on_duplicate_key, ins_rows)

以上でUpsertを実装することができた。
いままでUpsertって使ったことがなかったけど、これってluigiだけじゃなくて普通のSQLAlchemyでも使えるね

MySQL以外にも、PostgreSQLやSQLiteでもできるらしいが、今回はここまで

 

参考

 

実行環境

  • Windows 10
  • Python 3.6.3
  • luigi 2.7.2
  • SQLAlchemy 1.2.1
%d人のブロガーが「いいね」をつけました。