diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index a0215aae..1a246b7c 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -306,6 +306,8 @@ def read(self) -> Optional[OAuthToken]: ) self.session.open() except Exception as e: + # Respect user's telemetry preference even during connection failure + enable_telemetry = kwargs.get("enable_telemetry", True) TelemetryClientFactory.connection_failure_log( error_name="Exception", error_message=str(e), @@ -316,6 +318,7 @@ def read(self) -> Optional[OAuthToken]: user_agent=self.session.useragent_header if hasattr(self, "session") else None, + enable_telemetry=enable_telemetry, ) raise e diff --git a/src/databricks/sql/common/unified_http_client.py b/src/databricks/sql/common/unified_http_client.py index d5f7d3c8..ef55564c 100644 --- a/src/databricks/sql/common/unified_http_client.py +++ b/src/databricks/sql/common/unified_http_client.py @@ -217,7 +217,7 @@ def _should_use_proxy(self, target_host: str) -> bool: logger.debug("Error checking proxy bypass for host %s: %s", target_host, e) return True - def _get_pool_manager_for_url(self, url: str) -> urllib3.PoolManager: + def _get_pool_manager_for_url(self, url: str) -> Optional[urllib3.PoolManager]: """ Get the appropriate pool manager for the given URL. @@ -225,7 +225,7 @@ def _get_pool_manager_for_url(self, url: str) -> urllib3.PoolManager: url: The target URL Returns: - PoolManager instance (either direct or proxy) + PoolManager instance (either direct or proxy), or None if client is closed """ parsed_url = urllib.parse.urlparse(url) target_host = parsed_url.hostname @@ -291,6 +291,14 @@ def request_context( # Select appropriate pool manager based on target URL pool_manager = self._get_pool_manager_for_url(url) + # DEFENSIVE: Check if pool_manager is None (client closing/closed) + # This prevents AttributeError race condition when telemetry cleanup happens + if pool_manager is None: + logger.debug( + "HTTP client closing or closed, cannot make request to %s", url + ) + raise RequestError("HTTP client is closing or has been closed") + response = None try: diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 523fcc1d..40816240 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -42,6 +42,7 @@ from databricks.sql.common.feature_flag import FeatureFlagsContextFactory from databricks.sql.common.unified_http_client import UnifiedHttpClient from databricks.sql.common.http import HttpMethod +from databricks.sql.exc import RequestError from databricks.sql.telemetry.telemetry_push_client import ( ITelemetryPushClient, TelemetryPushClient, @@ -417,10 +418,38 @@ def export_latency_log( ) def close(self): - """Flush remaining events before closing""" + """Flush remaining events before closing + + IMPORTANT: This method does NOT close self._http_client. + + Rationale: + - _flush() submits async work to the executor that uses _http_client + - If we closed _http_client here, async callbacks would fail with AttributeError + - Instead, we let _http_client live as long as needed: + * Pending futures hold references to self (via bound methods) + * This keeps self alive, which keeps self._http_client alive + * When all futures complete, Python GC will clean up naturally + - The __del__ method ensures eventual cleanup during garbage collection + + This design prevents race conditions while keeping telemetry truly async. + """ logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex) self._flush() + def __del__(self): + """Cleanup when TelemetryClient is garbage collected + + This ensures _http_client is eventually closed when the TelemetryClient + object is destroyed. By this point, all async work should be complete + (since the futures held references keeping us alive), so it's safe to + close the http client. + """ + try: + if hasattr(self, "_http_client") and self._http_client: + self._http_client.close() + except Exception: + pass + class _TelemetryClientHolder: """ @@ -674,7 +703,8 @@ def close(host_url): ) try: TelemetryClientFactory._stop_flush_thread() - TelemetryClientFactory._executor.shutdown(wait=True) + # Use wait=False to allow process to exit immediately + TelemetryClientFactory._executor.shutdown(wait=False) except Exception as e: logger.debug("Failed to shutdown thread pool executor: %s", e) TelemetryClientFactory._executor = None @@ -689,9 +719,15 @@ def connection_failure_log( port: int, client_context, user_agent: Optional[str] = None, + enable_telemetry: bool = True, ): """Send error telemetry when connection creation fails, using provided client context""" + # Respect user's telemetry preference - don't force-enable + if not enable_telemetry: + logger.debug("Telemetry disabled, skipping connection failure log") + return + UNAUTH_DUMMY_SESSION_ID = "unauth_session_id" TelemetryClientFactory.initialize_telemetry_client( diff --git a/tests/e2e/test_concurrent_telemetry.py b/tests/e2e/test_concurrent_telemetry.py index bed348c2..f725d6a3 100644 --- a/tests/e2e/test_concurrent_telemetry.py +++ b/tests/e2e/test_concurrent_telemetry.py @@ -27,6 +27,7 @@ def run_in_threads(target, num_threads, pass_index=False): @pytest.mark.serial +@pytest.mark.xdist_group(name="serial_telemetry") class TestE2ETelemetry(PySQLPytestTestCase): @pytest.fixture(autouse=True) def telemetry_setup_teardown(self): diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index 0a57edd3..b9f64b7e 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -44,6 +44,7 @@ def connection(self, extra_params=()): @pytest.mark.serial +@pytest.mark.xdist_group(name="serial_telemetry") class TestTelemetryE2E(TelemetryTestBase): """E2E tests for telemetry scenarios - must run serially due to shared host-level telemetry client"""