From a2fe99f8933530483e9f1414ee6674bb55d550de Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Thu, 4 Jun 2026 16:07:36 +0530 Subject: [PATCH 1/4] =?UTF-8?q?feat(kernel):=20wire=20CUJ-gap=20fixes=20?= =?UTF-8?q?=E2=80=94=20staging=20fail-loud,=20error=20context,=20sync=20ca?= =?UTF-8?q?ncel=20(#825)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(kernel): wire CUJ-gap fixes — staging fail-loud, error context, sync cancel Connector half of the API+CUJ audit fixes (kernel half: databricks-sql-kernel PR #121). Bumps KERNEL_REV to pick up the kernel surface. Staging fail-loud (kernel/client.py): - Volume/staging PUT/GET/REMOVE silently no-op'd on the kernel path (KernelResultSet.is_staging_operation is always False, so the connector's _handle_staging_operation never fired and no file was transferred). Detect the leading verb in execute_command and raise NotSupportedError so ETL fails loud instead of ingesting stale data. Error context (_errors.py): - Forward display_message / diagnostic_info / error_details_json (now exposed across the pyo3 boundary in #121) onto the re-raised PEP-249 exception, and populate ServerOperationError.context with "diagnostic-info" (Spark stack trace) + "operation-id" — matching the Thrift backend so callers reading err.context work identically. Sync cancel wiring (client.py, kernel/client.py): - cursor.cancel() was a silent no-op for the default blocking execute() (active_command_id is None until execute returns). The kernel backend now registers a detached StatementCanceller (keyed by the cursor) before the blocking execute and exposes cancel_running_cursor(cursor). Cursor.cancel() routes to that hook via getattr when there's no command id yet — opt-in, so Thrift/SEA backends are unaffected. Tests: unit (staging fail-loud, _is_staging_statement, cancel registry + routing, error context/diagnostic-info forwarding) and e2e (tz-aware TIMESTAMP, scientific DECIMAL, staging NotSupportedError, diagnostic-info context, cross-thread sync cancel interrupts a running query). All e2e verified live against dogfood with use_kernel=True. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala * fix(kernel): address review — comment-prefixed staging + tolerant sync cancel Review fixes on PR #825 (+ KERNEL_REV bump to the amended kernel #121 which now folds the original message on attach failure and bounds error_details_json): P1 #1 — staging fail-loud missed comment-prefixed statements: _is_staging_statement took the first whitespace token without stripping SQL comments, so "-- upload\nPUT ..." / "/* c */ PUT ..." (common in ETL) classified as non-staging and slipped into the silent-no-op bug. Added _strip_leading_sql_comments (handles leading -- line and /* */ block comments, multiple/mixed) before extracting the verb. Tests for both comment forms, mixed, and verb-only-in-comment (must NOT match). P1 #2 — sync cancel could raise out of cursor.cancel(): cursor.cancel() is best-effort per PEP-249, but cancel_running_cursor re-raised a canceller failure (e.g. an early cancel before the server statement id is observed, or a transport hiccup) via the public cancel(). Now swallow+log and still return True (a canceller was present and attempted) so Cursor doesn't emit the misleading "no executing command" warning. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala * chore: re-pin KERNEL_REV to merged kernel #121 (cbeaf44) #121 (tz-aware/scientific param binds, error context, sync cancel + Ctrl-C) is merged to kernel main. Re-pin from the orphaned branch HEAD (f62d941) to the merged squash SHA (cbeaf44) — content-identical, but reachable from main so no orphan-SHA risk. Verified against a wheel built from cbeaf44: connector unit + kernel e2e (tz-aware TIMESTAMP, scientific DECIMAL, staging fail-loud incl. comment-prefixed, diagnostic-info context) all pass. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala --------- Signed-off-by: Vikrant Puppala --- KERNEL_REV | 2 +- src/databricks/sql/backend/kernel/_errors.py | 24 ++- src/databricks/sql/backend/kernel/client.py | 132 ++++++++++++ src/databricks/sql/client.py | 21 +- tests/e2e/test_kernel_backend.py | 119 ++++++++++- tests/unit/test_client.py | 37 +++- tests/unit/test_kernel_client.py | 201 +++++++++++++++++++ 7 files changed, 526 insertions(+), 10 deletions(-) diff --git a/KERNEL_REV b/KERNEL_REV index af059324d..37b717a45 100644 --- a/KERNEL_REV +++ b/KERNEL_REV @@ -1 +1 @@ -101aa465e71991eec98102bba77aad2f7ad8faed +cbeaf44c66ebc57c5b3eed2a5c5d0290d4bf035b diff --git a/src/databricks/sql/backend/kernel/_errors.py b/src/databricks/sql/backend/kernel/_errors.py index 78b542300..151bac96b 100644 --- a/src/databricks/sql/backend/kernel/_errors.py +++ b/src/databricks/sql/backend/kernel/_errors.py @@ -97,7 +97,23 @@ def reraise_kernel_error(exc: "_kernel.KernelError") -> "Error": """ code = getattr(exc, "code", "Unknown") cls = _CODE_TO_EXCEPTION.get(code, DatabaseError) - new = cls(getattr(exc, "message", str(exc))) + + # For ServerOperationError, reproduce the Thrift backend's + # ``context`` dict so callers that read + # ``err.context["diagnostic-info"]`` (the Spark stack trace) / + # ``err.context["operation-id"]`` get the same shape on the kernel + # path. ``diagnostic_info`` is forwarded from the kernel error (it + # now crosses the PyO3 boundary; older wheels return ``None`` via + # ``getattr``, so this degrades gracefully). Matches + # thrift_backend.py's ServerOperationError construction. + context = None + if cls is ServerOperationError: + context = { + "operation-id": getattr(exc, "query_id", None), + "diagnostic-info": getattr(exc, "diagnostic_info", None), + } + new = cls(getattr(exc, "message", str(exc)), context) + for attr in ( "code", "sql_state", @@ -106,6 +122,12 @@ def reraise_kernel_error(exc: "_kernel.KernelError") -> "Error": "http_status", "retryable", "query_id", + # Extended server status now forwarded across the PyO3 boundary + # (kernel #121). ``getattr(..., None)`` keeps this forward-safe + # against an older wheel that doesn't set these attrs. + "display_message", + "diagnostic_info", + "error_details_json", ): setattr(new, attr, getattr(exc, attr, None)) return new diff --git a/src/databricks/sql/backend/kernel/client.py b/src/databricks/sql/backend/kernel/client.py index 7cdd484fa..08bb0d36b 100644 --- a/src/databricks/sql/backend/kernel/client.py +++ b/src/databricks/sql/backend/kernel/client.py @@ -71,6 +71,56 @@ # per-request skip-and-warn. _KERNEL_MANAGED_HEADERS = frozenset({"authorization", "x-databricks-org-id"}) +# Leading verbs of SQL volume/staging statements. Detected by the +# leading token (case-insensitive) so the kernel backend can fail loud +# on staging ops it can't service — see ``execute_command``. +_STAGING_VERBS = ("PUT", "GET", "REMOVE") + + +def _strip_leading_sql_comments(sql: str) -> str: + """Strip leading whitespace and SQL comments (``-- …`` line and + ``/* … */`` block, possibly several) from ``sql``, returning the + remainder. + + Needed so staging detection sees the real leading verb: a + comment-prefixed staging op (``-- upload\\nPUT …`` or + ``/* c */ PUT …``, common in ETL scripts) must still be classified + as staging, or it would slip past the guard into the silent-no-op + bug. Block comments do not nest in Databricks SQL, so a simple + scan-to-``*/`` is correct. + """ + i = 0 + n = len(sql) + while i < n: + if sql[i].isspace(): + i += 1 + elif sql.startswith("--", i): + # Line comment: skip to end of line (or string). + nl = sql.find("\n", i) + i = n if nl == -1 else nl + 1 + elif sql.startswith("/*", i): + # Block comment: skip to closing */ (or end if unterminated). + close = sql.find("*/", i + 2) + i = n if close == -1 else close + 2 + else: + break + return sql[i:] + + +def _is_staging_statement(operation: str) -> bool: + """True iff ``operation`` is a volume/staging statement (PUT / GET / + REMOVE). + + Strips leading whitespace + SQL comments first (so a comment- + prefixed staging op is still caught), then matches the leading token + only — so a normal query that merely *contains* the word (e.g. + ``SELECT 'GET' AS x``) isn't misflagged. + """ + stripped = _strip_leading_sql_comments(operation) + # First whitespace-delimited token, uppercased. + verb = stripped.split(None, 1)[0].upper() if stripped.strip() else "" + return verb in _STAGING_VERBS + # ─── Client ───────────────────────────────────────────────────────────────── @@ -172,6 +222,17 @@ def __init__( # path. Same lock as ``_async_handles``. self._closed_commands: Set[str] = set() self._async_handles_lock = threading.RLock() + # Sync-execute cancellers keyed by ``id(cursor)``. A blocking + # ``execute()`` sets ``cursor.active_command_id`` only AFTER it + # returns, so a concurrent ``cursor.cancel()`` (the documented + # cross-thread PEP-249 shape) has no command id to target while + # the query runs. We register a detached kernel + # ``StatementCanceller`` here just before the blocking call and + # drop it after; ``cancel_running_cursor`` (invoked by + # ``Cursor.cancel`` when there's no command id yet) fires it. + # Guarded by its own lock — cancel can race execute teardown. + self._sync_cancellers: Dict[int, Any] = {} + self._sync_cancellers_lock = threading.RLock() # ── Session lifecycle ────────────────────────────────────────── @@ -354,6 +415,24 @@ def execute_command( # ``_async_statements`` and closed by ``close_command``); the sync # path drops it in finally. ``close_stmt`` is the post-success # decision flag — it stays True on sync, flips to False on async. + # Volume/staging (PUT/GET/REMOVE) is not supported on the kernel + # path: the kernel returns the staging control row as a normal + # result set (``KernelResultSet.is_staging_operation`` is always + # False), so the connector's ``_handle_staging_operation`` never + # fires and NO file is transferred. Rather than silently no-op + # (the Thrift path performs the presigned-URL upload/download), + # fail loud at the call site so ETL scripts don't ingest + # stale/missing data. Detected by the leading SQL verb — the + # only signal available pre-execute, since the kernel exposes no + # staging marker today. + if _is_staging_statement(operation): + raise NotSupportedError( + "Volume / staging operations (PUT / GET / REMOVE) are not " + "supported on the kernel backend (use_kernel=True); the file " + "transfer would silently not happen. Use the Thrift backend " + "for staging operations." + ) + close_stmt = True try: try: @@ -382,10 +461,23 @@ def execute_command( self._async_statements[command_id.guid] = stmt close_stmt = False return None + # Register a detached canceller BEFORE the blocking + # execute so a concurrent ``cursor.cancel()`` can reach + # the running statement (its server id is populated mid- + # execute). Keyed by ``id(cursor)`` since no command id + # exists yet. Dropped in the finally. + try: + with self._sync_cancellers_lock: + self._sync_cancellers[id(cursor)] = stmt.canceller() + except Exception: + # Canceller is best-effort; never block execute on it. + pass executed = stmt.execute() except Exception as exc: raise _wrap_kernel_exception("execute_command", exc) from exc finally: + with self._sync_cancellers_lock: + self._sync_cancellers.pop(id(cursor), None) if close_stmt: # Sync path: ``Statement`` is a lifecycle owner separate # from the executed handle. Drop it here so the parent @@ -422,6 +514,46 @@ def cancel_command(self, command_id: CommandId) -> None: except Exception as exc: raise _wrap_kernel_exception("cancel_command", exc) from exc + def cancel_running_cursor(self, cursor: "Cursor") -> bool: + """Cancel an in-flight SYNC ``execute()`` on ``cursor``. + + Invoked by ``Cursor.cancel()`` when ``active_command_id`` is + still ``None`` — i.e. a blocking ``execute()`` hasn't returned, + so the command id isn't set yet but the server statement may be + running. Fires the detached ``StatementCanceller`` registered in + ``execute_command`` before the blocking call. + + Returns ``True`` if a canceller was found and fired (the + statement was in flight), ``False`` otherwise so ``Cursor`` can + emit its "no executing command" warning. Safe to call from + another thread. + + Tolerant by design: ``cursor.cancel()`` is a best-effort + PEP-249 method (callers don't expect it to raise), so a cancel + failure is logged and swallowed rather than propagated. This + also covers the early-cancel window — a cancel arriving before + the kernel has observed the server statement id is a no-op in + the kernel canceller, but if it ever raised (e.g. a transport + hiccup on the cancel RPC) we must not surface that out of + ``cancel()``. We still return ``True`` (a canceller was present + and we attempted it) so ``Cursor`` doesn't emit the misleading + "no executing command" warning. + """ + with self._sync_cancellers_lock: + canceller = self._sync_cancellers.get(id(cursor)) + if canceller is None: + return False + try: + canceller.cancel() + except Exception: + logger.warning( + "cancel_running_cursor: best-effort cancel of in-flight " + "sync statement failed; swallowing (cursor.cancel() is " + "tolerant by PEP-249 contract)", + exc_info=True, + ) + return True + def close_command(self, command_id: CommandId) -> None: with self._async_handles_lock: handle = self._async_handles.pop(command_id.guid, None) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 7fc815cd8..e66dd897c 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1704,11 +1704,22 @@ def cancel(self) -> None: """ if self.active_command_id is not None: self.backend.cancel_command(self.active_command_id) - else: - logger.warning( - "Attempting to cancel a command, but there is no " - "currently executing command" - ) + return + # No command id yet. A backend whose synchronous ``execute()`` + # blocks without first publishing a command id (the kernel + # backend) may still have a server statement in flight. Such a + # backend exposes ``cancel_running_cursor(cursor)`` -> bool to + # cancel it; it returns True if something was actually + # cancelled. Opt-in via getattr so the Thrift / SEA backends + # (which set ``active_command_id`` before blocking) are + # unaffected. + cancel_running_cursor = getattr(self.backend, "cancel_running_cursor", None) + if cancel_running_cursor is not None and cancel_running_cursor(self): + return + logger.warning( + "Attempting to cancel a command, but there is no " + "currently executing command" + ) def close(self) -> None: """Close cursor""" diff --git a/tests/e2e/test_kernel_backend.py b/tests/e2e/test_kernel_backend.py index 1e61bd7b8..95d7d942e 100644 --- a/tests/e2e/test_kernel_backend.py +++ b/tests/e2e/test_kernel_backend.py @@ -24,7 +24,7 @@ import pytest import databricks.sql as sql -from databricks.sql.exc import DatabaseError +from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError # Skip the whole module unless the kernel wheel is genuinely installed. # ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a @@ -361,3 +361,120 @@ def test_user_agent_entry_and_http_headers_round_trip(kernel_conn_params): with c.cursor() as cur: cur.execute("SELECT 1 AS n") assert cur.fetchall()[0][0] == 1 + + +# ── Parameter parity (tz-aware timestamp, scientific decimal) ────── + + +def test_tz_aware_timestamp_parameter_binds(conn): + """A tz-aware datetime parameter (datetime with tzinfo) binds on + the kernel path and resolves to the correct UTC instant. Previously + rejected at bind on kernel; works on Thrift. (kernel #121)""" + import datetime + + tzdt = datetime.datetime( + 2026, + 5, + 15, + 18, + 0, + 0, + tzinfo=datetime.timezone(datetime.timedelta(hours=5, minutes=30)), + ) + with conn.cursor() as cur: + cur.execute("SELECT ? AS ts", [tzdt]) + ts = cur.fetchall()[0][0] + # 18:00 +05:30 == 12:30 UTC. + assert (ts.hour, ts.minute) == (12, 30) + + +def test_scientific_notation_decimal_parameter_binds(conn): + """A Decimal whose str() is exponential (e.g. 1E-7) binds on the + kernel path. Previously rejected at bind; the server/Thrift accept + scientific-notation decimal literals. (kernel #121)""" + import decimal + from databricks.sql.parameters.native import DecimalParameter + + with conn.cursor() as cur: + # 1E+2 == 100, round-trips cleanly at scale 0. + cur.execute("SELECT ? AS d", [DecimalParameter(decimal.Decimal("1E+2"))]) + assert int(cur.fetchall()[0][0]) == 100 + + +# ── Staging / volume — fail loud, not silent no-op ──────────────── + + +def test_staging_put_raises_not_supported(conn): + """A PUT (volume/staging) statement fails loud on the kernel path + rather than silently no-opping (which would make ETL ingest + stale/missing data). (CUJ-gap audit)""" + with conn.cursor() as cur: + with pytest.raises(NotSupportedError, match="staging"): + cur.execute("PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'") + + +def test_comment_prefixed_staging_put_raises_not_supported(conn): + """A comment-prefixed staging op (common in ETL scripts) must also + fail loud — the leading-verb detection strips SQL comments first, so + it can't slip past into the silent-no-op bug (PR #825 review #1).""" + with conn.cursor() as cur: + with pytest.raises(NotSupportedError, match="staging"): + cur.execute( + "-- upload the daily extract\n" + "PUT '/tmp/x.csv' INTO '/Volumes/main/default/v/x.csv'" + ) + + +# ── Error fidelity — diagnostic-info reaches .context ───────────── + + +def test_server_error_exposes_diagnostic_info_context(conn): + """A server-side query failure surfaces as ServerOperationError + with the Spark diagnostic context in ``.context['diagnostic-info']`` + — Thrift parity (kernel #121 forwards diagnostic_info across pyo3; + the connector populates .context).""" + with conn.cursor() as cur: + with pytest.raises(ServerOperationError) as exc_info: + cur.execute("SELECT * FROM definitely_not_a_table_xyz_kernel_e2e") + err = exc_info.value + # context shape matches Thrift; diagnostic-info may be None if + # the server didn't attach one, but the KEY must exist. + assert "diagnostic-info" in err.context + assert "operation-id" in err.context + + +# ── Sync cancel (cursor.cancel() from another thread) ───────────── + + +def test_sync_cancel_interrupts_blocking_execute(conn): + """cursor.cancel() from another thread cancels a long-running + blocking execute() on the kernel path. Previously a silent no-op + (active_command_id was None until execute returned). (kernel #121 + StatementCanceller + connector cancel_running_cursor wiring)""" + import threading + import time + + cur = conn.cursor() + + # The kernel publishes the server statement id once the initial + # POST returns — within the server's default wait window (~10s). + # Cancel after that so the canceller has an id to target; a cancel + # before then is a no-op by design (id not yet known). Pick a query + # that runs well past this so the cancel lands mid-flight. + def cancel_after_delay(): + time.sleep(15.0) + cur.cancel() + + t = threading.Thread(target=cancel_after_delay) + t.start() + try: + # Cancel should make execute() raise rather than run to + # completion — proving the server-side statement was cancelled. + with pytest.raises(Exception): + cur.execute( + "SELECT count(*) FROM range(0, 1000000000000) " + "WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1" + ) + finally: + t.join() + cur.close() diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4a8cb0b68..16e705cec 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -371,13 +371,46 @@ def test_cancel_command_calls_the_backend(self): def test_cancel_command_will_issue_warning_for_cancel_with_no_executing_command( self, logger_instance ): - mock_thrift_backend = Mock() + # Backends like Thrift/SEA set active_command_id before blocking + # and do NOT define ``cancel_running_cursor``. Use ``spec`` so the + # mock doesn't auto-advertise that opt-in hook (a bare ``Mock()`` + # returns a truthy Mock for any attribute). + mock_thrift_backend = Mock(spec=["cancel_command"]) cursor = client.Cursor(Mock(), mock_thrift_backend) cursor.cancel() self.assertTrue(logger_instance.warning.called) self.assertFalse(mock_thrift_backend.cancel_command.called) + @patch("databricks.sql.client.logger") + def test_cancel_routes_to_cancel_running_cursor_when_no_command_id( + self, logger_instance + ): + """When there's no active_command_id (a blocking sync execute() + hasn't published one), cancel() routes to the backend's opt-in + ``cancel_running_cursor`` hook (the kernel backend). If the hook + cancels something (returns True), no warning is emitted.""" + backend = Mock(spec=["cancel_command", "cancel_running_cursor"]) + backend.cancel_running_cursor.return_value = True + cursor = client.Cursor(Mock(), backend) + cursor.cancel() + + backend.cancel_running_cursor.assert_called_once_with(cursor) + self.assertFalse(backend.cancel_command.called) + self.assertFalse(logger_instance.warning.called) + + @patch("databricks.sql.client.logger") + def test_cancel_warns_when_hook_finds_nothing_in_flight(self, logger_instance): + """If the hook returns False (nothing was in flight), the + existing 'no executing command' warning still fires.""" + backend = Mock(spec=["cancel_command", "cancel_running_cursor"]) + backend.cancel_running_cursor.return_value = False + cursor = client.Cursor(Mock(), backend) + cursor.cancel() + + backend.cancel_running_cursor.assert_called_once_with(cursor) + self.assertTrue(logger_instance.warning.called) + def test_version_is_canonical(self): version = databricks.sql.__version__ canonical_version_re = ( @@ -510,7 +543,7 @@ def test_column_name_api(self): expected_values = [["val1", 321, 52.32], ["val2", 2321, 252.32]] - for (row, expected) in zip(data, expected_values): + for row, expected in zip(data, expected_values): self.assertEqual(row.first_col, expected[0]) self.assertEqual(row.second_col, expected[1]) self.assertEqual(row.third_col, expected[2]) diff --git a/tests/unit/test_kernel_client.py b/tests/unit/test_kernel_client.py index 8cff9b3d4..3cbf55540 100644 --- a/tests/unit/test_kernel_client.py +++ b/tests/unit/test_kernel_client.py @@ -41,6 +41,9 @@ def __init__( message: str = "boom", sql_state: Optional[str] = None, query_id: Optional[str] = None, + diagnostic_info: Optional[str] = None, + display_message: Optional[str] = None, + error_details_json: Optional[str] = None, ) -> None: super().__init__(message) self.code = code @@ -51,6 +54,11 @@ def __init__( self.http_status = None self.retryable = False self.query_id = query_id + # Extended server status forwarded across the PyO3 boundary + # (kernel #121). Defaults None so existing tests are unaffected. + self.diagnostic_info = diagnostic_info + self.display_message = display_message + self.error_details_json = error_details_json _fake_kernel_module = types.ModuleType("databricks_sql_kernel") @@ -139,6 +147,40 @@ def test_reraise_forwards_structured_attributes(): assert out.retryable is False +def test_reraise_forwards_extended_status_attributes(): + """display_message / diagnostic_info / error_details_json now cross + the PyO3 boundary (kernel #121) and must be forwarded onto the + re-raised exception so callers can read them.""" + err = _FakeKernelError( + code="SqlError", + message="boom", + diagnostic_info="org.apache.spark...stack", + display_message="user-facing msg", + error_details_json='{"k":1}', + ) + out = kernel_client._reraise_kernel_error(err) + assert out.diagnostic_info == "org.apache.spark...stack" + assert out.display_message == "user-facing msg" + assert out.error_details_json == '{"k":1}' + + +def test_server_operation_error_populates_context_like_thrift(): + """A SqlError maps to ServerOperationError; its ``context`` must + carry ``diagnostic-info`` (the Spark stack trace) and + ``operation-id``, matching the Thrift backend so callers reading + ``err.context["diagnostic-info"]`` work identically on use_kernel.""" + err = _FakeKernelError( + code="SqlError", + message="table not found", + query_id="q-123", + diagnostic_info="org.apache.spark...stack", + ) + out = kernel_client._reraise_kernel_error(err) + assert isinstance(out, ServerOperationError) + assert out.context["diagnostic-info"] == "org.apache.spark...stack" + assert out.context["operation-id"] == "q-123" + + def test_kernel_error_chains_through_wrap(): """``raise wrap_kernel_exception(...) from exc`` is the call-site pattern; ``__cause__`` must be set to the original ``KernelError`` @@ -371,6 +413,165 @@ def test_execute_command_forwards_query_tags(): assert stmt.execute.called +# --------------------------------------------------------------------------- +# Staging / volume operations — fail loud (not silently no-op) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "operation", + [ + "PUT '/local/f.csv' INTO '/Volumes/c/s/v/f.csv'", + " put '/local/f' into '/Volumes/...'", # leading ws + lowercase + "GET '/Volumes/c/s/v/f' TO '/local/f'", + "REMOVE '/Volumes/c/s/v/f'", + ], +) +def test_staging_operation_raises_not_supported(operation): + """Volume/staging PUT/GET/REMOVE must FAIL LOUD on the kernel path + (the kernel can't perform the presigned-URL transfer; silently + no-opping would make ETL ingest stale/missing data).""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + with pytest.raises(NotSupportedError, match="staging"): + c.execute_command( + operation=operation, + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + +@pytest.mark.parametrize( + "operation, is_staging", + [ + ("PUT '/f' INTO '/v'", True), + ("get '/v' to '/f'", True), + ("REMOVE '/v'", True), + # Comment-prefixed staging ops MUST still be caught — otherwise + # they slip into the silent-no-op bug this guard exists to close + # (regression: review #1 on PR #825). ETL scripts commonly + # prefix statements with comments. + ("-- upload the file\nPUT '/f' INTO '/v'", True), + ("/* staging */ PUT '/f' INTO '/v'", True), + ("/* c1 */\n -- c2\n get '/v' to '/f'", True), # mixed, multiple + (" \n\t PUT '/f' INTO '/v'", True), # leading whitespace only + ("SELECT 'GET' AS x", False), # word appears but not leading verb + ("SELECT * FROM puts", False), + ("-- PUT in a comment\nSELECT 1", False), # verb only in comment + ("/* PUT */ SELECT 1", False), + ("INSERT INTO t VALUES (1)", False), + ("", False), + ("-- just a comment", False), # comment only, no statement + ], +) +def test_is_staging_statement(operation, is_staging): + assert kernel_client._is_staging_statement(operation) is is_staging + + +# --------------------------------------------------------------------------- +# Sync cancel wiring (cursor.cancel() during a blocking execute()) +# --------------------------------------------------------------------------- + + +def test_cancel_running_cursor_fires_registered_canceller(): + """A canceller registered for a cursor (as execute_command does + before the blocking call) is fired by cancel_running_cursor, which + returns True.""" + c = _make_client() + cursor = MagicMock() + canceller = MagicMock() + with c._sync_cancellers_lock: + c._sync_cancellers[id(cursor)] = canceller + + assert c.cancel_running_cursor(cursor) is True + canceller.cancel.assert_called_once_with() + + +def test_cancel_running_cursor_returns_false_when_none_registered(): + """No in-flight sync statement for this cursor -> False so the + Cursor can emit its 'no executing command' warning.""" + c = _make_client() + assert c.cancel_running_cursor(MagicMock()) is False + + +def test_cancel_running_cursor_swallows_cancel_errors(): + """cursor.cancel() is best-effort (PEP-249); a failing canceller + (e.g. an early cancel before the statement id is observed, or a + transport hiccup on the cancel RPC) must NOT propagate out of + cancel(). It's swallowed+logged, and we still return True so the + Cursor doesn't emit the misleading 'no executing command' warning + (regression: review #2 on PR #825).""" + c = _make_client() + cursor = MagicMock() + canceller = MagicMock() + canceller.cancel.side_effect = RuntimeError("cancel RPC failed") + with c._sync_cancellers_lock: + c._sync_cancellers[id(cursor)] = canceller + + # Does not raise, returns True (a canceller was present + attempted). + assert c.cancel_running_cursor(cursor) is True + canceller.cancel.assert_called_once_with() + + +def test_execute_command_registers_and_clears_sync_canceller(): + """The sync execute() path registers a StatementCanceller keyed by + the cursor before blocking, and clears it in the finally — so a + concurrent cancel can reach it mid-flight, and it doesn't leak.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + canceller = MagicMock() + stmt = MagicMock() + stmt.canceller.return_value = canceller + seen_during_execute = {} + + def fake_execute(): + # The canceller is registered *during* the blocking execute. + with c._sync_cancellers_lock: + seen_during_execute["registered"] = ( + c._sync_cancellers.get(id(cursor)) is canceller + ) + return MagicMock( + statement_id="stmt-id", + arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])), + ) + + stmt.execute.side_effect = fake_execute + c._kernel_session.statement.return_value = stmt + + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + assert seen_during_execute["registered"] is True + # Cleared after execute returns — no leak. + with c._sync_cancellers_lock: + assert id(cursor) not in c._sync_cancellers + + def test_get_columns_accepts_none_catalog(): """The kernel's `list_columns` honours `catalog=None` by issuing `SHOW COLUMNS IN ALL CATALOGS` server-side. The connector should From 015ee476532e5ac586eb59f830411bf1b9b91b45 Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Fri, 5 Jun 2026 14:28:47 +0530 Subject: [PATCH 2/4] test(e2e/mst): DESCRIBE QUERY is now allowed in transactions (#836) test(e2e/mst): DESCRIBE QUERY now allowed in transactions The server's MSTCheckRule allowlist has broadened to include DESCRIBE QUERY (DescribeQueryCommand), mirroring the earlier SHOW COLUMNS change. It no longer throws inside an active transaction, so the prior test_describe_query_blocked assertion (DID NOT RAISE) was stale. Flip it to test_describe_query_not_blocked using _assert_not_blocked (verifies it succeeds and returns >0 rows) and move DESCRIBE QUERY from the Blocked to the Allowed list in the class docstring. Verified against a live DBSQL warehouse: the full TestMstBlockedSql class (9 tests) passes. Signed-off-by: Vikrant Puppala --- tests/e2e/test_transactions.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/e2e/test_transactions.py b/tests/e2e/test_transactions.py index fd4fd73ab..a634885ab 100644 --- a/tests/e2e/test_transactions.py +++ b/tests/e2e/test_transactions.py @@ -565,17 +565,19 @@ class TestMstBlockedSql: "Only SELECT / INSERT / MERGE / UPDATE / DELETE / DESCRIBE TABLE are supported." The server has since broadened the allowlist to include SHOW COLUMNS - (ShowDeltaTableColumnsCommand), observed on current DBSQL warehouses. + (ShowDeltaTableColumnsCommand) and DESCRIBE QUERY (DescribeQueryCommand), + observed on current DBSQL warehouses. Blocked (throw + abort txn): - SHOW TABLES, SHOW SCHEMAS, SHOW CATALOGS, SHOW FUNCTIONS - - DESCRIBE QUERY, DESCRIBE TABLE EXTENDED + - DESCRIBE TABLE EXTENDED - SELECT FROM information_schema - Thrift Get{Catalogs,Schemas,Tables,Columns} RPCs (see TestMstMetadata) Allowed: - DESCRIBE TABLE (basic form) - SHOW COLUMNS + - DESCRIBE QUERY """ def _assert_blocked_and_txn_aborted(self, mst_conn_params, fq_table, blocked_sql): @@ -652,10 +654,14 @@ def test_show_columns_not_blocked(self, mst_conn_params, mst_table): mst_conn_params, fq_table, f"SHOW COLUMNS IN {fq_table}" ) - def test_describe_query_blocked(self, mst_conn_params, mst_table): - """DESCRIBE QUERY is blocked in MST (DescribeQueryCommand).""" + def test_describe_query_not_blocked(self, mst_conn_params, mst_table): + """DESCRIBE QUERY succeeds in MST — now allowed by the server's MSTCheckRule allowlist. + + Previously blocked (DescribeQueryCommand), the server has since broadened + the allowlist to include it, mirroring the earlier SHOW COLUMNS change. + """ fq_table, _ = mst_table - self._assert_blocked_and_txn_aborted( + self._assert_not_blocked( mst_conn_params, fq_table, f"DESCRIBE QUERY SELECT * FROM {fq_table}", From a3e882ee6f17b725eecd4bc731738a5d37f9988c Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Fri, 5 Jun 2026 22:36:57 +0530 Subject: [PATCH 3/4] feat(kernel): surface kernel logs through Python logging on use_kernel (#824) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(kernel): surface kernel logs through Python logging on use_kernel When the kernel backend loads, auto-initialize the kernel's tracing -> Python logging bridge so `use_kernel=True` users see kernel logs with no extra setup. Kernel logs land under the `databricks.sql.kernel` logger (a child of the connector's `databricks.sql.*` namespace), so an existing `logging.getLogger("databricks.sql").setLevel(...)` cascades to them. - `_errors.py` calls `databricks_sql_kernel.init_logging()` once at extension load (it's the canonical kernel-import site). The call is `getattr`-guarded so an older kernel wheel without the function still works — just without kernel logs. - e2e tests assert kernel records reach the `databricks.sql.kernel` logger (and the pyo3 boundary under `databricks.sql.kernel.pyo3`) and that the level set on the logger is respected. Creds-gated per the existing kernel e2e convention. Requires the companion kernel/pyo3 change (databricks-sql-kernel#120) that exposes `init_logging()`. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala * fix(kernel): harden log-bridge init + address review feedback Review feedback from PR #824 (gopalldb): - P1: guard the init_logging() call against throwing, not just a missing function. A panic across the PyO3 boundary surfaces as pyo3_runtime.PanicException (a BaseException, not Exception), so a bare call could escape module import and fail every use_kernel=True connection over a non-essential logging feature. Wrap in try/except BaseException, log a debug breadcrumb, continue. This also neutralizes the idempotency concern regardless of the Rust impl. - P2: soften the level-control test docstring to make clear it asserts the effective customer-visible outcome (sub-threshold records don't surface), not source-side suppression — caplog filters after the FFI. - P2: downgrade the databricks.sql.kernel.pyo3 assertion to a soft warning so a benign kernel change to the boundary breadcrumb target doesn't break the connector e2e suite. The core databricks.sql.kernel contract is still hard-asserted. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala * chore: bump KERNEL_REV to merged kernel main (f4ee6fe) for the logging bridge The kernel tracing -> Python logging bridge (init_logging / pyo3-log, routing under databricks.sql.kernel) landed in kernel #120 — AFTER the previously-pinned 101aa46 (#118). The kernel-e2e built a wheel without the bridge, so test_kernel_logs_reach_python_logging failed with 'no records delivered' (assert []). Bump to f4ee6fe (current kernel main: includes #118, #120, #123, #125). Verified live against a wheel built from f4ee6fe: the bridge delivers records to the databricks.sql.kernel logger, and both test_kernel_logs_reach_python_logging and test_kernel_log_level_is_respected pass. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala --------- Signed-off-by: Vikrant Puppala --- KERNEL_REV | 2 +- src/databricks/sql/backend/kernel/_errors.py | 27 +++++++ tests/e2e/test_kernel_backend.py | 85 ++++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) diff --git a/KERNEL_REV b/KERNEL_REV index 37b717a45..696572aef 100644 --- a/KERNEL_REV +++ b/KERNEL_REV @@ -1 +1 @@ -cbeaf44c66ebc57c5b3eed2a5c5d0290d4bf035b +f4ee6fec78aabce8c0ea9c1ff47fc11b8191d013 diff --git a/src/databricks/sql/backend/kernel/_errors.py b/src/databricks/sql/backend/kernel/_errors.py index 151bac96b..334866a37 100644 --- a/src/databricks/sql/backend/kernel/_errors.py +++ b/src/databricks/sql/backend/kernel/_errors.py @@ -38,6 +38,8 @@ from __future__ import annotations +import logging + from databricks.sql.exc import ( DatabaseError, Error, @@ -57,6 +59,31 @@ "(into the same venv as databricks-sql-connector)." ) from exc +# Route the kernel's Rust-side logs into Python's ``logging`` as soon as +# the extension loads. The kernel emits under the ``databricks.sql.kernel`` +# logger (a child of the connector's ``databricks.sql`` namespace), so a +# customer who configures ``databricks.sql`` logging gets kernel logs for +# free with no extra setup. +# +# This is a best-effort, non-essential feature: it must never take down +# ``use_kernel=True`` for a process. ``getattr`` guards against an older +# kernel wheel that predates the function. The ``try`` guards against the +# call itself throwing — note ``except BaseException`` is deliberate: a +# panic raised across the PyO3 boundary surfaces as +# ``pyo3_runtime.PanicException``, which derives from ``BaseException`` +# (not ``Exception``), so a narrower clause would let it escape module +# import and fail every kernel-backed connection. The kernel side is +# idempotent and returns rather than panics on a double install, but we +# do not rely on that here — the guard holds regardless of the Rust impl. +_kernel_init_logging = getattr(_kernel, "init_logging", None) +if _kernel_init_logging is not None: + try: + _kernel_init_logging() + except BaseException as exc: # noqa: BLE001 - see comment above re: PanicException + logging.getLogger(__name__).debug( + "kernel log bridge init failed; continuing without it: %r", exc + ) + # Map a kernel `code` slug to the PEP 249 exception class that best # captures it. The match isn't a perfect 1:1 — PEP 249 has a diff --git a/tests/e2e/test_kernel_backend.py b/tests/e2e/test_kernel_backend.py index 95d7d942e..8d988646d 100644 --- a/tests/e2e/test_kernel_backend.py +++ b/tests/e2e/test_kernel_backend.py @@ -127,6 +127,91 @@ def test_fetchall_arrow(conn): assert table.column_names == ["a", "b"] +# ─── Logging (Rust kernel -> Python logging bridge) ────────────────────────── +# +# Layer 3 of the logger-name drift guard (see also the Rust tests +# `klog::tests::klog_emits_contract_target` and +# `logging::tests::kernel_target_matches_contract` in the kernel repo). +# Asserts the *customer-facing* contract end-to-end: kernel logs reach +# Python `logging` under the `databricks.sql.kernel` logger, respect the +# level set on it, and the pyo3 boundary surfaces under +# `databricks.sql.kernel.pyo3`. If the kernel's tracing target or the +# pyo3-log wiring ever drifts, these fail. + +import logging + + +def test_kernel_logs_reach_python_logging(kernel_conn_params, caplog): + """A query at DEBUG produces records on the `databricks.sql.kernel` + logger — proving the tracing -> log -> pyo3-log -> logging chain.""" + with caplog.at_level(logging.DEBUG, logger="databricks.sql.kernel"): + c = sql.connect(**kernel_conn_params) + try: + with c.cursor() as cur: + cur.execute("SELECT 1 AS a") + cur.fetchall() + finally: + c.close() + + kernel_records = [ + r for r in caplog.records if r.name.startswith("databricks.sql.kernel") + ] + assert kernel_records, ( + "expected log records under the 'databricks.sql.kernel' logger; " + "the kernel tracing -> Python logging bridge did not deliver any" + ) + # The core kernel logger (not just any child) must be present — this + # is the customer-facing contract. + assert any( + r.name == "databricks.sql.kernel" for r in kernel_records + ), "expected core kernel records on the exact 'databricks.sql.kernel' logger" + # The pyo3-boundary breadcrumb (`databricks.sql.kernel.pyo3`) is a + # nice-to-have, but the exact sub-target is a kernel-internal naming + # detail — assert softly so a benign kernel change to the boundary + # breadcrumbs doesn't break the connector e2e suite. + if not any(r.name == "databricks.sql.kernel.pyo3" for r in kernel_records): + import warnings + + warnings.warn( + "no 'databricks.sql.kernel.pyo3' boundary records seen; " + "the kernel may have changed its pyo3 breadcrumb target", + stacklevel=2, + ) + + +def test_kernel_log_level_is_respected(kernel_conn_params, caplog): + """At WARNING on the kernel logger, no DEBUG/INFO kernel records reach + `caplog.records` — i.e. level control on `databricks.sql.kernel` + behaves like any other Python logger. + + Scope note: `caplog.at_level` sets the logger's level and attaches a + handler, so this asserts the *effective* outcome a customer sees + (sub-threshold records don't surface), not specifically that the Rust + side suppressed them at source. A DEBUG record that crossed the FFI + would still be dropped by Python's level check before reaching + `caplog`. Source-side suppression (and its per-record FFI cost + avoidance) is covered by the kernel-side filtering, not asserted + here.""" + with caplog.at_level(logging.WARNING, logger="databricks.sql.kernel"): + c = sql.connect(**kernel_conn_params) + try: + with c.cursor() as cur: + cur.execute("SELECT 1 AS a") + cur.fetchall() + finally: + c.close() + + debug_records = [ + r + for r in caplog.records + if r.name.startswith("databricks.sql.kernel") and r.levelno < logging.WARNING + ] + assert not debug_records, ( + "DEBUG/INFO kernel records leaked through at WARNING level: " + f"{[(r.name, r.levelname, r.getMessage()) for r in debug_records]}" + ) + + # ── Metadata ────────────────────────────────────────────────────── From 85f8ba38048a6d18ad6f20611f98d174a73b6ec5 Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Fri, 5 Jun 2026 23:22:53 +0530 Subject: [PATCH 4/4] fix(kernel): stop premature sync statement close (H4); bump KERNEL_REV to batch 2 (#830) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(kernel): stop premature sync statement close (H4); bump KERNEL_REV to batch 2 Connector half of the kernel batch-2 fixes (kernel PR #123). Bumps KERNEL_REV to pick up the batch-2 kernel surface. H4 — don't close the kernel Statement at sync execute-return: execute_command's finally used to `stmt.close()` immediately after `stmt.execute()` succeeded. For a large CloudFetch result with paginated chunk links (all_fetched=false), the kernel fetches later links lazily (get_result_chunks against the LIVE statement) during consumption, so a premature CloseStatement broke those fetches. The kernel now auto-closes the server statement when its result stream drains (ExecutedStatement::next_batch end-of-stream), with the executed-handle Drop as the backstop for partial/abandoned reads. So the connector now flips close_stmt=False on a successful execute and only closes on the error path (no executed handle was produced). The other batch-2 fixes (cancelled-class -> OperationalError, U2M refresh fail-fast, metadata statement close-on-drop, per-binding OAuth client_id) are entirely kernel-side and need no connector code beyond the KERNEL_REV bump. Tests: unit (sync execute does-not-close on success / does-close on failure) + e2e (large multi-chunk result drains without premature close + cursor reuse; server cancel maps to OperationalError not ProgrammingError). All e2e verified live against dogfood. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala * chore: re-pin KERNEL_REV to #123 HEAD after cancelled-test fix #123 picked up a follow-up commit fixing the wiremock cancelled-state assertions (ErrorCode::Cancelled). Bump the placeholder pin so the connector CI builds the corrected kernel. Still to be re-pinned to the squash-merge SHA before #830 merges. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala * chore: re-pin KERNEL_REV to merged kernel #123 (f4ee6fe) Kernel batch-2 (#123) is merged to kernel main (f4ee6fe, current main HEAD). Re-pin from the orphaned branch HEAD (4f7fbe7) to the merged SHA — reachable from main, no orphan-SHA risk. Verified against a wheel built from f4ee6fe: connector unit (102) + kernel e2e (H4 large-result drain + reuse, server-cancel -> OperationalError, staging fail-loud, diagnostic-info) all pass against the real merged kernel. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala * docs: drop internal 'H4' audit shorthand from comments/docstrings Address review: 'H4' is internal audit shorthand, meaningless in the public connector codebase. Reword the affected comment + two test docstrings to describe the behavior directly (premature sync CloseStatement broke lazy CloudFetch chunk-link fetches). No code change. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala --------- Signed-off-by: Vikrant Puppala --- src/databricks/sql/backend/kernel/client.py | 21 +++++-- tests/e2e/test_kernel_backend.py | 58 +++++++++++++++++- tests/unit/test_kernel_client.py | 68 +++++++++++++++++++++ 3 files changed, 142 insertions(+), 5 deletions(-) diff --git a/src/databricks/sql/backend/kernel/client.py b/src/databricks/sql/backend/kernel/client.py index 08bb0d36b..cb3b0b7ba 100644 --- a/src/databricks/sql/backend/kernel/client.py +++ b/src/databricks/sql/backend/kernel/client.py @@ -473,16 +473,29 @@ def execute_command( # Canceller is best-effort; never block execute on it. pass executed = stmt.execute() + # Execute succeeded: the kernel now owns the statement + # lifecycle. It auto-closes the server statement when the + # result stream is fully drained (``ExecutedStatement:: + # next_batch`` end-of-stream), with the executed handle's + # ``Drop`` as the backstop for partial/abandoned reads. + # So we must NOT close ``stmt`` here: a premature + # ``CloseStatement`` at execute-return broke lazy + # CloudFetch chunk-link fetches (``get_result_chunks`` + # against the live statement) for large paginated-link + # results. Closing here is left ONLY for the error path + # below, where no executed handle / result set was + # produced to reap it. + close_stmt = False except Exception as exc: raise _wrap_kernel_exception("execute_command", exc) from exc finally: with self._sync_cancellers_lock: self._sync_cancellers.pop(id(cursor), None) if close_stmt: - # Sync path: ``Statement`` is a lifecycle owner separate - # from the executed handle. Drop it here so the parent - # doesn't outlive its caller. Swallow close errors — - # they're not actionable. + # Reached only when ``stmt.execute()`` did not succeed + # (or async, which flipped the flag earlier): no executed + # handle owns the statement, so close it here to avoid a + # leak. Swallow close errors — not actionable. try: stmt.close() except Exception: diff --git a/tests/e2e/test_kernel_backend.py b/tests/e2e/test_kernel_backend.py index 8d988646d..f25c60630 100644 --- a/tests/e2e/test_kernel_backend.py +++ b/tests/e2e/test_kernel_backend.py @@ -24,7 +24,12 @@ import pytest import databricks.sql as sql -from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError +from databricks.sql.exc import ( + DatabaseError, + NotSupportedError, + OperationalError, + ServerOperationError, +) # Skip the whole module unless the kernel wheel is genuinely installed. # ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a @@ -563,3 +568,54 @@ def cancel_after_delay(): finally: t.join() cur.close() + + +# ── Batch 2 ──────────────────────────────────────────────────────── + + +def test_large_result_drains_without_premature_close(conn): + """A large multi-chunk result fully drains even though the connector + no longer closes the statement at execute-return — the kernel + auto-closes on drain. Guards the regression where a premature + CloseStatement broke lazy CloudFetch chunk-link fetches.""" + n = 5_000_000 + with conn.cursor() as cur: + cur.execute(f"SELECT id, cast(id AS string) s FROM range({n})") + rows = cur.fetchall() + assert len(rows) == n + # Cursor is reusable after the auto-close fired on the prior result. + cur.execute("SELECT 42 AS n") + assert cur.fetchall()[0][0] == 42 + + +def test_server_cancel_maps_to_operational_error(conn): + """A server-side cancel surfaces as OperationalError (cancelled + class), not ProgrammingError. We trigger it via a cross-thread + cancel of a running query; the raised exception must be in the + OperationalError family, not ProgrammingError.""" + import threading + import time + + from databricks.sql.exc import ProgrammingError + + cur = conn.cursor() + + def cancel_after_delay(): + time.sleep(15.0) + cur.cancel() + + t = threading.Thread(target=cancel_after_delay) + t.start() + try: + with pytest.raises(Exception) as exc_info: + cur.execute( + "SELECT count(*) FROM range(0, 1000000000000) " + "WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1" + ) + # The cancellation must not masquerade as a caller-argument + # (ProgrammingError) error. It should be operational. + assert not isinstance(exc_info.value, ProgrammingError) + assert isinstance(exc_info.value, (OperationalError, DatabaseError)) + finally: + t.join() + cur.close() diff --git a/tests/unit/test_kernel_client.py b/tests/unit/test_kernel_client.py index 3cbf55540..42ac36197 100644 --- a/tests/unit/test_kernel_client.py +++ b/tests/unit/test_kernel_client.py @@ -572,6 +572,74 @@ def fake_execute(): assert id(cursor) not in c._sync_cancellers +def test_sync_execute_does_not_close_statement_on_success(): + """On a successful sync execute(), the connector must NOT close the + parent kernel Statement — the kernel now auto-closes the server + statement when the result stream drains (with the executed handle's + Drop as backstop). A premature close() here broke lazy CloudFetch + chunk-link fetches for large paginated-link results.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + stmt = MagicMock() + stmt.execute.return_value = MagicMock( + statement_id="stmt-id", + arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])), + ) + c._kernel_session.statement.return_value = stmt + + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + # The kernel owns the statement lifecycle post-execute; connector + # leaves it alone (kernel auto-close-on-drain + Drop backstop). + stmt.close.assert_not_called() + + +def test_sync_execute_closes_statement_on_failure(): + """On the error path (execute raised, no executed handle / result + set produced), the connector still closes the parent Statement so + it isn't leaked.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + stmt = MagicMock() + stmt.execute.side_effect = RuntimeError("boom") + c._kernel_session.statement.return_value = stmt + + with pytest.raises(Exception): + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + stmt.close.assert_called_once_with() + + def test_get_columns_accepts_none_catalog(): """The kernel's `list_columns` honours `catalog=None` by issuing `SHOW COLUMNS IN ALL CATALOGS` server-side. The connector should