非同期の操作#

psycopg の ConnectionCursor には、対応する AsyncConnectionAsyncCursor があり、asyncio インターフェイスをサポートしています。

非同期オブジェクトのデザインは、同期オブジェクトとほぼ同じです。つまり、非同期オブジェクトを使用するために必要なのは、あちこちに await キーワードを散りばめることだけです。

async with await psycopg.AsyncConnection.connect(
        "dbname=test user=postgres") as aconn:
    async with aconn.cursor() as acur:
        await acur.execute(
            "INSERT INTO test (num, data) VALUES (%s, %s)",
            (100, "abc'def"))
        await acur.execute("SELECT * FROM test")
        await acur.fetchone()
        # (1, 100, "abc'def") を返す
        async for record in acur:
            print(record)

バージョン 3.1 で変更: AsyncConnection.connect() は DNS の名前解決をノンブロッキングな方法で行います。

警告

バージョン 3.1 より前では、AsyncConnection.connect() は DNS の名前解決でブロックされる可能性があります。それを避けるには、hostaddr 接続パラメータを設定 するか、これを自動化するために resolve_hostaddr_async() を使用する必要があります。

警告

Windows では、psycopg はデフォルトの ProactorEventLoop と互換性がありません。たとえば、 SelectorEventLoop などの別のループを使ってください。

たとえば、プログラムの初期で次のように使用できます。

asyncio.set_event_loop_policy(
    asyncio.WindowsSelectorEventLoopPolicy()
)

with async コネクション#

基本的な使い方 で見たように、コネクションとカーソルはコンテクストマネージャとして振る舞えるため、次のように実行できます。

with psycopg.connect("dbname=test user=postgres") as conn:
    with conn.cursor() as cur:
        cur.execute(...)
    # コンテクストを離れるとすぐカーソルがクローズされる
# トランザクションがコミットされて、コネクションが閉じる

非同期のコネクションは ほとんど 期待通りのものですが、完全ではありません。connect()cursor()コンテクストを返さない ということに注意してください。どちらも コンテクストとして使用できるオブジェクト を返すファクトリーメソッドです。なぜなら、手動でオブジェクトを処理して、必要なときにだけ close() すると便利なユースケースがいくつかあるためです。

結果として async with connect() を使うことはできず、代わりに次に示すように2段階で使う必要があります。

aconn = await psycopg.AsyncConnection.connect()
async with aconn:
    async with aconn.cursor() as cur:
        await cur.execute(...)

これはさらに、次のように async with await に短縮できます。

async with await psycopg.AsyncConnection.connect() as aconn:
    async with aconn.cursor() as cur:
        await cur.execute(...)

しかし、これ以上は短くできません。2重の async をする必要があります。

AsyncConnection.cursor() 関数は async 関数ではないため (決して I/O を実行しません)、そこに await が必要ないことに注意してください。結果として、普通の async with コンテクストマネージャが使用できます。

非同期操作の中断#

メインスレッドで実行されている普通のコネクション上の Ctrl-C によって、長時間実行中の操作が中断された場合、操作はキャンセルされ、コネクションはエラー状態になります。エラー状態からは、通常の rollback() で回復できます。

非同期タスクがキャンセルされた場合に、そのコネクション上のすべての操作も同様にキャンセルされるという点で、非同期コネクションも同様の動作を提供します。キャンセルは、Ctrl-C や同様のシグナルにより間接的に起こることも、通常の方法で Python Task をキャンセルすることで直接的に起こることもあります。psycopg が Python 標準の CancelledError に遭遇したときは、PostgreSQL の postmaster に操作をキャンセルするように依頼します。

たとえタスクが最終的に CancelledError により途中で終了した場合でも、Python Task のキャンセルによって操作が完了しないことが保証されないことに注意してください。ステートメントの最終的な結果を知りたい場合は、タスクのキャンセルの代替手段として Connection.cancel() を呼び出すことを検討してください。

以前のバージョンの psycopg では、コネクションを手動でキャンセルするためにシグナル ハンドラを設定することを推奨していました。これはもう必要ありません。

サーバー メッセージ#

PostgreSQL は、ちょうど実行された操作に関する警告やデバッグ情報などの 参考になるメッセージ を、クエリの結果とともに送ることができます。操作が成功したときでも通知が現れることがありますが、エラーを表しているわけではありません。その一部は次のように psql によって報告されるため、おそらく馴染みがあるでしょう。

$ psql
=# ROLLBACK;
WARNING:  there is no transaction in progress
ROLLBACK

メッセージは、PL/pgSQL の 'RAISE' ステートメント によって送られることもあります(EXCEPTION より低いレベルの場合。それ以外の場合は適切な DatabaseError が発生します)。受け取ったメッセージのレベルは client_min_messages 設定でコントロールできます。

デフォルトでは、メッセージは受信後に無視されます。クライアントでメッセージを処理したい場合は、Connection.add_notice_handler() 関数を使って、どんなメッセージを受信した場合にも呼び出される関数を登録できます。メッセージは、メッセージテキストや severity などのサーバーから渡されたすべての情報を含む Diagnostic のインスタンスとして、そのコールバック関数に渡されます。このオブジェクトは、サーバーによって起こされたエラーの diag 属性で見つかるものと同じです。

>>> import psycopg

>>> def log_notice(diag):
...     print(f"The server says: {diag.severity} - {diag.message_primary}")

