diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index ad536999..7e06c822 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -7,7 +7,7 @@ on: pull_request: jobs: - run-e2e-tests: + run-non-telemetry-tests: runs-on: ubuntu-latest environment: azure-prod env: @@ -54,9 +54,49 @@ jobs: #---------------------------------------------- # run test suite #---------------------------------------------- - - name: Run e2e tests (excluding daily-only tests) + - name: Run non-telemetry e2e tests run: | - # Exclude telemetry E2E tests from PR runs (run daily instead) + # Exclude all telemetry tests - they run in separate job for isolation poetry run python -m pytest tests/e2e \ --ignore=tests/e2e/test_telemetry_e2e.py \ - -n auto \ No newline at end of file + --ignore=tests/e2e/test_concurrent_telemetry.py \ + -n auto --dist=loadgroup + + run-telemetry-tests: + runs-on: ubuntu-latest + needs: run-non-telemetry-tests # Run after non-telemetry tests complete + environment: azure-prod + env: + DATABRICKS_SERVER_HOSTNAME: ${{ secrets.DATABRICKS_HOST }} + DATABRICKS_HTTP_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }} + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} + DATABRICKS_CATALOG: peco + DATABRICKS_USER: ${{ secrets.TEST_PECO_SP_ID }} + steps: + - name: Check out repository + uses: actions/checkout@v4 + - name: Set up python + id: setup-python + uses: actions/setup-python@v5 + with: + python-version: "3.10" + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v4 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} + - name: Install dependencies + run: poetry install --no-interaction --all-extras + - name: Run telemetry tests in isolation + run: | + # Run telemetry tests in fresh Python process with complete isolation + # Use --dist=loadgroup to respect @pytest.mark.xdist_group markers + poetry run python -m pytest tests/e2e/test_concurrent_telemetry.py tests/e2e/test_telemetry_e2e.py \ + -n auto --dist=loadgroup -v \ No newline at end of file diff --git a/.github/workflows/test-telemetry-only.yml b/.github/workflows/test-telemetry-only.yml new file mode 100644 index 00000000..9c616de1 --- /dev/null +++ b/.github/workflows/test-telemetry-only.yml @@ -0,0 +1,57 @@ +name: Test Telemetry Only (Debug) + +on: + push: + branches: + - fix/telemetry-lifecycle-issues-729-731 + pull_request: + branches: + - fix/telemetry-lifecycle-issues-729-731 + workflow_dispatch: # Allow manual trigger + +jobs: + test-telemetry: + runs-on: ubuntu-latest + environment: azure-prod + env: + DATABRICKS_SERVER_HOSTNAME: ${{ secrets.DATABRICKS_HOST }} + DATABRICKS_HTTP_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }} + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} + DATABRICKS_CATALOG: peco + DATABRICKS_USER: ${{ secrets.TEST_PECO_SP_ID }} + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Set up python + id: setup-python + uses: actions/setup-python@v5 + with: + python-version: "3.10" + + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y libkrb5-dev + + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v4 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} + + - name: Install dependencies + run: poetry install --no-interaction --all-extras + + - name: Run telemetry test only (with xdist auto to mimic real CI) + run: | + # Use --dist=loadgroup to respect @pytest.mark.xdist_group markers + poetry run python -m pytest tests/e2e/test_concurrent_telemetry.py::TestE2ETelemetry::test_concurrent_queries_sends_telemetry -n auto --dist=loadgroup -v -s diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index a0215aae..1a246b7c 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -306,6 +306,8 @@ def read(self) -> Optional[OAuthToken]: ) self.session.open() except Exception as e: + # Respect user's telemetry preference even during connection failure + enable_telemetry = kwargs.get("enable_telemetry", True) TelemetryClientFactory.connection_failure_log( error_name="Exception", error_message=str(e), @@ -316,6 +318,7 @@ def read(self) -> Optional[OAuthToken]: user_agent=self.session.useragent_header if hasattr(self, "session") else None, + enable_telemetry=enable_telemetry, ) raise e diff --git a/src/databricks/sql/common/unified_http_client.py b/src/databricks/sql/common/unified_http_client.py index d5f7d3c8..ef55564c 100644 --- a/src/databricks/sql/common/unified_http_client.py +++ b/src/databricks/sql/common/unified_http_client.py @@ -217,7 +217,7 @@ def _should_use_proxy(self, target_host: str) -> bool: logger.debug("Error checking proxy bypass for host %s: %s", target_host, e) return True - def _get_pool_manager_for_url(self, url: str) -> urllib3.PoolManager: + def _get_pool_manager_for_url(self, url: str) -> Optional[urllib3.PoolManager]: """ Get the appropriate pool manager for the given URL. @@ -225,7 +225,7 @@ def _get_pool_manager_for_url(self, url: str) -> urllib3.PoolManager: url: The target URL Returns: - PoolManager instance (either direct or proxy) + PoolManager instance (either direct or proxy), or None if client is closed """ parsed_url = urllib.parse.urlparse(url) target_host = parsed_url.hostname @@ -291,6 +291,14 @@ def request_context( # Select appropriate pool manager based on target URL pool_manager = self._get_pool_manager_for_url(url) + # DEFENSIVE: Check if pool_manager is None (client closing/closed) + # This prevents AttributeError race condition when telemetry cleanup happens + if pool_manager is None: + logger.debug( + "HTTP client closing or closed, cannot make request to %s", url + ) + raise RequestError("HTTP client is closing or has been closed") + response = None try: diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 523fcc1d..40816240 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -42,6 +42,7 @@ from databricks.sql.common.feature_flag import FeatureFlagsContextFactory from databricks.sql.common.unified_http_client import UnifiedHttpClient from databricks.sql.common.http import HttpMethod +from databricks.sql.exc import RequestError from databricks.sql.telemetry.telemetry_push_client import ( ITelemetryPushClient, TelemetryPushClient, @@ -417,10 +418,38 @@ def export_latency_log( ) def close(self): - """Flush remaining events before closing""" + """Flush remaining events before closing + + IMPORTANT: This method does NOT close self._http_client. + + Rationale: + - _flush() submits async work to the executor that uses _http_client + - If we closed _http_client here, async callbacks would fail with AttributeError + - Instead, we let _http_client live as long as needed: + * Pending futures hold references to self (via bound methods) + * This keeps self alive, which keeps self._http_client alive + * When all futures complete, Python GC will clean up naturally + - The __del__ method ensures eventual cleanup during garbage collection + + This design prevents race conditions while keeping telemetry truly async. + """ logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex) self._flush() + def __del__(self): + """Cleanup when TelemetryClient is garbage collected + + This ensures _http_client is eventually closed when the TelemetryClient + object is destroyed. By this point, all async work should be complete + (since the futures held references keeping us alive), so it's safe to + close the http client. + """ + try: + if hasattr(self, "_http_client") and self._http_client: + self._http_client.close() + except Exception: + pass + class _TelemetryClientHolder: """ @@ -674,7 +703,8 @@ def close(host_url): ) try: TelemetryClientFactory._stop_flush_thread() - TelemetryClientFactory._executor.shutdown(wait=True) + # Use wait=False to allow process to exit immediately + TelemetryClientFactory._executor.shutdown(wait=False) except Exception as e: logger.debug("Failed to shutdown thread pool executor: %s", e) TelemetryClientFactory._executor = None @@ -689,9 +719,15 @@ def connection_failure_log( port: int, client_context, user_agent: Optional[str] = None, + enable_telemetry: bool = True, ): """Send error telemetry when connection creation fails, using provided client context""" + # Respect user's telemetry preference - don't force-enable + if not enable_telemetry: + logger.debug("Telemetry disabled, skipping connection failure log") + return + UNAUTH_DUMMY_SESSION_ID = "unauth_session_id" TelemetryClientFactory.initialize_telemetry_client( diff --git a/tests/e2e/test_concurrent_telemetry.py b/tests/e2e/test_concurrent_telemetry.py index bed348c2..0100599f 100644 --- a/tests/e2e/test_concurrent_telemetry.py +++ b/tests/e2e/test_concurrent_telemetry.py @@ -27,6 +27,7 @@ def run_in_threads(target, num_threads, pass_index=False): @pytest.mark.serial +@pytest.mark.xdist_group(name="serial_telemetry") class TestE2ETelemetry(PySQLPytestTestCase): @pytest.fixture(autouse=True) def telemetry_setup_teardown(self): @@ -35,13 +36,27 @@ def telemetry_setup_teardown(self): before each test and shuts it down afterward. Using a fixture makes this robust and automatic. """ + # Clean up BEFORE test starts to ensure no leftover state from previous tests + # Use wait=True to ensure all pending telemetry from previous tests completes + # This prevents those events from being captured by this test's mock + if TelemetryClientFactory._executor: + TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for pending telemetry + TelemetryClientFactory._executor = None + TelemetryClientFactory._stop_flush_thread() + TelemetryClientFactory._flush_event.clear() # Clear the event flag + TelemetryClientFactory._clients.clear() + TelemetryClientFactory._initialized = False + try: yield finally: + # Clean up AFTER test ends + # Use wait=True to ensure this test's telemetry completes before next test starts if TelemetryClientFactory._executor: - TelemetryClientFactory._executor.shutdown(wait=True) + TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for this test's telemetry TelemetryClientFactory._executor = None TelemetryClientFactory._stop_flush_thread() + TelemetryClientFactory._flush_event.clear() # Clear the event flag TelemetryClientFactory._clients.clear() TelemetryClientFactory._initialized = False @@ -50,6 +65,14 @@ def test_concurrent_queries_sends_telemetry(self): An E2E test where concurrent threads execute real queries against the staging endpoint, while we capture and verify the generated telemetry. """ + # Extra flush right before test starts to clear any events that accumulated + # between fixture cleanup and now (e.g., from other tests on same worker) + if TelemetryClientFactory._executor: + TelemetryClientFactory._executor.shutdown(wait=True) + TelemetryClientFactory._executor = None + TelemetryClientFactory._clients.clear() + TelemetryClientFactory._initialized = False + num_threads = 30 capture_lock = threading.Lock() captured_telemetry = [] @@ -139,6 +162,18 @@ def execute_query_worker(thread_id): assert "errors" not in response or not response["errors"] if "numProtoSuccess" in response: total_successful_events += response["numProtoSuccess"] + + # DEBUG: Print detailed counts to understand CI vs local difference + print(f"\n{'='*60}") + print(f"TELEMETRY DEBUG - Understanding 88 vs 60 discrepancy") + print(f"{'='*60}") + print(f"Client side captured_telemetry: {len(captured_telemetry)} events") + print(f"Client side captured_futures: {len(captured_futures)} futures") + print(f"Server side captured_responses: {len(captured_responses)} responses") + print(f"Server reported total_successful_events: {total_successful_events}") + print(f"Expected events: {num_threads * 2}") + print(f"{'='*60}\n") + assert total_successful_events == num_threads * 2 assert ( diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index 0a57edd3..83c2dbf8 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -44,23 +44,45 @@ def connection(self, extra_params=()): @pytest.mark.serial +@pytest.mark.xdist_group(name="serial_telemetry") class TestTelemetryE2E(TelemetryTestBase): """E2E tests for telemetry scenarios - must run serially due to shared host-level telemetry client""" @pytest.fixture(autouse=True) def telemetry_setup_teardown(self): """Clean up telemetry client state before and after each test""" + # Clean up BEFORE test starts + # Use wait=True to ensure all pending telemetry from previous tests completes + if TelemetryClientFactory._executor: + TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for pending telemetry + TelemetryClientFactory._executor = None + TelemetryClientFactory._stop_flush_thread() + TelemetryClientFactory._flush_event.clear() # Clear the event flag + TelemetryClientFactory._clients.clear() + TelemetryClientFactory._initialized = False + + # Clear feature flags cache before test starts + from databricks.sql.common.feature_flag import FeatureFlagsContextFactory + with FeatureFlagsContextFactory._lock: + FeatureFlagsContextFactory._context_map.clear() + if FeatureFlagsContextFactory._executor: + FeatureFlagsContextFactory._executor.shutdown(wait=False) + FeatureFlagsContextFactory._executor = None + try: yield finally: + # Clean up AFTER test ends + # Use wait=True to ensure this test's telemetry completes if TelemetryClientFactory._executor: - TelemetryClientFactory._executor.shutdown(wait=True) + TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for this test's telemetry TelemetryClientFactory._executor = None TelemetryClientFactory._stop_flush_thread() + TelemetryClientFactory._flush_event.clear() # Clear the event flag + TelemetryClientFactory._clients.clear() TelemetryClientFactory._initialized = False - # Clear feature flags cache to prevent state leakage between tests - from databricks.sql.common.feature_flag import FeatureFlagsContextFactory + # Clear feature flags cache after test ends with FeatureFlagsContextFactory._lock: FeatureFlagsContextFactory._context_map.clear() if FeatureFlagsContextFactory._executor: