From 9a7127907dcf47ccad8decb7ed2736390eab17e6 Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Mon, 20 Apr 2026 17:06:37 +0200 Subject: [PATCH 1/4] fix: normalize PEM line endings in certificate reading --- influxdb_client_3/query/query_api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py index 7631795..ad50293 100644 --- a/influxdb_client_3/query/query_api.py +++ b/influxdb_client_3/query/query_api.py @@ -61,7 +61,9 @@ def __init__(self, root_certs_path: str, def _read_certs(self, path: str) -> bytes: with open(path, "rb") as certs_file: - return certs_file.read() + certs = certs_file.read() + # Normalize PEM line endings so behavior is stable across platforms. + return certs.replace(b"\r\n", b"\n").replace(b"\r", b"\n") class QueryApiOptionsBuilder(object): From 85a4a4fd8baede0caf8250f1780f2a145e7a84d6 Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Mon, 20 Apr 2026 17:07:13 +0200 Subject: [PATCH 2/4] feat: add CLI for querying InfluxDB 3 Add a new query CLI to support quick read/debug workflows from terminal and AI agents. - add influx3 query with json, jsonl, csv, and pretty output - add module execution path via python -m influxdb_client_3 - wire console entry point in setup - add CLI tests --- CHANGELOG.md | 5 + README.md | 38 +++++++ influxdb_client_3/__main__.py | 5 + influxdb_client_3/cli.py | 180 ++++++++++++++++++++++++++++++++++ setup.py | 3 + tests/test_cli.py | 140 ++++++++++++++++++++++++++ 6 files changed, 371 insertions(+) create mode 100644 influxdb_client_3/__main__.py create mode 100644 influxdb_client_3/cli.py create mode 100644 tests/test_cli.py diff --git a/CHANGELOG.md b/CHANGELOG.md index db9b01d..4d99f92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ 1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option. See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. 1. [#202](https://github.com/InfluxCommunity/influxdb3-python/pull/202): Add escape for field keys when serializing to line protocol in `PolarsDataframeSerializer`. +1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`. + +### Bug Fixes + +1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Normalize PEM certificate line endings when loading Flight query root certificates to ensure consistent SSL option behavior on Windows. ## 0.18.0 [2026-02-19] diff --git a/README.md b/README.md index b1f4734..cbe0c21 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,44 @@ Note: This does not include Pandas support. If you would like to use key feature *Note: Please make sure you are using 3.9 or above. For the best performance use 3.11+* +## CLI (Agent-Friendly Query Tool) + +This package includes an `influx3` CLI for read/query workflows. + +### Run a query + +```bash +influx3 query -d my_database "SELECT * FROM cpu LIMIT 5" +``` + +By default, output is JSON to stdout. + +### Supported formats + +- `json` (default) +- `jsonl` +- `csv` +- `pretty` + +```bash +influx3 query -d my_database --format csv "SELECT * FROM cpu LIMIT 5" +``` + +### Config precedence + +Configuration values are resolved in this order: + +1. CLI flags +2. `INFLUXDB3_*` environment variables +3. legacy `INFLUX_*` environment variables +4. built-in defaults (host defaults to `http://127.0.0.1:8181`) + +Relevant environment variables: + +- `INFLUXDB3_HOST_URL` (legacy fallback: `INFLUX_HOST`) +- `INFLUXDB3_DATABASE_NAME` (legacy fallback: `INFLUX_DATABASE`) +- `INFLUXDB3_AUTH_TOKEN` (legacy fallback: `INFLUX_TOKEN`) + # Usage One of the easiest ways to get started is to check out the ["Influxdb3 Python Basic Usage"](https://github.com/InfluxCommunity/influxdb3-python/blob/main/examples/jupyter/basic-write-query.ipynb) notebook. This scenario takes you through the core write and read APIs of the client library. diff --git a/influxdb_client_3/__main__.py b/influxdb_client_3/__main__.py new file mode 100644 index 0000000..f36f60c --- /dev/null +++ b/influxdb_client_3/__main__.py @@ -0,0 +1,5 @@ +from influxdb_client_3.cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/influxdb_client_3/cli.py b/influxdb_client_3/cli.py new file mode 100644 index 0000000..1c0b672 --- /dev/null +++ b/influxdb_client_3/cli.py @@ -0,0 +1,180 @@ +import argparse +import csv +import io +import json +import os +import sys +from typing import Mapping, Optional + +import pyarrow as pa + +from influxdb_client_3 import ( + INFLUX_DATABASE, + INFLUX_HOST, + INFLUX_TOKEN, + InfluxDBClient3, +) +from influxdb_client_3.exceptions import InfluxDB3ClientQueryError, InfluxDBError + + +def _resolve_option( + cli_value: Optional[str], + env: Mapping[str, str], + primary_env: str, + secondary_env: Optional[str] = None, + default: Optional[str] = None, +) -> Optional[str]: + if cli_value is not None: + return cli_value + + for var in (primary_env, secondary_env): + if not var: + continue + value = env.get(var) + if value not in (None, ""): + return value + + return default + + +def _rows_to_csv(rows, fieldnames): + buff = io.StringIO() + writer = csv.DictWriter(buff, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow(row) + return buff.getvalue() + + +def _rows_to_pretty(rows, fieldnames): + if not rows: + return "(0 rows)" + + widths = {name: len(name) for name in fieldnames} + for row in rows: + for name in fieldnames: + widths[name] = max(widths[name], len(str(row.get(name, "")))) + + header = " | ".join(name.ljust(widths[name]) for name in fieldnames) + sep = "-+-".join("-" * widths[name] for name in fieldnames) + lines = [header, sep] + for row in rows: + lines.append(" | ".join(str(row.get(name, "")).ljust(widths[name]) for name in fieldnames)) + return "\n".join(lines) + + +def _rows_to_json(rows, fieldnames): + return json.dumps(rows, default=str) + + +def _rows_to_jsonl(rows, fieldnames): + return "\n".join(json.dumps(row, default=str) for row in rows) + + +_FORMATTERS = { + "json": _rows_to_json, + "jsonl": _rows_to_jsonl, + "csv": _rows_to_csv, + "pretty": _rows_to_pretty, +} + + +def _format_table(table: pa.Table, output_format: str) -> str: + rows = table.to_pylist() + fieldnames = table.schema.names + return _FORMATTERS[output_format](rows, fieldnames) + + +def _ensure_trailing_nl(text: str) -> str: + return text if text.endswith("\n") else text + "\n" + + +def _write_error(stderr, message: str): + stderr.write(json.dumps({"error": str(message)}) + "\n") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="influx3", description="InfluxDB 3 query CLI") + subparsers = parser.add_subparsers(dest="command", required=True) + + query_parser = subparsers.add_parser("query", aliases=["q"], help="Run a SQL or InfluxQL query") + query_parser.add_argument("query", nargs="?", help="The query string to execute") + query_parser.add_argument("-f", "--file", dest="file_path", help="File containing the query") + query_parser.add_argument("-H", "--host", dest="host", help="InfluxDB host URL") + query_parser.add_argument("-d", "--database", dest="database", help="Database name") + query_parser.add_argument("--token", dest="token", help="Authentication token") + query_parser.add_argument( + "-l", + "--language", + dest="language", + choices=["sql", "influxql"], + default="sql", + help="Query language", + ) + query_parser.add_argument( + "--format", + dest="output_format", + choices=list(_FORMATTERS), + default="json", + help="Output format", + ) + query_parser.add_argument("-o", "--output", dest="output_file_path", help="Write output to file") + query_parser.add_argument("--query-timeout", dest="query_timeout", type=int, help="Query timeout in ms") + query_parser.set_defaults(func=_run_query) + return parser + + +def _run_query(args, stdout, stderr, env: Optional[Mapping[str, str]] = None) -> int: + if env is None: + env = os.environ + + host = _resolve_option(args.host, env, "INFLUXDB3_HOST_URL", INFLUX_HOST, "http://127.0.0.1:8181") + database = _resolve_option(args.database, env, "INFLUXDB3_DATABASE_NAME", INFLUX_DATABASE) + token = _resolve_option(args.token, env, "INFLUXDB3_AUTH_TOKEN", INFLUX_TOKEN) + + if (args.query is None) == (args.file_path is None): + _write_error(stderr, "Provide exactly one of query or --file.") + return 1 + + if not database: + _write_error(stderr, "Database is required. Set --database or INFLUXDB3_DATABASE_NAME.") + return 1 + + try: + query = args.query + if args.file_path: + with open(args.file_path, "r", encoding="utf-8") as file_handle: + query = file_handle.read() + + query_kwargs = {} + if args.query_timeout is not None: + query_kwargs["query_timeout"] = args.query_timeout + + with InfluxDBClient3(host=host, database=database, token=token, **query_kwargs) as client: + table = client.query( + query=query, + language=args.language, + mode="all", + database=database, + ) + + payload = _ensure_trailing_nl(_format_table(table, args.output_format)) + if args.output_file_path: + with open(args.output_file_path, "w", encoding="utf-8", newline="") as file_handle: + file_handle.write(payload) + else: + stdout.write(payload) + return 0 + except (InfluxDB3ClientQueryError, InfluxDBError, OSError, pa.ArrowException) as error: + _write_error(stderr, str(error)) + return 1 + + +def main(argv=None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + return args.func(args, sys.stdout, sys.stderr) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/setup.py b/setup.py index 2920869..5883e43 100644 --- a/setup.py +++ b/setup.py @@ -60,6 +60,9 @@ def get_version(): ] }, install_requires=requires, + entry_points={ + 'console_scripts': ['influx3 = influxdb_client_3.cli:main'], + }, python_requires='>=3.9', classifiers=[ 'Development Status :: 4 - Beta', diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..4422f3f --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,140 @@ +import io +import json +from argparse import Namespace +from unittest.mock import MagicMock, patch + +import pyarrow as pa + +from influxdb_client_3.cli import _run_query, build_parser, main +from influxdb_client_3.exceptions import InfluxDB3ClientQueryError + + +def _args(**overrides): + base = dict( + command="query", + query=None, + file_path=None, + host=None, + database=None, + token=None, + language="sql", + output_format="json", + output_file_path=None, + query_timeout=None, + ) + return Namespace(**{**base, **overrides}) + + +def _mock_client(return_value=None, side_effect=None): + client = MagicMock() + client.__enter__.return_value = client + if side_effect is not None: + client.query.side_effect = side_effect + else: + client.query.return_value = return_value + return client + + +def test_build_parser_query_alias(): + parser = build_parser() + args = parser.parse_args(["q", "SELECT 1"]) + assert args.command == "q" + assert args.query == "SELECT 1" + + +def test_run_query_uses_env_precedence(): + args = _args(query="SELECT 1") + env = { + "INFLUXDB3_HOST_URL": "http://from-primary:8181", + "INFLUX_HOST": "http://from-legacy:8181", + "INFLUXDB3_DATABASE_NAME": "db_primary", + "INFLUX_DATABASE": "db_legacy", + "INFLUXDB3_AUTH_TOKEN": "token_primary", + "INFLUX_TOKEN": "token_legacy", + } + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"value": 1}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client) as client_ctor: + rc = _run_query(args, stdout, stderr, env=env) + + assert rc == 0 + client_ctor.assert_called_once_with( + host="http://from-primary:8181", + database="db_primary", + token="token_primary", + ) + mock_client.query.assert_called_once_with( + query="SELECT 1", + language="sql", + mode="all", + database="db_primary", + ) + + +def test_run_query_formats_jsonl(): + args = _args( + query="SELECT * FROM m", + host="http://localhost:8181", + database="db1", + output_format="jsonl", + ) + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"a": 1}, {"a": 2}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 0 + assert stdout.getvalue() == '{"a": 1}\n{"a": 2}\n' + assert stderr.getvalue() == "" + + +def test_run_query_reads_query_from_file(tmp_path): + query_file = tmp_path / "query.sql" + query_file.write_text("SELECT * FROM cpu LIMIT 1", encoding="utf-8") + + args = _args( + file_path=str(query_file), + host="http://localhost:8181", + database="db1", + ) + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"ok": True}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 0 + mock_client.query.assert_called_once_with( + query="SELECT * FROM cpu LIMIT 1", + language="sql", + mode="all", + database="db1", + ) + + +def test_run_query_writes_json_error_for_query_exception(): + args = _args(query="SELECT bad", host="http://localhost:8181", database="db1") + mock_client = _mock_client(side_effect=InfluxDB3ClientQueryError("bad query")) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 1 + assert json.loads(stderr.getvalue()) == {"error": "bad query"} + + +def test_main_returns_1_when_database_missing(): + with patch("influxdb_client_3.cli.sys.stdout", new=io.StringIO()), patch( + "influxdb_client_3.cli.sys.stderr", new=io.StringIO() + ) as stderr: + rc = main(["query", "SELECT 1"]) + + assert rc == 1 + assert "Database is required" in stderr.getvalue() From b3ff2002b9c38c6a9b23da46a587ed990db86e1c Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Wed, 22 Apr 2026 18:25:18 +0200 Subject: [PATCH 3/4] feat: enhance CLI output formatting and add error handling for query timeout --- CHANGELOG.md | 13 +++-- influxdb_client_3/cli.py | 8 +++ tests/test_cli.py | 107 +++++++++++++++++++++++++++++++++++++-- tests/test_query.py | 16 ++++++ 4 files changed, 135 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d99f92..95854c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,19 +2,22 @@ ## 0.20.0 [unreleased] -## 0.19.0 [2026-04-23] - ### Features -1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option. - See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. -1. [#202](https://github.com/InfluxCommunity/influxdb3-python/pull/202): Add escape for field keys when serializing to line protocol in `PolarsDataframeSerializer`. 1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`. ### Bug Fixes 1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Normalize PEM certificate line endings when loading Flight query root certificates to ensure consistent SSL option behavior on Windows. +## 0.19.0 [2026-04-23] + +### Features + +1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option. + See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. +1. [#202](https://github.com/InfluxCommunity/influxdb3-python/pull/202): Add escape for field keys when serializing to line protocol in `PolarsDataframeSerializer`. + ## 0.18.0 [2026-02-19] ### Features diff --git a/influxdb_client_3/cli.py b/influxdb_client_3/cli.py index 1c0b672..c6c4b98 100644 --- a/influxdb_client_3/cli.py +++ b/influxdb_client_3/cli.py @@ -68,6 +68,8 @@ def _rows_to_json(rows, fieldnames): def _rows_to_jsonl(rows, fieldnames): + if not rows: + return "" return "\n".join(json.dumps(row, default=str) for row in rows) @@ -86,6 +88,8 @@ def _format_table(table: pa.Table, output_format: str) -> str: def _ensure_trailing_nl(text: str) -> str: + if not text: + return "" return text if text.endswith("\n") else text + "\n" @@ -140,6 +144,10 @@ def _run_query(args, stdout, stderr, env: Optional[Mapping[str, str]] = None) -> _write_error(stderr, "Database is required. Set --database or INFLUXDB3_DATABASE_NAME.") return 1 + if args.query_timeout is not None and args.query_timeout < 0: + _write_error(stderr, "--query-timeout must be non-negative.") + return 1 + try: query = args.query if args.file_path: diff --git a/tests/test_cli.py b/tests/test_cli.py index 4422f3f..a26d7f3 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -130,11 +130,110 @@ def test_run_query_writes_json_error_for_query_exception(): assert json.loads(stderr.getvalue()) == {"error": "bad query"} +def test_run_query_formats_csv(): + args = _args( + query="SELECT * FROM m", + host="http://localhost:8181", + database="db1", + output_format="csv", + ) + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 0 + lines = stdout.getvalue().strip().splitlines() + assert lines[0] == "a,b" + assert lines[1] == "1,x" + assert lines[2] == "2,y" + assert stderr.getvalue() == "" + + +def test_run_query_formats_pretty(): + args = _args( + query="SELECT * FROM m", + host="http://localhost:8181", + database="db1", + output_format="pretty", + ) + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"a": 1, "b": "hi"}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 0 + lines = stdout.getvalue().splitlines() + assert lines[0] == "a | b " + assert lines[1] == "--+---" + assert lines[2] == "1 | hi" + assert stderr.getvalue() == "" + + +def test_run_query_writes_output_to_file(tmp_path): + out_file = tmp_path / "result.json" + args = _args( + query="SELECT 1 AS v", + host="http://localhost:8181", + database="db1", + output_file_path=str(out_file), + ) + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"v": 1}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 0 + assert stdout.getvalue() == "" + content = out_file.read_text(encoding="utf-8") + assert json.loads(content) == [{"v": 1}] + + +def test_run_query_empty_jsonl_no_output(): + args = _args( + query="SELECT * FROM m", + host="http://localhost:8181", + database="db1", + output_format="jsonl", + ) + mock_client = _mock_client(return_value=pa.Table.from_pylist([], schema=pa.schema([("a", pa.int64())]))) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 0 + assert stdout.getvalue() == "" + + +def test_run_query_rejects_negative_timeout(): + args = _args( + query="SELECT 1", + host="http://localhost:8181", + database="db1", + query_timeout=-5, + ) + stdout, stderr = io.StringIO(), io.StringIO() + + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 1 + assert "non-negative" in stderr.getvalue() + + def test_main_returns_1_when_database_missing(): - with patch("influxdb_client_3.cli.sys.stdout", new=io.StringIO()), patch( - "influxdb_client_3.cli.sys.stderr", new=io.StringIO() - ) as stderr: - rc = main(["query", "SELECT 1"]) + with patch.dict("os.environ", {}, clear=True): + with patch("influxdb_client_3.cli.sys.stdout", new=io.StringIO()), patch( + "influxdb_client_3.cli.sys.stderr", new=io.StringIO() + ) as stderr: + rc = main(["query", "SELECT 1"]) assert rc == 1 assert "Database is required" in stderr.getvalue() diff --git a/tests/test_query.py b/tests/test_query.py index 2d623d0..a3d93f8 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -167,6 +167,22 @@ def test_query_api_options_builder(self): finally: self.remove_cert_file(cert_file) + def test_pem_crlf_normalized_to_lf(self): + cert_file = "cert_crlf_test.pem" + crlf_content = self.sample_cert.replace("\n", "\r\n") + with open(cert_file, "wb") as f: + f.write(crlf_content.encode("utf-8")) + + try: + options = QueryApiOptionsBuilder()\ + .root_certs(cert_file)\ + .build() + loaded = options.tls_root_certs.decode("utf-8") + assert "\r" not in loaded + assert loaded == self.sample_cert + finally: + self.remove_cert_file(cert_file) + def test_query_client_with_options(self): connection = "grpc+tls://localhost:9999" token = "my_token" From c90beeb3aee5b7e0e340fb2c8951a5a424db043f Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Tue, 5 May 2026 20:27:36 +0200 Subject: [PATCH 4/4] fix: handle nanosecond timestamps in CLI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ns timestamps are now truncated to µs. This enables querying DBs with ns precision. If ns precision is needed as output, the direct Python client is the better option. Co-authored-by: Copilot --- influxdb_client_3/cli.py | 27 +++++++++++++++++++++-- tests/test_cli.py | 46 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/influxdb_client_3/cli.py b/influxdb_client_3/cli.py index c6c4b98..b51e4c6 100644 --- a/influxdb_client_3/cli.py +++ b/influxdb_client_3/cli.py @@ -81,7 +81,30 @@ def _rows_to_jsonl(rows, fieldnames): } -def _format_table(table: pa.Table, output_format: str) -> str: +def _is_ns_timestamp(field_type) -> bool: + return pa.types.is_timestamp(field_type) and field_type.unit == "ns" + + +def _coerce_timestamps(table: pa.Table) -> tuple[pa.Table, bool]: + # Python datetime only supports microsecond precision, so to_pylist truncates ns + # timestamps anyway; cast explicitly with safe=False to acknowledge the precision loss. + if not any(_is_ns_timestamp(field.type) for field in table.schema): + return table, False + new_fields = [ + pa.field(field.name, pa.timestamp("us", tz=field.type.tz)) + if _is_ns_timestamp(field.type) + else field + for field in table.schema + ] + return table.cast(pa.schema(new_fields), safe=False), True + + +def _format_table(table: pa.Table, output_format: str, stderr=None) -> str: + table, coerced = _coerce_timestamps(table) + if coerced and stderr is not None: + stderr.write( + "Warning: nanosecond precision timestamps truncated to microseconds for display.\n" + ) rows = table.to_pylist() fieldnames = table.schema.names return _FORMATTERS[output_format](rows, fieldnames) @@ -166,7 +189,7 @@ def _run_query(args, stdout, stderr, env: Optional[Mapping[str, str]] = None) -> database=database, ) - payload = _ensure_trailing_nl(_format_table(table, args.output_format)) + payload = _ensure_trailing_nl(_format_table(table, args.output_format, stderr=stderr)) if args.output_file_path: with open(args.output_file_path, "w", encoding="utf-8", newline="") as file_handle: file_handle.write(payload) diff --git a/tests/test_cli.py b/tests/test_cli.py index a26d7f3..c07e071 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,7 +5,7 @@ import pyarrow as pa -from influxdb_client_3.cli import _run_query, build_parser, main +from influxdb_client_3.cli import _coerce_timestamps, _format_table, _run_query, build_parser, main from influxdb_client_3.exceptions import InfluxDB3ClientQueryError @@ -237,3 +237,47 @@ def test_main_returns_1_when_database_missing(): assert rc == 1 assert "Database is required" in stderr.getvalue() + + +def test_coerce_timestamps_converts_nanoseconds(): + ns_value = 1777981256454646723 + table = pa.table({"time": pa.array([ns_value], type=pa.timestamp("ns")), "val": [1]}) + result, coerced = _coerce_timestamps(table) + assert coerced is True + assert result.schema.field("time").type == pa.timestamp("us") + rows = result.to_pylist() + assert rows[0]["val"] == 1 + # Verify no ValueError is raised and datetime is returned + assert rows[0]["time"].year == 2026 + + +def test_coerce_timestamps_preserves_timezone(): + ns_value = 1777981256454646723 + table = pa.table({"time": pa.array([ns_value], type=pa.timestamp("ns", tz="UTC")), "val": [1]}) + result, coerced = _coerce_timestamps(table) + assert coerced is True + assert result.schema.field("time").type == pa.timestamp("us", tz="UTC") + + +def test_coerce_timestamps_ignores_non_nanosecond(): + table = pa.table({"time": pa.array([1000000], type=pa.timestamp("us")), "val": [1]}) + result, coerced = _coerce_timestamps(table) + assert coerced is False + # Should be unchanged + assert result.schema.field("time").type == pa.timestamp("us") + assert result.to_pylist() == table.to_pylist() + + +def test_format_table_emits_warning_for_nanoseconds(): + ns_value = 1777981256454646723 + table = pa.table({"time": pa.array([ns_value], type=pa.timestamp("ns")), "val": [1]}) + stderr = io.StringIO() + _format_table(table, "json", stderr=stderr) + assert "nanosecond" in stderr.getvalue().lower() + + +def test_format_table_no_warning_for_microseconds(): + table = pa.table({"time": pa.array([1000000], type=pa.timestamp("us")), "val": [1]}) + stderr = io.StringIO() + _format_table(table, "json", stderr=stderr) + assert stderr.getvalue() == ""