>>> conn = psycopg.connect(autocommit=True)
>>> conn.add_notice_handler(log_notice)

>>> cur = conn.execute("ROLLBACK")
The server says: WARNING - there is no transaction in progress
>>> print(cur.statusmessage)
ROLLBACK

警告

コールバック関数が受け取った Diagnostic オブジェクトは、コールバックの処理が完了した後にデータの割り当て解除されるため、コールバック関数の終了後に使用してはいけません。後で情報を使う必要がある場合は、Diagnostic オブジェクト全体を使い回す代わりに、要求された属性を取り出してそれを使ってください。

非同期通知#

psycopg は、PostgreSQL コマンド LISTENNOTIFY により提供される機能(?)を使用して、他のデータベースセッションとの非同期な対話が可能です。この形式の通信を使用する方法の例については、PostgreSQL のドキュメンテーションを参照してください。

セッションが通知と対話する方法のため (NOTIFY のドキュメンテーションを参照)、通知をタイムリーに受信または送信したい場合は、コネクションを autocommit モードに保つ必要があります。

通知は Notify のインスタンスとして受信されます。もし通知を受信するためだけのコネクションを予約したい場合は、最も簡単な方法は Connection.notifies ジェネレータを使用することです。ジェネレータは close() を使用して停止できます。

注釈

通知を処理するのに AsyncConnection は必要ありません。普通のブロッキングな Connection は完全に有効です。

次の例は、通知を出力して、"stop" というメッセージを受信したときに停止するコードです。

import psycopg
conn = psycopg.connect("", autocommit=True)
conn.execute("LISTEN mychan")
gen = conn.notifies()
for notify in gen:
    print(notify)
    if notify.payload == "stop":
        gen.close()
print("there, I stopped")

次のように psql セッションで NOTIFY をいくつか実行すると、

=# NOTIFY mychan, 'hello';
NOTIFY
=# NOTIFY mychan, 'hey';
NOTIFY
=# NOTIFY mychan, 'stop';
NOTIFY

Python プロセスからは、たとえば次のような出力を得られるでしょう。

Notify(channel='mychan', payload='hello', pid=961823)
Notify(channel='mychan', payload='hey', pid=961823)
Notify(channel='mychan', payload='stop', pid=961823)
there, I stopped

あるいは、add_notify_handler() を使用して、普通のクエリ処理中にどんな通知を受信した場合にも呼び出されコールバック関数を登録することもできます。その後、コネクションを普通に使うことができます。この場合には、通知は即座には受信されず、クエリなどのコネクション操作中にだけ受信されるということに注意してください。

conn.add_notify_handler(lambda n: print(f"got this: {n}"))

# meanwhile in psql...
# =# NOTIFY mychan, 'hey';
# NOTIFY

print(conn.execute("SELECT 1").fetchone())
# got this: Notify(channel='mychan', payload='hey', pid=961823)
# (1,)

コネクションの切断の検知#

データベースとのコネクションが失われたときに、即座に検知できると便利な場合があります。これを行う容赦のない方法の1つは、SELECT 1 の無限ストリームを実行するループの中で、コネクションをポーリングすることです……。そのようなことをするのは やめてください。ポーリングは、非常に 時代遅れの手法です。さらに、ポーリングは非常率であり (本当にほしいものがサーバー-クライアントによる数字の1のジェネレータでない限り)、無意味なトラフィックを生み出し、そして平均でポーリング時間の半分の遅延で1つの切断を検出できるだけです。

より効率よくタイムリーにサーバーの切断を検知する方法は、追加のコネクションを作り、このコネクションが何か言うことがあるという OS からの通知を待つことです。その時にだけ何らかのチェックを実行できます。スレッド (または asyncio タスク) を専用に割り当てて、このコネクション上で待ちます。このようなスレッドは、OS によって起こされるまでは、何もアクティビティを実行しません。

普通の (asyncio ではない) プログラムの場合には、selectors モジュールが利用できます。Connectionfileno() メソッドを実装しているため、それをファイル ライクなオブジェクトとしてそのまま登録できます。もし残りのプログラムが何か他にするべきことができたとしても、そのコードは専用のスレッドで (そして専用のコネクションを使用して) 実行できます。

import selectors

sel = selectors.DefaultSelector()
sel.register(conn, selectors.EVENT_READ)
while True:
    if not sel.select(timeout=60.0):
        continue  # 1分以内に FD のアクティビティが何も検出されなかった

    # アクティビティが検出された。コネクションはまだ OK?
    try:
        conn.execute("SELECT 1")
    except psycopg.OperationalError:
        # 切断されてしまったため、パニックになるなど、何か役に立つことをする
        logger.error("we lost our database!")
        sys.exit(1)

asyncio のプログラムでは、代わりに Task を専用に割り当てて、add_reader を使用して同じようなことをします。

import asyncio

ev = asyncio.Event()
loop = asyncio.get_event_loop()
loop.add_reader(conn.fileno(), ev.set)

while True:
    try:
        await asyncio.wait_for(ev.wait(), 60.0)
    except asyncio.TimeoutError:
        continue  # 1分以内に FD のアクティビティが何も検出されなかった

    # アクティビティが検出された。コネクションはまだ OK?
    try:
        await conn.execute("SELECT 1")
    except psycopg.OperationalError:
        # 何が起きたか推測する
        ...