-
Notifications
You must be signed in to change notification settings - Fork 113
Description
Issue
Text chunks are getting lost when streaming via TextStreamWriter. The last few words/characters of the stream sometimes don't make it to the receiver.
What's happening
TextStreamWriter.write() uses a lock (_write_lock) to prevent concurrent writes, but aclose() doesn't wait for it. So if you call aclose() while write() is still sending chunks, the stream trailer gets sent immediately and the receiver closes before all chunks arrive.
Here's the flow:
write("distinctive characteristics?")starts sending chunks (holds the lock)- Stream ends,
aclose()is called aclose()sends the trailer right away (doesn't wait for the lock)- Receiver sees the trailer, closes the stream
- Final chunks arrive too late → lost
In our case, text that should read "distinctive characteristics?" shows up as just "distinctive" on the frontend.
Where it happens
In livekit/rtc/data_stream.py:
TextStreamWriter.write() - has the lock:
async def write(self, text: str):
async with self._write_lock: # Waits for other writes
for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
await self._send_chunk(chunk_msg)BaseStreamWriter.aclose() - doesn't use the lock:
async def aclose(self, *, reason: str = "", attributes = None):
if self._closed:
raise RuntimeError("Stream already closed")
self._closed = True
await self._send_trailer(...) # Sent immediatelyQuick fix
Override aclose() in TextStreamWriter to wait for the lock:
class TextStreamWriter(BaseStreamWriter):
async def aclose(self, *, reason: str = "", attributes = None):
async with self._write_lock: # Wait for writes to finish
pass
await super().aclose(reason=reason, attributes=attributes)Or move _write_lock to BaseStreamWriter so all stream types can use it.
Context
Running into this with LiveKit Agents SDK doing text transcription streaming. Happens intermittently - more common with network latency.
Versions:
- livekit: 0.18.3
- Python: 3.13