diff --git a/CHANGELOG.md b/CHANGELOG.md
index df16256c..8e224172 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,8 @@
1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client.
1. [#363](https://github.com/InfluxCommunity/influxdb3-java/pull/363): Support custom tag order via `tagOrder` write option.
See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more.
+1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Support partial writes via `acceptPartial` write option.
+ See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more.
## 1.8.0 [2026-02-19]
diff --git a/README.md b/README.md
index 7c7c6c1e..ae66ad49 100644
--- a/README.md
+++ b/README.md
@@ -73,6 +73,7 @@ import java.util.List;
import java.util.stream.Stream;
import com.influxdb.v3.client.InfluxDBClient;
+import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.query.QueryOptions;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.write.WriteOptions;
@@ -113,6 +114,23 @@ client.writePoint(
orderedTagWrite
);
+//
+// Write with partial acceptance
+//
+WriteOptions partialWrite = new WriteOptions.Builder()
+ .acceptPartial(true)
+ .build();
+try {
+ client.writeRecords(List.of(
+ "temperature,region=west value=20.0",
+ "temperature,region=west value=\"bad\""
+ ), partialWrite);
+} catch (InfluxDBPartialWriteException e) {
+ // Inspect failed line details.
+ e.lineErrors().forEach(line ->
+ System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine()));
+}
+
//
// Write by LineProtocol
//
diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java
index 4ea1ea20..35653c02 100644
--- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java
+++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java
@@ -536,6 +536,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) {
*
precision - timestamp precision when writing data
* gzipThreshold - payload size size for gzipping data
* writeNoSync - skip waiting for WAL persistence on write
+ * writeAcceptPartial - accept partial writes
*
*
* @param connectionString connection string
@@ -569,6 +570,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
* INFLUX_PRECISION - timestamp precision when writing data
* INFLUX_GZIP_THRESHOLD - payload size size for gzipping data
* INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write
+ * INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes
*
* Supported system properties:
*
@@ -580,6 +582,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
* - influx.precision - timestamp precision when writing data
* - influx.gzipThreshold - payload size size for gzipping data
* - influx.writeNoSync - skip waiting for WAL persistence on write
+ * - influx.writeAcceptPartial - accept partial writes
*
*
* @return instance of {@link InfluxDBClient}
diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java b/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java
new file mode 100644
index 00000000..9e7a2494
--- /dev/null
+++ b/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java
@@ -0,0 +1,110 @@
+/*
+ * The MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+package com.influxdb.v3.client;
+
+import java.net.http.HttpHeaders;
+import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * HTTP exception for partial write errors returned by InfluxDB 3 write endpoint.
+ * Contains parsed line-level write errors so callers can decide how to handle failed lines.
+ */
+public class InfluxDBPartialWriteException extends InfluxDBApiHttpException {
+
+ private final List lineErrors;
+
+ /**
+ * Construct a new InfluxDBPartialWriteException.
+ *
+ * @param message detail message
+ * @param headers response headers
+ * @param statusCode response status code
+ * @param lineErrors line-level errors parsed from response body
+ */
+ public InfluxDBPartialWriteException(
+ @Nullable final String message,
+ @Nullable final HttpHeaders headers,
+ final int statusCode,
+ @Nonnull final List lineErrors) {
+ super(message, headers, statusCode);
+ this.lineErrors = List.copyOf(lineErrors);
+ }
+
+ /**
+ * Line-level write errors.
+ *
+ * @return immutable list of line errors
+ */
+ @Nonnull
+ public List lineErrors() {
+ return lineErrors;
+ }
+
+ /**
+ * Represents one failed line from a partial write response.
+ */
+ public static final class LineError {
+
+ private final Integer lineNumber;
+ private final String errorMessage;
+ private final String originalLine;
+
+ /**
+ * @param lineNumber line number in the write payload; may be null if not provided by server
+ * @param errorMessage line-level error message
+ * @param originalLine original line protocol row; may be null if not provided by server
+ */
+ public LineError(@Nullable final Integer lineNumber,
+ @Nonnull final String errorMessage,
+ @Nullable final String originalLine) {
+ this.lineNumber = lineNumber;
+ this.errorMessage = errorMessage;
+ this.originalLine = originalLine;
+ }
+
+ /**
+ * @return line number or null if server didn't provide it
+ */
+ @Nullable
+ public Integer lineNumber() {
+ return lineNumber;
+ }
+
+ /**
+ * @return line-level error message
+ */
+ @Nonnull
+ public String errorMessage() {
+ return errorMessage;
+ }
+
+ /**
+ * @return original line protocol row or null if server didn't provide it
+ */
+ @Nullable
+ public String originalLine() {
+ return originalLine;
+ }
+ }
+}
diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java
index 4326f3a5..df413fb3 100644
--- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java
+++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java
@@ -57,6 +57,7 @@
* defaultTags - defaultTags added when writing points to InfluxDB
* gzipThreshold - threshold when gzip compression is used for writing points to InfluxDB
* writeNoSync - skip waiting for WAL persistence on write
+ * writeAcceptPartial - accept partial writes
* timeout - deprecated in 1.4.0 timeout when connecting to InfluxDB,
* please use more informative properties writeTimeout and queryTimeout
* writeTimeout - timeout when writing data to InfluxDB
@@ -107,6 +108,7 @@ public final class ClientConfig {
private final WritePrecision writePrecision;
private final Integer gzipThreshold;
private final Boolean writeNoSync;
+ private final Boolean writeAcceptPartial;
private final Map defaultTags;
@Deprecated
private final Duration timeout;
@@ -208,6 +210,16 @@ public Boolean getWriteNoSync() {
return writeNoSync;
}
+ /**
+ * Accept partial writes?
+ *
+ * @return accept partial writes
+ */
+ @Nonnull
+ public Boolean getWriteAcceptPartial() {
+ return writeAcceptPartial;
+ }
+
/**
* Gets default tags used when writing points.
* @return default tags
@@ -370,6 +382,7 @@ public boolean equals(final Object o) {
&& writePrecision == that.writePrecision
&& Objects.equals(gzipThreshold, that.gzipThreshold)
&& Objects.equals(writeNoSync, that.writeNoSync)
+ && Objects.equals(writeAcceptPartial, that.writeAcceptPartial)
&& Objects.equals(defaultTags, that.defaultTags)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(writeTimeout, that.writeTimeout)
@@ -388,7 +401,7 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
- database, writePrecision, gzipThreshold, writeNoSync,
+ database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial,
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
proxy, proxyUrl, authenticator, headers,
defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors);
@@ -403,6 +416,7 @@ public String toString() {
.add("writePrecision=" + writePrecision)
.add("gzipThreshold=" + gzipThreshold)
.add("writeNoSync=" + writeNoSync)
+ .add("writeAcceptPartial=" + writeAcceptPartial)
.add("timeout=" + timeout)
.add("writeTimeout=" + writeTimeout)
.add("queryTimeout=" + queryTimeout)
@@ -432,6 +446,7 @@ public static final class Builder {
private WritePrecision writePrecision;
private Integer gzipThreshold;
private Boolean writeNoSync;
+ private Boolean writeAcceptPartial;
private Map defaultTags;
@Deprecated
private Duration timeout;
@@ -554,6 +569,19 @@ public Builder writeNoSync(@Nullable final Boolean writeNoSync) {
return this;
}
+ /**
+ * Sets whether to accept partial writes.
+ *
+ * @param writeAcceptPartial accept partial writes
+ * @return this
+ */
+ @Nonnull
+ public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) {
+
+ this.writeAcceptPartial = writeAcceptPartial;
+ return this;
+ }
+
/**
* Sets default tags to be written with points.
*
@@ -800,6 +828,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
if (parameters.containsKey("writeNoSync")) {
this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync")));
}
+ if (parameters.containsKey("writeAcceptPartial")) {
+ this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial")));
+ }
if (parameters.containsKey("disableGRPCCompression")) {
this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression")));
}
@@ -855,6 +886,10 @@ public ClientConfig build(@Nonnull final Map env, final Properti
if (writeNoSync != null) {
this.writeNoSync(Boolean.parseBoolean(writeNoSync));
}
+ final String writeAcceptPartial = get.apply("INFLUX_WRITE_ACCEPT_PARTIAL", "influx.writeAcceptPartial");
+ if (writeAcceptPartial != null) {
+ this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial));
+ }
final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout");
if (writeTimeout != null) {
long to = Long.parseLong(writeTimeout);
@@ -911,6 +946,9 @@ private ClientConfig(@Nonnull final Builder builder) {
writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION;
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD;
writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC;
+ writeAcceptPartial = builder.writeAcceptPartial != null
+ ? builder.writeAcceptPartial
+ : WriteOptions.DEFAULT_ACCEPT_PARTIAL;
defaultTags = builder.defaultTags;
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
writeTimeout = builder.writeTimeout != null
diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java
index 8c2ce466..0e5e8dd0 100644
--- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java
+++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java
@@ -309,15 +309,22 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti
String path;
Map queryParams;
boolean noSync = options.noSyncSafe(config);
- if (noSync) {
- // Setting no_sync=true is supported only in the v3 API.
+ boolean acceptPartial = options.acceptPartialSafe(config);
+ boolean useV3Write = noSync || acceptPartial;
+ if (useV3Write) {
+ // no_sync=true and accept_partial=true are supported only in the v3 API.
path = "api/v3/write_lp";
queryParams = new HashMap<>() {{
put("org", config.getOrganization());
put("db", database);
put("precision", WritePrecisionConverter.toV3ApiString(precision));
- put("no_sync", "true");
}};
+ if (noSync) {
+ queryParams.put("no_sync", "true");
+ }
+ if (acceptPartial) {
+ queryParams.put("accept_partial", "true");
+ }
} else {
// By default, use the v2 API.
path = "api/v2/write";
@@ -373,10 +380,12 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti
try {
restClient.request(path, HttpMethod.POST, body, queryParams, headers);
} catch (InfluxDBApiHttpException e) {
- if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) {
- // Server does not support the v3 write API, can't use the NoSync option.
+ if (useV3Write && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) {
+ // Server does not support the v3 write API, can't use v3-only write options.
throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true "
- + "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode());
+ + "or AcceptPartial=true (supported by InfluxDB 3 Core/Enterprise servers only).",
+ e.headers(),
+ e.statusCode());
}
throw e;
}
diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java
index 6fb0e05b..a9afbfa3 100644
--- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java
+++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java
@@ -46,6 +46,7 @@
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -57,6 +58,7 @@
import com.influxdb.v3.client.InfluxDBApiException;
import com.influxdb.v3.client.InfluxDBApiHttpException;
+import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.config.ClientConfig;
final class RestClient implements AutoCloseable {
@@ -219,7 +221,8 @@ HttpResponse request(@Nonnull final String path,
if (statusCode < 200 || statusCode >= 300) {
String reason;
String body = response.body();
- reason = formatErrorMessage(body, response.headers().firstValue("Content-Type").orElse(null));
+ String contentType = response.headers().firstValue("Content-Type").orElse(null);
+ reason = formatErrorMessage(body, contentType);
if (reason == null) {
reason = "";
@@ -241,6 +244,11 @@ HttpResponse request(@Nonnull final String path,
}
String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason);
+ List lineErrors =
+ parsePartialWriteLineErrors(path, body, contentType);
+ if (!lineErrors.isEmpty()) {
+ throw new InfluxDBPartialWriteException(message, response.headers(), response.statusCode(), lineErrors);
+ }
throw new InfluxDBApiHttpException(message, response.headers(), response.statusCode());
}
@@ -278,11 +286,7 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St
if (error != null && dataNode != null && dataNode.isArray()) {
final StringBuilder message = new StringBuilder(error);
boolean hasDetails = false;
- for (JsonNode item : dataNode) {
- final String detail = errFormatDataArrayDetail(item);
- if (detail == null) {
- continue;
- }
+ for (String detail : errFormatDataArrayDetails(dataNode)) {
if (!hasDetails) {
message.append(':');
hasDetails = true;
@@ -306,12 +310,89 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St
}
}
+ @Nonnull
+ private List parsePartialWriteLineErrors(
+ @Nonnull final String path,
+ @Nonnull final String body,
+ @Nullable final String contentType) {
+ if (!isWriteEndpoint(path)) {
+ return List.of();
+ }
+
+ if (body.isEmpty()) {
+ return List.of();
+ }
+
+ if (contentType != null
+ && !contentType.isEmpty()
+ && !contentType.regionMatches(true, 0, "application/json", 0, "application/json".length())) {
+ return List.of();
+ }
+
+ try {
+ final JsonNode root = objectMapper.readTree(body);
+ if (!root.isObject()) {
+ return List.of();
+ }
+
+ final String error = errNonEmptyField(root, "error");
+ final JsonNode dataNode = root.get("data");
+ if (error == null || dataNode == null) {
+ return List.of();
+ }
+
+ if (dataNode.isArray()) {
+ final ErrDataArrayItem[] parsed = errReadDataArray(dataNode);
+ if (parsed == null) {
+ return List.of();
+ }
+
+ final List lineErrors = new ArrayList<>();
+ for (ErrDataArrayItem item : parsed) {
+ final InfluxDBPartialWriteException.LineError lineError = errToLineError(item);
+ if (lineError != null) {
+ lineErrors.add(lineError);
+ }
+ }
+ return lineErrors;
+ }
+
+ if (dataNode.isObject()) {
+ try {
+ final ErrDataArrayItem item = objectMapper.treeToValue(dataNode, ErrDataArrayItem.class);
+ final InfluxDBPartialWriteException.LineError lineError = errToLineError(item);
+ return lineError == null ? List.of() : List.of(lineError);
+ } catch (JsonProcessingException e) {
+ return List.of();
+ }
+ }
+
+ return List.of();
+ } catch (JsonProcessingException e) {
+ LOG.debug("Can't parse line errors from response body {}", body, e);
+ return List.of();
+ }
+ }
+
+ private boolean isWriteEndpoint(@Nonnull final String path) {
+ return "api/v2/write".equals(path) || "api/v3/write_lp".equals(path);
+ }
+
@Nullable
private String errNonEmptyText(@Nullable final JsonNode node) {
if (node == null || node.isNull()) {
return null;
}
- final String value = node.asText();
+
+ final String value;
+ if (node.isTextual()) {
+ value = node.asText();
+ } else if (node.isNumber() || node.isBoolean()) {
+ value = node.asText();
+ } else {
+ value = node.toString();
+ }
+
return value.isEmpty() ? null : value;
}
@@ -323,25 +404,68 @@ private String errNonEmptyField(@Nullable final JsonNode object, @Nonnull final
return errNonEmptyText(object.get(fieldName));
}
+ @Nonnull
+ private List errFormatDataArrayDetails(@Nonnull final JsonNode dataNode) {
+ final ErrDataArrayItem[] parsed = errReadDataArray(dataNode);
+ if (parsed != null) {
+ final List details = new ArrayList<>();
+ for (ErrDataArrayItem item : parsed) {
+ final InfluxDBPartialWriteException.LineError lineError = errToLineError(item);
+ if (lineError == null) {
+ continue;
+ }
+
+ if (lineError.lineNumber() != null
+ && lineError.originalLine() != null
+ && !lineError.originalLine().isEmpty()) {
+ details.add("line " + lineError.lineNumber() + ": "
+ + lineError.errorMessage() + " (" + lineError.originalLine() + ")");
+ } else {
+ details.add(lineError.errorMessage());
+ }
+ }
+ return details;
+ }
+
+ final List details = new ArrayList<>();
+ for (JsonNode item : dataNode) {
+ final String raw = errNonEmptyText(item);
+ if (raw != null) {
+ details.add(raw);
+ }
+ }
+ return details;
+ }
+
@Nullable
- private String errFormatDataArrayDetail(@Nullable final JsonNode item) {
- if (!item.isObject()) {
+ private ErrDataArrayItem[] errReadDataArray(@Nonnull final JsonNode dataNode) {
+ try {
+ return objectMapper.treeToValue(dataNode, ErrDataArrayItem[].class);
+ } catch (JsonProcessingException e) {
return null;
}
+ }
- final String errorMessage = errNonEmptyField(item, "error_message");
- if (errorMessage == null) {
+ @Nullable
+ private InfluxDBPartialWriteException.LineError errToLineError(@Nullable final ErrDataArrayItem item) {
+ if (item == null || item.errorMessage == null || item.errorMessage.isEmpty()) {
return null;
}
- if (item.hasNonNull("line_number")) {
- final String originalLine = errNonEmptyField(item, "original_line");
- if (originalLine != null) {
- final String lineNumber = item.get("line_number").asText();
- return "line " + lineNumber + ": " + errorMessage + " (" + originalLine + ")";
- }
- }
- return errorMessage;
+ final String originalLine =
+ (item.originalLine == null || item.originalLine.isEmpty()) ? null : item.originalLine;
+ return new InfluxDBPartialWriteException.LineError(item.lineNumber, item.errorMessage, originalLine);
+ }
+
+ private static final class ErrDataArrayItem {
+ @JsonProperty("error_message")
+ private String errorMessage;
+
+ @JsonProperty("line_number")
+ private Integer lineNumber;
+
+ @JsonProperty("original_line")
+ private String originalLine;
}
private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) {
diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java
index e9b4ca1b..8fddccd6 100644
--- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java
+++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java
@@ -68,6 +68,10 @@ public final class WriteOptions {
* Default NoSync.
*/
public static final boolean DEFAULT_NO_SYNC = false;
+ /**
+ * Default AcceptPartial.
+ */
+ public static final boolean DEFAULT_ACCEPT_PARTIAL = false;
/**
* Default timeout for writes in seconds. Set to {@value}
@@ -81,12 +85,14 @@ public final class WriteOptions {
@Deprecated(forRemoval = true)
public static final WriteOptions DEFAULTS = new WriteOptions(
- null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, null, null, null);
+ null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, DEFAULT_ACCEPT_PARTIAL,
+ null, null, null);
private final String database;
private final WritePrecision precision;
private final Integer gzipThreshold;
private final Boolean noSync;
+ private final Boolean acceptPartial;
private final Map defaultTags;
private final List tagOrder;
private final Map headers;
@@ -99,6 +105,7 @@ public final class WriteOptions {
*/
public static WriteOptions defaultWriteOptions() {
return new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC,
+ DEFAULT_ACCEPT_PARTIAL,
null, null, null);
}
@@ -152,7 +159,7 @@ public WriteOptions(@Nullable final String database,
@Nullable final WritePrecision precision,
@Nullable final Integer gzipThreshold,
@Nullable final Boolean noSync) {
- this(database, precision, gzipThreshold, noSync, null, null);
+ this(database, precision, gzipThreshold, noSync, null, null, null);
}
/**
@@ -184,7 +191,7 @@ public WriteOptions(@Nullable final String database,
@Nullable final Integer gzipThreshold,
@Nullable final Map defaultTags,
@Nullable final Map headers) {
- this(database, precision, gzipThreshold, null, defaultTags, headers);
+ this(database, precision, gzipThreshold, null, null, defaultTags, headers, null);
}
/**
@@ -209,7 +216,7 @@ public WriteOptions(@Nullable final String database,
@Nullable final Boolean noSync,
@Nullable final Map defaultTags,
@Nullable final Map headers) {
- this(database, precision, gzipThreshold, noSync, defaultTags, headers, null);
+ this(database, precision, gzipThreshold, noSync, null, defaultTags, headers, null);
}
/**
@@ -223,6 +230,8 @@ public WriteOptions(@Nullable final String database,
* If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}.
* @param noSync Skip waiting for WAL persistence on write.
* If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}.
+ * @param acceptPartial Request partial write acceptance.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}.
* @param defaultTags Default tags to be added when writing points.
* @param headers The headers to be added to write request.
* The headers specified here are preferred over the headers
@@ -234,6 +243,7 @@ public WriteOptions(@Nullable final String database,
@Nullable final WritePrecision precision,
@Nullable final Integer gzipThreshold,
@Nullable final Boolean noSync,
+ @Nullable final Boolean acceptPartial,
@Nullable final Map defaultTags,
@Nullable final Map headers,
@Nullable final List tagOrder) {
@@ -241,11 +251,40 @@ public WriteOptions(@Nullable final String database,
this.precision = precision;
this.gzipThreshold = gzipThreshold;
this.noSync = noSync;
+ this.acceptPartial = acceptPartial;
this.defaultTags = defaultTags == null ? Map.of() : defaultTags;
this.tagOrder = sanitizeTagOrder(tagOrder);
this.headers = headers == null ? Map.of() : headers;
}
+ /**
+ * Construct WriteAPI options.
+ *
+ * @param database The database to be used for InfluxDB operations.
+ * If it is not specified then use {@link ClientConfig#getDatabase()}.
+ * @param precision The precision to use for the timestamp of points.
+ * If it is not specified then use {@link ClientConfig#getWritePrecision()}.
+ * @param gzipThreshold The threshold for compressing request body.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}.
+ * @param noSync Skip waiting for WAL persistence on write.
+ * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}.
+ * @param defaultTags Default tags to be added when writing points.
+ * @param headers The headers to be added to write request.
+ * The headers specified here are preferred over the headers
+ * specified in the client configuration.
+ * @param tagOrder Preferred order of tags in line protocol serialization.
+ * Null or empty tag names are ignored.
+ */
+ public WriteOptions(@Nullable final String database,
+ @Nullable final WritePrecision precision,
+ @Nullable final Integer gzipThreshold,
+ @Nullable final Boolean noSync,
+ @Nullable final Map defaultTags,
+ @Nullable final Map headers,
+ @Nullable final List tagOrder) {
+ this(database, precision, gzipThreshold, noSync, null, defaultTags, headers, tagOrder);
+ }
+
/**
* @param config with default value
* @return The destination database for writes.
@@ -299,8 +338,16 @@ public Integer gzipThresholdSafe(@Nonnull final ClientConfig config) {
*/
public boolean noSyncSafe(@Nonnull final ClientConfig config) {
Arguments.checkNotNull(config, "config");
- return noSync != null ? noSync
- : (config.getWriteNoSync() != null ? config.getWriteNoSync() : DEFAULT_NO_SYNC);
+ return noSync != null ? noSync : config.getWriteNoSync();
+ }
+
+ /**
+ * @param config with default value
+ * @return Accept partial write.
+ */
+ public boolean acceptPartialSafe(@Nonnull final ClientConfig config) {
+ Arguments.checkNotNull(config, "config");
+ return acceptPartial != null ? acceptPartial : config.getWriteAcceptPartial();
}
/**
@@ -332,6 +379,7 @@ public boolean equals(final Object o) {
&& precision == that.precision
&& Objects.equals(gzipThreshold, that.gzipThreshold)
&& Objects.equals(noSync, that.noSync)
+ && Objects.equals(acceptPartial, that.acceptPartial)
&& defaultTags.equals(that.defaultTags)
&& tagOrder.equals(that.tagOrder)
&& headers.equals(that.headers);
@@ -339,7 +387,8 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
- return Objects.hash(database, precision, gzipThreshold, noSync, defaultTags, tagOrder, headers);
+ return Objects.hash(database, precision, gzipThreshold, noSync, acceptPartial, defaultTags, tagOrder,
+ headers);
}
private boolean isNotDefined(final String option) {
@@ -368,6 +417,7 @@ public static final class Builder {
private WritePrecision precision;
private Integer gzipThreshold;
private Boolean noSync;
+ private Boolean acceptPartial;
private Map defaultTags = new HashMap<>();
private List tagOrder = List.of();
private Map headers = new HashMap<>();
@@ -424,6 +474,19 @@ public Builder noSync(@Nonnull final Boolean noSync) {
return this;
}
+ /**
+ * Sets whether to request partial write acceptance.
+ *
+ * @param acceptPartial request partial write acceptance
+ * @return this
+ */
+ @Nonnull
+ public Builder acceptPartial(@Nonnull final Boolean acceptPartial) {
+
+ this.acceptPartial = acceptPartial;
+ return this;
+ }
+
/**
* Sets defaultTags.
*
@@ -473,7 +536,7 @@ public WriteOptions build() {
}
private WriteOptions(@Nonnull final Builder builder) {
- this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.defaultTags,
- builder.headers, builder.tagOrder);
+ this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.acceptPartial,
+ builder.defaultTags, builder.headers, builder.tagOrder);
}
}
diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
index a7d31240..23569283 100644
--- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
+++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java
@@ -224,6 +224,23 @@ void writeNoSyncTrueUsesV3API() throws InterruptedException {
assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond");
}
+ @Test
+ void writeAcceptPartialTrueUsesV3API() throws InterruptedException {
+ mockServer.enqueue(createResponse(200));
+
+ client.writeRecord("mem,tag=one value=1.0",
+ new WriteOptions.Builder().precision(WritePrecision.NS).acceptPartial(true).build());
+
+ assertThat(mockServer.getRequestCount()).isEqualTo(1);
+ RecordedRequest request = mockServer.takeRequest();
+ assertThat(request).isNotNull();
+ assertThat(request.getUrl()).isNotNull();
+ assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp");
+ assertThat(request.getUrl().queryParameter("no_sync")).isNull();
+ assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo("true");
+ assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond");
+ }
+
@Test
void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException {
mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code()));
@@ -242,7 +259,29 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException {
assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond");
assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code());
- assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true"
+ assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true or AcceptPartial=true"
+ + " (supported by InfluxDB 3 Core/Enterprise servers only).");
+ }
+
+ @Test
+ void writeAcceptPartialTrueOnV2ServerThrowsException() throws InterruptedException {
+ mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code()));
+
+ InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class,
+ () -> client.writeRecord("mem,tag=one value=1.0",
+ new WriteOptions.Builder().precision(WritePrecision.MS).acceptPartial(true).build())
+ );
+
+ assertThat(mockServer.getRequestCount()).isEqualTo(1);
+ RecordedRequest request = mockServer.takeRequest();
+ assertThat(request).isNotNull();
+ assertThat(request.getUrl()).isNotNull();
+ assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp");
+ assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo("true");
+ assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond");
+
+ assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code());
+ assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true or AcceptPartial=true"
+ " (supported by InfluxDB 3 Core/Enterprise servers only).");
}
@@ -256,7 +295,7 @@ void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception {
client.writeRecord("mem,tag=one value=1.0");
}
- checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
+ checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false);
}
@Test
@@ -272,7 +311,21 @@ void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception {
client.writeRecord("mem,tag=one value=1.0");
}
- checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
+ checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true);
+ }
+
+ @Test
+ void writeRecordWithDefaultWriteOptionsAcceptPartialConfig() throws Exception {
+ mockServer.enqueue(createResponse(200));
+
+ ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB")
+ .writeAcceptPartial(true)
+ .build();
+ try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) {
+ client.writeRecord("mem,tag=one value=1.0");
+ }
+
+ checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", false, true, false);
}
@Test
@@ -285,7 +338,7 @@ void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception {
client.writeRecords(List.of("mem,tag=one value=1.0"));
}
- checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
+ checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false);
}
@Test
@@ -301,7 +354,7 @@ void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception {
client.writeRecords(List.of("mem,tag=one value=1.0"));
}
- checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
+ checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true);
}
@Test
@@ -317,7 +370,7 @@ void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception {
client.writePoint(point);
}
- checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
+ checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false);
}
@Test
@@ -336,7 +389,7 @@ void writePointWithDefaultWriteOptionsCustomConfig() throws Exception {
client.writePoint(point);
}
- checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
+ checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true);
}
@Test
@@ -352,7 +405,7 @@ void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception {
client.writePoints(List.of(point));
}
- checkWriteCalled("/api/v2/write", "DB", "ns", false, false);
+ checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false);
}
@Test
@@ -371,17 +424,19 @@ void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception {
client.writePoints(List.of(point));
}
- checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true);
+ checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true);
}
private void checkWriteCalled(final String expectedPath, final String expectedDB,
final String expectedPrecision, final boolean expectedNoSync,
+ final boolean expectedAcceptPartial,
final boolean expectedGzip) throws InterruptedException {
RecordedRequest request = assertThatServerRequested();
HttpUrl requestUrl = request.getUrl();
assertThat(requestUrl).isNotNull();
assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath);
- if (expectedNoSync) {
+ boolean expectedV3 = expectedNoSync || expectedAcceptPartial;
+ if (expectedV3) {
assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB);
} else {
assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB);
@@ -392,6 +447,11 @@ private void checkWriteCalled(final String expectedPath, final String expectedDB
} else {
assertThat(requestUrl.queryParameter("no_sync")).isNull();
}
+ if (expectedAcceptPartial) {
+ assertThat(requestUrl.queryParameter("accept_partial")).isEqualTo("true");
+ } else {
+ assertThat(requestUrl.queryParameter("accept_partial")).isNull();
+ }
if (expectedGzip) {
assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip");
} else {
diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java
index f5ce2e8f..f940afd7 100644
--- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java
+++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java
@@ -56,6 +56,21 @@ void equalConfig() {
Assertions.assertThat(config).isEqualTo(config);
Assertions.assertThat(config).isEqualTo(configBuilder.build());
Assertions.assertThat(config).isNotEqualTo(configBuilder);
+ Assertions.assertThat(config).isNotEqualTo(new ClientConfig.Builder()
+ .host("http://localhost:9999")
+ .token("my-token".toCharArray())
+ .organization("my-org")
+ .database("my-db")
+ .writePrecision(WritePrecision.NS)
+ .writeAcceptPartial(true)
+ .timeout(Duration.ofSeconds(30))
+ .writeTimeout(Duration.ofSeconds(35))
+ .queryTimeout(Duration.ofSeconds(120))
+ .allowHttpRedirects(true)
+ .disableServerCertificateValidation(true)
+ .headers(Map.of("X-device", "ab-01"))
+ .disableGRPCCompression(true)
+ .build());
Assertions.assertThat(config).isNotEqualTo(configBuilder.database("database").build());
}
@@ -79,6 +94,7 @@ void toStringConfig() {
Assertions.assertThat(configString.contains("database='my-db'")).isEqualTo(true);
Assertions.assertThat(configString.contains("gzipThreshold=1000")).isEqualTo(true);
Assertions.assertThat(configString).contains("writeNoSync=false");
+ Assertions.assertThat(configString).contains("writeAcceptPartial=false");
Assertions.assertThat(configString).contains("timeout=PT30S");
Assertions.assertThat(configString).contains("writeTimeout=PT35S");
Assertions.assertThat(configString).contains("queryTimeout=PT2M");
@@ -90,7 +106,8 @@ void toStringConfig() {
void fromConnectionString() throws MalformedURLException {
ClientConfig cfg = new ClientConfig.Builder()
.build("http://localhost:9999/"
- + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128&writeNoSync=true");
+ + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128"
+ + "&writeNoSync=true&writeAcceptPartial=true");
Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/");
Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray());
Assertions.assertThat(cfg.getOrganization()).isEqualTo("my-org");
@@ -98,6 +115,7 @@ void fromConnectionString() throws MalformedURLException {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); // default
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(128);
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true);
cfg = new ClientConfig.Builder()
.build("http://localhost:9999/"
@@ -109,6 +127,7 @@ void fromConnectionString() throws MalformedURLException {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL);
cfg = new ClientConfig.Builder()
.build("http://localhost:9999/"
@@ -208,6 +227,7 @@ void fromEnv() {
"INFLUX_PRECISION", "ms",
"INFLUX_GZIP_THRESHOLD", "64",
"INFLUX_WRITE_NO_SYNC", "true",
+ "INFLUX_WRITE_ACCEPT_PARTIAL", "true",
"INFLUX_DISABLE_GRPC_COMPRESSION", "true"
);
@@ -220,6 +240,7 @@ void fromEnv() {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64);
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true);
Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue();
}
@@ -287,6 +308,7 @@ void fromSystemProperties() {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000);
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL);
// basic
properties = new Properties();
@@ -324,6 +346,7 @@ void fromSystemProperties() {
properties.put("influx.precision", "ms");
properties.put("influx.gzipThreshold", "64");
properties.put("influx.writeNoSync", "true");
+ properties.put("influx.writeAcceptPartial", "true");
properties.put("influx.disableGRPCCompression", "true");
cfg = new ClientConfig.Builder()
.build(new HashMap<>(), properties);
@@ -334,6 +357,7 @@ void fromSystemProperties() {
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64);
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
+ Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true);
Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue();
}
diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
index 6ec5ee72..1df052b0 100644
--- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
+++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java
@@ -41,7 +41,9 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import com.influxdb.v3.client.InfluxDBApiHttpException;
import com.influxdb.v3.client.InfluxDBClient;
+import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.PointValues;
import com.influxdb.v3.client.config.ClientConfig;
@@ -186,6 +188,78 @@ public void testQuery() throws Exception {
}
}
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
+ @Test
+ public void testAcceptPartialWriteError() throws Exception {
+ try (InfluxDBClient client = InfluxDBClient.getInstance(
+ System.getenv("TESTING_INFLUXDB_URL"),
+ System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
+ System.getenv("TESTING_INFLUXDB_DATABASE"),
+ null)) {
+
+ String points = "temperature,room=room1 value=18.94647\n"
+ + "temperatureroom=room2value=20.268019\n"
+ + "temperature,room=room3 value=24.064857\n"
+ + "temperature,room=room4 value=43i";
+
+ WriteOptions options = new WriteOptions.Builder()
+ .acceptPartial(true)
+ .build();
+
+ Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options));
+ Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class);
+
+ String expectedMessage = "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: Expected at least one space character, got end of input (temperatureroom=room)\n"
+ + "\tline 4: invalid column type for column 'value', expected iox::column_type::field::float, "
+ + "got iox::column_type::field::integer (temperature,room=roo)";
+ Assertions.assertThat(thrown.getMessage()).isEqualTo(expectedMessage);
+
+ InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown;
+ Assertions.assertThat(partialError.lineErrors()).hasSize(2);
+ Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2);
+ Assertions.assertThat(partialError.lineErrors().get(0).errorMessage())
+ .isEqualTo("Expected at least one space character, got end of input");
+ Assertions.assertThat(partialError.lineErrors().get(0).originalLine())
+ .isEqualTo("temperatureroom=room");
+
+ Assertions.assertThat(partialError.lineErrors().get(1).lineNumber()).isEqualTo(4);
+ Assertions.assertThat(partialError.lineErrors().get(1).errorMessage())
+ .isEqualTo("invalid column type for column 'value', expected iox::column_type::field::float, "
+ + "got iox::column_type::field::integer");
+ Assertions.assertThat(partialError.lineErrors().get(1).originalLine())
+ .isEqualTo("temperature,room=roo");
+
+ Assertions.assertThat(partialError).isInstanceOf(InfluxDBApiHttpException.class);
+ Assertions.assertThat(partialError.statusCode()).isEqualTo(400);
+ }
+ }
+
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
+ @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
+ @Test
+ public void testWriteErrorWithoutAcceptPartial() throws Exception {
+ try (InfluxDBClient client = InfluxDBClient.getInstance(
+ System.getenv("TESTING_INFLUXDB_URL"),
+ System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
+ System.getenv("TESTING_INFLUXDB_DATABASE"),
+ null)) {
+
+ String points = "temperature,room=room1 value=18.94647\n"
+ + "temperatureroom=room2value=20.268019\n"
+ + "temperature,room=room3 value=24.064857\n"
+ + "temperature,room=room4 value=43i";
+
+ Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points));
+ Assertions.assertThat(thrown.getMessage())
+ .isEqualTo("HTTP status code: 400; Message: write buffer error: "
+ + "line protocol parse failed: Expected at least one space character, got end of input");
+ }
+ }
+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
index b6d4a26b..fb481423 100644
--- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
+++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java
@@ -51,6 +51,7 @@
import com.influxdb.v3.client.InfluxDBApiException;
import com.influxdb.v3.client.InfluxDBApiHttpException;
import com.influxdb.v3.client.InfluxDBClient;
+import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.config.ClientConfig;
import com.influxdb.v3.client.write.WriteOptions;
@@ -546,11 +547,19 @@ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format
.host(baseURL)
.build());
- Assertions.assertThatThrownBy(
- () -> restClient.request("ping", HttpMethod.GET, null, null, null)
- )
- .isInstanceOf(InfluxDBApiException.class)
+ Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null));
+ Assertions.assertThat(thrown)
+ .isInstanceOf(InfluxDBPartialWriteException.class)
+ .isInstanceOf(InfluxDBApiHttpException.class)
.hasMessage("HTTP status code: 400; Message: invalid field value");
+
+ InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown;
+ Assertions.assertThat(partialWriteException.statusCode()).isEqualTo(400);
+ Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1);
+ InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0);
+ Assertions.assertThat(lineError.lineNumber()).isNull();
+ Assertions.assertThat(lineError.errorMessage()).isEqualTo("invalid field value");
+ Assertions.assertThat(lineError.originalLine()).isNull();
}
@Test
@@ -567,13 +576,43 @@ public void errorFromBodyV3WithDataArray() {
.host(baseURL)
.build());
- Assertions.assertThatThrownBy(
- () -> restClient.request("ping", HttpMethod.GET, null, null, null)
- )
- .isInstanceOf(InfluxDBApiException.class)
+ Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null));
+ Assertions.assertThat(thrown)
+ .isInstanceOf(InfluxDBPartialWriteException.class)
+ .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer,"
+ + " got iox::column_type::field::float (testa6a3ad v=1 17702)");
+
+ InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown;
+ Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1);
+ InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0);
+ Assertions.assertThat(lineError.lineNumber()).isEqualTo(2);
+ Assertions.assertThat(lineError.errorMessage())
+ .isEqualTo("invalid column type for column 'v', expected iox::column_type::field::integer,"
+ + " got iox::column_type::field::float");
+ Assertions.assertThat(lineError.originalLine()).isEqualTo("testa6a3ad v=1 17702");
+ }
+
+ @Test
+ public void errorFromBodyV3WithDataArrayAnyInvalidItemFallsBackToHttpException() {
+ mockServer.enqueue(createResponse(400,
+ "application/json",
+ null,
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"},"
+ + "{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}]}"));
+
+ restClient = new RestClient(new ClientConfig.Builder()
+ .host(baseURL)
+ .build());
+
+ Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null));
+ Assertions.assertThat(thrown)
+ .isInstanceOf(InfluxDBApiHttpException.class)
+ .isNotInstanceOf(InfluxDBPartialWriteException.class)
.hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n"
- + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer,"
- + " got iox::column_type::field::float (testa6a3ad v=1 17702)");
+ + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}\n"
+ + "\t{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}");
}
@ParameterizedTest(name = "{0}")
@@ -630,7 +669,8 @@ private static Stream errorFromBodyV3WithDataArrayCases() {
"{\"error\":\"partial write of line protocol occurred\",\"data\":[1,{\"error_message\":"
+ "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
"HTTP status code: 400; Message: partial write of line protocol occurred:\n"
- + "\tline 2: bad line (bad lp)"
+ + "\t1\n"
+ + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}"
),
Arguments.of(
"null error_message skipped",
@@ -652,6 +692,47 @@ private static Stream errorFromBodyV3WithDataArrayCases() {
"HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ "\tline 2: bad line (bad lp)\n"
+ "\tsecond issue"
+ ),
+ Arguments.of(
+ "array of strings fallback",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[\"bad line 1\",\"bad line 2\"]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tbad line 1\n"
+ + "\tbad line 2"
+ ),
+ Arguments.of(
+ "textual numeric line_number",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":\"2\",\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\tline 2: bad line (bad lp)"
+ ),
+ Arguments.of(
+ "textual non-numeric line_number",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\t{\"error_message\":\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}"
+ ),
+ Arguments.of(
+ "empty textual line_number with empty original_line",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"only error message\",\"line_number\":\"\",\"original_line\":\"\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message"
+ ),
+ Arguments.of(
+ "non-textual line_number",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\t{\"error_message\":\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}"
+ ),
+ Arguments.of(
+ "object line_number preserved as text",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":"
+ + "\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}]}",
+ "HTTP status code: 400; Message: partial write of line protocol occurred:\n"
+ + "\t{\"error_message\":\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}"
)
);
}
@@ -659,11 +740,14 @@ private static Stream errorFromBodyV3WithDataArrayCases() {
@ParameterizedTest(name = "{0}")
@MethodSource("errorFromBodyV3FallbackCases")
public void errorFromBodyV3FallbackCase(final String testName,
+ final String requestPath,
+ final String contentType,
final String body,
+ final Class extends InfluxDBApiException> expectedClass,
final String expectedMessage) {
mockServer.enqueue(createResponse(400,
- "application/json",
+ contentType,
null,
body));
@@ -671,48 +755,125 @@ public void errorFromBodyV3FallbackCase(final String testName,
.host(baseURL)
.build());
- Assertions.assertThatThrownBy(
- () -> restClient.request("ping", HttpMethod.GET, null, null, null)
- )
- .isInstanceOf(InfluxDBApiException.class)
- .hasMessage(expectedMessage);
+ Throwable thrown = catchThrowable(() -> restClient.request(requestPath, HttpMethod.GET, null, null, null));
+ Assertions.assertThat(thrown)
+ .isInstanceOf(expectedClass)
+ .hasMessage(expectedMessage);
}
private static Stream errorFromBodyV3FallbackCases() {
return Stream.of(
Arguments.of(
"missing error with data array falls back to body",
+ "ping",
+ "application/json",
"{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: "
+ "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}"
),
Arguments.of(
"empty error with data array falls back to body",
+ "ping",
+ "application/json",
"{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":"
+ "\"bad lp\"}]}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: "
+ "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":"
+ "\"bad lp\"}]}"
),
Arguments.of(
"data object without error_message falls back to error",
+ "ping",
+ "application/json",
"{\"error\":\"parsing failed\",\"data\":{}}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: parsing failed"
),
Arguments.of(
"data object with empty error_message falls back to error",
+ "ping",
+ "application/json",
"{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"\"}}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: parsing failed"
),
Arguments.of(
"data string falls back to error",
+ "ping",
+ "application/json",
"{\"error\":\"parsing failed\",\"data\":\"not-an-object\"}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: parsing failed"
),
Arguments.of(
"data number falls back to error",
+ "ping",
+ "application/json",
"{\"error\":\"parsing failed\",\"data\":123}",
+ InfluxDBApiHttpException.class,
"HTTP status code: 400; Message: parsing failed"
+ ),
+ Arguments.of(
+ "partial-write with invalid data string falls back to error",
+ "ping",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":\"invalid\"}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: partial write of line protocol occurred"
+ ),
+ Arguments.of(
+ "partial-write with empty data object falls back to error",
+ "ping",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":{}}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: partial write of line protocol occurred"
+ ),
+ Arguments.of(
+ "write endpoint ignores line-error parsing for non-json content type",
+ "api/v3/write_lp",
+ "text/plain",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\","
+ + "\"line_number\":2,\"original_line\":\"bad lp\"}]}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: "
+ + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\","
+ + "\"line_number\":2,\"original_line\":\"bad lp\"}]}"
+ ),
+ Arguments.of(
+ "write endpoint with non-object root falls back to body",
+ "api/v3/write_lp",
+ "application/json",
+ "[]",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: []"
+ ),
+ Arguments.of(
+ "write endpoint with invalid line-error object type falls back to http exception",
+ "api/v3/write_lp",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":{\"error_message\":\"bad line\","
+ + "\"line_number\":{\"x\":2},\"original_line\":\"bad lp\"}}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: bad line"
+ ),
+ Arguments.of(
+ "write endpoint with scalar data falls back to error",
+ "api/v3/write_lp",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\",\"data\":123}",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: partial write of line protocol occurred"
+ ),
+ Arguments.of(
+ "write endpoint invalid json body falls back to raw body",
+ "api/v3/write_lp",
+ "application/json",
+ "{\"error\":\"partial write of line protocol occurred\"",
+ InfluxDBApiHttpException.class,
+ "HTTP status code: 400; Message: {\"error\":\"partial write of line protocol occurred\""
)
);
}
diff --git a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java
index a1b7974c..a2432da8 100644
--- a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java
+++ b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java
@@ -57,9 +57,10 @@ void optionsBasics() {
@Test
void optionsEqualAll() {
- WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true);
+ WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true, true, null, null, null);
WriteOptions optionsViaBuilder = new WriteOptions.Builder()
- .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true).build();
+ .database("my-database").precision(WritePrecision.S).gzipThreshold(512)
+ .noSync(true).acceptPartial(true).build();
Assertions.assertThat(options).isEqualTo(optionsViaBuilder);
@@ -67,6 +68,9 @@ void optionsEqualAll() {
.database("my-database").precision(WritePrecision.S).gzipThreshold(1024).noSync(true).build();
WriteOptions noSyncMismatch = new WriteOptions.Builder()
.database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(false).build();
+ WriteOptions acceptPartialMismatch = new WriteOptions.Builder()
+ .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true)
+ .acceptPartial(false).build();
WriteOptions defaultTagsMismatch = new WriteOptions.Builder()
.database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true)
.defaultTags(Map.of("region", "west")).build();
@@ -79,6 +83,7 @@ void optionsEqualAll() {
Assertions.assertThat(options).isNotEqualTo(gzipMismatch);
Assertions.assertThat(options).isNotEqualTo(noSyncMismatch);
+ Assertions.assertThat(options).isNotEqualTo(acceptPartialMismatch);
Assertions.assertThat(options).isNotEqualTo(defaultTagsMismatch);
Assertions.assertThat(options).isNotEqualTo(tagOrderMismatch);
Assertions.assertThat(options).isNotEqualTo(headersMismatch);
@@ -147,12 +152,14 @@ void optionsEmpty() {
Assertions.assertThat(options.databaseSafe(config)).isEqualTo("my-database");
Assertions.assertThat(options.precisionSafe(config)).isEqualTo(WriteOptions.DEFAULT_WRITE_PRECISION);
Assertions.assertThat(options.gzipThresholdSafe(config)).isEqualTo(WriteOptions.DEFAULT_GZIP_THRESHOLD);
+ Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL);
Assertions.assertThat(options.tagOrderSafe()).isEmpty();
WriteOptions builderOptions = new WriteOptions.Builder().build();
Assertions.assertThat(builderOptions.databaseSafe(config)).isEqualTo("my-database");
Assertions.assertThat(builderOptions.precisionSafe(config)).isEqualTo(WritePrecision.S);
Assertions.assertThat(builderOptions.gzipThresholdSafe(config)).isEqualTo(512);
+ Assertions.assertThat(builderOptions.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL);
}
@Test
@@ -234,6 +241,19 @@ void optionsOverrideWriteNoSync() {
Assertions.assertThat(options.noSyncSafe(config)).isEqualTo(false);
}
+ @Test
+ void optionsOverrideWriteAcceptPartial() {
+ ClientConfig config = configBuilder
+ .database("my-database")
+ .organization("my-org")
+ .writeAcceptPartial(true)
+ .build();
+
+ WriteOptions options = new WriteOptions.Builder().acceptPartial(false).build();
+
+ Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(false);
+ }
+
@Test
void optionsOverridesDefaultTags() {
Map defaultTagsBase = new HashMap<>() {{
@@ -303,6 +323,8 @@ void optionsHashCode() {
.isNotEqualTo(builder.database("my-database").build().hashCode());
Assertions.assertThat(baseOptions.hashCode())
.isNotEqualTo(builder.defaultTags(defaultTags).build().hashCode());
+ Assertions.assertThat(baseOptions.hashCode())
+ .isNotEqualTo(builder.acceptPartial(true).build().hashCode());
Assertions.assertThat(baseOptions.hashCode())
.isNotEqualTo(builder.tagOrder(List.of("region", "host")).build().hashCode());
}