diff --git a/CHANGELOG.md b/CHANGELOG.md index df16256c..8e224172 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ 1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client. 1. [#363](https://github.com/InfluxCommunity/influxdb3-java/pull/363): Support custom tag order via `tagOrder` write option. See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. +1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Support partial writes via `acceptPartial` write option. + See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. ## 1.8.0 [2026-02-19] diff --git a/README.md b/README.md index 7c7c6c1e..ae66ad49 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ import java.util.List; import java.util.stream.Stream; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.write.WriteOptions; @@ -113,6 +114,23 @@ client.writePoint( orderedTagWrite ); +// +// Write with partial acceptance +// +WriteOptions partialWrite = new WriteOptions.Builder() + .acceptPartial(true) + .build(); +try { + client.writeRecords(List.of( + "temperature,region=west value=20.0", + "temperature,region=west value=\"bad\"" + ), partialWrite); +} catch (InfluxDBPartialWriteException e) { + // Inspect failed line details. + e.lineErrors().forEach(line -> + System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine())); +} + // // Write by LineProtocol // diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index 4ea1ea20..35653c02 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -536,6 +536,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) { *
  • precision - timestamp precision when writing data
  • *
  • gzipThreshold - payload size size for gzipping data
  • *
  • writeNoSync - skip waiting for WAL persistence on write
  • + *
  • writeAcceptPartial - accept partial writes
  • * * * @param connectionString connection string @@ -569,6 +570,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) { *
  • INFLUX_PRECISION - timestamp precision when writing data
  • *
  • INFLUX_GZIP_THRESHOLD - payload size size for gzipping data
  • *
  • INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write
  • + *
  • INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes
  • * * Supported system properties: * * * @return instance of {@link InfluxDBClient} diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java b/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java new file mode 100644 index 00000000..9e7a2494 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/InfluxDBPartialWriteException.java @@ -0,0 +1,110 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.net.http.HttpHeaders; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * HTTP exception for partial write errors returned by InfluxDB 3 write endpoint. + * Contains parsed line-level write errors so callers can decide how to handle failed lines. + */ +public class InfluxDBPartialWriteException extends InfluxDBApiHttpException { + + private final List lineErrors; + + /** + * Construct a new InfluxDBPartialWriteException. + * + * @param message detail message + * @param headers response headers + * @param statusCode response status code + * @param lineErrors line-level errors parsed from response body + */ + public InfluxDBPartialWriteException( + @Nullable final String message, + @Nullable final HttpHeaders headers, + final int statusCode, + @Nonnull final List lineErrors) { + super(message, headers, statusCode); + this.lineErrors = List.copyOf(lineErrors); + } + + /** + * Line-level write errors. + * + * @return immutable list of line errors + */ + @Nonnull + public List lineErrors() { + return lineErrors; + } + + /** + * Represents one failed line from a partial write response. + */ + public static final class LineError { + + private final Integer lineNumber; + private final String errorMessage; + private final String originalLine; + + /** + * @param lineNumber line number in the write payload; may be null if not provided by server + * @param errorMessage line-level error message + * @param originalLine original line protocol row; may be null if not provided by server + */ + public LineError(@Nullable final Integer lineNumber, + @Nonnull final String errorMessage, + @Nullable final String originalLine) { + this.lineNumber = lineNumber; + this.errorMessage = errorMessage; + this.originalLine = originalLine; + } + + /** + * @return line number or null if server didn't provide it + */ + @Nullable + public Integer lineNumber() { + return lineNumber; + } + + /** + * @return line-level error message + */ + @Nonnull + public String errorMessage() { + return errorMessage; + } + + /** + * @return original line protocol row or null if server didn't provide it + */ + @Nullable + public String originalLine() { + return originalLine; + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index 4326f3a5..df413fb3 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -57,6 +57,7 @@ *
  • defaultTags - defaultTags added when writing points to InfluxDB
  • *
  • gzipThreshold - threshold when gzip compression is used for writing points to InfluxDB
  • *
  • writeNoSync - skip waiting for WAL persistence on write
  • + *
  • writeAcceptPartial - accept partial writes
  • *
  • timeout - deprecated in 1.4.0 timeout when connecting to InfluxDB, * please use more informative properties writeTimeout and queryTimeout
  • *
  • writeTimeout - timeout when writing data to InfluxDB
  • @@ -107,6 +108,7 @@ public final class ClientConfig { private final WritePrecision writePrecision; private final Integer gzipThreshold; private final Boolean writeNoSync; + private final Boolean writeAcceptPartial; private final Map defaultTags; @Deprecated private final Duration timeout; @@ -208,6 +210,16 @@ public Boolean getWriteNoSync() { return writeNoSync; } + /** + * Accept partial writes? + * + * @return accept partial writes + */ + @Nonnull + public Boolean getWriteAcceptPartial() { + return writeAcceptPartial; + } + /** * Gets default tags used when writing points. * @return default tags @@ -370,6 +382,7 @@ public boolean equals(final Object o) { && writePrecision == that.writePrecision && Objects.equals(gzipThreshold, that.gzipThreshold) && Objects.equals(writeNoSync, that.writeNoSync) + && Objects.equals(writeAcceptPartial, that.writeAcceptPartial) && Objects.equals(defaultTags, that.defaultTags) && Objects.equals(timeout, that.timeout) && Objects.equals(writeTimeout, that.writeTimeout) @@ -388,7 +401,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { return Objects.hash(host, Arrays.hashCode(token), authScheme, organization, - database, writePrecision, gzipThreshold, writeNoSync, + database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial, timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation, proxy, proxyUrl, authenticator, headers, defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors); @@ -403,6 +416,7 @@ public String toString() { .add("writePrecision=" + writePrecision) .add("gzipThreshold=" + gzipThreshold) .add("writeNoSync=" + writeNoSync) + .add("writeAcceptPartial=" + writeAcceptPartial) .add("timeout=" + timeout) .add("writeTimeout=" + writeTimeout) .add("queryTimeout=" + queryTimeout) @@ -432,6 +446,7 @@ public static final class Builder { private WritePrecision writePrecision; private Integer gzipThreshold; private Boolean writeNoSync; + private Boolean writeAcceptPartial; private Map defaultTags; @Deprecated private Duration timeout; @@ -554,6 +569,19 @@ public Builder writeNoSync(@Nullable final Boolean writeNoSync) { return this; } + /** + * Sets whether to accept partial writes. + * + * @param writeAcceptPartial accept partial writes + * @return this + */ + @Nonnull + public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) { + + this.writeAcceptPartial = writeAcceptPartial; + return this; + } + /** * Sets default tags to be written with points. * @@ -800,6 +828,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform if (parameters.containsKey("writeNoSync")) { this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync"))); } + if (parameters.containsKey("writeAcceptPartial")) { + this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial"))); + } if (parameters.containsKey("disableGRPCCompression")) { this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression"))); } @@ -855,6 +886,10 @@ public ClientConfig build(@Nonnull final Map env, final Properti if (writeNoSync != null) { this.writeNoSync(Boolean.parseBoolean(writeNoSync)); } + final String writeAcceptPartial = get.apply("INFLUX_WRITE_ACCEPT_PARTIAL", "influx.writeAcceptPartial"); + if (writeAcceptPartial != null) { + this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial)); + } final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout"); if (writeTimeout != null) { long to = Long.parseLong(writeTimeout); @@ -911,6 +946,9 @@ private ClientConfig(@Nonnull final Builder builder) { writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION; gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD; writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC; + writeAcceptPartial = builder.writeAcceptPartial != null + ? builder.writeAcceptPartial + : WriteOptions.DEFAULT_ACCEPT_PARTIAL; defaultTags = builder.defaultTags; timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT); writeTimeout = builder.writeTimeout != null diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 8c2ce466..0e5e8dd0 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -309,15 +309,22 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti String path; Map queryParams; boolean noSync = options.noSyncSafe(config); - if (noSync) { - // Setting no_sync=true is supported only in the v3 API. + boolean acceptPartial = options.acceptPartialSafe(config); + boolean useV3Write = noSync || acceptPartial; + if (useV3Write) { + // no_sync=true and accept_partial=true are supported only in the v3 API. path = "api/v3/write_lp"; queryParams = new HashMap<>() {{ put("org", config.getOrganization()); put("db", database); put("precision", WritePrecisionConverter.toV3ApiString(precision)); - put("no_sync", "true"); }}; + if (noSync) { + queryParams.put("no_sync", "true"); + } + if (acceptPartial) { + queryParams.put("accept_partial", "true"); + } } else { // By default, use the v2 API. path = "api/v2/write"; @@ -373,10 +380,12 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti try { restClient.request(path, HttpMethod.POST, body, queryParams, headers); } catch (InfluxDBApiHttpException e) { - if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { - // Server does not support the v3 write API, can't use the NoSync option. + if (useV3Write && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) { + // Server does not support the v3 write API, can't use v3-only write options. throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true " - + "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode()); + + "or AcceptPartial=true (supported by InfluxDB 3 Core/Enterprise servers only).", + e.headers(), + e.statusCode()); } throw e; } diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 6fb0e05b..a9afbfa3 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -46,6 +46,7 @@ import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -57,6 +58,7 @@ import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBApiHttpException; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.config.ClientConfig; final class RestClient implements AutoCloseable { @@ -219,7 +221,8 @@ HttpResponse request(@Nonnull final String path, if (statusCode < 200 || statusCode >= 300) { String reason; String body = response.body(); - reason = formatErrorMessage(body, response.headers().firstValue("Content-Type").orElse(null)); + String contentType = response.headers().firstValue("Content-Type").orElse(null); + reason = formatErrorMessage(body, contentType); if (reason == null) { reason = ""; @@ -241,6 +244,11 @@ HttpResponse request(@Nonnull final String path, } String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason); + List lineErrors = + parsePartialWriteLineErrors(path, body, contentType); + if (!lineErrors.isEmpty()) { + throw new InfluxDBPartialWriteException(message, response.headers(), response.statusCode(), lineErrors); + } throw new InfluxDBApiHttpException(message, response.headers(), response.statusCode()); } @@ -278,11 +286,7 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St if (error != null && dataNode != null && dataNode.isArray()) { final StringBuilder message = new StringBuilder(error); boolean hasDetails = false; - for (JsonNode item : dataNode) { - final String detail = errFormatDataArrayDetail(item); - if (detail == null) { - continue; - } + for (String detail : errFormatDataArrayDetails(dataNode)) { if (!hasDetails) { message.append(':'); hasDetails = true; @@ -306,12 +310,89 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St } } + @Nonnull + private List parsePartialWriteLineErrors( + @Nonnull final String path, + @Nonnull final String body, + @Nullable final String contentType) { + if (!isWriteEndpoint(path)) { + return List.of(); + } + + if (body.isEmpty()) { + return List.of(); + } + + if (contentType != null + && !contentType.isEmpty() + && !contentType.regionMatches(true, 0, "application/json", 0, "application/json".length())) { + return List.of(); + } + + try { + final JsonNode root = objectMapper.readTree(body); + if (!root.isObject()) { + return List.of(); + } + + final String error = errNonEmptyField(root, "error"); + final JsonNode dataNode = root.get("data"); + if (error == null || dataNode == null) { + return List.of(); + } + + if (dataNode.isArray()) { + final ErrDataArrayItem[] parsed = errReadDataArray(dataNode); + if (parsed == null) { + return List.of(); + } + + final List lineErrors = new ArrayList<>(); + for (ErrDataArrayItem item : parsed) { + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); + if (lineError != null) { + lineErrors.add(lineError); + } + } + return lineErrors; + } + + if (dataNode.isObject()) { + try { + final ErrDataArrayItem item = objectMapper.treeToValue(dataNode, ErrDataArrayItem.class); + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); + return lineError == null ? List.of() : List.of(lineError); + } catch (JsonProcessingException e) { + return List.of(); + } + } + + return List.of(); + } catch (JsonProcessingException e) { + LOG.debug("Can't parse line errors from response body {}", body, e); + return List.of(); + } + } + + private boolean isWriteEndpoint(@Nonnull final String path) { + return "api/v2/write".equals(path) || "api/v3/write_lp".equals(path); + } + @Nullable private String errNonEmptyText(@Nullable final JsonNode node) { if (node == null || node.isNull()) { return null; } - final String value = node.asText(); + + final String value; + if (node.isTextual()) { + value = node.asText(); + } else if (node.isNumber() || node.isBoolean()) { + value = node.asText(); + } else { + value = node.toString(); + } + return value.isEmpty() ? null : value; } @@ -323,25 +404,68 @@ private String errNonEmptyField(@Nullable final JsonNode object, @Nonnull final return errNonEmptyText(object.get(fieldName)); } + @Nonnull + private List errFormatDataArrayDetails(@Nonnull final JsonNode dataNode) { + final ErrDataArrayItem[] parsed = errReadDataArray(dataNode); + if (parsed != null) { + final List details = new ArrayList<>(); + for (ErrDataArrayItem item : parsed) { + final InfluxDBPartialWriteException.LineError lineError = errToLineError(item); + if (lineError == null) { + continue; + } + + if (lineError.lineNumber() != null + && lineError.originalLine() != null + && !lineError.originalLine().isEmpty()) { + details.add("line " + lineError.lineNumber() + ": " + + lineError.errorMessage() + " (" + lineError.originalLine() + ")"); + } else { + details.add(lineError.errorMessage()); + } + } + return details; + } + + final List details = new ArrayList<>(); + for (JsonNode item : dataNode) { + final String raw = errNonEmptyText(item); + if (raw != null) { + details.add(raw); + } + } + return details; + } + @Nullable - private String errFormatDataArrayDetail(@Nullable final JsonNode item) { - if (!item.isObject()) { + private ErrDataArrayItem[] errReadDataArray(@Nonnull final JsonNode dataNode) { + try { + return objectMapper.treeToValue(dataNode, ErrDataArrayItem[].class); + } catch (JsonProcessingException e) { return null; } + } - final String errorMessage = errNonEmptyField(item, "error_message"); - if (errorMessage == null) { + @Nullable + private InfluxDBPartialWriteException.LineError errToLineError(@Nullable final ErrDataArrayItem item) { + if (item == null || item.errorMessage == null || item.errorMessage.isEmpty()) { return null; } - if (item.hasNonNull("line_number")) { - final String originalLine = errNonEmptyField(item, "original_line"); - if (originalLine != null) { - final String lineNumber = item.get("line_number").asText(); - return "line " + lineNumber + ": " + errorMessage + " (" + originalLine + ")"; - } - } - return errorMessage; + final String originalLine = + (item.originalLine == null || item.originalLine.isEmpty()) ? null : item.originalLine; + return new InfluxDBPartialWriteException.LineError(item.lineNumber, item.errorMessage, originalLine); + } + + private static final class ErrDataArrayItem { + @JsonProperty("error_message") + private String errorMessage; + + @JsonProperty("line_number") + private Integer lineNumber; + + @JsonProperty("original_line") + private String originalLine; } private X509TrustManager getX509TrustManagerFromFile(@Nonnull final String filePath) { diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index e9b4ca1b..8fddccd6 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -68,6 +68,10 @@ public final class WriteOptions { * Default NoSync. */ public static final boolean DEFAULT_NO_SYNC = false; + /** + * Default AcceptPartial. + */ + public static final boolean DEFAULT_ACCEPT_PARTIAL = false; /** * Default timeout for writes in seconds. Set to {@value} @@ -81,12 +85,14 @@ public final class WriteOptions { @Deprecated(forRemoval = true) public static final WriteOptions DEFAULTS = new WriteOptions( - null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, null, null, null); + null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, DEFAULT_ACCEPT_PARTIAL, + null, null, null); private final String database; private final WritePrecision precision; private final Integer gzipThreshold; private final Boolean noSync; + private final Boolean acceptPartial; private final Map defaultTags; private final List tagOrder; private final Map headers; @@ -99,6 +105,7 @@ public final class WriteOptions { */ public static WriteOptions defaultWriteOptions() { return new WriteOptions(null, DEFAULT_WRITE_PRECISION, DEFAULT_GZIP_THRESHOLD, DEFAULT_NO_SYNC, + DEFAULT_ACCEPT_PARTIAL, null, null, null); } @@ -152,7 +159,7 @@ public WriteOptions(@Nullable final String database, @Nullable final WritePrecision precision, @Nullable final Integer gzipThreshold, @Nullable final Boolean noSync) { - this(database, precision, gzipThreshold, noSync, null, null); + this(database, precision, gzipThreshold, noSync, null, null, null); } /** @@ -184,7 +191,7 @@ public WriteOptions(@Nullable final String database, @Nullable final Integer gzipThreshold, @Nullable final Map defaultTags, @Nullable final Map headers) { - this(database, precision, gzipThreshold, null, defaultTags, headers); + this(database, precision, gzipThreshold, null, null, defaultTags, headers, null); } /** @@ -209,7 +216,7 @@ public WriteOptions(@Nullable final String database, @Nullable final Boolean noSync, @Nullable final Map defaultTags, @Nullable final Map headers) { - this(database, precision, gzipThreshold, noSync, defaultTags, headers, null); + this(database, precision, gzipThreshold, noSync, null, defaultTags, headers, null); } /** @@ -223,6 +230,8 @@ public WriteOptions(@Nullable final String database, * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. * @param noSync Skip waiting for WAL persistence on write. * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param acceptPartial Request partial write acceptance. + * If it is not specified then use {@link WriteOptions#DEFAULT_ACCEPT_PARTIAL}. * @param defaultTags Default tags to be added when writing points. * @param headers The headers to be added to write request. * The headers specified here are preferred over the headers @@ -234,6 +243,7 @@ public WriteOptions(@Nullable final String database, @Nullable final WritePrecision precision, @Nullable final Integer gzipThreshold, @Nullable final Boolean noSync, + @Nullable final Boolean acceptPartial, @Nullable final Map defaultTags, @Nullable final Map headers, @Nullable final List tagOrder) { @@ -241,11 +251,40 @@ public WriteOptions(@Nullable final String database, this.precision = precision; this.gzipThreshold = gzipThreshold; this.noSync = noSync; + this.acceptPartial = acceptPartial; this.defaultTags = defaultTags == null ? Map.of() : defaultTags; this.tagOrder = sanitizeTagOrder(tagOrder); this.headers = headers == null ? Map.of() : headers; } + /** + * Construct WriteAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. + * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. + * @param noSync Skip waiting for WAL persistence on write. + * If it is not specified then use {@link WriteOptions#DEFAULT_NO_SYNC}. + * @param defaultTags Default tags to be added when writing points. + * @param headers The headers to be added to write request. + * The headers specified here are preferred over the headers + * specified in the client configuration. + * @param tagOrder Preferred order of tags in line protocol serialization. + * Null or empty tag names are ignored. + */ + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Boolean noSync, + @Nullable final Map defaultTags, + @Nullable final Map headers, + @Nullable final List tagOrder) { + this(database, precision, gzipThreshold, noSync, null, defaultTags, headers, tagOrder); + } + /** * @param config with default value * @return The destination database for writes. @@ -299,8 +338,16 @@ public Integer gzipThresholdSafe(@Nonnull final ClientConfig config) { */ public boolean noSyncSafe(@Nonnull final ClientConfig config) { Arguments.checkNotNull(config, "config"); - return noSync != null ? noSync - : (config.getWriteNoSync() != null ? config.getWriteNoSync() : DEFAULT_NO_SYNC); + return noSync != null ? noSync : config.getWriteNoSync(); + } + + /** + * @param config with default value + * @return Accept partial write. + */ + public boolean acceptPartialSafe(@Nonnull final ClientConfig config) { + Arguments.checkNotNull(config, "config"); + return acceptPartial != null ? acceptPartial : config.getWriteAcceptPartial(); } /** @@ -332,6 +379,7 @@ public boolean equals(final Object o) { && precision == that.precision && Objects.equals(gzipThreshold, that.gzipThreshold) && Objects.equals(noSync, that.noSync) + && Objects.equals(acceptPartial, that.acceptPartial) && defaultTags.equals(that.defaultTags) && tagOrder.equals(that.tagOrder) && headers.equals(that.headers); @@ -339,7 +387,8 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(database, precision, gzipThreshold, noSync, defaultTags, tagOrder, headers); + return Objects.hash(database, precision, gzipThreshold, noSync, acceptPartial, defaultTags, tagOrder, + headers); } private boolean isNotDefined(final String option) { @@ -368,6 +417,7 @@ public static final class Builder { private WritePrecision precision; private Integer gzipThreshold; private Boolean noSync; + private Boolean acceptPartial; private Map defaultTags = new HashMap<>(); private List tagOrder = List.of(); private Map headers = new HashMap<>(); @@ -424,6 +474,19 @@ public Builder noSync(@Nonnull final Boolean noSync) { return this; } + /** + * Sets whether to request partial write acceptance. + * + * @param acceptPartial request partial write acceptance + * @return this + */ + @Nonnull + public Builder acceptPartial(@Nonnull final Boolean acceptPartial) { + + this.acceptPartial = acceptPartial; + return this; + } + /** * Sets defaultTags. * @@ -473,7 +536,7 @@ public WriteOptions build() { } private WriteOptions(@Nonnull final Builder builder) { - this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.defaultTags, - builder.headers, builder.tagOrder); + this(builder.database, builder.precision, builder.gzipThreshold, builder.noSync, builder.acceptPartial, + builder.defaultTags, builder.headers, builder.tagOrder); } } diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index a7d31240..23569283 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -224,6 +224,23 @@ void writeNoSyncTrueUsesV3API() throws InterruptedException { assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); } + @Test + void writeAcceptPartialTrueUsesV3API() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.NS).acceptPartial(true).build()); + + assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + assertThat(request).isNotNull(); + assertThat(request.getUrl()).isNotNull(); + assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); + assertThat(request.getUrl().queryParameter("no_sync")).isNull(); + assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo("true"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("nanosecond"); + } + @Test void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); @@ -242,7 +259,29 @@ void writeNoSyncTrueOnV2ServerThrowsException() throws InterruptedException { assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond"); assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); - assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true" + assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true or AcceptPartial=true" + + " (supported by InfluxDB 3 Core/Enterprise servers only)."); + } + + @Test + void writeAcceptPartialTrueOnV2ServerThrowsException() throws InterruptedException { + mockServer.enqueue(createEmptyResponse(HttpResponseStatus.METHOD_NOT_ALLOWED.code())); + + InfluxDBApiHttpException ae = org.junit.jupiter.api.Assertions.assertThrows(InfluxDBApiHttpException.class, + () -> client.writeRecord("mem,tag=one value=1.0", + new WriteOptions.Builder().precision(WritePrecision.MS).acceptPartial(true).build()) + ); + + assertThat(mockServer.getRequestCount()).isEqualTo(1); + RecordedRequest request = mockServer.takeRequest(); + assertThat(request).isNotNull(); + assertThat(request.getUrl()).isNotNull(); + assertThat(request.getUrl().encodedPath()).isEqualTo("/api/v3/write_lp"); + assertThat(request.getUrl().queryParameter("accept_partial")).isEqualTo("true"); + assertThat(request.getUrl().queryParameter("precision")).isEqualTo("millisecond"); + + assertThat(ae.statusCode()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED.code()); + assertThat(ae.getMessage()).contains("Server doesn't support write with NoSync=true or AcceptPartial=true" + " (supported by InfluxDB 3 Core/Enterprise servers only)."); } @@ -256,7 +295,7 @@ void writeRecordWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); } @Test @@ -272,7 +311,21 @@ void writeRecordWithDefaultWriteOptionsCustomConfig() throws Exception { client.writeRecord("mem,tag=one value=1.0"); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); + } + + @Test + void writeRecordWithDefaultWriteOptionsAcceptPartialConfig() throws Exception { + mockServer.enqueue(createResponse(200)); + + ClientConfig cfg = new ClientConfig.Builder().host(baseURL).token("TOKEN".toCharArray()).database("DB") + .writeAcceptPartial(true) + .build(); + try (InfluxDBClient client = InfluxDBClient.getInstance(cfg)) { + client.writeRecord("mem,tag=one value=1.0"); + } + + checkWriteCalled("/api/v3/write_lp", "DB", "nanosecond", false, true, false); } @Test @@ -285,7 +338,7 @@ void writeRecordsWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writeRecords(List.of("mem,tag=one value=1.0")); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); } @Test @@ -301,7 +354,7 @@ void writeRecordsWithDefaultWriteOptionsCustomConfig() throws Exception { client.writeRecords(List.of("mem,tag=one value=1.0")); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); } @Test @@ -317,7 +370,7 @@ void writePointWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writePoint(point); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); } @Test @@ -336,7 +389,7 @@ void writePointWithDefaultWriteOptionsCustomConfig() throws Exception { client.writePoint(point); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); } @Test @@ -352,7 +405,7 @@ void writePointsWithDefaultWriteOptionsDefaultConfig() throws Exception { client.writePoints(List.of(point)); } - checkWriteCalled("/api/v2/write", "DB", "ns", false, false); + checkWriteCalled("/api/v2/write", "DB", "ns", false, false, false); } @Test @@ -371,17 +424,19 @@ void writePointsWithDefaultWriteOptionsCustomConfig() throws Exception { client.writePoints(List.of(point)); } - checkWriteCalled("/api/v3/write_lp", "DB", "second", true, true); + checkWriteCalled("/api/v3/write_lp", "DB", "second", true, false, true); } private void checkWriteCalled(final String expectedPath, final String expectedDB, final String expectedPrecision, final boolean expectedNoSync, + final boolean expectedAcceptPartial, final boolean expectedGzip) throws InterruptedException { RecordedRequest request = assertThatServerRequested(); HttpUrl requestUrl = request.getUrl(); assertThat(requestUrl).isNotNull(); assertThat(requestUrl.encodedPath()).isEqualTo(expectedPath); - if (expectedNoSync) { + boolean expectedV3 = expectedNoSync || expectedAcceptPartial; + if (expectedV3) { assertThat(requestUrl.queryParameter("db")).isEqualTo(expectedDB); } else { assertThat(requestUrl.queryParameter("bucket")).isEqualTo(expectedDB); @@ -392,6 +447,11 @@ private void checkWriteCalled(final String expectedPath, final String expectedDB } else { assertThat(requestUrl.queryParameter("no_sync")).isNull(); } + if (expectedAcceptPartial) { + assertThat(requestUrl.queryParameter("accept_partial")).isEqualTo("true"); + } else { + assertThat(requestUrl.queryParameter("accept_partial")).isNull(); + } if (expectedGzip) { assertThat(request.getHeaders().get("Content-Encoding")).isEqualTo("gzip"); } else { diff --git a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java index f5ce2e8f..f940afd7 100644 --- a/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java +++ b/src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java @@ -56,6 +56,21 @@ void equalConfig() { Assertions.assertThat(config).isEqualTo(config); Assertions.assertThat(config).isEqualTo(configBuilder.build()); Assertions.assertThat(config).isNotEqualTo(configBuilder); + Assertions.assertThat(config).isNotEqualTo(new ClientConfig.Builder() + .host("http://localhost:9999") + .token("my-token".toCharArray()) + .organization("my-org") + .database("my-db") + .writePrecision(WritePrecision.NS) + .writeAcceptPartial(true) + .timeout(Duration.ofSeconds(30)) + .writeTimeout(Duration.ofSeconds(35)) + .queryTimeout(Duration.ofSeconds(120)) + .allowHttpRedirects(true) + .disableServerCertificateValidation(true) + .headers(Map.of("X-device", "ab-01")) + .disableGRPCCompression(true) + .build()); Assertions.assertThat(config).isNotEqualTo(configBuilder.database("database").build()); } @@ -79,6 +94,7 @@ void toStringConfig() { Assertions.assertThat(configString.contains("database='my-db'")).isEqualTo(true); Assertions.assertThat(configString.contains("gzipThreshold=1000")).isEqualTo(true); Assertions.assertThat(configString).contains("writeNoSync=false"); + Assertions.assertThat(configString).contains("writeAcceptPartial=false"); Assertions.assertThat(configString).contains("timeout=PT30S"); Assertions.assertThat(configString).contains("writeTimeout=PT35S"); Assertions.assertThat(configString).contains("queryTimeout=PT2M"); @@ -90,7 +106,8 @@ void toStringConfig() { void fromConnectionString() throws MalformedURLException { ClientConfig cfg = new ClientConfig.Builder() .build("http://localhost:9999/" - + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128&writeNoSync=true"); + + "?token=my-token&org=my-org&database=my-db&gzipThreshold=128" + + "&writeNoSync=true&writeAcceptPartial=true"); Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/"); Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray()); Assertions.assertThat(cfg.getOrganization()).isEqualTo("my-org"); @@ -98,6 +115,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); // default Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(128); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -109,6 +127,7 @@ void fromConnectionString() throws MalformedURLException { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.US); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); // default Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); cfg = new ClientConfig.Builder() .build("http://localhost:9999/" @@ -208,6 +227,7 @@ void fromEnv() { "INFLUX_PRECISION", "ms", "INFLUX_GZIP_THRESHOLD", "64", "INFLUX_WRITE_NO_SYNC", "true", + "INFLUX_WRITE_ACCEPT_PARTIAL", "true", "INFLUX_DISABLE_GRPC_COMPRESSION", "true" ); @@ -220,6 +240,7 @@ void fromEnv() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue(); } @@ -287,6 +308,7 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.NS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(1000); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(WriteOptions.DEFAULT_NO_SYNC); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); // basic properties = new Properties(); @@ -324,6 +346,7 @@ void fromSystemProperties() { properties.put("influx.precision", "ms"); properties.put("influx.gzipThreshold", "64"); properties.put("influx.writeNoSync", "true"); + properties.put("influx.writeAcceptPartial", "true"); properties.put("influx.disableGRPCCompression", "true"); cfg = new ClientConfig.Builder() .build(new HashMap<>(), properties); @@ -334,6 +357,7 @@ void fromSystemProperties() { Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS); Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64); Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true); + Assertions.assertThat(cfg.getWriteAcceptPartial()).isEqualTo(true); Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue(); } diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 6ec5ee72..1df052b0 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -41,7 +41,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import com.influxdb.v3.client.InfluxDBApiHttpException; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; @@ -186,6 +188,78 @@ public void testQuery() throws Exception { } } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testAcceptPartialWriteError() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + String points = "temperature,room=room1 value=18.94647\n" + + "temperatureroom=room2value=20.268019\n" + + "temperature,room=room3 value=24.064857\n" + + "temperature,room=room4 value=43i"; + + WriteOptions options = new WriteOptions.Builder() + .acceptPartial(true) + .build(); + + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points, options)); + Assertions.assertThat(thrown).isInstanceOf(InfluxDBPartialWriteException.class); + + String expectedMessage = "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: Expected at least one space character, got end of input (temperatureroom=room)\n" + + "\tline 4: invalid column type for column 'value', expected iox::column_type::field::float, " + + "got iox::column_type::field::integer (temperature,room=roo)"; + Assertions.assertThat(thrown.getMessage()).isEqualTo(expectedMessage); + + InfluxDBPartialWriteException partialError = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialError.lineErrors()).hasSize(2); + Assertions.assertThat(partialError.lineErrors().get(0).lineNumber()).isEqualTo(2); + Assertions.assertThat(partialError.lineErrors().get(0).errorMessage()) + .isEqualTo("Expected at least one space character, got end of input"); + Assertions.assertThat(partialError.lineErrors().get(0).originalLine()) + .isEqualTo("temperatureroom=room"); + + Assertions.assertThat(partialError.lineErrors().get(1).lineNumber()).isEqualTo(4); + Assertions.assertThat(partialError.lineErrors().get(1).errorMessage()) + .isEqualTo("invalid column type for column 'value', expected iox::column_type::field::float, " + + "got iox::column_type::field::integer"); + Assertions.assertThat(partialError.lineErrors().get(1).originalLine()) + .isEqualTo("temperature,room=roo"); + + Assertions.assertThat(partialError).isInstanceOf(InfluxDBApiHttpException.class); + Assertions.assertThat(partialError.statusCode()).isEqualTo(400); + } + } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testWriteErrorWithoutAcceptPartial() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + + String points = "temperature,room=room1 value=18.94647\n" + + "temperatureroom=room2value=20.268019\n" + + "temperature,room=room3 value=24.064857\n" + + "temperature,room=room4 value=43i"; + + Throwable thrown = Assertions.catchThrowable(() -> client.writeRecord(points)); + Assertions.assertThat(thrown.getMessage()) + .isEqualTo("HTTP status code: 400; Message: write buffer error: " + + "line protocol parse failed: Expected at least one space character, got end of input"); + } + } + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index b6d4a26b..fb481423 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -51,6 +51,7 @@ import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBApiHttpException; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.InfluxDBPartialWriteException; import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.write.WriteOptions; @@ -546,11 +547,19 @@ public void errorFromBodyV3WithDataObject() { // Core/Enterprise object format .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBPartialWriteException.class) + .isInstanceOf(InfluxDBApiHttpException.class) .hasMessage("HTTP status code: 400; Message: invalid field value"); + + InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialWriteException.statusCode()).isEqualTo(400); + Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1); + InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0); + Assertions.assertThat(lineError.lineNumber()).isNull(); + Assertions.assertThat(lineError.errorMessage()).isEqualTo("invalid field value"); + Assertions.assertThat(lineError.originalLine()).isNull(); } @Test @@ -567,13 +576,43 @@ public void errorFromBodyV3WithDataArray() { .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBPartialWriteException.class) + .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer," + + " got iox::column_type::field::float (testa6a3ad v=1 17702)"); + + InfluxDBPartialWriteException partialWriteException = (InfluxDBPartialWriteException) thrown; + Assertions.assertThat(partialWriteException.lineErrors()).hasSize(1); + InfluxDBPartialWriteException.LineError lineError = partialWriteException.lineErrors().get(0); + Assertions.assertThat(lineError.lineNumber()).isEqualTo(2); + Assertions.assertThat(lineError.errorMessage()) + .isEqualTo("invalid column type for column 'v', expected iox::column_type::field::integer," + + " got iox::column_type::field::float"); + Assertions.assertThat(lineError.originalLine()).isEqualTo("testa6a3ad v=1 17702"); + } + + @Test + public void errorFromBodyV3WithDataArrayAnyInvalidItemFallsBackToHttpException() { + mockServer.enqueue(createResponse(400, + "application/json", + null, + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}," + + "{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}]}")); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .build()); + + Throwable thrown = catchThrowable(() -> restClient.request("api/v3/write_lp", HttpMethod.POST, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(InfluxDBApiHttpException.class) + .isNotInstanceOf(InfluxDBPartialWriteException.class) .hasMessage("HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::integer," - + " got iox::column_type::field::float (testa6a3ad v=1 17702)"); + + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}\n" + + "\t{\"error_message\":\"bad line 2\",\"line_number\":\"x\",\"original_line\":\"bad lp 2\"}"); } @ParameterizedTest(name = "{0}") @@ -630,7 +669,8 @@ private static Stream errorFromBodyV3WithDataArrayCases() { "{\"error\":\"partial write of line protocol occurred\",\"data\":[1,{\"error_message\":" + "\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", "HTTP status code: 400; Message: partial write of line protocol occurred:\n" - + "\tline 2: bad line (bad lp)" + + "\t1\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}" ), Arguments.of( "null error_message skipped", @@ -652,6 +692,47 @@ private static Stream errorFromBodyV3WithDataArrayCases() { "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + "\tline 2: bad line (bad lp)\n" + "\tsecond issue" + ), + Arguments.of( + "array of strings fallback", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[\"bad line 1\",\"bad line 2\"]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tbad line 1\n" + + "\tbad line 2" + ), + Arguments.of( + "textual numeric line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":\"2\",\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\tline 2: bad line (bad lp)" + ), + Arguments.of( + "textual non-numeric line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}" + ), + Arguments.of( + "empty textual line_number with empty original_line", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"only error message\",\"line_number\":\"\",\"original_line\":\"\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n\tonly error message" + ), + Arguments.of( + "non-textual line_number", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":true,\"original_line\":\"bad lp\"}" + ), + Arguments.of( + "object line_number preserved as text", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":" + + "\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}]}", + "HTTP status code: 400; Message: partial write of line protocol occurred:\n" + + "\t{\"error_message\":\"bad line\",\"line_number\":{\"index\":2},\"original_line\":\"bad lp\"}" ) ); } @@ -659,11 +740,14 @@ private static Stream errorFromBodyV3WithDataArrayCases() { @ParameterizedTest(name = "{0}") @MethodSource("errorFromBodyV3FallbackCases") public void errorFromBodyV3FallbackCase(final String testName, + final String requestPath, + final String contentType, final String body, + final Class expectedClass, final String expectedMessage) { mockServer.enqueue(createResponse(400, - "application/json", + contentType, null, body)); @@ -671,48 +755,125 @@ public void errorFromBodyV3FallbackCase(final String testName, .host(baseURL) .build()); - Assertions.assertThatThrownBy( - () -> restClient.request("ping", HttpMethod.GET, null, null, null) - ) - .isInstanceOf(InfluxDBApiException.class) - .hasMessage(expectedMessage); + Throwable thrown = catchThrowable(() -> restClient.request(requestPath, HttpMethod.GET, null, null, null)); + Assertions.assertThat(thrown) + .isInstanceOf(expectedClass) + .hasMessage(expectedMessage); } private static Stream errorFromBodyV3FallbackCases() { return Stream.of( Arguments.of( "missing error with data array falls back to body", + "ping", + "application/json", "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: " + "{\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}]}" ), Arguments.of( "empty error with data array falls back to body", + "ping", + "application/json", "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + "\"bad lp\"}]}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: " + "{\"error\":\"\",\"data\":[{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":" + "\"bad lp\"}]}" ), Arguments.of( "data object without error_message falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":{}}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data object with empty error_message falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":{\"error_message\":\"\"}}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data string falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":\"not-an-object\"}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" ), Arguments.of( "data number falls back to error", + "ping", + "application/json", "{\"error\":\"parsing failed\",\"data\":123}", + InfluxDBApiHttpException.class, "HTTP status code: 400; Message: parsing failed" + ), + Arguments.of( + "partial-write with invalid data string falls back to error", + "ping", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":\"invalid\"}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "partial-write with empty data object falls back to error", + "ping", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":{}}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "write endpoint ignores line-error parsing for non-json content type", + "api/v3/write_lp", + "text/plain", + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\"," + + "\"line_number\":2,\"original_line\":\"bad lp\"}]}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: " + + "{\"error\":\"partial write of line protocol occurred\",\"data\":[{\"error_message\":\"bad line\"," + + "\"line_number\":2,\"original_line\":\"bad lp\"}]}" + ), + Arguments.of( + "write endpoint with non-object root falls back to body", + "api/v3/write_lp", + "application/json", + "[]", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: []" + ), + Arguments.of( + "write endpoint with invalid line-error object type falls back to http exception", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":{\"error_message\":\"bad line\"," + + "\"line_number\":{\"x\":2},\"original_line\":\"bad lp\"}}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: bad line" + ), + Arguments.of( + "write endpoint with scalar data falls back to error", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\",\"data\":123}", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: partial write of line protocol occurred" + ), + Arguments.of( + "write endpoint invalid json body falls back to raw body", + "api/v3/write_lp", + "application/json", + "{\"error\":\"partial write of line protocol occurred\"", + InfluxDBApiHttpException.class, + "HTTP status code: 400; Message: {\"error\":\"partial write of line protocol occurred\"" ) ); } diff --git a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java index a1b7974c..a2432da8 100644 --- a/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/write/WriteOptionsTest.java @@ -57,9 +57,10 @@ void optionsBasics() { @Test void optionsEqualAll() { - WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true); + WriteOptions options = new WriteOptions("my-database", WritePrecision.S, 512, true, true, null, null, null); WriteOptions optionsViaBuilder = new WriteOptions.Builder() - .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true).build(); + .database("my-database").precision(WritePrecision.S).gzipThreshold(512) + .noSync(true).acceptPartial(true).build(); Assertions.assertThat(options).isEqualTo(optionsViaBuilder); @@ -67,6 +68,9 @@ void optionsEqualAll() { .database("my-database").precision(WritePrecision.S).gzipThreshold(1024).noSync(true).build(); WriteOptions noSyncMismatch = new WriteOptions.Builder() .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(false).build(); + WriteOptions acceptPartialMismatch = new WriteOptions.Builder() + .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) + .acceptPartial(false).build(); WriteOptions defaultTagsMismatch = new WriteOptions.Builder() .database("my-database").precision(WritePrecision.S).gzipThreshold(512).noSync(true) .defaultTags(Map.of("region", "west")).build(); @@ -79,6 +83,7 @@ void optionsEqualAll() { Assertions.assertThat(options).isNotEqualTo(gzipMismatch); Assertions.assertThat(options).isNotEqualTo(noSyncMismatch); + Assertions.assertThat(options).isNotEqualTo(acceptPartialMismatch); Assertions.assertThat(options).isNotEqualTo(defaultTagsMismatch); Assertions.assertThat(options).isNotEqualTo(tagOrderMismatch); Assertions.assertThat(options).isNotEqualTo(headersMismatch); @@ -147,12 +152,14 @@ void optionsEmpty() { Assertions.assertThat(options.databaseSafe(config)).isEqualTo("my-database"); Assertions.assertThat(options.precisionSafe(config)).isEqualTo(WriteOptions.DEFAULT_WRITE_PRECISION); Assertions.assertThat(options.gzipThresholdSafe(config)).isEqualTo(WriteOptions.DEFAULT_GZIP_THRESHOLD); + Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); Assertions.assertThat(options.tagOrderSafe()).isEmpty(); WriteOptions builderOptions = new WriteOptions.Builder().build(); Assertions.assertThat(builderOptions.databaseSafe(config)).isEqualTo("my-database"); Assertions.assertThat(builderOptions.precisionSafe(config)).isEqualTo(WritePrecision.S); Assertions.assertThat(builderOptions.gzipThresholdSafe(config)).isEqualTo(512); + Assertions.assertThat(builderOptions.acceptPartialSafe(config)).isEqualTo(WriteOptions.DEFAULT_ACCEPT_PARTIAL); } @Test @@ -234,6 +241,19 @@ void optionsOverrideWriteNoSync() { Assertions.assertThat(options.noSyncSafe(config)).isEqualTo(false); } + @Test + void optionsOverrideWriteAcceptPartial() { + ClientConfig config = configBuilder + .database("my-database") + .organization("my-org") + .writeAcceptPartial(true) + .build(); + + WriteOptions options = new WriteOptions.Builder().acceptPartial(false).build(); + + Assertions.assertThat(options.acceptPartialSafe(config)).isEqualTo(false); + } + @Test void optionsOverridesDefaultTags() { Map defaultTagsBase = new HashMap<>() {{ @@ -303,6 +323,8 @@ void optionsHashCode() { .isNotEqualTo(builder.database("my-database").build().hashCode()); Assertions.assertThat(baseOptions.hashCode()) .isNotEqualTo(builder.defaultTags(defaultTags).build().hashCode()); + Assertions.assertThat(baseOptions.hashCode()) + .isNotEqualTo(builder.acceptPartial(true).build().hashCode()); Assertions.assertThat(baseOptions.hashCode()) .isNotEqualTo(builder.tagOrder(List.of("region", "host")).build().hashCode()); }