Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client.
1. [#363](https://github.com/InfluxCommunity/influxdb3-java/pull/363): Support custom tag order via `tagOrder` write option.
See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more.
1. [#365](https://github.com/InfluxCommunity/influxdb3-java/pull/365): Support partial writes via `acceptPartial` write option.
See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more.

## 1.8.0 [2026-02-19]

Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import java.util.List;
import java.util.stream.Stream;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.InfluxDBPartialWriteException;
import com.influxdb.v3.client.query.QueryOptions;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.write.WriteOptions;
Expand Down Expand Up @@ -113,6 +114,23 @@ client.writePoint(
orderedTagWrite
);

//
// Write with partial acceptance
//
WriteOptions partialWrite = new WriteOptions.Builder()
.acceptPartial(true)
.build();
try {
client.writeRecords(List.of(
"temperature,region=west value=20.0",
"temperature,region=west value=\"bad\""
), partialWrite);
} catch (InfluxDBPartialWriteException e) {
// Inspect failed line details.
e.lineErrors().forEach(line ->
System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine()));
}

//
// Write by LineProtocol
//
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) {
* <li>precision - timestamp precision when writing data</li>
* <li>gzipThreshold - payload size size for gzipping data</li>
* <li>writeNoSync - skip waiting for WAL persistence on write</li>
* <li>writeAcceptPartial - accept partial writes</li>
* </ul>
*
* @param connectionString connection string
Expand Down Expand Up @@ -569,6 +570,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
* <li>INFLUX_PRECISION - timestamp precision when writing data</li>
* <li>INFLUX_GZIP_THRESHOLD - payload size size for gzipping data</li>
* <li>INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write</li>
* <li>INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes</li>
* </ul>
* Supported system properties:
* <ul>
Expand All @@ -580,6 +582,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
* <li>influx.precision - timestamp precision when writing data</li>
* <li>influx.gzipThreshold - payload size size for gzipping data</li>
* <li>influx.writeNoSync - skip waiting for WAL persistence on write</li>
* <li>influx.writeAcceptPartial - accept partial writes</li>
* </ul>
*
* @return instance of {@link InfluxDBClient}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.influxdb.v3.client;

import java.net.http.HttpHeaders;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* HTTP exception for partial write errors returned by InfluxDB 3 write endpoint.
* Contains parsed line-level write errors so callers can decide how to handle failed lines.
*/
public class InfluxDBPartialWriteException extends InfluxDBApiHttpException {

private final List<LineError> lineErrors;

/**
* Construct a new InfluxDBPartialWriteException.
*
* @param message detail message
* @param headers response headers
* @param statusCode response status code
* @param lineErrors line-level errors parsed from response body
*/
public InfluxDBPartialWriteException(
@Nullable final String message,
@Nullable final HttpHeaders headers,
final int statusCode,
@Nonnull final List<LineError> lineErrors) {
super(message, headers, statusCode);
this.lineErrors = List.copyOf(lineErrors);
}

/**
* Line-level write errors.
*
* @return immutable list of line errors
*/
@Nonnull
public List<LineError> lineErrors() {
return lineErrors;
}

/**
* Represents one failed line from a partial write response.
*/
public static final class LineError {

private final Integer lineNumber;
private final String errorMessage;
private final String originalLine;

/**
* @param lineNumber line number in the write payload; may be null if not provided by server
* @param errorMessage line-level error message
* @param originalLine original line protocol row; may be null if not provided by server
*/
public LineError(@Nullable final Integer lineNumber,
@Nonnull final String errorMessage,
@Nullable final String originalLine) {
this.lineNumber = lineNumber;
this.errorMessage = errorMessage;
this.originalLine = originalLine;
}

/**
* @return line number or null if server didn't provide it
*/
@Nullable
public Integer lineNumber() {
return lineNumber;
}

/**
* @return line-level error message
*/
@Nonnull
public String errorMessage() {
return errorMessage;
}

/**
* @return original line protocol row or null if server didn't provide it
*/
@Nullable
public String originalLine() {
return originalLine;
}
}
}
40 changes: 39 additions & 1 deletion src/main/java/com/influxdb/v3/client/config/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* <li><code>defaultTags</code> - defaultTags added when writing points to InfluxDB</li>
* <li><code>gzipThreshold</code> - threshold when gzip compression is used for writing points to InfluxDB</li>
* <li><code>writeNoSync</code> - skip waiting for WAL persistence on write</li>
* <li><code>writeAcceptPartial</code> - accept partial writes</li>
* <li><code>timeout</code> - <i>deprecated in 1.4.0</i> timeout when connecting to InfluxDB,
* please use more informative properties <code>writeTimeout</code> and <code>queryTimeout</code></li>
* <li><code>writeTimeout</code> - timeout when writing data to InfluxDB</li>
Expand Down Expand Up @@ -107,6 +108,7 @@ public final class ClientConfig {
private final WritePrecision writePrecision;
private final Integer gzipThreshold;
private final Boolean writeNoSync;
private final Boolean writeAcceptPartial;
private final Map<String, String> defaultTags;
@Deprecated
private final Duration timeout;
Expand Down Expand Up @@ -208,6 +210,16 @@ public Boolean getWriteNoSync() {
return writeNoSync;
}

/**
* Accept partial writes?
*
* @return accept partial writes
*/
@Nonnull
public Boolean getWriteAcceptPartial() {
return writeAcceptPartial;
}

/**
* Gets default tags used when writing points.
* @return default tags
Expand Down Expand Up @@ -370,6 +382,7 @@ public boolean equals(final Object o) {
&& writePrecision == that.writePrecision
&& Objects.equals(gzipThreshold, that.gzipThreshold)
&& Objects.equals(writeNoSync, that.writeNoSync)
&& Objects.equals(writeAcceptPartial, that.writeAcceptPartial)
&& Objects.equals(defaultTags, that.defaultTags)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(writeTimeout, that.writeTimeout)
Expand All @@ -388,7 +401,7 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
database, writePrecision, gzipThreshold, writeNoSync,
database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial,
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
proxy, proxyUrl, authenticator, headers,
defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors);
Expand All @@ -403,6 +416,7 @@ public String toString() {
.add("writePrecision=" + writePrecision)
.add("gzipThreshold=" + gzipThreshold)
.add("writeNoSync=" + writeNoSync)
.add("writeAcceptPartial=" + writeAcceptPartial)
.add("timeout=" + timeout)
.add("writeTimeout=" + writeTimeout)
.add("queryTimeout=" + queryTimeout)
Expand Down Expand Up @@ -432,6 +446,7 @@ public static final class Builder {
private WritePrecision writePrecision;
private Integer gzipThreshold;
private Boolean writeNoSync;
private Boolean writeAcceptPartial;
private Map<String, String> defaultTags;
@Deprecated
private Duration timeout;
Expand Down Expand Up @@ -554,6 +569,19 @@ public Builder writeNoSync(@Nullable final Boolean writeNoSync) {
return this;
}

/**
* Sets whether to accept partial writes.
*
* @param writeAcceptPartial accept partial writes
* @return this
*/
@Nonnull
public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) {

this.writeAcceptPartial = writeAcceptPartial;
return this;
}

/**
* Sets default tags to be written with points.
*
Expand Down Expand Up @@ -800,6 +828,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
if (parameters.containsKey("writeNoSync")) {
this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync")));
}
if (parameters.containsKey("writeAcceptPartial")) {
this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial")));
}
if (parameters.containsKey("disableGRPCCompression")) {
this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression")));
}
Expand Down Expand Up @@ -855,6 +886,10 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
if (writeNoSync != null) {
this.writeNoSync(Boolean.parseBoolean(writeNoSync));
}
final String writeAcceptPartial = get.apply("INFLUX_WRITE_ACCEPT_PARTIAL", "influx.writeAcceptPartial");
if (writeAcceptPartial != null) {
this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial));
}
final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout");
if (writeTimeout != null) {
long to = Long.parseLong(writeTimeout);
Expand Down Expand Up @@ -911,6 +946,9 @@ private ClientConfig(@Nonnull final Builder builder) {
writePrecision = builder.writePrecision != null ? builder.writePrecision : WriteOptions.DEFAULT_WRITE_PRECISION;
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD;
writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC;
writeAcceptPartial = builder.writeAcceptPartial != null
? builder.writeAcceptPartial
: WriteOptions.DEFAULT_ACCEPT_PARTIAL;
defaultTags = builder.defaultTags;
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
writeTimeout = builder.writeTimeout != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,22 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
String path;
Map<String, String> queryParams;
boolean noSync = options.noSyncSafe(config);
if (noSync) {
// Setting no_sync=true is supported only in the v3 API.
boolean acceptPartial = options.acceptPartialSafe(config);
boolean useV3Write = noSync || acceptPartial;
if (useV3Write) {
// no_sync=true and accept_partial=true are supported only in the v3 API.
path = "api/v3/write_lp";
queryParams = new HashMap<>() {{
put("org", config.getOrganization());
put("db", database);
put("precision", WritePrecisionConverter.toV3ApiString(precision));
put("no_sync", "true");
}};
if (noSync) {
queryParams.put("no_sync", "true");
}
if (acceptPartial) {
queryParams.put("accept_partial", "true");
}
} else {
// By default, use the v2 API.
path = "api/v2/write";
Expand Down Expand Up @@ -373,10 +380,12 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
try {
restClient.request(path, HttpMethod.POST, body, queryParams, headers);
} catch (InfluxDBApiHttpException e) {
if (noSync && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) {
// Server does not support the v3 write API, can't use the NoSync option.
if (useV3Write && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) {
// Server does not support the v3 write API, can't use v3-only write options.
throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true "
+ "(supported by InfluxDB 3 Core/Enterprise servers only).", e.headers(), e.statusCode());
+ "or AcceptPartial=true (supported by InfluxDB 3 Core/Enterprise servers only).",
e.headers(),
e.statusCode());
}
throw e;
}
Expand Down
Loading
Loading