Skip to content

Connection._disconnect() early-return skips transport.close() on close-handshake errors, leaking aiohttp ClientSession #46830

@MuhammadAliShahzad

Description

@MuhammadAliShahzad

Package

`azure-eventhub` 5.15.1 (pyamqp), affects both sync and async paths.

Describe the bug

`Connection._disconnect()` early-returns when state is already `END`:

```python

azure/eventhub/_pyamqp/aio/_connection_async.py:242

async def _disconnect(self) -> None:
"""Disconnect the transport and set state to END."""
if self.state == ConnectionState.END:
return
await self._set_state(ConnectionState.END)
await self._transport.close()
```

`Connection.close()` (line 842) wraps the close handshake in `try/except/finally`. The `except` sets state to `END` on any error during close; the `finally` then calls `_disconnect()`:

```python

azure/eventhub/_pyamqp/aio/_connection_async.py:842

async def close(self, error: Optional[AMQPError] = None, wait: bool = False) -> None:
try:
# ... send Close frame, transition state, _wait_for_response ...
except Exception as exc: # pylint:disable=broad-except
_LOGGER.info("An error occurred when closing the connection: %r", exc, ...)
await self._set_state(ConnectionState.END) # <-- state is now END
finally:
await self._disconnect() # <-- early-returns, transport.close() never runs
```

So when any error occurs during the AMQP close handshake (timeout, network error, peer already closed, etc.), the underlying transport is abandoned. The `aiohttp.ClientSession` is then only released when the connection object is garbage-collected, which produces a stream of `Unclosed client session` warnings and delays reclamation of TCP/SSL state.

Reproduction

Long-lived Cloud Run instance running an EventHub consumer/producer over `TransportType.AmqpOverWebsocket` with intermittent network disruption. We observe 8–17 `Unclosed client session` log entries at once on instance shutdown, one per partition, because each partition's connection close-handshake fails and `_disconnect()` early-returns.

Sync version has the identical bug

`azure/eventhub/_pyamqp/_connection.py:260` mirrors the async version exactly:

```python
def _disconnect(self) -> None:
if self.state == ConnectionState.END:
return
self._set_state(ConnectionState.END)
self._transport.close()
```

The sync transport (`_transport.py:743`) does not hold an aiohttp `ClientSession` so the user-visible leak is less severe, but the logic flaw is identical and a fix should cover both.

Why the early return exists (happy path tracing)

The early return is not arbitrary — it prevents double-close in the happy path. Tracing `Connection.close(wait=True)`:

  1. Send Close frame, state → `CLOSE_SENT`
  2. `_wait_for_response(wait=True, END)` blocks
  3. Peer sends Close back; the frame loop calls `_incoming_close()` (line 470)
  4. `_incoming_close` sees state in `disconnect_states` and calls `_disconnect()`
  5. `_disconnect()` sees state ≠ END, sets state to END, calls `transport.close()` ✓
  6. `_wait_for_response` returns; control returns to `close()`'s try block
  7. `finally` calls `_disconnect()` again — state is now END, early-returns

So the early return is correct for the happy path; the bug is that it also fires on the exception path where `_disconnect()` has not yet been called.

Two possible fixes

Option A — minimal patch (suggested in original report):

```python
async def _disconnect(self) -> None:
if self.state == ConnectionState.END:
try:
await self._transport.close()
except Exception:
pass
return
await self._set_state(ConnectionState.END)
await self._transport.close()
```

Smallest diff. However, this causes `transport.close()` to be called twice in the happy path (once from `_incoming_close`, once from `Connection.close`'s finally). That is only safe if `transport.close()` is idempotent — which it currently is not for `WebSocketTransportAsync` (it calls `self.sock.close()` without a None-check). See PR #46829 which makes the transport-level close idempotent and None-safe.

Option B — track close at the connection level (defense-in-depth):

```python
def init(self, ...):
...
self._transport_closed = False

async def _disconnect(self) -> None:
if self._transport_closed:
return
self._transport_closed = True
if self.state != ConnectionState.END:
await self._set_state(ConnectionState.END)
try:
await self._transport.close()
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Error closing transport: %r", e, extra=self._network_trace_params)
```

Slightly larger diff but does not depend on the transport-level fix landing first, and guarantees `transport.close()` runs exactly once regardless of code path.

Related

Environment

  • Python 3.12
  • `azure-eventhub==5.15.1`
  • `aiohttp==3.13.5`
  • Cloud Run (long-lived instances)
  • Transport: `TransportType.AmqpOverWebsocket`

Metadata

Metadata

Assignees

No one assigned

    Labels

    ClientThis issue points to a problem in the data-plane of the library.Event HubsService AttentionWorkflow: This issue is responsible by Azure service team.bugThis issue requires a change to an existing behavior in the product in order to be resolved.customer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK team

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions