From 3a46b704dad023b89688c71b9d14df5f4894b7d7 Mon Sep 17 00:00:00 2001 From: Miguel P Z <60221874+MiguelElGallo@users.noreply.github.com> Date: Tue, 14 Apr 2026 15:18:32 +0300 Subject: [PATCH 1/3] Add ADBC coverage matrix and live Azure checks --- README.md | 31 +- docs/adbc-coverage.md | 47 +++ src/lakehouse/server.py | 126 +++++++-- tests/jdbc/pom.xml | 3 + .../FlightSqlJdbcAzurePersistenceTest.java | 155 ++++++++++ tests/test_e2e.py | 150 +++++++++- tests/test_e2e_ducklake.py | 3 + tests/test_e2e_tls.py | 4 + tests/test_live_azure_backend.py | 267 +++++++++++++++++- tests/test_server.py | 88 +++++- 10 files changed, 819 insertions(+), 55 deletions(-) create mode 100644 docs/adbc-coverage.md create mode 100644 tests/jdbc/src/test/java/lakehouse/FlightSqlJdbcAzurePersistenceTest.java diff --git a/README.md b/README.md index 7b942c6..dc27b02 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,8 @@ mvn -q -Dexec.mainClass=lakehouse.AzureDemo test-compile exec:java The `MAVEN_OPTS` flag is required for Apache Arrow on Java 17+. +For ADBC support details, see the [ADBC coverage matrix](docs/adbc-coverage.md). + ### Run the live backend tests The live backend pytest is opt-in because it queries the deployed Azure Container App and reads the `lakehouse-password` secret from Key Vault: @@ -157,7 +159,7 @@ LAKEHOUSE_LIVE_BACKEND=1 LAKEHOUSE_LIVE_BACKEND_ADBC_BASIC=1 \ uv run pytest -q tests/test_live_azure_backend.py ``` -The ADBC Basic check is marked `xfail` because that is the known client path currently failing against the deployed Container App. A result such as `1 passed, 1 xfailed` means the supported bearer smoke test passed and the tracked ADBC Basic issue reproduced as expected. If that changes to `1 passed, 1 xpassed`, the ADBC Basic path has started working and the `xfail` marker should be removed. +The ADBC Basic check is marked `xfail` because that is the known client path currently failing against the deployed Container App. A result with all bearer-path tests passing and one `xfailed` direct-Basic test means the supported ADBC live checks passed and the tracked ADBC Basic issue reproduced as expected. If that changes to `xpassed`, the ADBC Basic path has started working and the `xfail` marker should be removed. If you want one copy/paste block for the demo itself: @@ -250,16 +252,26 @@ pip install adbc-driver-flightsql Connect to your Azure deployment: ```python -import base64 +import pyarrow.flight as fl import adbc_driver_flightsql.dbapi as flight_sql from adbc_driver_flightsql import DatabaseOptions endpoint = "grpc+tls://ca-lakehouse-xxxxx.centralus.azurecontainerapps.io:443" -token = base64.b64encode(b"lakehouse:").decode() + +# Supported Azure path: use PyArrow for the Basic-token handshake, then +# pass the returned Bearer token to ADBC. +client = fl.connect(endpoint) +header_name, bearer_header = client.authenticate_basic_token("lakehouse", "") +if isinstance(header_name, bytes): + header_name = header_name.decode() +if isinstance(bearer_header, bytes): + bearer_header = bearer_header.decode() +if header_name.lower() != "authorization" or not bearer_header.startswith("Bearer "): + raise RuntimeError("Basic auth did not return a Bearer authorization header") conn = flight_sql.connect( endpoint, - db_kwargs={DatabaseOptions.AUTHORIZATION_HEADER.value: f"Basic {token}"}, + db_kwargs={DatabaseOptions.AUTHORIZATION_HEADER.value: bearer_header}, ) cursor = conn.cursor() @@ -269,6 +281,8 @@ print(cursor.fetchall()) You should see the rows inserted by the JDBC demo, or an empty result if you have not run it yet. +The direct Basic-auth ADBC path is tracked separately as an opt-in `xfail` live test. Do not use it as the supported Azure ADBC example until that test is promoted. + ## What Just Happened? ```text @@ -499,15 +513,18 @@ docker run -p 31337:31337 -v ./data:/data lakehouse serve \ ## Flight SQL Protocol Support -Lakehouse implements all standard Flight SQL RPCs: +Lakehouse implements the following Flight SQL operations: | Category | Supported Operations | | -------- | -------------------- | -| **Queries** | `CommandStatementQuery`, `CommandStatementUpdate`, `CommandStatementSubstraitPlan` | +| **Queries** | `CommandStatementQuery`, `CommandStatementUpdate` | | **Prepared Statements** | `ActionCreatePreparedStatementRequest`, `ActionClosePreparedStatementRequest`, `CommandPreparedStatementQuery`, `CommandPreparedStatementUpdate` | | **Catalog Metadata** | `CommandGetCatalogs`, `CommandGetDbSchemas`, `CommandGetTables`, `CommandGetTableTypes`, `CommandGetPrimaryKeys`, `CommandGetExportedKeys`, `CommandGetImportedKeys`, `CommandGetCrossReference` | | **SQL Info** | `CommandGetSqlInfo`, `CommandGetXdbcTypeInfo` | -| **Transactions** | `ActionBeginTransactionRequest`, `ActionEndTransactionRequest`, `ActionBeginSavepointRequest`, `ActionEndSavepointRequest` | +| **Schemas** | Flight `GetSchema` for statement queries, prepared statements, and metadata commands | +| **Transactions** | `ActionBeginTransactionRequest`, `ActionEndTransactionRequest` | + +Substrait and savepoint commands are intentionally not listed as supported. See the [ADBC coverage matrix](docs/adbc-coverage.md) for ADBC-specific support and limitations. --- diff --git a/docs/adbc-coverage.md b/docs/adbc-coverage.md new file mode 100644 index 0000000..4df10dd --- /dev/null +++ b/docs/adbc-coverage.md @@ -0,0 +1,47 @@ +# ADBC Coverage Matrix + +This matrix documents ADBC compatibility for this repository's Flight SQL server backed by DuckDB/DuckLake. It is scoped to server-visible behavior through the Python `adbc-driver-flightsql` client, not to implementing the ADBC C ABI inside this server. + +Primary sources: +- [ADBC API Standard](https://arrow.apache.org/adbc/current/format/specification.html) +- [ADBC Flight SQL driver](https://arrow.apache.org/adbc/current/driver/flight_sql.html) +- [ADBC driver implementation status](https://arrow.apache.org/adbc/current/driver/status.html) +- [Canonical `adbc.h`](https://raw.githubusercontent.com/apache/arrow-adbc/main/c/include/arrow-adbc/adbc.h) +- [Arrow Flight SQL protocol](https://arrow.apache.org/docs/format/FlightSql.html) + +Status values: +- `covered`: implemented and exercised by repo tests or live smoke tests. +- `partial`: implemented for the main path, with known limitations. +- `unsupported-by-server`: the Flight SQL server does not implement this feature. +- `not-supported-by-python-flightsql-driver`: the server may expose a Flight SQL feature, but the Python ADBC Flight SQL driver does not currently expose a matching ADBC path. +- `client-only/N/A`: ADBC driver manager, client ABI, or client option behavior that this server does not implement. +- `needs-verification`: not enough repo evidence to claim support. + +| ADBC area | Status | Notes | +| --- | --- | --- | +| ADBC object and driver ABI: database, connection, statement lifecycle, driver loading, option getter/setter variants, `AdbcError` layout | `client-only/N/A` | These are ADBC driver/driver-manager responsibilities from the ADBC API and `adbc.h`, not Flight SQL server APIs. | +| Connection URI and basic client setup | `covered` | Python `adbc-driver-flightsql` connects to local and deployed Flight SQL endpoints. | +| Authorization header bearer flow | `covered` | Supported Azure path is PyArrow Basic-token bootstrap followed by ADBC Bearer auth. | +| Direct ADBC Basic auth against Azure Container App | `needs-verification` | Tracked as an opt-in `xfail` path; do not document as the supported Azure ADBC path. | +| TLS server authentication | `covered` | Covered by TLS e2e tests and live Azure smoke path. | +| mTLS client certificates, OAuth flows, cookies, timeout knobs, queue size, custom call headers | `client-only/N/A` | Mostly Flight SQL driver options. Only server-observable effects should be promoted to support claims after dedicated tests. | +| SQL query execution and Arrow result fetch | `covered` | Maps to Flight SQL statement query `GetFlightInfo`/`DoGet`. | +| SQL update, DDL/DML, and row counts | `covered` | Maps to Flight SQL update `DoPut` with update-result metadata. | +| Execute schema / Flight `GetSchema` | `covered` | Server implements Flight `GetSchema` for statement queries, prepared statements, and metadata commands, with direct unit and ADBC `execute_schema` acceptance coverage. | +| Prepared statement create/close | `covered` | Server supports Flight SQL prepared statement actions. | +| Prepared statement query/update with single-row parameter batches | `covered` | Supported by current handler behavior and tests. | +| Prepared statement multi-row parameter batches | `partial` | Current implementation should be treated as single-row oriented until multi-row parameter binding is tested or implemented. | +| Metadata: `GetInfo`, catalogs, schemas, tables, table types, table schema, XDBC type info | `covered` | Maps to Flight SQL metadata commands and local ADBC `GetObjects` depth/filter tests. | +| Metadata: primary keys, imported/exported keys, cross-reference | `covered` | Server has Flight SQL handlers and unit coverage. | +| ADBC `GetObjects` table constraints | `not-supported-by-python-flightsql-driver` | The Flight SQL driver docs note `AdbcConnectionGetObjects()` does not currently populate column constraint info such as primary/foreign keys. | +| ADBC metadata pattern/filter semantics | `partial` | Flight SQL driver docs note catalog filters are simple string matches, not `LIKE` patterns. | +| Transactions: begin/end, commit, rollback, autocommit discovery | `covered` | Current Flight SQL and SQL transaction `SqlInfo` IDs are emitted from the generated proto constants, and ADBC DBAPI rollback/autocommit discovery is covered. | +| Savepoints | `unsupported-by-server` | Server reports savepoints as unsupported. | +| Cancellation via ADBC and Flight SQL actions | `partial` | Server supports deprecated `CancelQuery` behavior returning cancellation state, but does not guarantee true DuckDB query interruption and does not claim modern `CancelFlightInfo` coverage. | +| Partitioned or incremental results | `partial` | ADBC `ExecutePartitions`/`ReadPartition` is covered for the server's single-endpoint result path. Multi-endpoint distributed optimization and incremental execution are not implemented. | +| Bulk ingest via Flight SQL `CommandStatementIngest` | `covered` | Server-side ingest handlers and tests cover create/append/replace/fail behavior. | +| Bulk ingest through Python `adbc-driver-flightsql` | `partial` | The current local `adbc-driver-flightsql` exposes and passes `adbc_ingest`; the test xfails if a driver/environment returns `NotSupportedError` because the upstream Flight SQL driver docs still state bulk ingestion is not currently implemented. | +| ADBC statistics APIs: `GetStatistics`, `GetStatisticNames` | `unsupported-by-server` | No server support is claimed for ADBC statistics metadata. | +| Substrait plans | `unsupported-by-server` | Substrait commands are intentionally not implemented and should not be listed as supported protocol surface. | +| Rich ADBC error details | `needs-verification` | Server maps errors to Flight/gRPC status paths, but ADBC driver-specific error-detail propagation has no dedicated coverage. | +| Flight SQL session options | `unsupported-by-server` | Session option get/set/erase lifecycle is not part of the current server support claim. | diff --git a/src/lakehouse/server.py b/src/lakehouse/server.py index d2084d5..1b7f4c2 100644 --- a/src/lakehouse/server.py +++ b/src/lakehouse/server.py @@ -35,7 +35,7 @@ import pyarrow.flight as flight from lakehouse.dispatch import FlightSqlServer -from lakehouse.proto import fs, pack_any +from lakehouse.proto import fs, pack_any, unpack_any from lakehouse.session import SessionManager logger = logging.getLogger(__name__) @@ -324,8 +324,8 @@ def _prepare_get_tables_query( [ pa.field("catalog_name", pa.utf8()), pa.field("db_schema_name", pa.utf8()), - pa.field("table_name", pa.utf8()), - pa.field("table_type", pa.utf8()), + pa.field("table_name", pa.utf8(), nullable=False), + pa.field("table_type", pa.utf8(), nullable=False), ] ) @@ -333,13 +333,13 @@ def _prepare_get_tables_query( [ pa.field("catalog_name", pa.utf8()), pa.field("db_schema_name", pa.utf8()), - pa.field("table_name", pa.utf8()), - pa.field("table_type", pa.utf8()), - pa.field("table_schema", pa.binary()), + pa.field("table_name", pa.utf8(), nullable=False), + pa.field("table_type", pa.utf8(), nullable=False), + pa.field("table_schema", pa.binary(), nullable=False), ] ) -_TABLE_TYPES_SCHEMA = pa.schema([pa.field("table_type", pa.utf8())]) +_TABLE_TYPES_SCHEMA = pa.schema([pa.field("table_type", pa.utf8(), nullable=False)]) _PRIMARY_KEYS_SCHEMA = pa.schema( [ @@ -726,24 +726,37 @@ def _build_xdbc_type_info_table( ] ) -# SqlInfo enum constants (matching Flight SQL protobuf values) -_FLIGHT_SQL_SERVER_NAME = 0 -_FLIGHT_SQL_SERVER_VERSION = 1 -_FLIGHT_SQL_SERVER_ARROW_VERSION = 2 -_FLIGHT_SQL_SERVER_READ_ONLY = 500 -_FLIGHT_SQL_SERVER_SQL = 501 -_FLIGHT_SQL_SERVER_SUBSTRAIT = 502 -_FLIGHT_SQL_SERVER_TRANSACTION = 504 -_FLIGHT_SQL_SERVER_CANCEL = 505 -_FLIGHT_SQL_SERVER_BULK_INGESTION = 507 -_FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED = 508 -_FLIGHT_SQL_SERVER_STATEMENT_TIMEOUT = 100 -_FLIGHT_SQL_SERVER_TRANSACTION_TIMEOUT = 101 -_SQL_DDL_CATALOG = 500 -_SQL_DDL_SCHEMA = 501 -_SQL_DDL_TABLE = 502 -_SQL_IDENTIFIER_QUOTE_CHAR = 503 -_SQL_IDENTIFIER_CASE = 504 +# SqlInfo enum constants (matching the generated Flight SQL protobuf values) +_FLIGHT_SQL_SERVER_NAME = fs.FLIGHT_SQL_SERVER_NAME +_FLIGHT_SQL_SERVER_VERSION = fs.FLIGHT_SQL_SERVER_VERSION +_FLIGHT_SQL_SERVER_ARROW_VERSION = fs.FLIGHT_SQL_SERVER_ARROW_VERSION +_FLIGHT_SQL_SERVER_READ_ONLY = fs.FLIGHT_SQL_SERVER_READ_ONLY +_FLIGHT_SQL_SERVER_SQL = fs.FLIGHT_SQL_SERVER_SQL +_FLIGHT_SQL_SERVER_SUBSTRAIT = fs.FLIGHT_SQL_SERVER_SUBSTRAIT +_FLIGHT_SQL_SERVER_TRANSACTION = fs.FLIGHT_SQL_SERVER_TRANSACTION +_FLIGHT_SQL_SERVER_CANCEL = fs.FLIGHT_SQL_SERVER_CANCEL +_FLIGHT_SQL_SERVER_BULK_INGESTION = fs.FLIGHT_SQL_SERVER_BULK_INGESTION +_FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED = ( + fs.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED +) +_FLIGHT_SQL_SERVER_STATEMENT_TIMEOUT = fs.FLIGHT_SQL_SERVER_STATEMENT_TIMEOUT +_FLIGHT_SQL_SERVER_TRANSACTION_TIMEOUT = fs.FLIGHT_SQL_SERVER_TRANSACTION_TIMEOUT +_SQL_DDL_CATALOG = fs.SQL_DDL_CATALOG +_SQL_DDL_SCHEMA = fs.SQL_DDL_SCHEMA +_SQL_DDL_TABLE = fs.SQL_DDL_TABLE +_SQL_IDENTIFIER_CASE = fs.SQL_IDENTIFIER_CASE +_SQL_IDENTIFIER_QUOTE_CHAR = fs.SQL_IDENTIFIER_QUOTE_CHAR +_SQL_DEFAULT_TRANSACTION_ISOLATION = fs.SQL_DEFAULT_TRANSACTION_ISOLATION +_SQL_TRANSACTIONS_SUPPORTED = fs.SQL_TRANSACTIONS_SUPPORTED +_SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS = ( + fs.SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS +) +_SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT = ( + fs.SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT +) +_SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED = ( + fs.SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED +) def _build_sql_info_table( @@ -764,12 +777,25 @@ def _build_sql_info_table( (_FLIGHT_SQL_SERVER_READ_ONLY, 1, False), (_FLIGHT_SQL_SERVER_SQL, 1, True), (_FLIGHT_SQL_SERVER_SUBSTRAIT, 1, False), - (_FLIGHT_SQL_SERVER_TRANSACTION, 2, 1), # TRANSACTION supported + ( + _FLIGHT_SQL_SERVER_TRANSACTION, + 3, + fs.SQL_SUPPORTED_TRANSACTION_TRANSACTION, + ), (_FLIGHT_SQL_SERVER_CANCEL, 1, True), (_FLIGHT_SQL_SERVER_BULK_INGESTION, 1, True), (_FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED, 1, True), (_FLIGHT_SQL_SERVER_STATEMENT_TIMEOUT, 3, 0), # no timeout (_FLIGHT_SQL_SERVER_TRANSACTION_TIMEOUT, 3, 0), # no timeout + (_SQL_DEFAULT_TRANSACTION_ISOLATION, 3, fs.SQL_TRANSACTION_SERIALIZABLE), + (_SQL_TRANSACTIONS_SUPPORTED, 1, True), + ( + _SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS, + 3, + 1 << fs.SQL_TRANSACTION_SERIALIZABLE, + ), + (_SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT, 1, False), + (_SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED, 1, False), ] # Filter if specific IDs requested @@ -876,6 +902,54 @@ def shutdown(self) -> None: self._db.close() super().shutdown() + def get_schema( + self, + context: flight.ServerCallContext, + descriptor: flight.FlightDescriptor, + ) -> flight.SchemaResult: + """Return the schema for supported Flight SQL descriptors. + + ``GetSchema`` must be side-effect-free. In particular, prepared + statements with empty schemas can be DDL/DML; do not route through + ``get_flight_info_prepared_statement`` because that method eagerly + executes such statements for ADBC query execution compatibility. + """ + command = unpack_any(descriptor.command) + + if isinstance(command, fs.CommandStatementQuery): + conn = self._get_session(context) + schema_query = f"SELECT * FROM ({command.query}) AS __schema_probe LIMIT 0" + return flight.SchemaResult(_execute_query(conn, schema_query).schema) + + if isinstance(command, fs.CommandPreparedStatementQuery): + session_id = _get_session_id(context) + handle = command.prepared_statement_handle.decode("utf-8") + meta = self._prepared_meta.get((session_id, handle)) + return flight.SchemaResult(meta.schema if meta is not None else pa.schema([])) + + if isinstance(command, fs.CommandGetCatalogs): + return flight.SchemaResult(_CATALOGS_SCHEMA) + if isinstance(command, fs.CommandGetDbSchemas): + return flight.SchemaResult(_DB_SCHEMAS_SCHEMA) + if isinstance(command, fs.CommandGetTables): + schema = _TABLES_SCHEMA_WITH_SCHEMA if command.include_schema else _TABLES_SCHEMA + return flight.SchemaResult(schema) + if isinstance(command, fs.CommandGetTableTypes): + return flight.SchemaResult(_TABLE_TYPES_SCHEMA) + if isinstance(command, fs.CommandGetXdbcTypeInfo): + return flight.SchemaResult(_XDBC_TYPE_INFO_SCHEMA) + if isinstance(command, fs.CommandGetSqlInfo): + return flight.SchemaResult(_SQL_INFO_SCHEMA) + if isinstance(command, fs.CommandGetPrimaryKeys): + return flight.SchemaResult(_PRIMARY_KEYS_SCHEMA) + if isinstance(command, fs.CommandGetImportedKeys | fs.CommandGetExportedKeys): + return flight.SchemaResult(_FK_KEYS_SCHEMA) + if isinstance(command, fs.CommandGetCrossReference): + return flight.SchemaResult(_FK_KEYS_SCHEMA) + + msg = f"Unsupported Flight SQL command for get_schema: {type(command).__name__}" + raise NotImplementedError(msg) + # ═══════════════════════════════════════════════════════════════════════ # get_flight_info handlers # ═══════════════════════════════════════════════════════════════════════ diff --git a/tests/jdbc/pom.xml b/tests/jdbc/pom.xml index 208be1d..ec6243b 100644 --- a/tests/jdbc/pom.xml +++ b/tests/jdbc/pom.xml @@ -54,6 +54,9 @@ ${flight.url} + ${flight.user} + ${flight.password} + ${live.azure.jdbc.required} ${tls.ca.cert} diff --git a/tests/jdbc/src/test/java/lakehouse/FlightSqlJdbcAzurePersistenceTest.java b/tests/jdbc/src/test/java/lakehouse/FlightSqlJdbcAzurePersistenceTest.java new file mode 100644 index 0000000..510ba05 --- /dev/null +++ b/tests/jdbc/src/test/java/lakehouse/FlightSqlJdbcAzurePersistenceTest.java @@ -0,0 +1,155 @@ +/* + * Live JDBC persistence test for the deployed Azure Container App. + * + *

System properties: + *

+ * + *

Run with: + *

+ *     mvn test -Dflight.url=grpc+tls://ca-example.region.azurecontainerapps.io:443 \
+ *              -Dflight.user=lakehouse \
+ *              -Dflight.password=... \
+ *              -Dtest=FlightSqlJdbcAzurePersistenceTest
+ * 
+ */ +package lakehouse; + +import static org.junit.jupiter.api.Assertions.*; + +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.UUID; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; + +class FlightSqlJdbcAzurePersistenceTest { + private static final String ALIAS = "lakehouse"; + private static final String SCHEMA = "main"; + private static final String DEFAULT_USER = "lakehouse"; + private static final int DEFAULT_TLS_PORT = 443; + private static final int DEFAULT_PLAINTEXT_PORT = 31337; + + private record EndpointTarget(String host, int port, boolean useEncryption) {} + + private static String fq(String table) { + return ALIAS + "." + SCHEMA + "." + table; + } + + private static EndpointTarget parseEndpoint(String rawEndpoint) { + String endpoint = rawEndpoint.trim(); + boolean useEncryption = true; + int defaultPort = DEFAULT_TLS_PORT; + + if (endpoint.startsWith("grpc+tls://")) { + endpoint = endpoint.substring("grpc+tls://".length()); + useEncryption = true; + defaultPort = DEFAULT_TLS_PORT; + } else if (endpoint.startsWith("grpc://")) { + endpoint = endpoint.substring("grpc://".length()); + useEncryption = false; + defaultPort = DEFAULT_PLAINTEXT_PORT; + } + + String host = endpoint; + int port = defaultPort; + int colon = endpoint.lastIndexOf(':'); + if (colon > 0 && colon < endpoint.length() - 1) { + host = endpoint.substring(0, colon); + String portText = endpoint.substring(colon + 1); + try { + port = Integer.parseInt(portText); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid endpoint port: " + portText, e); + } + } + + if (host.isBlank()) { + throw new IllegalArgumentException("Endpoint host must not be empty."); + } + return new EndpointTarget(host, port, useEncryption); + } + + private static String enc(String value) { + return URLEncoder.encode(value, StandardCharsets.UTF_8); + } + + private static boolean isConfigured(String value) { + return value != null && !value.isBlank() && !value.startsWith("${"); + } + + private static Connection connect() throws SQLException { + String endpoint = System.getProperty("flight.url"); + String password = System.getProperty("flight.password"); + String username = System.getProperty("flight.user", DEFAULT_USER); + boolean required = Boolean.getBoolean("live.azure.jdbc.required"); + if (!isConfigured(username)) { + username = DEFAULT_USER; + } + + if (required) { + assertTrue(isConfigured(endpoint), "flight.url is required"); + assertTrue(isConfigured(password), "flight.password is required"); + } else { + Assumptions.assumeTrue(isConfigured(endpoint), "flight.url is required"); + Assumptions.assumeTrue(isConfigured(password), "flight.password is required"); + } + + EndpointTarget target = parseEndpoint(endpoint); + String jdbc = "jdbc:arrow-flight-sql://" + target.host() + ":" + target.port() + + "?useEncryption=" + target.useEncryption() + + "&user=" + enc(username) + + "&password=" + enc(password); + + return DriverManager.getConnection(jdbc); + } + + private static void dropBestEffort(String table) { + try (Connection conn = connect(); + Statement st = conn.createStatement()) { + st.execute("DROP TABLE IF EXISTS " + table); + } catch (Exception ignored) { + // Cleanup is best-effort; the primary assertion failure should stay visible. + } + } + + @Test + void persistsWritesAcrossJdbcConnections() throws SQLException { + String suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 12); + String table = fq("live_jdbc_persist_" + suffix); + + try { + try (Connection writer = connect(); + Statement st = writer.createStatement()) { + writer.setAutoCommit(true); + st.execute("DROP TABLE IF EXISTS " + table); + st.execute("CREATE TABLE " + table + " (id INT, label TEXT)"); + st.execute("INSERT INTO " + table + " VALUES (101, 'alpha'), (202, 'beta')"); + } + + try (Connection reader = connect(); + Statement st = reader.createStatement(); + ResultSet rs = st.executeQuery("SELECT id, label FROM " + table + " ORDER BY id")) { + assertTrue(rs.next()); + assertEquals(101, rs.getInt("id")); + assertEquals("alpha", rs.getString("label")); + + assertTrue(rs.next()); + assertEquals(202, rs.getInt("id")); + assertEquals("beta", rs.getString("label")); + + assertFalse(rs.next()); + } + } finally { + dropBestEffort(table); + } + } +} diff --git a/tests/test_e2e.py b/tests/test_e2e.py index b7e20cd..4f05a96 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -34,12 +34,14 @@ from __future__ import annotations import base64 +import contextlib import socket import threading import time import adbc_driver_flightsql import adbc_driver_flightsql.dbapi as flightsql +import adbc_driver_manager import grpc import pyarrow as pa import pytest @@ -76,6 +78,14 @@ def _start_server(server: DuckDBFlightSqlServer) -> threading.Thread: return t +def _connect_autocommit(uri: str, **kwargs): + """Open a Flight SQL DBAPI connection for tests that assume autocommit.""" + conn = flightsql.connect(uri, **kwargs) + with contextlib.suppress(adbc_driver_manager.NotSupportedError): + conn.adbc_connection.set_autocommit(True) + return conn + + # ─────────────────────────────────────────────────────────────────────────── # Fixtures — plain server (no auth) # ─────────────────────────────────────────────────────────────────────────── @@ -96,7 +106,7 @@ def plain_server(): def plain_conn(plain_server): """ADBC connection to the plain server.""" _srv, port = plain_server - conn = flightsql.connect(f"grpc://127.0.0.1:{port}") + conn = _connect_autocommit(f"grpc://127.0.0.1:{port}") yield conn conn.close() @@ -123,7 +133,7 @@ def seeded_server(): def seeded_conn(seeded_server): """ADBC connection to the seeded server.""" _srv, port = seeded_server - conn = flightsql.connect(f"grpc://127.0.0.1:{port}") + conn = _connect_autocommit(f"grpc://127.0.0.1:{port}") yield conn conn.close() @@ -176,7 +186,7 @@ def auth_conn(auth_server): """ _srv, port = auth_server basic_token = base64.b64encode(f"{_TEST_USERNAME}:{_TEST_PASSWORD}".encode()).decode() - conn = flightsql.connect( + conn = _connect_autocommit( f"grpc://127.0.0.1:{port}", db_kwargs={ adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER.value: ( @@ -301,6 +311,40 @@ def test_get_objects_catalogs(self, plain_conn): assert table.num_rows >= 0 assert "catalog_name" in table.column_names + def test_get_objects_db_schemas_filter(self, seeded_conn): + """ADBC GetObjects applies a simple schema-name filter.""" + reader = seeded_conn.adbc_get_objects( + depth="db_schemas", + db_schema_filter="main", + ) + table = reader.read_all() + + memory_catalog = next( + row for row in table.to_pylist() if row["catalog_name"] == "memory" + ) + schemas = memory_catalog["catalog_db_schemas"] + assert schemas == [{"db_schema_name": "main", "db_schema_tables": None}] + + def test_get_objects_tables_filter(self, seeded_conn): + reader = seeded_conn.adbc_get_objects( + depth="tables", + table_name_filter="test_data", + ) + table = reader.read_all() + + memory_catalog = next( + row for row in table.to_pylist() if row["catalog_name"] == "memory" + ) + tables = memory_catalog["catalog_db_schemas"][0]["db_schema_tables"] + assert tables == [ + { + "table_name": "test_data", + "table_type": "BASE TABLE", + "table_columns": None, + "table_constraints": None, + } + ] + def test_get_table_types(self, plain_conn): table_types = plain_conn.adbc_get_table_types() assert isinstance(table_types, list) @@ -310,6 +354,23 @@ def test_get_info(self, plain_conn): assert isinstance(info, dict) assert len(info) > 0 + def test_transaction_capability_discovery_allows_autocommit_toggle(self, plain_conn): + """ADBC transaction APIs discover SqlInfo entries and support rollback.""" + plain_conn.execute("CREATE TABLE t_adbc_txn_probe (id INT)").close() + + plain_conn.adbc_connection.set_autocommit(False) + try: + plain_conn.execute("INSERT INTO t_adbc_txn_probe VALUES (1)").close() + plain_conn.rollback() + finally: + plain_conn.adbc_connection.set_autocommit(True) + + cur = plain_conn.execute("SELECT id FROM t_adbc_txn_probe") + try: + assert cur.fetchall() == [] + finally: + cur.close() + def test_get_table_schema_preseeded(self, seeded_conn): """Get schema for a pre-created table.""" schema = seeded_conn.adbc_get_table_schema("test_data") @@ -319,6 +380,16 @@ def test_get_table_schema_preseeded(self, seeded_conn): assert "name" in field_names assert "value" in field_names + def test_execute_schema(self, plain_conn): + """ADBC execute_schema uses Flight GetSchema for query schema discovery.""" + cur = plain_conn.cursor() + try: + schema = cur.adbc_execute_schema("SELECT 1 AS value, 'x' AS label") + finally: + cur.close() + + assert schema.names == ["value", "label"] + # ═══════════════════════════════════════════════════════════════════════════ # 7.4 — Auth flow (header-based Basic auth) @@ -347,7 +418,7 @@ def test_auth_select_preseeded(self, auth_server): srv._db.execute("INSERT INTO auth_data VALUES (1, 'secret')") basic_token = base64.b64encode(f"{_TEST_USERNAME}:{_TEST_PASSWORD}".encode()).decode() - conn = flightsql.connect( + conn = _connect_autocommit( f"grpc://127.0.0.1:{port}", db_kwargs={ adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER.value: ( @@ -580,21 +651,78 @@ def test_multiple_ddl_sequence(self, plain_conn): # ── Transaction rollback ───────────────────────────────────────── - def test_transaction_rollback(self, plain_conn): - """ROLLBACK undoes changes made in the transaction.""" + def test_dbapi_commit_and_rollback(self, plain_conn): + """DBAPI commit/rollback use ADBC transaction actions.""" plain_conn.execute("CREATE TABLE t_txn (id INT)").close() - plain_conn.execute("INSERT INTO t_txn VALUES (1)").close() - # Start explicit transaction, insert, then rollback - plain_conn.execute("BEGIN TRANSACTION").close() - plain_conn.execute("INSERT INTO t_txn VALUES (2)").close() - plain_conn.execute("ROLLBACK").close() + plain_conn.adbc_connection.set_autocommit(False) + try: + plain_conn.execute("INSERT INTO t_txn VALUES (1)").close() + plain_conn.commit() + + plain_conn.execute("INSERT INTO t_txn VALUES (2)").close() + plain_conn.rollback() + finally: + plain_conn.adbc_connection.set_autocommit(True) cur = plain_conn.execute("SELECT id FROM t_txn ORDER BY id") rows = cur.fetchall() assert rows == [(1,)] cur.close() + def test_execute_partitions_single_endpoint(self, seeded_conn): + """ADBC partition execution round-trips through the single Flight endpoint.""" + cur = seeded_conn.cursor() + try: + partitions, schema = cur.adbc_execute_partitions( + "SELECT id, name FROM test_data ORDER BY id" + ) + + assert len(partitions) == 1 + assert schema.names == ["id", "name"] + + cur.adbc_read_partition(partitions[0]) + assert cur.fetchall() == [(1, "alice"), (2, "bob"), (3, "carol")] + finally: + cur.close() + + def test_adbc_cancel_idle_statement_is_non_disruptive(self, plain_conn): + """Current ADBC cancel behavior is safe when no operation is in flight.""" + cur = plain_conn.cursor() + try: + cur.adbc_cancel() + cur.execute("SELECT 1 AS value") + assert cur.fetchall() == [(1,)] + finally: + cur.close() + + def test_adbc_ingest_python_driver_path(self, plain_conn): + """Exercise Python Flight SQL ADBC ingest, or xfail if the driver lacks it.""" + data = pa.table({"id": [1, 2], "label": ["one", "two"]}) + cur = plain_conn.cursor() + try: + try: + rows_inserted = cur.adbc_ingest( + "t_adbc_ingest", + data, + mode="create", + ) + except adbc_driver_manager.NotSupportedError as exc: + pytest.xfail( + "Python Flight SQL ADBC driver does not support ingest in this " + f"environment: {exc}" + ) + + assert rows_inserted in (-1, 2) + finally: + cur.close() + + cur = plain_conn.execute("SELECT id, label FROM t_adbc_ingest ORDER BY id") + try: + assert cur.fetchall() == [(1, "one"), (2, "two")] + finally: + cur.close() + # ═══════════════════════════════════════════════════════════════════════════ # 7.6 — Health check diff --git a/tests/test_e2e_ducklake.py b/tests/test_e2e_ducklake.py index 0204339..2a2230c 100644 --- a/tests/test_e2e_ducklake.py +++ b/tests/test_e2e_ducklake.py @@ -44,6 +44,7 @@ import time import adbc_driver_flightsql.dbapi as flightsql +import adbc_driver_manager import duckdb import pyarrow as pa import pytest @@ -183,6 +184,8 @@ def ducklake_conn(ducklake_server): """ADBC connection to the DuckLake-enabled server.""" _srv, port = ducklake_server conn = flightsql.connect(f"grpc://127.0.0.1:{port}") + with contextlib.suppress(adbc_driver_manager.NotSupportedError): + conn.adbc_connection.set_autocommit(True) yield conn conn.close() diff --git a/tests/test_e2e_tls.py b/tests/test_e2e_tls.py index 1a219ba..6aa0095 100644 --- a/tests/test_e2e_tls.py +++ b/tests/test_e2e_tls.py @@ -9,6 +9,7 @@ from __future__ import annotations +import contextlib import ipaddress import os import shutil @@ -21,6 +22,7 @@ import adbc_driver_flightsql import adbc_driver_flightsql.dbapi as flightsql +import adbc_driver_manager import pytest from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization @@ -162,6 +164,8 @@ def tls_conn(tls_server, tls_artifacts): adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTS.value: ca_bytes, }, ) + with contextlib.suppress(adbc_driver_manager.NotSupportedError): + conn.adbc_connection.set_autocommit(True) yield conn conn.close() diff --git a/tests/test_live_azure_backend.py b/tests/test_live_azure_backend.py index 895ed94..4b4762b 100644 --- a/tests/test_live_azure_backend.py +++ b/tests/test_live_azure_backend.py @@ -7,9 +7,9 @@ The separate ADBC Basic-auth check is gated by ``LAKEHOUSE_LIVE_BACKEND_ADBC_BASIC`` and is marked ``xfail`` because ADBC's Basic-to-Bearer exchange is the currently -tracked client path that fails against the deployed Container App. A result like -``1 passed, 1 xfailed`` means the supported bearer smoke test passed and the -known ADBC Basic issue reproduced as expected. +tracked client path that fails against the deployed Container App. A result +with all bearer-path tests passing and one xfailed direct-Basic test means the +supported ADBC live checks passed and the known ADBC Basic issue reproduced. """ from __future__ import annotations @@ -19,6 +19,7 @@ import re import shutil import subprocess +import uuid import pytest @@ -26,6 +27,7 @@ _LIVE_BACKEND_FLAG = "LAKEHOUSE_LIVE_BACKEND" _LIVE_BACKEND_ADBC_BASIC_FLAG = "LAKEHOUSE_LIVE_BACKEND_ADBC_BASIC" +_JDBC_DIR = os.path.join(os.path.dirname(__file__), "jdbc") pytestmark = pytest.mark.skipif( os.environ.get(_LIVE_BACKEND_FLAG) != "1", @@ -136,28 +138,31 @@ def _connect_with_adbc_basic(endpoint: str, password: str): ) -def _run_live_query(connect): +def _set_autocommit_if_supported(conn, enabled: bool) -> None: + import adbc_driver_manager + + with contextlib.suppress(adbc_driver_manager.NotSupportedError): + conn.adbc_connection.set_autocommit(enabled) + + +def _run_live_check_with_credentials(connect, check): resolution = resolve_container_app_env() if not resolution.ready: pytest.skip(resolution.skip_reason("Azure Container App azd outputs missing")) conn = None - cursor = None failure: str | None = None password = "" try: endpoint = _discover_endpoint(resolution.values) password = _read_password(resolution.values) conn = connect(endpoint, password) - cursor = conn.execute("SELECT 1 AS value") - assert cursor.fetchall() == [(1,)] + _set_autocommit_if_supported(conn, True) + check(conn, endpoint, password) except Exception as exc: detail = _redact_auth_material(str(exc), password)[:500] failure = f"{type(exc).__name__}: {detail}" finally: - if cursor is not None: - with contextlib.suppress(Exception): - cursor.close() if conn is not None: with contextlib.suppress(Exception): conn.close() @@ -166,10 +171,252 @@ def _run_live_query(connect): pytest.fail(f"Live Azure backend query failed ({failure})", pytrace=False) +def _run_live_check(connect, check): + def check_without_credentials(conn, _endpoint, _password): + check(conn) + + _run_live_check_with_credentials(connect, check_without_credentials) + + +def _run_live_query(connect): + def check(conn): + cursor = conn.execute("SELECT 1 AS value") + try: + assert cursor.fetchall() == [(1,)] + finally: + cursor.close() + + _run_live_check(connect, check) + + +def _run_live_jdbc_test(test_class: str) -> None: + if shutil.which("mvn") is None: + pytest.skip("Maven (mvn) not found") + + resolution = resolve_container_app_env() + if not resolution.ready: + pytest.skip(resolution.skip_reason("Azure Container App azd outputs missing")) + + password = "" + try: + endpoint = _discover_endpoint(resolution.values) + password = _read_password(resolution.values) + result = subprocess.run( + [ + "mvn", + "-q", + "test", + f"-Dflight.url={endpoint}", + "-Dflight.user=lakehouse", + f"-Dflight.password={password}", + "-Dlive.azure.jdbc.required=true", + f"-Dtest={test_class}", + ], + cwd=_JDBC_DIR, + capture_output=True, + text=True, + timeout=240, + ) + except Exception as exc: + detail = _redact_auth_material(str(exc), password)[:500] + pytest.fail( + f"Live Azure JDBC test failed ({type(exc).__name__}: {detail})", + pytrace=False, + ) + + if result.returncode != 0: + stdout = _redact_auth_material(result.stdout, password) + stderr = _redact_auth_material(result.stderr, password) + print("=== Maven stdout ===") + print(stdout[-3000:] if len(stdout) > 3000 else stdout) + print("=== Maven stderr ===") + print(stderr[-3000:] if len(stderr) > 3000 else stderr) + + assert result.returncode == 0, f"mvn test failed (exit {result.returncode})" + + +def _unique_table(prefix: str) -> str: + return f"live_adbc_{prefix}_{uuid.uuid4().hex[:12]}" + + +def _ducklake_table(table_name: str) -> str: + return f"lakehouse.main.{table_name}" + + +def _find_get_objects_table(table, table_name: str): + for catalog in table.to_pylist(): + for schema in catalog.get("catalog_db_schemas") or []: + for found_table in schema.get("db_schema_tables") or []: + if found_table["table_name"] == table_name: + return found_table + return None + + def test_deployed_container_app_accepts_pyarrow_bootstrapped_bearer_query(): _run_live_query(_connect_with_pyarrow_bootstrapped_bearer) +def test_deployed_container_app_supports_adbc_execute_schema(): + def check(conn): + cursor = conn.cursor() + try: + schema = cursor.adbc_execute_schema("SELECT 1 AS value, 'ok' AS label") + finally: + cursor.close() + + assert schema.names == ["value", "label"] + + _run_live_check(_connect_with_pyarrow_bootstrapped_bearer, check) + + +def test_deployed_container_app_supports_adbc_get_objects_table_filter(): + table_name = _unique_table("objects") + other_table_name = _unique_table("objects_other") + + def check(conn): + try: + conn.execute(f"DROP TABLE IF EXISTS {table_name}").close() + conn.execute(f"DROP TABLE IF EXISTS {other_table_name}").close() + conn.execute(f"CREATE TABLE {table_name} (id INT, label TEXT)").close() + conn.execute(f"CREATE TABLE {other_table_name} (id INT)").close() + + reader = conn.adbc_get_objects(depth="tables", table_name_filter=table_name) + table = reader.read_all() + found_table = _find_get_objects_table(table, table_name) + other_table = _find_get_objects_table(table, other_table_name) + + assert found_table is not None + assert found_table["table_type"] in {"BASE TABLE", "TABLE"} + assert found_table["table_columns"] is None + assert found_table["table_constraints"] is None + assert other_table is None + finally: + with contextlib.suppress(Exception): + conn.execute(f"DROP TABLE IF EXISTS {table_name}").close() + with contextlib.suppress(Exception): + conn.execute(f"DROP TABLE IF EXISTS {other_table_name}").close() + + _run_live_check(_connect_with_pyarrow_bootstrapped_bearer, check) + + +def test_deployed_container_app_supports_adbc_transaction_commit_and_rollback(): + table_name = _unique_table("txn") + fq_table_name = _ducklake_table(table_name) + + def check(conn, endpoint, password): + verify_conn = None + cursor = None + try: + conn.execute(f"DROP TABLE IF EXISTS {fq_table_name}").close() + conn.execute(f"CREATE TABLE {fq_table_name} (id INT)").close() + + conn.adbc_connection.set_autocommit(False) + conn.execute(f"INSERT INTO {fq_table_name} VALUES (1)").close() + conn.commit() + conn.execute(f"INSERT INTO {fq_table_name} VALUES (2)").close() + conn.rollback() + conn.adbc_connection.set_autocommit(True) + + verify_conn = _connect_with_pyarrow_bootstrapped_bearer(endpoint, password) + _set_autocommit_if_supported(verify_conn, True) + cursor = verify_conn.execute(f"SELECT id FROM {fq_table_name} ORDER BY id") + assert cursor.fetchall() == [(1,)] + finally: + if cursor is not None: + with contextlib.suppress(Exception): + cursor.close() + if verify_conn is not None: + with contextlib.suppress(Exception): + verify_conn.close() + with contextlib.suppress(Exception): + conn.adbc_connection.set_autocommit(True) + with contextlib.suppress(Exception): + conn.execute(f"DROP TABLE IF EXISTS {fq_table_name}").close() + + _run_live_check_with_credentials(_connect_with_pyarrow_bootstrapped_bearer, check) + + +def test_deployed_container_app_persists_writes_after_disconnect(): + table_name = _unique_table("persist") + fq_table_name = _ducklake_table(table_name) + + def check(conn, endpoint, password): + reader_conn = None + cleanup_conn = None + cursor = None + dropped = False + try: + conn.execute(f"DROP TABLE IF EXISTS {fq_table_name}").close() + conn.execute(f"CREATE TABLE {fq_table_name} (id INT, label TEXT)").close() + conn.execute( + f"INSERT INTO {fq_table_name} VALUES (101, 'alpha'), (202, 'beta')" + ).close() + conn.close() + + reader_conn = _connect_with_pyarrow_bootstrapped_bearer(endpoint, password) + _set_autocommit_if_supported(reader_conn, True) + cursor = reader_conn.execute( + f"SELECT id, label FROM {fq_table_name} ORDER BY id" + ) + + assert cursor.fetchall() == [(101, "alpha"), (202, "beta")] + finally: + if cursor is not None: + with contextlib.suppress(Exception): + cursor.close() + if reader_conn is not None: + with contextlib.suppress(Exception): + reader_conn.execute(f"DROP TABLE IF EXISTS {fq_table_name}").close() + dropped = True + with contextlib.suppress(Exception): + reader_conn.close() + if not dropped: + cleanup_conn = _connect_with_pyarrow_bootstrapped_bearer(endpoint, password) + with contextlib.suppress(Exception): + cleanup_conn.execute(f"DROP TABLE IF EXISTS {fq_table_name}").close() + dropped = True + with contextlib.suppress(Exception): + cleanup_conn.close() + + _run_live_check_with_credentials(_connect_with_pyarrow_bootstrapped_bearer, check) + + +def test_deployed_container_app_persists_jdbc_writes_after_disconnect(): + _run_live_jdbc_test("FlightSqlJdbcAzurePersistenceTest") + + +def test_deployed_container_app_supports_adbc_execute_partitions(): + def check(conn): + cursor = conn.cursor() + try: + partitions, schema = cursor.adbc_execute_partitions( + "SELECT 1 AS value UNION ALL SELECT 2 AS value ORDER BY value" + ) + + assert len(partitions) == 1 + assert schema.names == ["value"] + + cursor.adbc_read_partition(partitions[0]) + assert cursor.fetchall() == [(1,), (2,)] + finally: + cursor.close() + + _run_live_check(_connect_with_pyarrow_bootstrapped_bearer, check) + + +def test_deployed_container_app_accepts_idle_adbc_cancel(): + def check(conn): + cursor = conn.cursor() + try: + cursor.adbc_cancel() + cursor.execute("SELECT 1 AS value") + assert cursor.fetchall() == [(1,)] + finally: + cursor.close() + + _run_live_check(_connect_with_pyarrow_bootstrapped_bearer, check) + + @pytest.mark.skipif( os.environ.get(_LIVE_BACKEND_ADBC_BASIC_FLAG) != "1", reason=f"set {_LIVE_BACKEND_ADBC_BASIC_FLAG}=1 to exercise ADBC Basic auth", diff --git a/tests/test_server.py b/tests/test_server.py index 9d8b6d7..e2482ab 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -266,6 +266,51 @@ def test_schema_matches_query_columns(self, server_with_data, ctx): assert info.schema.names == ["value"] +class TestGetSchema: + """Tests for the Flight ``GetSchema`` RPC.""" + + def test_statement_query_schema(self, server_with_data, ctx): + cmd = fs.CommandStatementQuery(query="SELECT id, name FROM test_table") + descriptor = _make_descriptor(cmd) + + result = server_with_data.get_schema(ctx, descriptor) + + assert isinstance(result, flight.SchemaResult) + assert result.schema.names == ["id", "name"] + + def test_prepared_statement_schema_without_executing_ddl(self, server, ctx): + create_req = fs.ActionCreatePreparedStatementRequest( + query="CREATE TABLE schema_probe_side_effect (id INT)" + ) + prepared = server.create_prepared_statement(ctx, create_req) + cmd = fs.CommandPreparedStatementQuery( + prepared_statement_handle=prepared.prepared_statement_handle + ) + descriptor = _make_descriptor(cmd) + + result = server.get_schema(ctx, descriptor) + + assert isinstance(result, flight.SchemaResult) + assert result.schema == pa.schema([]) + with pytest.raises(duckdb.CatalogException): + server._get_session(ctx).execute("SELECT * FROM schema_probe_side_effect") + + def test_metadata_schema(self, server, ctx): + cmd = fs.CommandGetTables(include_schema=True) + descriptor = _make_descriptor(cmd) + + result = server.get_schema(ctx, descriptor) + + assert isinstance(result, flight.SchemaResult) + assert result.schema.names == [ + "catalog_name", + "db_schema_name", + "table_name", + "table_type", + "table_schema", + ] + + class TestGetFlightInfoCatalogs: def test_returns_catalogs_schema(self, server, ctx): cmd = fs.CommandGetCatalogs() @@ -877,10 +922,51 @@ def test_server_name_is_lakehouse(self): assert value_col[0].as_py() == "lakehouse" def test_read_only_is_false(self): - table = _build_sql_info_table([500]) + table = _build_sql_info_table([fs.FLIGHT_SQL_SERVER_READ_ONLY]) value_col = table.column("value") assert value_col[0].as_py() is False + def test_current_flight_sql_capability_ids_are_reported(self): + current_capability_ids = [ + fs.FLIGHT_SQL_SERVER_READ_ONLY, + fs.FLIGHT_SQL_SERVER_SQL, + fs.FLIGHT_SQL_SERVER_SUBSTRAIT, + fs.FLIGHT_SQL_SERVER_TRANSACTION, + fs.FLIGHT_SQL_SERVER_CANCEL, + fs.FLIGHT_SQL_SERVER_BULK_INGESTION, + fs.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED, + ] + + table = _build_sql_info_table(current_capability_ids) + + assert set(table.column("info_name").to_pylist()) == set(current_capability_ids) + + def test_transaction_sql_info_ids_are_reported(self): + transaction_info_ids = [ + fs.SQL_DEFAULT_TRANSACTION_ISOLATION, + fs.SQL_TRANSACTIONS_SUPPORTED, + fs.SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS, + fs.SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT, + fs.SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED, + ] + + table = _build_sql_info_table(transaction_info_ids) + values = dict( + zip( + table.column("info_name").to_pylist(), + table.column("value").to_pylist(), + strict=True, + ) + ) + + assert values[fs.SQL_DEFAULT_TRANSACTION_ISOLATION] == fs.SQL_TRANSACTION_SERIALIZABLE + assert values[fs.SQL_TRANSACTIONS_SUPPORTED] is True + assert values[fs.SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS] == ( + 1 << fs.SQL_TRANSACTION_SERIALIZABLE + ) + assert values[fs.SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT] is False + assert values[fs.SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED] is False + def test_empty_filter_returns_all(self): table1 = _build_sql_info_table(None) table2 = _build_sql_info_table([]) From b6531405e84332be7d33fee5f36d24548c9e8a2a Mon Sep 17 00:00:00 2001 From: Miguel P Z <60221874+MiguelElGallo@users.noreply.github.com> Date: Tue, 14 Apr 2026 15:25:00 +0300 Subject: [PATCH 2/3] Format ADBC compatibility changes --- src/lakehouse/server.py | 12 +++--------- tests/test_e2e.py | 8 ++------ tests/test_live_azure_backend.py | 4 +--- 3 files changed, 6 insertions(+), 18 deletions(-) diff --git a/src/lakehouse/server.py b/src/lakehouse/server.py index 1b7f4c2..77bec3c 100644 --- a/src/lakehouse/server.py +++ b/src/lakehouse/server.py @@ -748,15 +748,9 @@ def _build_xdbc_type_info_table( _SQL_IDENTIFIER_QUOTE_CHAR = fs.SQL_IDENTIFIER_QUOTE_CHAR _SQL_DEFAULT_TRANSACTION_ISOLATION = fs.SQL_DEFAULT_TRANSACTION_ISOLATION _SQL_TRANSACTIONS_SUPPORTED = fs.SQL_TRANSACTIONS_SUPPORTED -_SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS = ( - fs.SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS -) -_SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT = ( - fs.SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT -) -_SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED = ( - fs.SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED -) +_SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS = fs.SQL_SUPPORTED_TRANSACTIONS_ISOLATION_LEVELS +_SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT = fs.SQL_DATA_DEFINITION_CAUSES_TRANSACTION_COMMIT +_SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED = fs.SQL_DATA_DEFINITIONS_IN_TRANSACTIONS_IGNORED def _build_sql_info_table( diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 4f05a96..c0a6e75 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -319,9 +319,7 @@ def test_get_objects_db_schemas_filter(self, seeded_conn): ) table = reader.read_all() - memory_catalog = next( - row for row in table.to_pylist() if row["catalog_name"] == "memory" - ) + memory_catalog = next(row for row in table.to_pylist() if row["catalog_name"] == "memory") schemas = memory_catalog["catalog_db_schemas"] assert schemas == [{"db_schema_name": "main", "db_schema_tables": None}] @@ -332,9 +330,7 @@ def test_get_objects_tables_filter(self, seeded_conn): ) table = reader.read_all() - memory_catalog = next( - row for row in table.to_pylist() if row["catalog_name"] == "memory" - ) + memory_catalog = next(row for row in table.to_pylist() if row["catalog_name"] == "memory") tables = memory_catalog["catalog_db_schemas"][0]["db_schema_tables"] assert tables == [ { diff --git a/tests/test_live_azure_backend.py b/tests/test_live_azure_backend.py index 4b4762b..8161cfd 100644 --- a/tests/test_live_azure_backend.py +++ b/tests/test_live_azure_backend.py @@ -355,9 +355,7 @@ def check(conn, endpoint, password): reader_conn = _connect_with_pyarrow_bootstrapped_bearer(endpoint, password) _set_autocommit_if_supported(reader_conn, True) - cursor = reader_conn.execute( - f"SELECT id, label FROM {fq_table_name} ORDER BY id" - ) + cursor = reader_conn.execute(f"SELECT id, label FROM {fq_table_name} ORDER BY id") assert cursor.fetchall() == [(101, "alpha"), (202, "beta")] finally: From 51ff126a2459638e4c5a67a601944ec1a8b8cea5 Mon Sep 17 00:00:00 2001 From: Miguel P Z <60221874+MiguelElGallo@users.noreply.github.com> Date: Tue, 14 Apr 2026 15:36:10 +0300 Subject: [PATCH 3/3] Address ADBC review feedback --- src/lakehouse/server.py | 2 +- tests/jdbc/pom.xml | 1 - .../FlightSqlJdbcAzurePersistenceTest.java | 9 ++++----- tests/test_live_azure_backend.py | 2 +- tests/test_server.py | 16 ++++++++++++++++ 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/lakehouse/server.py b/src/lakehouse/server.py index 77bec3c..492c28a 100644 --- a/src/lakehouse/server.py +++ b/src/lakehouse/server.py @@ -936,7 +936,7 @@ def get_schema( return flight.SchemaResult(_SQL_INFO_SCHEMA) if isinstance(command, fs.CommandGetPrimaryKeys): return flight.SchemaResult(_PRIMARY_KEYS_SCHEMA) - if isinstance(command, fs.CommandGetImportedKeys | fs.CommandGetExportedKeys): + if isinstance(command, (fs.CommandGetImportedKeys, fs.CommandGetExportedKeys)): return flight.SchemaResult(_FK_KEYS_SCHEMA) if isinstance(command, fs.CommandGetCrossReference): return flight.SchemaResult(_FK_KEYS_SCHEMA) diff --git a/tests/jdbc/pom.xml b/tests/jdbc/pom.xml index ec6243b..95f7aed 100644 --- a/tests/jdbc/pom.xml +++ b/tests/jdbc/pom.xml @@ -55,7 +55,6 @@ ${flight.url} ${flight.user} - ${flight.password} ${live.azure.jdbc.required} ${tls.ca.cert} diff --git a/tests/jdbc/src/test/java/lakehouse/FlightSqlJdbcAzurePersistenceTest.java b/tests/jdbc/src/test/java/lakehouse/FlightSqlJdbcAzurePersistenceTest.java index 510ba05..e6ba70c 100644 --- a/tests/jdbc/src/test/java/lakehouse/FlightSqlJdbcAzurePersistenceTest.java +++ b/tests/jdbc/src/test/java/lakehouse/FlightSqlJdbcAzurePersistenceTest.java @@ -5,14 +5,13 @@ * * *

Run with: *

  *     mvn test -Dflight.url=grpc+tls://ca-example.region.azurecontainerapps.io:443 \
  *              -Dflight.user=lakehouse \
- *              -Dflight.password=... \
  *              -Dtest=FlightSqlJdbcAzurePersistenceTest
  * 
*/ @@ -88,7 +87,7 @@ private static boolean isConfigured(String value) { private static Connection connect() throws SQLException { String endpoint = System.getProperty("flight.url"); - String password = System.getProperty("flight.password"); + String password = System.getenv("FLIGHT_PASSWORD"); String username = System.getProperty("flight.user", DEFAULT_USER); boolean required = Boolean.getBoolean("live.azure.jdbc.required"); if (!isConfigured(username)) { @@ -97,10 +96,10 @@ private static Connection connect() throws SQLException { if (required) { assertTrue(isConfigured(endpoint), "flight.url is required"); - assertTrue(isConfigured(password), "flight.password is required"); + assertTrue(isConfigured(password), "FLIGHT_PASSWORD is required"); } else { Assumptions.assumeTrue(isConfigured(endpoint), "flight.url is required"); - Assumptions.assumeTrue(isConfigured(password), "flight.password is required"); + Assumptions.assumeTrue(isConfigured(password), "FLIGHT_PASSWORD is required"); } EndpointTarget target = parseEndpoint(endpoint); diff --git a/tests/test_live_azure_backend.py b/tests/test_live_azure_backend.py index 8161cfd..1149873 100644 --- a/tests/test_live_azure_backend.py +++ b/tests/test_live_azure_backend.py @@ -208,12 +208,12 @@ def _run_live_jdbc_test(test_class: str) -> None: "test", f"-Dflight.url={endpoint}", "-Dflight.user=lakehouse", - f"-Dflight.password={password}", "-Dlive.azure.jdbc.required=true", f"-Dtest={test_class}", ], cwd=_JDBC_DIR, capture_output=True, + env={**os.environ, "FLIGHT_PASSWORD": password}, text=True, timeout=240, ) diff --git a/tests/test_server.py b/tests/test_server.py index e2482ab..35a16c1 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -310,6 +310,22 @@ def test_metadata_schema(self, server, ctx): "table_schema", ] + @pytest.mark.parametrize( + "cmd", + [ + fs.CommandGetImportedKeys(table="child"), + fs.CommandGetExportedKeys(table="parent"), + ], + ) + def test_foreign_key_metadata_schema(self, server, ctx, cmd): + descriptor = _make_descriptor(cmd) + + result = server.get_schema(ctx, descriptor) + + assert isinstance(result, flight.SchemaResult) + assert "pk_table_name" in result.schema.names + assert "fk_table_name" in result.schema.names + class TestGetFlightInfoCatalogs: def test_returns_catalogs_schema(self, server, ctx):