From 5a952047e4d98d6425c63cec5752340f1f4e29fb Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 16 Apr 2025 08:09:49 +0700 Subject: [PATCH 01/18] feat: support env --- influxdb_client_3/__init__.py | 27 ++++++++++++++++ tests/test_influxdb_client_3.py | 36 ++++++++++++++++++++- tests/test_influxdb_client_3_integration.py | 20 +++++++++++- 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 753a83b..a319a06 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -1,3 +1,4 @@ +import os import urllib.parse import pyarrow as pa import importlib.util @@ -47,6 +48,32 @@ def file_parser_options(**kwargs): return kwargs +def from_env(**kwargs): + """ + Create an instance of `InfluxDBClient3` using environment variables for configuration. + + This function retrieves the following environment variables: + - `INFLUX_HOST`: The hostname or IP address of the InfluxDB server. + - `INFLUX_TOKEN`: The authentication token used for accessing the server. + - `INFLUX_DATABASE`: The default database for the client operations. + - `INFLUX_ORG`: The organization associated with InfluxDB operations. + + If any of these variables are not set, their respective parameters will + default to `None`. + + :param kwargs: Additional keyword arguments that will be passed to the + `InfluxDBClient3` constructor for customization. + :return: An initialized `InfluxDBClient3` instance. + """ + + host = os.getenv("INFLUX_HOST") + token = os.getenv("INFLUX_TOKEN") + database = os.getenv("INFLUX_DATABASE") + org = os.getenv("INFLUX_ORG") + + return InfluxDBClient3(host=host, token=token, database=database, org=org, **kwargs) + + def _deep_merge(target, source): """ Performs a deep merge of dictionaries or lists, diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 1b52b76..a6a364a 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,7 @@ import unittest from unittest.mock import patch -from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3 import InfluxDBClient3, from_env from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData @@ -74,6 +74,40 @@ async def test_query_async(self): assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org'}) + def test_from_env_all_env_vars_set(self): + client = from_env() + self.assertIsInstance(client, InfluxDBClient3) + self.assertEqual(client._client.url, "https://localhost:443") + self.assertEqual(client._database, "test_db") + self.assertEqual(client._org, "test_org") + self.assertEqual(client._token, "test_token") + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_DATABASE': 'test_db'}) + def test_from_env_partial_env_vars_set(self): + client = from_env() + self.assertIsInstance(client, InfluxDBClient3) + self.assertEqual(client._client.url, "https://localhost:443") + self.assertEqual(client._database, "test_db") + self.assertEqual(client._org, "default") + self.assertIsNone(client._token) + + @patch.dict('os.environ', {}, clear=True) + def test_from_env_no_env_vars_set(self): + client = from_env() + self.assertIsInstance(client, InfluxDBClient3) + self.assertIsNotNone(client._client.url) + self.assertIsNone(client._database) + self.assertIsNone(client._token) + self.assertEqual(client._org, "default") + + def test_from_env_with_kargs(self): + client = from_env( + write_client_options = write_client_options(batch_size=10000), + ) + self.assertIsInstance(client, InfluxDBClient3) + self.assertEqual(client._write_client_options['batch_size'], 10000) if __name__ == '__main__': unittest.main() diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 48181f7..1d465a4 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -4,12 +4,13 @@ import string import time import unittest +from unittest.mock import patch import pyarrow import pytest from pyarrow._flight import FlightError -from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions +from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions, from_env from tests.util import asyncio_run, lp_to_py_object @@ -274,3 +275,20 @@ async def test_verify_query_async(self): result_list = result.to_pylist() for item in data: assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list" + + @patch.dict('os.environ', {'INFLUX_HOST': 'https://us-east-1-1.aws.cloud2.influxdata.com', + 'INFLUX_TOKEN': 'lDAtMRmhnLp5GjWNVBsieufUb66XZAPxvX3etlmi9wmeq7ispWoL06mwnxmY_BtHKoBhG4lR-c7WfrFgUXy15w==', + 'INFLUX_DATABASE': 'bucket0'}) + def test_from_env(self): + c = from_env() + with c: + id_test = time.time_ns() + c.write(f"integration_test_python,type=used value=123.0,id_test={id_test}i") + + sql = 'SELECT * FROM integration_test_python where type=$type and id_test=$id_test' + data = c.query(sql, mode="pandas", query_parameters={'type': 'used', 'id_test': id_test}) + + self.assertIsNotNone(data) + self.assertEqual(1, len(data)) + self.assertEqual(id_test, data['id_test'][0]) + self.assertEqual(123.0, data['value'][0]) \ No newline at end of file From 60ab02f0e3d26174d2568b67d4f40387eb66c90d Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 16 Apr 2025 08:22:02 +0700 Subject: [PATCH 02/18] feat: support env --- influxdb_client_3/__init__.py | 1 - tests/test_influxdb_client_3.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index a319a06..ceeeff6 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -65,7 +65,6 @@ def from_env(**kwargs): `InfluxDBClient3` constructor for customization. :return: An initialized `InfluxDBClient3` instance. """ - host = os.getenv("INFLUX_HOST") token = os.getenv("INFLUX_TOKEN") database = os.getenv("INFLUX_DATABASE") diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index a6a364a..5f4252d 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,7 @@ import unittest from unittest.mock import patch -from influxdb_client_3 import InfluxDBClient3, from_env +from influxdb_client_3 import InfluxDBClient3, from_env, write_client_options from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData From 924006d459640712032048a1d3d06957cdab52bb Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 16 Apr 2025 08:56:13 +0700 Subject: [PATCH 03/18] feat: support env --- tests/test_influxdb_client_3.py | 3 ++- tests/test_influxdb_client_3_integration.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 5f4252d..b8f5434 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -104,10 +104,11 @@ def test_from_env_no_env_vars_set(self): def test_from_env_with_kargs(self): client = from_env( - write_client_options = write_client_options(batch_size=10000), + write_client_options=write_client_options(batch_size=10000), ) self.assertIsInstance(client, InfluxDBClient3) self.assertEqual(client._write_client_options['batch_size'], 10000) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 1d465a4..b65028a 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -291,4 +291,4 @@ def test_from_env(self): self.assertIsNotNone(data) self.assertEqual(1, len(data)) self.assertEqual(id_test, data['id_test'][0]) - self.assertEqual(123.0, data['value'][0]) \ No newline at end of file + self.assertEqual(123.0, data['value'][0]) From 42519f42f18003e1c6e6eaf0431a73aa9b2158e5 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 21 Apr 2025 09:19:40 +0700 Subject: [PATCH 04/18] feat: support env --- influxdb_client_3/__init__.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index ceeeff6..afb0d6e 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -1,7 +1,8 @@ +import importlib.util import os import urllib.parse + import pyarrow as pa -import importlib.util from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder from influxdb_client_3.read_file import UploadFile @@ -52,24 +53,44 @@ def from_env(**kwargs): """ Create an instance of `InfluxDBClient3` using environment variables for configuration. - This function retrieves the following environment variables: + This function retrieves and validates the following required environment variables: - `INFLUX_HOST`: The hostname or IP address of the InfluxDB server. - `INFLUX_TOKEN`: The authentication token used for accessing the server. - `INFLUX_DATABASE`: The default database for the client operations. + + And optional environment variable: - `INFLUX_ORG`: The organization associated with InfluxDB operations. + Defaults to "default" if not set. - If any of these variables are not set, their respective parameters will - default to `None`. + If any of the required environment variables are not set, a ValueError will be + raised with details about the missing variables. :param kwargs: Additional keyword arguments that will be passed to the - `InfluxDBClient3` constructor for customization. + `InfluxDBClient3` constructor for customization. This allows for + configuring specific client behaviors like write_client_options, + flight_client_options, SSL settings, etc. :return: An initialized `InfluxDBClient3` instance. + :raises ValueError: If any required environment variables are not set. """ + + invalid_env_vars = [] host = os.getenv("INFLUX_HOST") + if host is None: + invalid_env_vars.append("INFLUX_HOST") + token = os.getenv("INFLUX_TOKEN") + if token is None: + invalid_env_vars.append("INFLUX_TOKEN") + database = os.getenv("INFLUX_DATABASE") + if database is None: + invalid_env_vars.append("INFLUX_DATABASE") + org = os.getenv("INFLUX_ORG") + if len(invalid_env_vars) > 0: + raise ValueError(f"The following environment variables are None or empty: {', '.join(invalid_env_vars)}") + return InfluxDBClient3(host=host, token=token, database=database, org=org, **kwargs) From 2d03d453186bf2e5bc68d7d9e675094985d87e27 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 21 Apr 2025 09:42:47 +0700 Subject: [PATCH 05/18] feat: support env --- influxdb_client_3/__init__.py | 50 +++++++++++---------- tests/test_influxdb_client_3.py | 24 +++------- tests/test_influxdb_client_3_integration.py | 3 -- 3 files changed, 33 insertions(+), 44 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index afb0d6e..329ad53 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -1,6 +1,7 @@ import importlib.util import os import urllib.parse +from typing import Any import pyarrow as pa @@ -48,8 +49,12 @@ def file_parser_options(**kwargs): """ return kwargs - -def from_env(**kwargs): +# Constants for environment variable names +INFLUX_HOST = "INFLUX_HOST" +INFLUX_TOKEN = "INFLUX_TOKEN" +INFLUX_DATABASE = "INFLUX_DATABASE" +INFLUX_ORG = "INFLUX_ORG" +def from_env(**kwargs: Any) -> 'InfluxDBClient3': """ Create an instance of `InfluxDBClient3` using environment variables for configuration. @@ -57,7 +62,6 @@ def from_env(**kwargs): - `INFLUX_HOST`: The hostname or IP address of the InfluxDB server. - `INFLUX_TOKEN`: The authentication token used for accessing the server. - `INFLUX_DATABASE`: The default database for the client operations. - And optional environment variable: - `INFLUX_ORG`: The organization associated with InfluxDB operations. Defaults to "default" if not set. @@ -72,26 +76,26 @@ def from_env(**kwargs): :return: An initialized `InfluxDBClient3` instance. :raises ValueError: If any required environment variables are not set. """ - - invalid_env_vars = [] - host = os.getenv("INFLUX_HOST") - if host is None: - invalid_env_vars.append("INFLUX_HOST") - - token = os.getenv("INFLUX_TOKEN") - if token is None: - invalid_env_vars.append("INFLUX_TOKEN") - - database = os.getenv("INFLUX_DATABASE") - if database is None: - invalid_env_vars.append("INFLUX_DATABASE") - - org = os.getenv("INFLUX_ORG") - - if len(invalid_env_vars) > 0: - raise ValueError(f"The following environment variables are None or empty: {', '.join(invalid_env_vars)}") - - return InfluxDBClient3(host=host, token=token, database=database, org=org, **kwargs) + required_vars = { + INFLUX_HOST: os.getenv(INFLUX_HOST), + INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), + INFLUX_DATABASE: os.getenv(INFLUX_DATABASE) + } + + missing_vars = [var for var, value in required_vars.items() if value is None or value == ""] + if missing_vars: + raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") + + org = os.getenv(INFLUX_ORG, "default") + + return InfluxDBClient3( + host=required_vars[INFLUX_HOST], + token=required_vars[INFLUX_TOKEN], + database=required_vars[INFLUX_DATABASE], + org=org, + **kwargs + ) + def _deep_merge(target, source): diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index b8f5434..3d51dc6 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -84,24 +84,6 @@ def test_from_env_all_env_vars_set(self): self.assertEqual(client._org, "test_org") self.assertEqual(client._token, "test_token") - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_DATABASE': 'test_db'}) - def test_from_env_partial_env_vars_set(self): - client = from_env() - self.assertIsInstance(client, InfluxDBClient3) - self.assertEqual(client._client.url, "https://localhost:443") - self.assertEqual(client._database, "test_db") - self.assertEqual(client._org, "default") - self.assertIsNone(client._token) - - @patch.dict('os.environ', {}, clear=True) - def test_from_env_no_env_vars_set(self): - client = from_env() - self.assertIsInstance(client, InfluxDBClient3) - self.assertIsNotNone(client._client.url) - self.assertIsNone(client._database) - self.assertIsNone(client._token) - self.assertEqual(client._org, "default") - def test_from_env_with_kargs(self): client = from_env( write_client_options=write_client_options(batch_size=10000), @@ -109,6 +91,12 @@ def test_from_env_with_kargs(self): self.assertIsInstance(client, InfluxDBClient3) self.assertEqual(client._write_client_options['batch_size'], 10000) + @patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "", + 'INFLUX_DATABASE': "", 'INFLUX_ORG': ""}) + def test_from_env_missing_variables(self): + with self.assertRaises(ValueError) as context: + from_env() + self.assertIn("Missing required environment variables", str(context.exception)) if __name__ == '__main__': unittest.main() diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index b65028a..e815855 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -276,9 +276,6 @@ async def test_verify_query_async(self): for item in data: assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list" - @patch.dict('os.environ', {'INFLUX_HOST': 'https://us-east-1-1.aws.cloud2.influxdata.com', - 'INFLUX_TOKEN': 'lDAtMRmhnLp5GjWNVBsieufUb66XZAPxvX3etlmi9wmeq7ispWoL06mwnxmY_BtHKoBhG4lR-c7WfrFgUXy15w==', - 'INFLUX_DATABASE': 'bucket0'}) def test_from_env(self): c = from_env() with c: From b1aeb69d55dbb4ea6f45967b64fd695fc6cae113 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 21 Apr 2025 09:46:26 +0700 Subject: [PATCH 06/18] feat: support env --- influxdb_client_3/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 329ad53..103a6f4 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -49,11 +49,14 @@ def file_parser_options(**kwargs): """ return kwargs + # Constants for environment variable names INFLUX_HOST = "INFLUX_HOST" INFLUX_TOKEN = "INFLUX_TOKEN" INFLUX_DATABASE = "INFLUX_DATABASE" INFLUX_ORG = "INFLUX_ORG" + + def from_env(**kwargs: Any) -> 'InfluxDBClient3': """ Create an instance of `InfluxDBClient3` using environment variables for configuration. @@ -97,7 +100,6 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3': ) - def _deep_merge(target, source): """ Performs a deep merge of dictionaries or lists, From c8075a28403ed17be730904f13cd05de27c4dbcf Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 21 Apr 2025 09:58:27 +0700 Subject: [PATCH 07/18] feat: support env --- tests/test_influxdb_client_3.py | 1 + tests/test_influxdb_client_3_integration.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 3d51dc6..0bc18ba 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -98,5 +98,6 @@ def test_from_env_missing_variables(self): from_env() self.assertIn("Missing required environment variables", str(context.exception)) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index e815855..9aa1cf1 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -4,7 +4,6 @@ import string import time import unittest -from unittest.mock import patch import pyarrow import pytest From b65b3fe37b719e6628ce248312f83b7dc222d7c8 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 22 Apr 2025 14:49:16 +0700 Subject: [PATCH 08/18] feat: support env --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4146ecf..b5c1e0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ ### Features -1. [#130](https://github.com/InfluxCommunity/influxdb3-python/pull/130): Remove org parameters from the example code because It is not mandatory in Influxdb3 +1. [#127](https://github.com/InfluxCommunity/influxdb3-python/pull/127): Support creating client from environment variables +2. [#130](https://github.com/InfluxCommunity/influxdb3-python/pull/130): Remove org parameters from the example code because It is not mandatory in Influxdb3 ## 0.12.0 [2025-03-26] From a54ebb1fda9b96ded496ef936511d8f5eb9d54f2 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 23 Apr 2025 14:37:27 +0700 Subject: [PATCH 09/18] feat: support env --- influxdb_client_3/__init__.py | 15 +++++++-- .../write_client/client/write_api.py | 20 +++++++++--- tests/test_influxdb_client_3.py | 31 +++++++++++++------ 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 103a6f4..ac5ccd8 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -10,7 +10,7 @@ from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point from influxdb_client_3.write_client.client.exceptions import InfluxDBError from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \ - PointSettings + PointSettings, WriteType from influxdb_client_3.write_client.domain.write_precision import WritePrecision polars = importlib.util.find_spec("polars") is not None @@ -55,9 +55,11 @@ def file_parser_options(**kwargs): INFLUX_TOKEN = "INFLUX_TOKEN" INFLUX_DATABASE = "INFLUX_DATABASE" INFLUX_ORG = "INFLUX_ORG" +INFLUX_PRECISION = "INFLUX_PRECISION" def from_env(**kwargs: Any) -> 'InfluxDBClient3': + """ Create an instance of `InfluxDBClient3` using environment variables for configuration. @@ -78,7 +80,7 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3': flight_client_options, SSL settings, etc. :return: An initialized `InfluxDBClient3` instance. :raises ValueError: If any required environment variables are not set. - """ + """ required_vars = { INFLUX_HOST: os.getenv(INFLUX_HOST), INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), @@ -91,10 +93,17 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3': org = os.getenv(INFLUX_ORG, "default") + write_client_option = None + if os.getenv(INFLUX_PRECISION) is not None: + write_client_option = default_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=os.getenv(INFLUX_PRECISION)) + ) + return InfluxDBClient3( host=required_vars[INFLUX_HOST], token=required_vars[INFLUX_TOKEN], database=required_vars[INFLUX_DATABASE], + write_client_options=write_client_option, org=org, **kwargs ) @@ -190,7 +199,7 @@ def __init__( self._database = database self._token = token self._write_client_options = write_client_options if write_client_options is not None \ - else default_client_options(write_options=SYNCHRONOUS) + else default_client_options(write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.NS)) # Parse the host input parsed_url = urllib.parse.urlparse(host) diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 2bcb612..ab7fe11 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -51,6 +51,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, max_retry_time=180_000, exponential_base=2, max_close_wait=300_000, + write_precision=DEFAULT_WRITE_PRECISION, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ Create write api configuration. @@ -66,7 +67,8 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param max_retry_delay: the maximum delay between each retry attempt in milliseconds :param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled :param exponential_base: base for the exponential retry delay - :parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called + :param max_close_wait: the maximum time to wait for writes to be flushed if close() is called + :param write_precision: the time precision for the data written to InfluxDB. :param write_scheduler: """ self.write_type = write_type @@ -80,6 +82,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.exponential_base = exponential_base self.write_scheduler = write_scheduler self.max_close_wait = max_close_wait + self.write_precision = write_precision def to_retry_strategy(self, **kwargs): """ @@ -290,7 +293,7 @@ def write(self, bucket: str, org: str = None, str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass'] ] = None, - write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any: + write_precision: WritePrecision = None, **kwargs) -> Any: """ Write time-series data into InfluxDB. @@ -360,7 +363,10 @@ def write(self, bucket: str, org: str = None, org = get_org_query_param(org=org, client=self._influxdb_client) self._append_default_tags(record) - + + if write_precision is None: + write_precision = self._write_options.write_precision + if self._write_options.write_type is WriteType.batching: return self._write_batching(bucket, org, record, write_precision, **kwargs) @@ -443,8 +449,11 @@ def __del__(self): pass def _write_batching(self, bucket, org, data, - precision=DEFAULT_WRITE_PRECISION, + precision=None, **kwargs): + if precision is None: + precision = self._write_options.write_precision + if isinstance(data, bytes): _key = _BatchItemKey(bucket, org, precision) self._subject.on_next(_BatchItem(key=_key, data=data)) @@ -454,7 +463,8 @@ def _write_batching(self, bucket, org, data, precision, **kwargs) elif isinstance(data, Point): - self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs) + write_precision = data.write_precision if data.write_precision is not None else precision + self._write_batching(bucket, org, data.to_line_protocol(), write_precision, **kwargs) elif isinstance(data, dict): self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs), diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 0bc18ba..f4eecc3 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,7 @@ import unittest from unittest.mock import patch -from influxdb_client_3 import InfluxDBClient3, from_env, write_client_options +from influxdb_client_3 import InfluxDBClient3, from_env, WritePrecision, WriteType from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData @@ -74,8 +74,27 @@ async def test_query_async(self): assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list + def test_default_client(self): + expected_precision = WritePrecision.NS + expected_write_type = WriteType.synchronous + + def verify_client_write_options(client): + write_options = client._write_client_options.get('write_options') + self.assertEqual(write_options.write_precision, expected_precision) + self.assertEqual(write_options.write_type, expected_write_type) + + self.assertEqual(client._write_api._write_options.write_precision, expected_precision) + self.assertEqual(client._write_api._write_options.write_type, expected_write_type) + + env_client = from_env() + verify_client_write_options(env_client) + + default_client = InfluxDBClient3() + verify_client_write_options(default_client) + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org'}) + 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS}) def test_from_env_all_env_vars_set(self): client = from_env() self.assertIsInstance(client, InfluxDBClient3) @@ -83,13 +102,7 @@ def test_from_env_all_env_vars_set(self): self.assertEqual(client._database, "test_db") self.assertEqual(client._org, "test_org") self.assertEqual(client._token, "test_token") - - def test_from_env_with_kargs(self): - client = from_env( - write_client_options=write_client_options(batch_size=10000), - ) - self.assertIsInstance(client, InfluxDBClient3) - self.assertEqual(client._write_client_options['batch_size'], 10000) + self.assertEqual(client._write_client_options.get("write_options").write_precision, WritePrecision.MS) @patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "", 'INFLUX_DATABASE': "", 'INFLUX_ORG': ""}) From 9c4708f27e9828a2bc3a63a643c7182df53ab092 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 23 Apr 2025 16:48:43 +0700 Subject: [PATCH 10/18] feat: support env --- influxdb_client_3/__init__.py | 4 ++++ tests/test_influxdb_client_3_integration.py | 15 +++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index ac5ccd8..0f3f668 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -56,6 +56,7 @@ def file_parser_options(**kwargs): INFLUX_DATABASE = "INFLUX_DATABASE" INFLUX_ORG = "INFLUX_ORG" INFLUX_PRECISION = "INFLUX_PRECISION" +INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" def from_env(**kwargs: Any) -> 'InfluxDBClient3': @@ -93,6 +94,9 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3': org = os.getenv(INFLUX_ORG, "default") + if os.getenv(INFLUX_AUTH_SCHEME) is not None: + kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME) + write_client_option = None if os.getenv(INFLUX_PRECISION) is not None: write_client_option = default_client_options( diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 9aa1cf1..1e807f7 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -4,6 +4,7 @@ import string import time import unittest +from unittest.mock import patch import pyarrow import pytest @@ -276,15 +277,21 @@ async def test_verify_query_async(self): assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list" def test_from_env(self): - c = from_env() - with c: + with from_env() as client: id_test = time.time_ns() - c.write(f"integration_test_python,type=used value=123.0,id_test={id_test}i") + client.write(f"integration_test_python,type=used value=123.0,id_test={id_test}i") sql = 'SELECT * FROM integration_test_python where type=$type and id_test=$id_test' - data = c.query(sql, mode="pandas", query_parameters={'type': 'used', 'id_test': id_test}) + data = client.query(sql, mode="pandas", query_parameters={'type': 'used', 'id_test': id_test}) self.assertIsNotNone(data) self.assertEqual(1, len(data)) self.assertEqual(id_test, data['id_test'][0]) self.assertEqual(123.0, data['value'][0]) + + @patch.dict('os.environ', {'INFLUX_AUTH_SCHEME': 'invalid_schema'}) + def test_from_env_invalid_auth_schema(self): + with from_env() as client: + with self.assertRaises(InfluxDBError) as err: + client.write("integration_test_python,type=used value=123.0") + self.assertEqual('unauthorized access', err.exception.message) From 10322560e9c231580ce636a4f8190cd70689a306 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 24 Apr 2025 08:31:41 +0700 Subject: [PATCH 11/18] feat: support env --- influxdb_client_3/__init__.py | 43 ++++++++++++++----- .../write_client/client/write_api.py | 9 ++++ tests/test_influxdb_client_3.py | 17 +++++--- 3 files changed, 53 insertions(+), 16 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 0f3f668..4f9aef7 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -10,7 +10,7 @@ from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point from influxdb_client_3.write_client.client.exceptions import InfluxDBError from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \ - PointSettings, WriteType + PointSettings, WriteType, DefaultWriteOptions from influxdb_client_3.write_client.domain.write_precision import WritePrecision polars = importlib.util.find_spec("polars") is not None @@ -57,6 +57,7 @@ def file_parser_options(**kwargs): INFLUX_ORG = "INFLUX_ORG" INFLUX_PRECISION = "INFLUX_PRECISION" INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" +INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD" def from_env(**kwargs: Any) -> 'InfluxDBClient3': @@ -92,16 +93,20 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3': if missing_vars: raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") - org = os.getenv(INFLUX_ORG, "default") + write_options = WriteOptions(write_type=WriteType.synchronous) + + if os.getenv(INFLUX_GZIP_THRESHOLD) is not None: + write_options.gzip_threshold = int(os.getenv(INFLUX_GZIP_THRESHOLD)) + + if os.getenv(INFLUX_PRECISION) is not None: + write_options.write_precision = os.getenv(INFLUX_PRECISION) + + write_client_option = {'write_options': write_options} if os.getenv(INFLUX_AUTH_SCHEME) is not None: kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME) - - write_client_option = None - if os.getenv(INFLUX_PRECISION) is not None: - write_client_option = default_client_options( - write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=os.getenv(INFLUX_PRECISION)) - ) + + org = os.getenv(INFLUX_ORG, "default") return InfluxDBClient3( host=required_vars[INFLUX_HOST], @@ -202,8 +207,26 @@ def __init__( self._org = org if org is not None else "default" self._database = database self._token = token - self._write_client_options = write_client_options if write_client_options is not None \ - else default_client_options(write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.NS)) + + write_type = DefaultWriteOptions['write_type'] + write_precision = DefaultWriteOptions['write_precision'] + gzip_threshold = DefaultWriteOptions['gzip_threshold'] + if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None: + write_opts = write_client_options['write_options'] + write_type = getattr(write_opts, 'write_type') + write_precision = getattr(write_opts, 'write_precision') + gzip_threshold = getattr(write_opts, 'gzip_threshold') + + write_options = WriteOptions( + write_type=write_type, + write_precision=write_precision, + gzip_threshold=gzip_threshold, + ) + + self._write_client_options = { + "write_options": write_options, + **(write_client_options or {}) + } # Parse the host input parsed_url = urllib.parse.urlparse(host) diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index ab7fe11..995293e 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -39,6 +39,13 @@ class WriteType(Enum): synchronous = 3 +DefaultWriteOptions = { + 'write_type': WriteType.synchronous, + 'write_precision': WritePrecision.NS, + 'gzip_threshold': 1000 +} + + class WriteOptions(object): """Write configuration.""" @@ -52,6 +59,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, exponential_base=2, max_close_wait=300_000, write_precision=DEFAULT_WRITE_PRECISION, + gzip_threshold=1000, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ Create write api configuration. @@ -83,6 +91,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.write_scheduler = write_scheduler self.max_close_wait = max_close_wait self.write_precision = write_precision + self.gzip_threshold = gzip_threshold def to_retry_strategy(self, **kwargs): """ diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index f4eecc3..e086589 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -78,13 +78,13 @@ def test_default_client(self): expected_precision = WritePrecision.NS expected_write_type = WriteType.synchronous - def verify_client_write_options(client): - write_options = client._write_client_options.get('write_options') + def verify_client_write_options(c): + write_options = c._write_client_options.get('write_options') self.assertEqual(write_options.write_precision, expected_precision) self.assertEqual(write_options.write_type, expected_write_type) - self.assertEqual(client._write_api._write_options.write_precision, expected_precision) - self.assertEqual(client._write_api._write_options.write_type, expected_write_type) + self.assertEqual(c._write_api._write_options.write_precision, expected_precision) + self.assertEqual(c._write_api._write_options.write_type, expected_write_type) env_client = from_env() verify_client_write_options(env_client) @@ -94,7 +94,8 @@ def verify_client_write_options(client): @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS}) + 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS, + 'INFLUX_GZIP_THRESHOLD': '2000'}) def test_from_env_all_env_vars_set(self): client = from_env() self.assertIsInstance(client, InfluxDBClient3) @@ -102,7 +103,11 @@ def test_from_env_all_env_vars_set(self): self.assertEqual(client._database, "test_db") self.assertEqual(client._org, "test_org") self.assertEqual(client._token, "test_token") - self.assertEqual(client._write_client_options.get("write_options").write_precision, WritePrecision.MS) + + write_options = client._write_client_options.get("write_options") + self.assertEqual(write_options.write_precision, WritePrecision.MS) + self.assertEqual(write_options.gzip_threshold, 2000) + @patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "", 'INFLUX_DATABASE': "", 'INFLUX_ORG': ""}) From e86914dc0459713504281680de6c2b3376a0838e Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 24 Apr 2025 16:28:23 +0700 Subject: [PATCH 12/18] feat: support env --- influxdb_client_3/__init__.py | 14 +++--- .../write_client/_sync/api_client.py | 10 +++++ .../write_client/client/_base.py | 9 ++-- .../write_client/client/influxdb_client.py | 4 +- .../write_client/client/write_api.py | 9 ++-- .../write_client/configuration.py | 5 +++ tests/test_api_client.py | 45 +++++++++++++++++++ tests/test_influxdb_client_3.py | 10 +++-- 8 files changed, 90 insertions(+), 16 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 4f9aef7..a15c454 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -96,7 +96,9 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3': write_options = WriteOptions(write_type=WriteType.synchronous) if os.getenv(INFLUX_GZIP_THRESHOLD) is not None: - write_options.gzip_threshold = int(os.getenv(INFLUX_GZIP_THRESHOLD)) + gzip_threshold = int(os.getenv(INFLUX_GZIP_THRESHOLD)) + write_options.enable_gzip = True + write_options.gzip_threshold = gzip_threshold if os.getenv(INFLUX_PRECISION) is not None: write_options.write_precision = os.getenv(INFLUX_PRECISION) @@ -105,7 +107,6 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3': if os.getenv(INFLUX_AUTH_SCHEME) is not None: kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME) - org = os.getenv(INFLUX_ORG, "default") return InfluxDBClient3( @@ -210,17 +211,18 @@ def __init__( write_type = DefaultWriteOptions['write_type'] write_precision = DefaultWriteOptions['write_precision'] - gzip_threshold = DefaultWriteOptions['gzip_threshold'] + gzip_threshold = None if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None: write_opts = write_client_options['write_options'] - write_type = getattr(write_opts, 'write_type') - write_precision = getattr(write_opts, 'write_precision') + write_type = getattr(write_opts, 'write_type', write_type) + write_precision = getattr(write_opts, 'write_precision', write_precision) gzip_threshold = getattr(write_opts, 'gzip_threshold') write_options = WriteOptions( write_type=write_type, write_precision=write_precision, gzip_threshold=gzip_threshold, + enable_gzip=kwargs.get('enable_gzip', False) ) self._write_client_options = { @@ -244,6 +246,8 @@ def __init__( url=f"{scheme}://{hostname}:{port}", token=self._token, org=self._org, + enable_gzip=write_options.enable_gzip, + gzip_threshold=write_options.gzip_threshold, **kwargs) self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options) diff --git a/influxdb_client_3/write_client/_sync/api_client.py b/influxdb_client_3/write_client/_sync/api_client.py index b72a840..27857e1 100644 --- a/influxdb_client_3/write_client/_sync/api_client.py +++ b/influxdb_client_3/write_client/_sync/api_client.py @@ -120,6 +120,10 @@ def __call_api( config = self.configuration self._signin(resource_path=resource_path) + gzip_threshold = config.gzip_threshold + enable_gzip = config.enable_gzip + self.should_compress = self.check_should_compress(body, gzip_threshold, enable_gzip) + # header parameters header_params = header_params or {} config.update_request_header_params(resource_path, header_params) @@ -192,6 +196,12 @@ def __call_api( return (return_data, response_data.status, response_data.getheaders()) + def check_should_compress(self, body: bytearray, gzip_threshold: int, enable_gzip: bool) -> bool: + body_size = len(body) + if enable_gzip is True or (enable_gzip is not False and (gzip_threshold and body_size >= gzip_threshold)): + return True + return False + def sanitize_for_serialization(self, obj): """Build a JSON POST object. diff --git a/influxdb_client_3/write_client/client/_base.py b/influxdb_client_3/write_client/client/_base.py index 32664d9..eb46ecb 100644 --- a/influxdb_client_3/write_client/client/_base.py +++ b/influxdb_client_3/write_client/client/_base.py @@ -34,7 +34,7 @@ # noinspection PyMethodMayBeStatic class _BaseClient(object): - def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, org: str = None, + def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, gzip_threshold=None, org: str = None, default_tags: dict = None, http_client_logger: str = None, **kwargs) -> None: self.url = url self.org = org @@ -47,6 +47,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or else: self.conf.host = self.url self.conf.enable_gzip = enable_gzip + self.conf.gzip_threshold = gzip_threshold self.conf.verify_ssl = kwargs.get('verify_ssl', True) self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None) self.conf.cert_file = kwargs.get('cert_file', None) @@ -271,12 +272,14 @@ class _Configuration(Configuration): def __init__(self): Configuration.__init__(self) self.enable_gzip = False + self.gzip_threshold = None + self.should_compress = False self.username = None self.password = None def update_request_header_params(self, path: str, params: dict): super().update_request_header_params(path, params) - if self.enable_gzip: + if self.should_compress: # GZIP Request if path == '/api/v2/write': params["Content-Encoding"] = "gzip" @@ -292,7 +295,7 @@ def update_request_header_params(self, path: str, params: dict): def update_request_body(self, path: str, body): _body = super().update_request_body(path, body) - if self.enable_gzip: + if self.should_compress: # GZIP Request if path == '/api/v2/write': import gzip diff --git a/influxdb_client_3/write_client/client/influxdb_client.py b/influxdb_client_3/write_client/client/influxdb_client.py index 3dd29af..e2bdbd6 100644 --- a/influxdb_client_3/write_client/client/influxdb_client.py +++ b/influxdb_client_3/write_client/client/influxdb_client.py @@ -13,7 +13,7 @@ class InfluxDBClient(_BaseClient): """InfluxDBClient is client for InfluxDB v2.""" - def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, org: str = None, + def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, gzip_threshold=None, org: str = None, default_tags: dict = None, **kwargs) -> None: """ Initialize defaults. @@ -50,7 +50,7 @@ def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gz :key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x :key list[str] profilers: list of enabled Flux profilers """ - super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, org=org, + super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, gzip_threshold=gzip_threshold, org=org, default_tags=default_tags, http_client_logger="urllib3", **kwargs) from influxdb_client_3.write_client._sync.api_client import ApiClient diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 995293e..e03068f 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -38,11 +38,12 @@ class WriteType(Enum): asynchronous = 2 synchronous = 3 +DEFAULT_GZIP_THRESHOLD = 1000 +#todo: convert to enum DefaultWriteOptions = { 'write_type': WriteType.synchronous, - 'write_precision': WritePrecision.NS, - 'gzip_threshold': 1000 + 'write_precision': WritePrecision.NS } @@ -59,7 +60,8 @@ def __init__(self, write_type: WriteType = WriteType.batching, exponential_base=2, max_close_wait=300_000, write_precision=DEFAULT_WRITE_PRECISION, - gzip_threshold=1000, + gzip_threshold=None, + enable_gzip=False, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ Create write api configuration. @@ -92,6 +94,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.max_close_wait = max_close_wait self.write_precision = write_precision self.gzip_threshold = gzip_threshold + self.enable_gzip = enable_gzip def to_retry_strategy(self, **kwargs): """ diff --git a/influxdb_client_3/write_client/configuration.py b/influxdb_client_3/write_client/configuration.py index 5793655..fb8592c 100644 --- a/influxdb_client_3/write_client/configuration.py +++ b/influxdb_client_3/write_client/configuration.py @@ -49,6 +49,7 @@ class Configuration(object, metaclass=TypeWithDefault): Ref: https://openapi-generator.tech Do not edit the class manually. """ + #todo: remove wrong document def __init__(self): """Initialize configuration.""" @@ -118,6 +119,10 @@ def __init__(self): # Safe chars for path_param self.safe_chars_for_path_param = '' + # Compression settings + self.enable_gzip = False + self.gzip_threshold = None + @property def logger_file(self): """Logger file. diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 9976cfb..33cdd5d 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -139,3 +139,48 @@ def test_api_error_headers(self): self.assertEqual(headers['Trace-Sampled'], 'false') self.assertEqual(headers['X-Influxdb-Request-Id'], requestid) self.assertEqual(headers['X-Influxdb-Build'], 'Mock') + + def test_check_should_compress_true(self): + conf = Configuration() + client = ApiClient(conf) + + body = bytearray("12345678901234567890".encode("utf-8")) # len = 20 + tests = [ + { + 'gzip_threshold': 10, + 'enable_gzip': True, + 'expected': True + }, + { + 'gzip_threshold': 30, + 'enable_gzip': True, + 'expected': True + }, + { + 'gzip_threshold': None, + 'enable_gzip': True, + 'expected': True + }, + { + 'gzip_threshold': 30, + 'enable_gzip': None, + 'expected': False + }, + { + 'gzip_threshold': 30, + 'enable_gzip': False, + 'expected': False + }, + { + 'gzip_threshold': 10, + 'enable_gzip': None, + 'expected': True + }, + ] + + for test in tests: + gzip_threshold = test['gzip_threshold'] + enable_gzip = test['enable_gzip'] + expected = test['expected'] + result = client.check_should_compress(body, gzip_threshold, enable_gzip) + self.assertEqual(result, expected) \ No newline at end of file diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index e086589..b1411ba 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,7 @@ import unittest from unittest.mock import patch -from influxdb_client_3 import InfluxDBClient3, from_env, WritePrecision, WriteType +from influxdb_client_3 import InfluxDBClient3, from_env, WritePrecision, DefaultWriteOptions from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData @@ -75,13 +75,17 @@ async def test_query_async(self): assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list def test_default_client(self): - expected_precision = WritePrecision.NS - expected_write_type = WriteType.synchronous + expected_precision = DefaultWriteOptions['write_precision'] + expected_write_type = DefaultWriteOptions['write_type'] + expected_gzip_threshold = None + expected_gzip_enabled = False def verify_client_write_options(c): write_options = c._write_client_options.get('write_options') self.assertEqual(write_options.write_precision, expected_precision) self.assertEqual(write_options.write_type, expected_write_type) + self.assertEqual(write_options.gzip_threshold, expected_gzip_threshold) + self.assertEqual(write_options.enable_gzip, expected_gzip_enabled) self.assertEqual(c._write_api._write_options.write_precision, expected_precision) self.assertEqual(c._write_api._write_options.write_type, expected_write_type) From c9cf3bc2b8ddeb806447c87f0f9860bf5272292f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 24 Apr 2025 16:32:18 +0700 Subject: [PATCH 13/18] feat: remove open aoi documents --- .../write_client/_sync/api_client.py | 20 +----------------- influxdb_client_3/write_client/_sync/rest.py | 20 ------------------ .../write_client/configuration.py | 21 ------------------- .../write_client/domain/write_precision.py | 16 -------------- influxdb_client_3/write_client/rest.py | 14 ------------- .../write_client/service/signin_service.py | 16 -------------- .../write_client/service/signout_service.py | 16 -------------- .../write_client/service/write_service.py | 16 -------------- 8 files changed, 1 insertion(+), 138 deletions(-) diff --git a/influxdb_client_3/write_client/_sync/api_client.py b/influxdb_client_3/write_client/_sync/api_client.py index 27857e1..98536bf 100644 --- a/influxdb_client_3/write_client/_sync/api_client.py +++ b/influxdb_client_3/write_client/_sync/api_client.py @@ -1,12 +1,4 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" from __future__ import absolute_import @@ -28,17 +20,7 @@ class ApiClient(object): - """Generic API client for OpenAPI client library Build. - - OpenAPI generic API client. This client handles the client- - server communication, and is invariant across implementations. Specifics of - the methods and models for each application are generated from the OpenAPI - templates. - - NOTE: This class is auto generated by OpenAPI Generator. - Ref: https://openapi-generator.tech - Do not edit the class manually. - + """ :param configuration: .Configuration object for this client :param header_name: a header to pass when making calls to the API. :param header_value: a header value to pass when making calls to diff --git a/influxdb_client_3/write_client/_sync/rest.py b/influxdb_client_3/write_client/_sync/rest.py index 09da73a..f4d5299 100644 --- a/influxdb_client_3/write_client/_sync/rest.py +++ b/influxdb_client_3/write_client/_sync/rest.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import io @@ -28,11 +18,6 @@ class RESTResponse(io.IOBase): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(self, resp): """Initialize with HTTP response.""" @@ -51,11 +36,6 @@ def getheader(self, name, default=None): class RESTClientObject(object): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(self, configuration, pools_size=4, maxsize=None, retries=False): """Initialize REST client.""" diff --git a/influxdb_client_3/write_client/configuration.py b/influxdb_client_3/write_client/configuration.py index fb8592c..0e4c54e 100644 --- a/influxdb_client_3/write_client/configuration.py +++ b/influxdb_client_3/write_client/configuration.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import copy @@ -21,11 +11,6 @@ class TypeWithDefault(type): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(cls, name, bases, dct): """Initialize with defaults.""" @@ -44,12 +29,6 @@ def set_default(cls, default): class Configuration(object, metaclass=TypeWithDefault): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ - #todo: remove wrong document def __init__(self): """Initialize configuration.""" diff --git a/influxdb_client_3/write_client/domain/write_precision.py b/influxdb_client_3/write_client/domain/write_precision.py index 41a0db9..4917201 100644 --- a/influxdb_client_3/write_client/domain/write_precision.py +++ b/influxdb_client_3/write_client/domain/write_precision.py @@ -1,26 +1,10 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - import pprint import re # noqa: F401 class WritePrecision(object): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - - Do not edit the class manually. - """ """ allowed enum values diff --git a/influxdb_client_3/write_client/rest.py b/influxdb_client_3/write_client/rest.py index 54f9e5c..e9a4652 100644 --- a/influxdb_client_3/write_client/rest.py +++ b/influxdb_client_3/write_client/rest.py @@ -1,14 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - from __future__ import absolute_import import logging @@ -21,11 +12,6 @@ class ApiException(InfluxDBError): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(self, status=None, reason=None, http_resp=None): """Initialize with HTTP response.""" diff --git a/influxdb_client_3/write_client/service/signin_service.py b/influxdb_client_3/write_client/service/signin_service.py index 97f3fcf..ae64b3b 100644 --- a/influxdb_client_3/write_client/service/signin_service.py +++ b/influxdb_client_3/write_client/service/signin_service.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import re # noqa: F401 @@ -18,12 +8,6 @@ class SigninService(_BaseService): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - - Do not edit the class manually. - """ def __init__(self, api_client=None): # noqa: E501,D401,D403 """SigninService - a operation defined in OpenAPI.""" diff --git a/influxdb_client_3/write_client/service/signout_service.py b/influxdb_client_3/write_client/service/signout_service.py index fdf8b4d..c7dbdbc 100644 --- a/influxdb_client_3/write_client/service/signout_service.py +++ b/influxdb_client_3/write_client/service/signout_service.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import re # noqa: F401 @@ -18,12 +8,6 @@ class SignoutService(_BaseService): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - - Do not edit the class manually. - """ def __init__(self, api_client=None): # noqa: E501,D401,D403 """SignoutService - a operation defined in OpenAPI.""" diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index 23dc791..b9e0b0b 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import re # noqa: F401 @@ -18,12 +8,6 @@ class WriteService(_BaseService): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - - Do not edit the class manually. - """ def __init__(self, api_client=None): # noqa: E501,D401,D403 """WriteService - a operation defined in OpenAPI.""" From 9daa275e8d90b03e22ec9726f04cc80245ac13ac Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 24 Apr 2025 16:47:49 +0700 Subject: [PATCH 14/18] feat: refactor to enum --- influxdb_client_3/__init__.py | 4 ++-- influxdb_client_3/write_client/client/write_api.py | 12 ++++++++---- tests/test_influxdb_client_3.py | 4 ++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index a15c454..ce18f33 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -209,8 +209,8 @@ def __init__( self._database = database self._token = token - write_type = DefaultWriteOptions['write_type'] - write_precision = DefaultWriteOptions['write_precision'] + write_type = DefaultWriteOptions.write_type.value + write_precision = DefaultWriteOptions.write_precision.value gzip_threshold = None if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None: write_opts = write_client_options['write_options'] diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index e03068f..ce5f9c7 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -41,10 +41,14 @@ class WriteType(Enum): DEFAULT_GZIP_THRESHOLD = 1000 #todo: convert to enum -DefaultWriteOptions = { - 'write_type': WriteType.synchronous, - 'write_precision': WritePrecision.NS -} +# DefaultWriteOptions = { +# 'write_type': WriteType.synchronous, +# 'write_precision': WritePrecision.NS +# } +class DefaultWriteOptions(Enum): + write_type = WriteType.synchronous + write_precision = WritePrecision.NS + class WriteOptions(object): diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index b1411ba..efee991 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -75,8 +75,8 @@ async def test_query_async(self): assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list def test_default_client(self): - expected_precision = DefaultWriteOptions['write_precision'] - expected_write_type = DefaultWriteOptions['write_type'] + expected_precision = DefaultWriteOptions.write_precision.value + expected_write_type = DefaultWriteOptions.write_type.value expected_gzip_threshold = None expected_gzip_enabled = False From 859264c3eb2171fb57a9002f6818cf7f2a6fa24b Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 24 Apr 2025 17:02:29 +0700 Subject: [PATCH 15/18] feat: refactor to enum --- influxdb_client_3/__init__.py | 5 +++-- influxdb_client_3/write_client/_sync/api_client.py | 2 +- .../write_client/client/influxdb_client.py | 8 ++++++-- influxdb_client_3/write_client/client/write_api.py | 14 +++++--------- tests/test_api_client.py | 5 +++-- tests/test_influxdb_client_3.py | 5 ++--- 6 files changed, 20 insertions(+), 19 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index ce18f33..1061db6 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -61,7 +61,7 @@ def file_parser_options(**kwargs): def from_env(**kwargs: Any) -> 'InfluxDBClient3': - + """ Create an instance of `InfluxDBClient3` using environment variables for configuration. @@ -82,7 +82,8 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3': flight_client_options, SSL settings, etc. :return: An initialized `InfluxDBClient3` instance. :raises ValueError: If any required environment variables are not set. - """ + """ + required_vars = { INFLUX_HOST: os.getenv(INFLUX_HOST), INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), diff --git a/influxdb_client_3/write_client/_sync/api_client.py b/influxdb_client_3/write_client/_sync/api_client.py index 98536bf..c95dd88 100644 --- a/influxdb_client_3/write_client/_sync/api_client.py +++ b/influxdb_client_3/write_client/_sync/api_client.py @@ -105,7 +105,7 @@ def __call_api( gzip_threshold = config.gzip_threshold enable_gzip = config.enable_gzip self.should_compress = self.check_should_compress(body, gzip_threshold, enable_gzip) - + # header parameters header_params = header_params or {} config.update_request_header_params(resource_path, header_params) diff --git a/influxdb_client_3/write_client/client/influxdb_client.py b/influxdb_client_3/write_client/client/influxdb_client.py index e2bdbd6..1eecfe0 100644 --- a/influxdb_client_3/write_client/client/influxdb_client.py +++ b/influxdb_client_3/write_client/client/influxdb_client.py @@ -13,7 +13,9 @@ class InfluxDBClient(_BaseClient): """InfluxDBClient is client for InfluxDB v2.""" - def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, gzip_threshold=None, org: str = None, + def __init__(self, url, token: str = None, + debug=None, timeout=10_000, + enable_gzip=False, gzip_threshold=None, org: str = None, default_tags: dict = None, **kwargs) -> None: """ Initialize defaults. @@ -50,7 +52,9 @@ def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gz :key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x :key list[str] profilers: list of enabled Flux profilers """ - super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, gzip_threshold=gzip_threshold, org=org, + + super().__init__(url=url, token=token, debug=debug, timeout=timeout, + enable_gzip=enable_gzip, gzip_threshold=gzip_threshold, org=org, default_tags=default_tags, http_client_logger="urllib3", **kwargs) from influxdb_client_3.write_client._sync.api_client import ApiClient diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index ce5f9c7..b31489a 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -38,19 +38,15 @@ class WriteType(Enum): asynchronous = 2 synchronous = 3 + DEFAULT_GZIP_THRESHOLD = 1000 -#todo: convert to enum -# DefaultWriteOptions = { -# 'write_type': WriteType.synchronous, -# 'write_precision': WritePrecision.NS -# } + class DefaultWriteOptions(Enum): write_type = WriteType.synchronous write_precision = WritePrecision.NS - class WriteOptions(object): """Write configuration.""" @@ -379,10 +375,10 @@ def write(self, bucket: str, org: str = None, org = get_org_query_param(org=org, client=self._influxdb_client) self._append_default_tags(record) - + if write_precision is None: write_precision = self._write_options.write_precision - + if self._write_options.write_type is WriteType.batching: return self._write_batching(bucket, org, record, write_precision, **kwargs) @@ -469,7 +465,7 @@ def _write_batching(self, bucket, org, data, **kwargs): if precision is None: precision = self._write_options.write_precision - + if isinstance(data, bytes): _key = _BatchItemKey(bucket, org, precision) self._subject.on_next(_BatchItem(key=_key, data=data)) diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 33cdd5d..66e1c3e 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -144,7 +144,8 @@ def test_check_should_compress_true(self): conf = Configuration() client = ApiClient(conf) - body = bytearray("12345678901234567890".encode("utf-8")) # len = 20 + # len of body = 20 + body = bytearray("12345678901234567890".encode("utf-8")) tests = [ { 'gzip_threshold': 10, @@ -183,4 +184,4 @@ def test_check_should_compress_true(self): enable_gzip = test['enable_gzip'] expected = test['expected'] result = client.check_should_compress(body, gzip_threshold, enable_gzip) - self.assertEqual(result, expected) \ No newline at end of file + self.assertEqual(result, expected) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index efee991..b6129f0 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -96,9 +96,9 @@ def verify_client_write_options(c): default_client = InfluxDBClient3() verify_client_write_options(default_client) - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS, + 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', + 'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_GZIP_THRESHOLD': '2000'}) def test_from_env_all_env_vars_set(self): client = from_env() @@ -112,7 +112,6 @@ def test_from_env_all_env_vars_set(self): self.assertEqual(write_options.write_precision, WritePrecision.MS) self.assertEqual(write_options.gzip_threshold, 2000) - @patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "", 'INFLUX_DATABASE': "", 'INFLUX_ORG': ""}) def test_from_env_missing_variables(self): From e7ccf9a59d42218eb914d12770803f66d3fe1e5f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 29 Apr 2025 08:54:48 +0700 Subject: [PATCH 16/18] wip --- influxdb_client_3/__init__.py | 170 ++++++++++-------- .../write_client/client/influxdb_client.py | 3 + tests/test_influxdb_client_3.py | 40 ++++- tests/test_influxdb_client_3_integration.py | 6 +- 4 files changed, 142 insertions(+), 77 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 1061db6..7ddb07e 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -15,6 +15,13 @@ polars = importlib.util.find_spec("polars") is not None +INFLUX_HOST = "INFLUX_HOST" +INFLUX_TOKEN = "INFLUX_TOKEN" +INFLUX_DATABASE = "INFLUX_DATABASE" +INFLUX_ORG = "INFLUX_ORG" +INFLUX_PRECISION = "INFLUX_PRECISION" +INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" +INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD" def write_client_options(**kwargs): """ @@ -50,76 +57,6 @@ def file_parser_options(**kwargs): return kwargs -# Constants for environment variable names -INFLUX_HOST = "INFLUX_HOST" -INFLUX_TOKEN = "INFLUX_TOKEN" -INFLUX_DATABASE = "INFLUX_DATABASE" -INFLUX_ORG = "INFLUX_ORG" -INFLUX_PRECISION = "INFLUX_PRECISION" -INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" -INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD" - - -def from_env(**kwargs: Any) -> 'InfluxDBClient3': - - """ - Create an instance of `InfluxDBClient3` using environment variables for configuration. - - This function retrieves and validates the following required environment variables: - - `INFLUX_HOST`: The hostname or IP address of the InfluxDB server. - - `INFLUX_TOKEN`: The authentication token used for accessing the server. - - `INFLUX_DATABASE`: The default database for the client operations. - And optional environment variable: - - `INFLUX_ORG`: The organization associated with InfluxDB operations. - Defaults to "default" if not set. - - If any of the required environment variables are not set, a ValueError will be - raised with details about the missing variables. - - :param kwargs: Additional keyword arguments that will be passed to the - `InfluxDBClient3` constructor for customization. This allows for - configuring specific client behaviors like write_client_options, - flight_client_options, SSL settings, etc. - :return: An initialized `InfluxDBClient3` instance. - :raises ValueError: If any required environment variables are not set. - """ - - required_vars = { - INFLUX_HOST: os.getenv(INFLUX_HOST), - INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), - INFLUX_DATABASE: os.getenv(INFLUX_DATABASE) - } - - missing_vars = [var for var, value in required_vars.items() if value is None or value == ""] - if missing_vars: - raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") - - write_options = WriteOptions(write_type=WriteType.synchronous) - - if os.getenv(INFLUX_GZIP_THRESHOLD) is not None: - gzip_threshold = int(os.getenv(INFLUX_GZIP_THRESHOLD)) - write_options.enable_gzip = True - write_options.gzip_threshold = gzip_threshold - - if os.getenv(INFLUX_PRECISION) is not None: - write_options.write_precision = os.getenv(INFLUX_PRECISION) - - write_client_option = {'write_options': write_options} - - if os.getenv(INFLUX_AUTH_SCHEME) is not None: - kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME) - org = os.getenv(INFLUX_ORG, "default") - - return InfluxDBClient3( - host=required_vars[INFLUX_HOST], - token=required_vars[INFLUX_TOKEN], - database=required_vars[INFLUX_DATABASE], - write_client_options=write_client_option, - org=org, - **kwargs - ) - - def _deep_merge(target, source): """ Performs a deep merge of dictionaries or lists, @@ -156,6 +93,36 @@ def _merge_options(defaults, exclude_keys=None, custom=None): return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys}) +def _parse_precision(precision): + """ + Parse and validate precision value. + + :param precision: Precision value to validate + :return: Validated precision value + :raises ValueError: If precision is invalid + """ + if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]: + raise ValueError(f"Invalid precision value: {precision}") + return precision + + +def _parse_gzip_threshold(threshold): + """ + Parse and validate gzip threshold value. + + :param threshold: Threshold value to validate + :return: Validated threshold value + :raises ValueError: If threshold is invalid + """ + try: + threshold = int(threshold) + except (TypeError, ValueError): + raise ValueError(f"Invalid threshold value: {threshold}. Must be integer.") + if threshold < 0: + raise ValueError(f"Invalid threshold value: {threshold}. Must be non-negative.") + return threshold + + class InfluxDBClient3: def __init__( self, @@ -272,6 +239,69 @@ def __init__( flight_client_options=flight_client_options, proxy=kwargs.get("proxy", None), options=q_opts_builder.build()) + + @classmethod + def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': + + """ + Create an instance of `InfluxDBClient3` using environment variables for configuration. + + This function retrieves and validates the following required environment variables: + - `INFLUX_HOST`: The hostname or IP address of the InfluxDB server. + - `INFLUX_TOKEN`: The authentication token used for accessing the server. + - `INFLUX_DATABASE`: The default database for the client operations. + And optional environment variable: + - `INFLUX_ORG`: The organization associated with InfluxDB operations. + Defaults to "default" if not set. + + If any of the required environment variables are not set, a ValueError will be + raised with details about the missing variables. + + :param kwargs: Additional keyword arguments that will be passed to the + `InfluxDBClient3` constructor for customization. This allows for + configuring specific client behaviors like write_client_options, + flight_client_options, SSL settings, etc. + :return: An initialized `InfluxDBClient3` instance. + :raises ValueError: If any required environment variables are not set. + """ + + required_vars = { + INFLUX_HOST: os.getenv(INFLUX_HOST), + INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), + INFLUX_DATABASE: os.getenv(INFLUX_DATABASE) + } + + missing_vars = [var for var, value in required_vars.items() if value is None or value == ""] + if missing_vars: + raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") + + write_options = WriteOptions(write_type=WriteType.synchronous) + + gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD) + if gzip_threshold is not None: + write_options.gzip_threshold = _parse_gzip_threshold(gzip_threshold) + write_options.enable_gzip = True + + precision = os.getenv(INFLUX_PRECISION) + if precision is not None: + write_options.write_precision = _parse_precision(precision) + + write_client_option = {'write_options': write_options} + + if os.getenv(INFLUX_AUTH_SCHEME) is not None: + kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME) + org = os.getenv(INFLUX_ORG, "default") + + return InfluxDBClient3( + host=required_vars[INFLUX_HOST], + token=required_vars[INFLUX_TOKEN], + database=required_vars[INFLUX_DATABASE], + write_client_options=write_client_option, + org=org, + **kwargs + ) + + def write(self, record=None, database=None, **kwargs): """ Write data to InfluxDB. diff --git a/influxdb_client_3/write_client/client/influxdb_client.py b/influxdb_client_3/write_client/client/influxdb_client.py index 1eecfe0..6dab859 100644 --- a/influxdb_client_3/write_client/client/influxdb_client.py +++ b/influxdb_client_3/write_client/client/influxdb_client.py @@ -4,6 +4,8 @@ import logging +from typing_extensions import deprecated + from influxdb_client_3.write_client.client._base import _BaseClient from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions, PointSettings @@ -171,6 +173,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz return InfluxDBClient._from_config_file(config_file=config_file, debug=debug, enable_gzip=enable_gzip, **kwargs) @classmethod + @deprecated("Use up to date Env Properties") def from_env_properties(cls, debug=None, enable_gzip=False, **kwargs): """ Configure client via environment properties. diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index b6129f0..02a9623 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,7 @@ import unittest from unittest.mock import patch -from influxdb_client_3 import InfluxDBClient3, from_env, WritePrecision, DefaultWriteOptions +from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData @@ -90,7 +90,7 @@ def verify_client_write_options(c): self.assertEqual(c._write_api._write_options.write_precision, expected_precision) self.assertEqual(c._write_api._write_options.write_type, expected_write_type) - env_client = from_env() + env_client = InfluxDBClient3.from_env() verify_client_write_options(env_client) default_client = InfluxDBClient3() @@ -101,7 +101,7 @@ def verify_client_write_options(c): 'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_GZIP_THRESHOLD': '2000'}) def test_from_env_all_env_vars_set(self): - client = from_env() + client = InfluxDBClient3.from_env() self.assertIsInstance(client, InfluxDBClient3) self.assertEqual(client._client.url, "https://localhost:443") self.assertEqual(client._database, "test_db") @@ -116,9 +116,41 @@ def test_from_env_all_env_vars_set(self): 'INFLUX_DATABASE': "", 'INFLUX_ORG': ""}) def test_from_env_missing_variables(self): with self.assertRaises(ValueError) as context: - from_env() + InfluxDBClient3.from_env() self.assertIn("Missing required environment variables", str(context.exception)) + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': WritePrecision.MS}) + def test_parse_valid_write_precision(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + self.assertEqual(client._write_client_options.get('write_options').write_precision, WritePrecision.MS) + + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': 'invalid'}) + def test_parse_invalid_write_precision(self): + with self.assertRaises(ValueError) as context: + InfluxDBClient3.from_env() + self.assertIn("Invalid precision value: invalid", str(context.exception)) + + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_GZIP_THRESHOLD': '2000'}) + def test_parse_valid_gzip_threshold(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + self.assertEqual(client._write_client_options.get('write_options').gzip_threshold, 2000) + + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_GZIP_THRESHOLD': 'invalid'}) + def test_parse_invalid_gzip_threshold(self): + with self.assertRaises(ValueError) as context: + InfluxDBClient3.from_env() + self.assertIn("Must be integer", str(context.exception)) + + if __name__ == '__main__': unittest.main() diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 1e807f7..a13b3a1 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -10,7 +10,7 @@ import pytest from pyarrow._flight import FlightError -from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions, from_env +from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions from tests.util import asyncio_run, lp_to_py_object @@ -277,7 +277,7 @@ async def test_verify_query_async(self): assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list" def test_from_env(self): - with from_env() as client: + with InfluxDBClient3.from_env() as client: id_test = time.time_ns() client.write(f"integration_test_python,type=used value=123.0,id_test={id_test}i") @@ -291,7 +291,7 @@ def test_from_env(self): @patch.dict('os.environ', {'INFLUX_AUTH_SCHEME': 'invalid_schema'}) def test_from_env_invalid_auth_schema(self): - with from_env() as client: + with InfluxDBClient3.from_env() as client: with self.assertRaises(InfluxDBError) as err: client.write("integration_test_python,type=used value=123.0") self.assertEqual('unauthorized access', err.exception.message) From f05355d94dc4695d5ef7c84888333fed50a420cb Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 29 Apr 2025 16:25:34 +0700 Subject: [PATCH 17/18] wip --- influxdb_client_3/__init__.py | 47 +++++++++++++++++++++++++++++++-- tests/test_influxdb_client_3.py | 34 +++++++++++++++++------- tests/test_polars.py | 2 ++ 3 files changed, 72 insertions(+), 11 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 7ddb07e..9ce9b44 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -22,6 +22,15 @@ INFLUX_PRECISION = "INFLUX_PRECISION" INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD" +INFLUX_TIMEOUT = "INFLUX_TIMEOUT" +INFLUX_VERIFY_SSL = "INFLUX_VERIFY_SSL" +INFLUX_SSL_CA_CERT = "INFLUX_SSL_CA_CERT" +INFLUX_CERT_FILE = "INFLUX_CERT_FILE" +INFLUX_CERT_KEY_FILE = "INFLUX_CERT_KEY_FILE" +INFLUX_CERT_KEY_PASSWORD = "INFLUX_CERT_KEY_PASSWORD" +INFLUX_CONNECTION_POOL_MAXSIZE = "INFLUX_CONNECTION_POOL_MAXSIZE" +INFLUX_PROFILERS = "INFLUX_PROFILERS" +INFLUX_TAG = "INFLUX_TAG" def write_client_options(**kwargs): """ @@ -270,7 +279,6 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), INFLUX_DATABASE: os.getenv(INFLUX_DATABASE) } - missing_vars = [var for var, value in required_vars.items() if value is None or value == ""] if missing_vars: raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") @@ -290,8 +298,43 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': if os.getenv(INFLUX_AUTH_SCHEME) is not None: kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME) - org = os.getenv(INFLUX_ORG, "default") + timeout = os.getenv(INFLUX_TIMEOUT) + if timeout is not None: + kwargs['timeout'] = int(timeout) + + ssl_ca_cert = os.getenv(INFLUX_SSL_CA_CERT) + if ssl_ca_cert is not None: + kwargs['ssl_ca_cert'] = ssl_ca_cert + + cert_file = os.getenv(INFLUX_CERT_FILE) + if cert_file is not None: + kwargs['cert_file'] = cert_file + + cert_key_file = os.getenv(INFLUX_CERT_KEY_FILE) + if cert_key_file is not None: + kwargs['cert_key_file'] = cert_key_file + + cert_key_password = os.getenv(INFLUX_CERT_KEY_PASSWORD) + if cert_key_password is not None: + kwargs['cert_key_password'] = cert_key_password + + connection_pool_maxsize = os.getenv(INFLUX_CONNECTION_POOL_MAXSIZE) + if connection_pool_maxsize is not None: + kwargs['connection_pool_maxsize'] = int(connection_pool_maxsize) + + profilers = os.getenv(INFLUX_PROFILERS) + if profilers is not None: + kwargs['profilers'] = [x.strip() for x in profilers.split(',')] + + default_tags = dict() + for key, value in os.environ.items(): + if key.startswith("{0}_".format(INFLUX_TAG)): + default_tags[key[11:].lower()] = value + kwargs['default_tags'] = default_tags + + kwargs['verify_ssl'] = bool(os.getenv(INFLUX_VERIFY_SSL, 'True').lower() in ['True', 'true']) + org = os.getenv(INFLUX_ORG, "default") return InfluxDBClient3( host=required_vars[INFLUX_HOST], token=required_vars[INFLUX_TOKEN], diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 02a9623..9892d8b 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -98,8 +98,12 @@ def verify_client_write_options(c): @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', - 'INFLUX_PRECISION': WritePrecision.MS, - 'INFLUX_GZIP_THRESHOLD': '2000'}) + 'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_GZIP_THRESHOLD': '2000', + 'INFLUX_TIMEOUT': '6000', 'INFLUX_VERIFY_SSL': 'False', + 'INFLUX_CERT_FILE': 'path_to_cert', 'INFLUX_CERT_KEY_FILE': 'path_to_cert_key', + 'INFLUX_CERT_KEY_PASSWORD': 'cert_key_password', 'INFLUX_CONNECTION_POOL_MAXSIZE': '200', + 'INFLUX_PROFILERS': 'prof1,prof2, prof3', 'INFLUX_TAG_TAG1': 'Tag1', + 'INFLUX_TAG_TAG2': 'Tag2'}) def test_from_env_all_env_vars_set(self): client = InfluxDBClient3.from_env() self.assertIsInstance(client, InfluxDBClient3) @@ -107,10 +111,26 @@ def test_from_env_all_env_vars_set(self): self.assertEqual(client._database, "test_db") self.assertEqual(client._org, "test_org") self.assertEqual(client._token, "test_token") - write_options = client._write_client_options.get("write_options") self.assertEqual(write_options.write_precision, WritePrecision.MS) self.assertEqual(write_options.gzip_threshold, 2000) + self.assertEqual(client._client.conf.verify_ssl, False) + self.assertEqual(client._client.conf.cert_file, 'path_to_cert') + self.assertEqual(client._client.conf.cert_key_file, 'path_to_cert_key') + self.assertEqual(client._client.conf.cert_key_password, 'cert_key_password') + self.assertEqual(client._client.conf.connection_pool_maxsize, 200) + self.assertEqual(client._client.conf.timeout, 6000) + self.assertEqual(client._client.profilers, ['prof1', 'prof2', 'prof3']) + self.assertEqual(client._client.default_tags['tag1'], 'Tag1') + self.assertEqual(client._client.default_tags['tag2'], 'Tag2') + client._write_api._point_settings = {} + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_SSL_CA_CERT': 'invalid/path'}) + def test_from_env_invalid_ssl_cert(self): + with self.assertRaises(FileNotFoundError) as context: + InfluxDBClient3.from_env() + self.assertIn("No such file or directory: 'invalid/path'", str(context.exception)) @patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "", 'INFLUX_DATABASE': "", 'INFLUX_ORG': ""}) @@ -119,7 +139,6 @@ def test_from_env_missing_variables(self): InfluxDBClient3.from_env() self.assertIn("Missing required environment variables", str(context.exception)) - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': WritePrecision.MS}) def test_parse_valid_write_precision(self): @@ -127,14 +146,12 @@ def test_parse_valid_write_precision(self): self.assertIsInstance(client, InfluxDBClient3) self.assertEqual(client._write_client_options.get('write_options').write_precision, WritePrecision.MS) - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': 'invalid'}) + 'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': 'invalid_value'}) def test_parse_invalid_write_precision(self): with self.assertRaises(ValueError) as context: InfluxDBClient3.from_env() - self.assertIn("Invalid precision value: invalid", str(context.exception)) - + self.assertIn("Invalid precision value: invalid_value", str(context.exception)) @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db', 'INFLUX_GZIP_THRESHOLD': '2000'}) @@ -143,7 +160,6 @@ def test_parse_valid_gzip_threshold(self): self.assertIsInstance(client, InfluxDBClient3) self.assertEqual(client._write_client_options.get('write_options').gzip_threshold, 2000) - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db', 'INFLUX_GZIP_THRESHOLD': 'invalid'}) def test_parse_invalid_gzip_threshold(self): diff --git a/tests/test_polars.py b/tests/test_polars.py index 435b4a2..dfd7c34 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -43,6 +43,7 @@ def setUp(self): database="my_db", token="my_token" ) + self.client._write_api._point_settings = PointSettings() def test_write_polars(self): import polars as pl @@ -77,6 +78,7 @@ def test_write_polars_batching(self): write_options=WriteOptions(batch_size=2) ) ) + self.client._write_api._point_settings = PointSettings() self.client._write_api._write_options = WriteOptions(batch_size=2) self.client._write_api._write_service = Mock(spec=WriteService) From ec8813cb57e1fef2e03c80c43cfd71972865f9bf Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 29 Apr 2025 17:04:14 +0700 Subject: [PATCH 18/18] wip --- influxdb_client_3/__init__.py | 79 +++++++++++-------- .../write_client/_sync/api_client.py | 21 +++++ .../write_client/client/_base.py | 44 +++++++++++ .../write_client/client/influxdb_client.py | 4 +- .../write_client/client/write_api.py | 2 + 5 files changed, 118 insertions(+), 32 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 9ce9b44..dbbc582 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -32,6 +32,7 @@ INFLUX_PROFILERS = "INFLUX_PROFILERS" INFLUX_TAG = "INFLUX_TAG" + def write_client_options(**kwargs): """ Function for providing additional arguments for the WriteApi client. @@ -104,11 +105,19 @@ def _merge_options(defaults, exclude_keys=None, custom=None): def _parse_precision(precision): """ - Parse and validate precision value. - - :param precision: Precision value to validate - :return: Validated precision value - :raises ValueError: If precision is invalid + Parses the precision value and ensures it is valid. + + This function checks that the given `precision` is one of the allowed + values defined in `WritePrecision`. If the precision is invalid, it + raises a `ValueError`. The function returns the valid precision value + if it passes validation. + + :param precision: The precision value to be validated. + Must be one of WritePrecision.NS, WritePrecision.MS, + WritePrecision.S, or WritePrecision.US. + :return: The valid precision value. + :rtype: WritePrecision + :raises ValueError: If the provided precision is not valid. """ if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]: raise ValueError(f"Invalid precision value: {precision}") @@ -117,11 +126,18 @@ def _parse_precision(precision): def _parse_gzip_threshold(threshold): """ - Parse and validate gzip threshold value. - - :param threshold: Threshold value to validate - :return: Validated threshold value - :raises ValueError: If threshold is invalid + Parses and validates the provided threshold value. + + This function ensures that the given threshold is a valid integer value, + and it raises an appropriate error if the threshold is not valid. It also + enforces that the threshold value is non-negative. + + :param threshold: The input threshold value to be parsed and validated. + :type threshold: Any + :return: The validated threshold value as an integer. + :rtype: int + :raises ValueError: If the provided threshold is not an integer or if it is + negative. """ try: threshold = int(threshold) @@ -248,30 +264,32 @@ def __init__( flight_client_options=flight_client_options, proxy=kwargs.get("proxy", None), options=q_opts_builder.build()) - @classmethod def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': """ - Create an instance of `InfluxDBClient3` using environment variables for configuration. - - This function retrieves and validates the following required environment variables: - - `INFLUX_HOST`: The hostname or IP address of the InfluxDB server. - - `INFLUX_TOKEN`: The authentication token used for accessing the server. - - `INFLUX_DATABASE`: The default database for the client operations. - And optional environment variable: - - `INFLUX_ORG`: The organization associated with InfluxDB operations. - Defaults to "default" if not set. - - If any of the required environment variables are not set, a ValueError will be - raised with details about the missing variables. - - :param kwargs: Additional keyword arguments that will be passed to the - `InfluxDBClient3` constructor for customization. This allows for - configuring specific client behaviors like write_client_options, - flight_client_options, SSL settings, etc. - :return: An initialized `InfluxDBClient3` instance. - :raises ValueError: If any required environment variables are not set. + Creates an instance of InfluxDBClient3 configured by specific environment + variables. This method automatically loads configuration settings, + such as connection details, security parameters, and performance + options, from environment variables and initializes the client + accordingly. + + :param cls: + The class used to create the client instance. + :param kwargs: + Additional optional parameters that can be passed to customize the + configuration or override specific settings derived from the + environment variables. + + :raises ValueError: + If any required environment variables are missing or have empty + values. + + :return: + An initialized instance of the `InfluxDBClient3` class with all the + configuration settings applied. + :rtype: + InfluxDBClient3 """ required_vars = { @@ -344,7 +362,6 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': **kwargs ) - def write(self, record=None, database=None, **kwargs): """ Write data to InfluxDB. diff --git a/influxdb_client_3/write_client/_sync/api_client.py b/influxdb_client_3/write_client/_sync/api_client.py index c95dd88..724f1c1 100644 --- a/influxdb_client_3/write_client/_sync/api_client.py +++ b/influxdb_client_3/write_client/_sync/api_client.py @@ -179,6 +179,27 @@ def __call_api( response_data.getheaders()) def check_should_compress(self, body: bytearray, gzip_threshold: int, enable_gzip: bool) -> bool: + """ + Determines whether the given body should be compressed based on its size, + a defined threshold for compression, and a flag indicating whether + compression is enabled. + + This function evaluates whether the body meets the required criteria for + compression. Compression may be enabled explicitly or conditionally + based on the body size exceeding the provided threshold. + + :param body: The content to be evaluated for compression. + :type body: bytearray + :param gzip_threshold: The minimum size threshold for compression to be applied. + :type gzip_threshold: int + :param enable_gzip: A flag indicating whether gzip compression is enabled. + It can explicitly enable or disable compression, or conditionally + allow compression if the body size exceeds the threshold. + :type enable_gzip: bool + :return: Returns True if the body meets the criteria for compression; + otherwise, returns False. + :rtype: bool + """ body_size = len(body) if enable_gzip is True or (enable_gzip is not False and (gzip_threshold and body_size >= gzip_threshold)): return True diff --git a/influxdb_client_3/write_client/client/_base.py b/influxdb_client_3/write_client/client/_base.py index eb46ecb..3b4bc1d 100644 --- a/influxdb_client_3/write_client/client/_base.py +++ b/influxdb_client_3/write_client/client/_base.py @@ -36,6 +36,50 @@ class _BaseClient(object): def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, gzip_threshold=None, org: str = None, default_tags: dict = None, http_client_logger: str = None, **kwargs) -> None: + """ + Initializes the configuration for an HTTP client with support for customizable settings + such as authentication token, debugging, timeout, and gzip compression. This class + encapsulates the client configuration, logging setup, and authentication mechanisms + to allow seamless interaction with an HTTP backend. + + :param url: The base URL for the HTTP client. + :type url: str + :param token: The authentication token to be used in requests. + :type token: str + :param debug: Enables debug mode for logging client operations. Defaults to None. + :type debug: bool, optional + :param timeout: The timeout duration for HTTP requests in milliseconds. Defaults to 10,000. + :type timeout: int + :param enable_gzip: Flag to enable or disable gzip compression. Defaults to False. + :type enable_gzip: bool + :param gzip_threshold: The threshold size for enabling gzip compression, if applicable. + :type gzip_threshold: int, optional + :param org: The organization identifier to be associated with the client. + :type org: str, optional + :param default_tags: A dictionary of default tags to add to outgoing requests for metadata purposes. + :type default_tags: dict, optional + :param http_client_logger: The logger name to use for HTTP client-specific logging. + :type http_client_logger: str, optional + :param kwargs: Additional optional parameters to customize the HTTP client: + + - verify_ssl: Flag to enable or disable SSL certificate verification. Defaults to True. + - ssl_ca_cert: Path to the CA certificate file for verifying SSL. Defaults to None. + - cert_file: Path to a client SSL certificate file for authentication. Defaults to None. + - cert_key_file: Path to the client’s SSL key file, if separate. Defaults to None. + - cert_key_password: Password for the client’s SSL key file, if applicable. Defaults to None. + - ssl_context: SSLContext object to configure custom SSL parameters. Defaults to None. + - proxy: Proxy server URL, if a proxy is to be used. Defaults to None. + - proxy_headers: A dictionary containing custom headers for the proxy server. Defaults to None. + - connection_pool_maxsize: Defines the maximum pool size for connections. + Default inherits from the configuration. + - retries: Determines if request retrying is enabled. Defaults to None. + - profilers: Profilers for performance tracking. Defaults to None. + - username: Username for basic authentication. Defaults to None. + - password: Password for basic authentication. Defaults to None. + - auth_scheme: Custom authentication scheme to use with the token. Defaults to "Token". + - auth_basic: Boolean flag to enable HTTP Basic Authentication. Defaults to False. + + """ self.url = url self.org = org diff --git a/influxdb_client_3/write_client/client/influxdb_client.py b/influxdb_client_3/write_client/client/influxdb_client.py index 6dab859..715c10f 100644 --- a/influxdb_client_3/write_client/client/influxdb_client.py +++ b/influxdb_client_3/write_client/client/influxdb_client.py @@ -49,7 +49,9 @@ def __init__(self, url, token: str = None, except batching writes. As a default there is no one retry strategy. :key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. - (defaults to false, don't set to true when talking to InfluxDB 2) + (defaults to false, don't set to true when talking to InfluxDB 2). + :key int gzip_threshold: If the payload size is larger than this gzip_threshold, then + the payload will be zipped. :key str username: ``username`` to authenticate via username and password credentials to the InfluxDB 2.x :key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x :key list[str] profilers: list of enabled Flux profilers diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index b31489a..2d1c15f 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -80,6 +80,8 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param max_close_wait: the maximum time to wait for writes to be flushed if close() is called :param write_precision: the time precision for the data written to InfluxDB. :param write_scheduler: + :param gzip_threshold: if the payload size is larger than the gzip_threshold, the payload will be zipped. + :param enable_gzip: set true to enable to zip the payload. """ self.write_type = write_type self.batch_size = batch_size