From 1bd1c19ea1c029e28c7ed33e1bc922f30c247e66 Mon Sep 17 00:00:00 2001 From: Siddhant Shah Date: Tue, 17 Feb 2026 19:25:45 +0530 Subject: [PATCH] fix(open-9027): azure openai tracer fix for chunks --- .../lib/integrations/async_openai_tracer.py | 29 +++++++++++++++--- src/openlayer/lib/integrations/groq_tracer.py | 30 ++++++++++++++++--- .../lib/integrations/openai_tracer.py | 23 ++++++++++---- 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/src/openlayer/lib/integrations/async_openai_tracer.py b/src/openlayer/lib/integrations/async_openai_tracer.py index cef05ba4..01a73ba8 100644 --- a/src/openlayer/lib/integrations/async_openai_tracer.py +++ b/src/openlayer/lib/integrations/async_openai_tracer.py @@ -206,6 +206,11 @@ async def handle_async_streaming_create( num_of_completion_tokens = i + 1 i += 1 + choices = getattr(chunk, "choices", None) + if not choices: + yield chunk + continue + delta = chunk.choices[0].delta if delta.content: @@ -235,7 +240,13 @@ async def handle_async_streaming_create( if collected_output_data: output_data = "".join(collected_output_data) else: - collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) + if collected_function_call["arguments"]: + try: + collected_function_call["arguments"] = json.loads( + collected_function_call["arguments"] + ) + except json.JSONDecodeError: + pass output_data = collected_function_call trace_args = create_trace_args( @@ -543,6 +554,12 @@ async def handle_async_streaming_parse( num_of_completion_tokens = i + 1 i += 1 + # Skip chunks with empty choices (e.g., Azure OpenAI heartbeat chunks) + choices = getattr(chunk, "choices", None) + if not choices: + yield chunk + continue + delta = chunk.choices[0].delta if delta.content: @@ -578,9 +595,13 @@ async def handle_async_streaming_parse( if collected_output_data: output_data = "".join(collected_output_data) else: - collected_function_call["arguments"] = json.loads( - collected_function_call["arguments"] - ) + if collected_function_call["arguments"]: + try: + collected_function_call["arguments"] = json.loads( + collected_function_call["arguments"] + ) + except json.JSONDecodeError: + pass output_data = collected_function_call trace_args = create_trace_args( diff --git a/src/openlayer/lib/integrations/groq_tracer.py b/src/openlayer/lib/integrations/groq_tracer.py index fc359427..eb1403da 100644 --- a/src/openlayer/lib/integrations/groq_tracer.py +++ b/src/openlayer/lib/integrations/groq_tracer.py @@ -133,6 +133,11 @@ def stream_chunks( if i > 0: num_of_completion_tokens = i + 1 + choices = getattr(chunk, "choices", None) + if not choices: + yield chunk + continue + delta = chunk.choices[0].delta if delta.content: @@ -161,7 +166,13 @@ def stream_chunks( if collected_output_data: output_data = "".join(collected_output_data) else: - collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) + if collected_function_call["arguments"]: + try: + collected_function_call["arguments"] = json.loads( + collected_function_call["arguments"] + ) + except json.JSONDecodeError: + pass output_data = collected_function_call # Get usage data from the last chunk @@ -321,14 +332,25 @@ def parse_non_streaming_output_data( output_data = output_content.strip() elif output_function_call or output_tool_calls: if output_function_call: + args_str = getattr(output_function_call, "arguments", "") or "" + try: + arguments = json.loads(args_str) if args_str.strip() else {} + except json.JSONDecodeError: + arguments = args_str function_call = { "name": output_function_call.name, - "arguments": json.loads(output_function_call.arguments), + "arguments": arguments, } else: + func = output_tool_calls[0].function + args_str = getattr(func, "arguments", "") or "" + try: + arguments = json.loads(args_str) if args_str.strip() else {} + except json.JSONDecodeError: + arguments = args_str function_call = { - "name": output_tool_calls[0].function.name, - "arguments": json.loads(output_tool_calls[0].function.arguments), + "name": func.name, + "arguments": arguments, } output_data = function_call else: diff --git a/src/openlayer/lib/integrations/openai_tracer.py b/src/openlayer/lib/integrations/openai_tracer.py index 3e6384b5..37e6fe58 100644 --- a/src/openlayer/lib/integrations/openai_tracer.py +++ b/src/openlayer/lib/integrations/openai_tracer.py @@ -216,7 +216,8 @@ def stream_chunks( num_of_completion_tokens = i + 1 # Skip chunks with empty choices (e.g., Azure OpenAI heartbeat chunks) - if not chunk.choices: + choices = getattr(chunk, "choices", None) + if not choices: yield chunk continue @@ -1344,14 +1345,25 @@ def parse_non_streaming_output_data( # Function/tool call response if output_function_call or output_tool_calls: if output_function_call: + args_str = getattr(output_function_call, "arguments", "") or "" + try: + arguments = json.loads(args_str) if args_str.strip() else {} + except json.JSONDecodeError: + arguments = args_str return { "name": output_function_call.name, - "arguments": json.loads(output_function_call.arguments), + "arguments": arguments, } else: + func = output_tool_calls[0].function + args_str = getattr(func, "arguments", "") or "" + try: + arguments = json.loads(args_str) if args_str.strip() else {} + except json.JSONDecodeError: + arguments = args_str return { - "name": output_tool_calls[0].function.name, - "arguments": json.loads(output_tool_calls[0].function.arguments), + "name": func.name, + "arguments": arguments, } return None @@ -1417,7 +1429,8 @@ def stream_parse_chunks( num_of_completion_tokens = i + 1 # Skip chunks with empty choices (e.g., Azure OpenAI heartbeat chunks) - if not chunk.choices: + choices = getattr(chunk, "choices", None) + if not choices: yield chunk continue