COPY TO と COPY FROM を使用する#
psycopg を使うと、PostgreSQL の COPY プロトコル を使用した操作ができるようになります。COPY
は、データベースにデータを読み込むための (そして SQL の創造性をいくらか発揮すれば、データを変更するための)、最も効率のよい方法の1つです。
Copy は、Cursor.copy()
メソッドに COPY ... FROM STDIN
または COPY ... TO STDOUT
という形式のクエリを渡して、with
ブロック内で Copy
オブジェクトを使用することでサポートされます。
with cursor.copy("COPY table_name (col1, col2) FROM STDIN") as copy:
# 'copy' オブジェクトに write()/write_row() を使用してデータを渡す
COPY ステートメントは、次のように psycopg.sql
モジュールのオブジェクトを使用して動的に作れます。
with cursor.copy(
sql.SQL("COPY {} TO STDOUT").format(sql.Identifier("table_name"))
) as copy:
# 'copy' オブジェクトから read()/read_row() を使用してデータを読み込む
バージョン 3.1 で変更: execute()
のように copy()
に引数を渡すこともできます。
with cur.copy("COPY (SELECT * FROM table_name LIMIT %s) TO STDOUT", (3,)) as copy:
# たかだか3レコードまでが期待される
コネクションは通常のトランザクション動作の影響を受けるため、コネクションが autocommit になっていない限り、COPY 操作の最後でも、保留中の変更をまだコミットする必要があり、その変更をロールバックすることもできます。詳細については、トランザクションの管理 を参照してください。
行ごとにデータを書き込む#
copy 操作を使用すると、Python のイテラブル (タプルのリスト、またはシーケンスの任意のイテラブル) からデータをデータベースに読み込めます。Python の値は通常のクエリの場合と同じように適応されます。このような操作を実行するには、Cursor.copy()
で COPY ... FROM STDIN
を使用し、with
ブロック内で結果として得たオブジェクトに write_row()
を使います。操作はブロックを出るときに完了します。
records = [(10, 20, "hello"), (40, None, "world")]
with cursor.copy("COPY sample (col1, col2, col3) FROM STDIN") as copy:
for record in records:
copy.write_row(record)
もしブロック内で例外が発生した場合は、操作は中断され、それまでに挿入されたレコードは破棄されます。
Copy
から行ごとに読み書きを行うためには、FORMAT CSV
、DELIMITER
、NULL
などの COPY
のオプションを指定してはいけません。これらの詳細については今は置いておきます。ありがとう :)
行ごとにデータを読み込む#
逆の操作、つまり rows()
を繰り返すことで COPY ... TO STDOUT
操作から行を読み込むことも可能です。ただし、これは通常は実行したいことではありません。普通のクエリ処理のほうが使いやすいでしょう。
現在、PostgreSQL は COPY TO
に完全な型情報を与えてくれないため、返された行には、フォーマットに従った未パースのデータが文字列かバイトとして得られるだけです。
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
for row in copy.rows():
print(row) # 未パースのデータが返る: ('10', '2046-12-24')
読み込み前に set_types()
を使うことで結果を改善できますが、自分自身で指定する必要があります。
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
copy.set_types(["int4", "date"])
for row in copy.rows():
print(row) # (10, datetime.date(2046, 12, 24))
ブロックごとにコピーする#
もしデータが copy に適した方法ですでにフォーマットされている場合には (たとえば、前回の COPY TO
操作の結果として得られたファイルに由来する場合など)、代わりに Copy.write()
を使用してデータベースにロードできます。
with open("data", "r") as f:
with cursor.copy("COPY data FROM STDIN") as copy:
while data := f.read(BLOCK_SIZE):
copy.write(data)
この場合には、入力データが copy()
内の操作が期待するものと互換性がある限り、COPY
の任意のオプションとフォーマットが使えます。データを str
として渡せるのは、copy が FORMAT TEXT
の場合で、データを bytes
として渡せるのは、FORMAT TEXT
と FORMAT BINARY
の場合です。
COPY
フォーマット内でデータを生成するためには、COPY ... TO
STDOUT
ステートメントを使い、結果として得られた bytes
オブジェクトのストリームを生成する Copy
オブジェクトをイテレートできます。
with open("data.out", "wb") as f:
with cursor.copy("COPY table_name TO STDOUT") as copy:
for data in copy:
f.write(data)
バイナリ コピー#
バイナリ コピーは、COPY
ステートメント内で FORMAT BINARY
を指定することでサポートされます。write_row()
でバイナリデータをインポートするためには、データベースに渡されたすべての型に、バイバリ ダンバー (binary dumper) が登録されている必要があります。データが write()
を使用して ブロックごとに コピーされた場合、これは必要ありません。
警告
PostgreSQLはバイナリ モードで データを読み込むときには特に注意が必要で、cast のルールが適用されません。つまり、たとえば100という値を integer
カラムに渡そうとしても 失敗する ということです。psycopg はこれを smallint
の値として渡すため、サーバーは期待したサイズと一致しないという理由でリジェクトするためです。
Copy
の set_types()
メソッドを使用して読み込む型を注意深く指定すれば、この問題は回避できます。
参考
バイナリ クエリに関する詳しい情報については、バイナリ パラメータと結果 を参照してください。
非同期コピーのサポート#
非同期の操作は、AsyncConnection
によって取得されたオブジェクトを使用して、上記と同じパターンを使用してサポートされます。たとえば、f
が COPY
のデータを返す非同期の read()
メソッドをサポートするオブジェクトである場合、完全に非同期なコピー操作は次のようになるでしょう。
async with cursor.copy("COPY data FROM STDIN") as copy:
while data := await f.read():
await copy.write(data)
AsyncCopy
オブジェクトのドキュメンテーションでは、非同期メソッドと、それに対応する同期の Copy
との違いが説明されています。
参考
非同期オブジェクトの使用に関する詳しい情報は、非同期の操作 を参照してください。
例: サーバーを横断するテーブルのコピー#
サーバーを横断してテーブルまたはテーブルの一部をコピーするためには、2つの異なるコネクションの上の COPY 操作を使用できます。1つ目のコネクションから読み込み、2つ目のコネクションに書き込みます。
with psycopg.connect(dsn_src) as conn1, psycopg.connect(dsn_tgt) as conn2:
with conn1.cursor().copy("COPY src TO STDOUT (FORMAT BINARY)") as copy1:
with conn2.cursor().copy("COPY tgt FROM STDIN (FORMAT BINARY)") as copy2:
for data in copy1:
copy2.write(data)
通常、FORMAT BINARY
を使用するとパフォーマンスが向上しますが、コピー元とコピー先のスキーマが 完全に同一である 場合にのみ機能します。もしテーブルが 互換性がある だけなら (たとえば、integer
フィールドを 送り先の bigint
フィールドにコピーしようとしている場合)、BINARY
オプションを削除し、テキストベースのコピーを実行する必要があります。詳細は バイナリ コピー を参照してください。
同様のパターンは、async copy を実行するために、async objects を使うように書き換えられます。