パイプライン モードのサポート#
バージョン 3.1 で追加.
パイプライン モード を使用すると、PostgreSQL クライアント アプリケーションが前回送信したクエリの結果を読み込む必要なく、クエリを送信できるようになります。パイプライン モードを活用することで、複数のクエリや結果が単一のネットワークラウンドトリップで送受信できるようになるため、クライアントがサーバーを待つ時間が短くなります。パイプライン モードにより、アプリケーションは大幅な性能向上が得られます。
パイプライン モードが最も役に立つのは、サーバーが遠い場合、つまりネットワーク レイテンシ (「ping 時間」) が高い場合や、多数の小さな操作が立て続けに実行される場合です。各クエリの実行時間がクライアント/サーバー間のラウンドトリップの何倍もかかる場合には、通常、パイプライン コマンドを使っても得られる恩恵は小さくなります。300 ms のラウンドトリップ時間がかかるサーバー上で実行される 100 ステートメントの操作は、パイプラインがないとネットワーク レイテンシだけで 30 秒かかることになるでしょう。パイプラインを使用すると、サーバーからの結果を待つのにわずか 0.3 秒しかかからなくなる可能性があります。
サーバーはステートメントを実行し、クライアントが送信した順序で結果を返します。サーバーはパイプラインの終わりを待たずに、パイプライン内ですぐにコマンドを実行し始めます。結果はサーバー側でバッファリングされ、サーバーは 同期ポイント が確立されたときにバッファをフラッシュすることに注意してください。
参考
以下の PostgreSQL のドキュメント
には、パイプライン モードが最も役に立つ場合と、エラー管理やトランザクションとのやり取りについて、多数の詳細が説明されています。
クライアント-サーバー間のメッセージ フロー#
パイプライン モードの仕組みについてよく理解するためには、PostgreSQL のクライアント-サーバー間のメッセージ フロー を詳しく見る必要があります。
通常のクエリの間、各ステートメントはクライアントからサーバーにリクエスト メッセージのストリームとして送られ、サーバーに対してこれまで送ったメッセージを処理するべきことを伝えるために Sync メッセージで終端されます。サーバーはステートメントを実行し、クライアントに対して結果をメッセージのストリームとして説明 (describe) して返し、クライアントに対してクライアントが新しいクエリを送っても良いということを伝えるために ReadyForQuery で終端します。
たとえば、次の (何も結果を返さない) ステートメントは、
conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"])
結果として、以下の2つのメッセージ グループになります。
方向 |
メッセージ |
---|---|
Python ▶ PostgreSQL |
|
PostgreSQL ◀ Python |
|
そして、次のクエリは、
conn.execute("SELECT data FROM mytable WHERE id = %s", [1])
結果として、以下の2つのメッセージ グループになります。
方向 |
メッセージ |
---|---|
Python ▶ PostgreSQL |
|
PostgreSQL ◀ Python |
|
2つのステートメントは連続的に送信されるため、1回ごとに通信のオーバーヘッドが4回も課されてしまいます。
パイプラインモードを使うと、クライアントは複数の操作を、サーバーへのメッセージのより長いストリームとしてまとめられるようになり、1つ以上のレスポンスを1つのバッチにまとまった1つのレスポンスとして受け取れるようになります。上記の2つの操作を次のようにパイプラインで実行した場合、
with conn.pipeline():
conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"])
conn.execute("SELECT data FROM mytable WHERE id = %s", [1])
結果として、クライアントとサーバー間で1つのラウンドトリップしかかからなくなるでしょう。
方向 |
メッセージ |
---|---|
Python ▶ PostgreSQL |
|
PostgreSQL ◀ Python |
|
パイプライン モードを使用する#
psycopg は Connection.pipeline()
メソッド経由でパイプライン モードをサポートしています。このメソッドはコンテクスト マネージャです。with
ブロックに入ると、Pipeline
オブジェクトが yield されます。ブロックの終わりで、コネクションは通常の操作モードを継続します。
パイプライン ブロック内では、1つ以上のカーソルで Connection.execute()
、Cursor.execute()
、executemany()
を普通に使用して、複数の操作を実行できます。
>>> with conn.pipeline():
... conn.execute("INSERT INTO mytable VALUES (%s)", ["hello"])
... with conn.cursor() as cur:
... cur.execute("INSERT INTO othertable VALUES (%s)", ["world"])
... cur.executemany(
... "INSERT INTO elsewhere VALUES (%s)",
... [("one",), ("two",), ("four",)])
ノーマル モードとは違い、psycopg はサーバーが各クエリの結果を受信するのを待ちません。クライアントは、サーバーが結果を出力バッファにフラッシュしたときにバッチで受信します。同じパイプライン内の2つ以上のカーソルを使用することで、2つ以上の結果を受信できます。
何らかのステートメントでエラーが発生した場合、サーバーは現在のトランザクションを中断し、キューにある後続のコマンドは、次の 同期ポイント まで実行しません。そのようなコマンドそれぞれに対して PipelineAborted
例外が発生します。クエリ処理は同期ポイントの後に再開します。
警告
特定の機能はパイプラインでは利用できません。これには次の機能が含まれます。
COPY は PostgreSQL のパイプライン モードではサポートされていません。
Cursor.stream()
はパイプライン モードでは意味がありません。(この機能はバッチとは逆です!)ServerCursor
は現在パイプライン モードでは実装されていません。
注釈
psycopg 3.1 以降、executemany()
内部でパイプライン モードを使用するようになります。その結果、executemany()
を1回だけ呼び出すためにパイプライン ブロックを扱う必要はありません。
同期ポイント#
クエリ結果のクライアントへのフラッシュは、同期ポイントが psycopg により確立される以下のようなタイミングで行われる可能性があります。
Pipeline.sync()
メソッドの使用時Pipeline
ブロックの最後場合によっては、ネストした
Pipeline
ブロックのオープン時Cursor.fetchone()
などの fetch メソッドの使用時 (クエリだけをフラッシュしますが、Sync は発行せず、パイプラインのステート エラーもリセットしません)
サーバーは、たとえば出力バッファがフルの場合など、自身の裁量でフラッシュを実行する可能性があります。
たとえ autocommit 中であっても、サーバーはパイプライン モードで送信されるステートメントを、暗黙のトランザクション内にラッピングすることに注意してください。これは、Sync を受信したときにしかコミットされません。そのため、ステートメントのグループ内で失敗すると、おそらく前回の Sync より後に実行されたステートメントの効果が無効化され、それが後続の Sync に伝搬します。
たとえば、以下のブロック内では、
>>> with psycopg.connect(autocommit=True) as conn:
... with conn.pipeline() as p, conn.cursor() as cur:
... try:
... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"])
... cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"])
... conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"])
... p.sync()
... except psycopg.errors.UndefinedTable:
... pass
... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["four"])
ブロック内で、two
の insert が原因の relation "no_such_table" does not exist
というエラーが、しかしおそらく sync()
によって発生するでしょう。ブロックの終わりでは、テーブルには以下のデータがあることになります。
=# SELECT * FROM mytable;
+----+------+
| id | data |
+----+------+
| 2 | four |
+----+------+
(1 row)
理由は次のとおりです。
シーケンス値 1 はステートメント
one
で消費されるが、レコードは同じ暗黙のトランザクション内でのエラーのため破棄された。パイプラインが中断されるため、ステートメント
three
は実行されない (したがって、シーケンスのアイテムを消費しない)。ステートメント
four
は、Sync が失敗したトランザクションを終了させた後、成功裏に実行される。
警告
サーバーエラーによって例外が発生する正確な Python ステートメントは、いくぶん不確定です。サーバーがバッファされた結果をどのタイミングでフラッシュするかによって異なるためです。
ステートメントのグループがサーバーによってアトミックに適用されることを保証したい場合は、commit()
or transaction()
などのトランザクション メソッドを活用してください。これらのメソッドもパイプラインを同期して、それまでに実行されたコマンドにエラーがあった場合に例外を発生させます。
注意事項#
警告
パイプライン モードは実験的な機能です。
パイプライン モードの動作、特に、エラー コンディションや並行性の周辺は、通常のリクエスト-レスポンスメッセージのパターンほど多くは探索されておらず、その非同期の性質により、本質的により複雑になります。
より経験を積んでフィードバックをもらうにつれて (フィードバックは歓迎です)、バグや欠点を発見し、現在のインターフェイスや動作を変更せざるを得なくなる可能性があります。
パイプライン モードは現在サポートされている PostgreSQL の全てのバージョンで利用可能ですが、利用するには、クライアントが PostgreSQL 14 以上の libpg を使用する必要があります。Pipeline.is_supported()
を使用すると、クライアントが正しいライブラリを持っていることを確認できます。