Package
`azure-eventhub` 5.15.1 (pyamqp), affects both sync and async paths.
Describe the bug
`Connection._disconnect()` early-returns when state is already `END`:
```python
azure/eventhub/_pyamqp/aio/_connection_async.py:242
async def _disconnect(self) -> None:
"""Disconnect the transport and set state to END."""
if self.state == ConnectionState.END:
return
await self._set_state(ConnectionState.END)
await self._transport.close()
```
`Connection.close()` (line 842) wraps the close handshake in `try/except/finally`. The `except` sets state to `END` on any error during close; the `finally` then calls `_disconnect()`:
```python
azure/eventhub/_pyamqp/aio/_connection_async.py:842
async def close(self, error: Optional[AMQPError] = None, wait: bool = False) -> None:
try:
# ... send Close frame, transition state, _wait_for_response ...
except Exception as exc: # pylint:disable=broad-except
_LOGGER.info("An error occurred when closing the connection: %r", exc, ...)
await self._set_state(ConnectionState.END) # <-- state is now END
finally:
await self._disconnect() # <-- early-returns, transport.close() never runs
```
So when any error occurs during the AMQP close handshake (timeout, network error, peer already closed, etc.), the underlying transport is abandoned. The `aiohttp.ClientSession` is then only released when the connection object is garbage-collected, which produces a stream of `Unclosed client session` warnings and delays reclamation of TCP/SSL state.
Reproduction
Long-lived Cloud Run instance running an EventHub consumer/producer over `TransportType.AmqpOverWebsocket` with intermittent network disruption. We observe 8–17 `Unclosed client session` log entries at once on instance shutdown, one per partition, because each partition's connection close-handshake fails and `_disconnect()` early-returns.
Sync version has the identical bug
`azure/eventhub/_pyamqp/_connection.py:260` mirrors the async version exactly:
```python
def _disconnect(self) -> None:
if self.state == ConnectionState.END:
return
self._set_state(ConnectionState.END)
self._transport.close()
```
The sync transport (`_transport.py:743`) does not hold an aiohttp `ClientSession` so the user-visible leak is less severe, but the logic flaw is identical and a fix should cover both.
Why the early return exists (happy path tracing)
The early return is not arbitrary — it prevents double-close in the happy path. Tracing `Connection.close(wait=True)`:
- Send Close frame, state → `CLOSE_SENT`
- `_wait_for_response(wait=True, END)` blocks
- Peer sends Close back; the frame loop calls `_incoming_close()` (line 470)
- `_incoming_close` sees state in `disconnect_states` and calls `_disconnect()`
- `_disconnect()` sees state ≠ END, sets state to END, calls `transport.close()` ✓
- `_wait_for_response` returns; control returns to `close()`'s try block
- `finally` calls `_disconnect()` again — state is now END, early-returns
So the early return is correct for the happy path; the bug is that it also fires on the exception path where `_disconnect()` has not yet been called.
Two possible fixes
Option A — minimal patch (suggested in original report):
```python
async def _disconnect(self) -> None:
if self.state == ConnectionState.END:
try:
await self._transport.close()
except Exception:
pass
return
await self._set_state(ConnectionState.END)
await self._transport.close()
```
Smallest diff. However, this causes `transport.close()` to be called twice in the happy path (once from `_incoming_close`, once from `Connection.close`'s finally). That is only safe if `transport.close()` is idempotent — which it currently is not for `WebSocketTransportAsync` (it calls `self.sock.close()` without a None-check). See PR #46829 which makes the transport-level close idempotent and None-safe.
Option B — track close at the connection level (defense-in-depth):
```python
def init(self, ...):
...
self._transport_closed = False
async def _disconnect(self) -> None:
if self._transport_closed:
return
self._transport_closed = True
if self.state != ConnectionState.END:
await self._set_state(ConnectionState.END)
try:
await self._transport.close()
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Error closing transport: %r", e, extra=self._network_trace_params)
```
Slightly larger diff but does not depend on the transport-level fix landing first, and guarantees `transport.close()` runs exactly once regardless of code path.
Related
Environment
- Python 3.12
- `azure-eventhub==5.15.1`
- `aiohttp==3.13.5`
- Cloud Run (long-lived instances)
- Transport: `TransportType.AmqpOverWebsocket`
Package
`azure-eventhub` 5.15.1 (pyamqp), affects both sync and async paths.
Describe the bug
`Connection._disconnect()` early-returns when state is already `END`:
```python
azure/eventhub/_pyamqp/aio/_connection_async.py:242
async def _disconnect(self) -> None:
"""Disconnect the transport and set state to END."""
if self.state == ConnectionState.END:
return
await self._set_state(ConnectionState.END)
await self._transport.close()
```
`Connection.close()` (line 842) wraps the close handshake in `try/except/finally`. The `except` sets state to `END` on any error during close; the `finally` then calls `_disconnect()`:
```python
azure/eventhub/_pyamqp/aio/_connection_async.py:842
async def close(self, error: Optional[AMQPError] = None, wait: bool = False) -> None:
try:
# ... send Close frame, transition state, _wait_for_response ...
except Exception as exc: # pylint:disable=broad-except
_LOGGER.info("An error occurred when closing the connection: %r", exc, ...)
await self._set_state(ConnectionState.END) # <-- state is now END
finally:
await self._disconnect() # <-- early-returns, transport.close() never runs
```
So when any error occurs during the AMQP close handshake (timeout, network error, peer already closed, etc.), the underlying transport is abandoned. The `aiohttp.ClientSession` is then only released when the connection object is garbage-collected, which produces a stream of `Unclosed client session` warnings and delays reclamation of TCP/SSL state.
Reproduction
Long-lived Cloud Run instance running an EventHub consumer/producer over `TransportType.AmqpOverWebsocket` with intermittent network disruption. We observe 8–17 `Unclosed client session` log entries at once on instance shutdown, one per partition, because each partition's connection close-handshake fails and `_disconnect()` early-returns.
Sync version has the identical bug
`azure/eventhub/_pyamqp/_connection.py:260` mirrors the async version exactly:
```python
def _disconnect(self) -> None:
if self.state == ConnectionState.END:
return
self._set_state(ConnectionState.END)
self._transport.close()
```
The sync transport (`_transport.py:743`) does not hold an aiohttp `ClientSession` so the user-visible leak is less severe, but the logic flaw is identical and a fix should cover both.
Why the early return exists (happy path tracing)
The early return is not arbitrary — it prevents double-close in the happy path. Tracing `Connection.close(wait=True)`:
So the early return is correct for the happy path; the bug is that it also fires on the exception path where `_disconnect()` has not yet been called.
Two possible fixes
Option A — minimal patch (suggested in original report):
```python
async def _disconnect(self) -> None:
if self.state == ConnectionState.END:
try:
await self._transport.close()
except Exception:
pass
return
await self._set_state(ConnectionState.END)
await self._transport.close()
```
Smallest diff. However, this causes `transport.close()` to be called twice in the happy path (once from `_incoming_close`, once from `Connection.close`'s finally). That is only safe if `transport.close()` is idempotent — which it currently is not for `WebSocketTransportAsync` (it calls `self.sock.close()` without a None-check). See PR #46829 which makes the transport-level close idempotent and None-safe.
Option B — track close at the connection level (defense-in-depth):
```python
def init(self, ...):
...
self._transport_closed = False
async def _disconnect(self) -> None:
if self._transport_closed:
return
self._transport_closed = True
if self.state != ConnectionState.END:
await self._set_state(ConnectionState.END)
try:
await self._transport.close()
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Error closing transport: %r", e, extra=self._network_trace_params)
```
Slightly larger diff but does not depend on the transport-level fix landing first, and guarantees `transport.close()` runs exactly once regardless of code path.
Related
Environment