diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java index a9ccf97765..6cb3b41389 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java @@ -130,4 +130,47 @@ public static synchronized CassandraOptions instance() { positiveInt(), 12 * 60 * 60 ); + + public static final ConfigOption CASSANDRA_RECONNECT_BASE_DELAY = + new ConfigOption<>( + "cassandra.reconnect_base_delay", + "The base delay in milliseconds used by the driver's " + + "exponential reconnection policy when a Cassandra host " + + "becomes unreachable.", + rangeInt(100L, Long.MAX_VALUE), + 1000L + ); + + public static final ConfigOption CASSANDRA_RECONNECT_MAX_DELAY = + new ConfigOption<>( + "cassandra.reconnect_max_delay", + "The maximum delay in milliseconds used by the driver's " + + "exponential reconnection policy when a Cassandra host " + + "becomes unreachable.", + rangeInt(1000L, Long.MAX_VALUE), + 10_000L + ); + + public static final ConfigOption CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS = + new ConfigOption<>( + "cassandra.query_retry_max_attempts", + "The maximum number of retry attempts applied at query-time when " + + "a Cassandra host is temporarily unreachable. " + + "OperationTimedOutException is retried only for " + + "idempotent statements. " + + "Set to 0 to disable query-time retries.", + rangeInt(0, Integer.MAX_VALUE), + 3 + ); + + public static final ConfigOption CASSANDRA_QUERY_RETRY_INTERVAL = + new ConfigOption<>( + "cassandra.query_retry_interval", + "The interval in milliseconds between query-time retries " + + "when a Cassandra host is temporarily unreachable. The " + + "actual wait grows with exponential backoff, capped at " + + "cassandra.reconnect_max_delay.", + rangeInt(100L, Long.MAX_VALUE), + 1000L + ); } diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java index 7a9ffa2b91..a217d53444 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java @@ -20,12 +20,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession; import org.apache.hugegraph.backend.store.BackendSessionPool; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Cluster; @@ -34,22 +37,67 @@ import com.datastax.driver.core.ProtocolOptions.Compression; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.DriverException; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.OperationTimedOutException; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; public class CassandraSessionPool extends BackendSessionPool { + private static final Logger LOG = Log.logger(CassandraSessionPool.class); + private static final int SECOND = 1000; + private static final String HEALTH_CHECK_CQL = + "SELECT now() FROM system.local"; + + /** + * Guards the one-time JVM-wide warning about {@code commitAsync()} not + * being covered by query-time retries. {@link CassandraSessionPool} is + * instantiated once per backend store per graph, so without this guard + * the warning would fire many times on startup for a structural + * limitation that does not change between instances. + */ + private static final AtomicBoolean ASYNC_RETRY_WARNING_LOGGED = + new AtomicBoolean(false); private Cluster cluster; private final String keyspace; + private final int maxRetries; + private final long retryInterval; + private final long retryBaseDelay; + private final long retryMaxDelay; public CassandraSessionPool(HugeConfig config, String keyspace, String store) { super(config, keyspace + "/" + store); this.cluster = null; this.keyspace = keyspace; + this.maxRetries = config.get( + CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS); + this.retryInterval = config.get( + CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL); + long reconnectBase = config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); + long reconnectMax = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY); + E.checkArgument(reconnectMax >= reconnectBase, + "'%s' (%s) must be >= '%s' (%s)", + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), + reconnectMax, + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), + reconnectBase); + this.retryBaseDelay = reconnectBase; + this.retryMaxDelay = reconnectMax; + + if (this.maxRetries > 0 && + ASYNC_RETRY_WARNING_LOGGED.compareAndSet(false, true)) { + LOG.warn("cassandra.query_retry_max_attempts={} applies to sync commit()" + + " only. commitAsync() has no retry protection.", this.maxRetries); + } } @Override @@ -86,6 +134,12 @@ public synchronized void open() { builder.withSocketOptions(socketOptions); + // Reconnection policy: let driver keep retrying nodes in background + // with exponential backoff after they go down (see issue #2740). + builder.withReconnectionPolicy( + new ExponentialReconnectionPolicy(this.retryBaseDelay, + this.retryMaxDelay)); + // Credential options String username = config.get(CassandraOptions.CASSANDRA_USERNAME); String password = config.get(CassandraOptions.CASSANDRA_PASSWORD); @@ -161,7 +215,7 @@ public void rollback() { @Override public ResultSet commit() { - ResultSet rs = this.session.execute(this.batch); + ResultSet rs = this.executeWithRetry(this.batch); // Clear batch if execute() successfully (retained if failed) this.batch.clear(); return rs; @@ -169,12 +223,25 @@ public ResultSet commit() { public void commitAsync() { Collection statements = this.batch.getStatements(); + if (statements.isEmpty()) { + this.batch.clear(); + return; + } int count = 0; int processors = Math.min(statements.size(), 1023); List results = new ArrayList<>(processors + 1); + com.datastax.driver.core.Session driverSession = + this.sessionForAsyncCommit(); for (Statement s : statements) { - ResultSetFuture future = this.session.executeAsync(s); + // TODO: track async retry support in a follow-up issue. + // commitAsync() bypasses executeWithRetry(). + // During a Cassandra restart, async writes may fail with + // NoHostAvailableException even when maxRetries > 0. Callers + // must handle ResultSetFuture failures surfaced by + // getUninterruptibly(). A follow-up issue should wrap each + // future with retry semantics. + ResultSetFuture future = driverSession.executeAsync(s); results.add(future); if (++count > processors) { @@ -197,15 +264,99 @@ public ResultSet query(Statement statement) { } public ResultSet execute(Statement statement) { - return this.session.execute(statement); + return this.executeWithRetry(statement); } public ResultSet execute(String statement) { - return this.session.execute(statement); + return this.executeWithRetry(new SimpleStatement(statement)); } public ResultSet execute(String statement, Object... args) { - return this.session.execute(statement, args); + return this.executeWithRetry(new SimpleStatement(statement, args)); + } + + /** + * Execute a statement, retrying on transient connectivity failures + * (NoHostAvailableException / OperationTimedOutException). The driver + * itself keeps retrying connections in the background via the + * reconnection policy, so once Cassandra comes back online, a + * subsequent attempt here will succeed without restarting the server. + * + *

OperationTimedOutException is only retried for statements marked + * idempotent; otherwise a timed-out mutation might be applied once by + * Cassandra and then duplicated by a client-side retry. + * + *

If the driver session has been discarded (e.g. by + * {@link #reconnectIfNeeded()} after a failed health-check) it is + * lazily reopened at the start of each attempt. + * + *

Blocking note: retries block the calling thread via + * {@link Thread#sleep(long)}. Worst-case a single call blocks for + * {@code maxRetries * retryMaxDelay} ms. Under high-throughput + * workloads concurrent threads may pile up in {@code sleep()} during + * a Cassandra outage. For such deployments lower + * {@code cassandra.query_retry_max_attempts} (default 3) and + * {@code cassandra.reconnect_max_delay} (default 10000ms) so the + * request fails fast and pressure is released back to the caller. + */ + private ResultSet executeWithRetry(Statement statement) { + int retries = CassandraSessionPool.this.maxRetries; + long interval = CassandraSessionPool.this.retryInterval; + long maxDelay = CassandraSessionPool.this.retryMaxDelay; + DriverException lastError = null; + for (int attempt = 0; attempt <= retries; attempt++) { + try { + if (this.session == null || this.session.isClosed()) { + // Lazy reopen: may itself throw NHAE while + // Cassandra is still unreachable; the catch below + // treats that as a transient failure. + this.session = null; + this.open(); + } + return this.session.execute(statement); + } catch (NoHostAvailableException | OperationTimedOutException e) { + lastError = e; + if (e instanceof OperationTimedOutException && + !Boolean.TRUE.equals(statement.isIdempotent())) { + throw new BackendException( + "Cassandra query timed out and won't be " + + "retried because the statement is not " + + "marked idempotent", e); + } + if (attempt >= retries) { + break; + } + long cap = maxDelay > 0 ? maxDelay : interval; + long shift = 1L << Math.min(attempt, 20); + long delay; + try { + // Guard against Long overflow when retryInterval is huge. + delay = Math.min(Math.multiplyExact(interval, shift), cap); + } catch (ArithmeticException overflow) { + delay = cap; + } + LOG.warn("Cassandra temporarily unavailable ({}), " + + "retry {}/{} in {} ms", + e.getClass().getSimpleName(), attempt + 1, + retries, delay); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new BackendException("Interrupted while " + + "waiting to retry " + + "Cassandra query", ie); + } + } + } + // Preserve original exception as cause (stack trace + type) by + // pre-formatting the message and using the (String, Throwable) + // constructor explicitly to avoid ambiguity with varargs overloads. + String msg = String.format( + "Failed to execute Cassandra query after %s retries: %s", + retries, + lastError == null ? "" : lastError.getMessage()); + throw new BackendException(msg, lastError); } private void tryOpen() { @@ -217,6 +368,24 @@ private void tryOpen() { } } + private com.datastax.driver.core.Session sessionForAsyncCommit() { + if (this.session == null || this.session.isClosed()) { + this.session = null; + try { + this.open(); + } catch (DriverException e) { + throw new BackendException( + "Failed to open Cassandra session for async commit", + e); + } + } + if (this.session == null) { + throw new BackendException( + "Cassandra session is unavailable for async commit"); + } + return this.session; + } + @Override public void open() { this.opened = true; @@ -255,6 +424,56 @@ public boolean hasChanges() { return this.batch.size() > 0; } + /** + * Periodic liveness probe invoked by {@link BackendSessionPool} to + * recover thread-local sessions after Cassandra has been restarted. + * Reopens the driver session if it was closed and pings the cluster + * with a lightweight query. On failure the session is discarded via + * {@link #reset()} so the next call to + * {@link #executeWithRetry(Statement)} reopens it; any exception + * here is swallowed so the caller can still issue the real query. + */ + @Override + public void reconnectIfNeeded() { + if (!this.opened) { + return; + } + try { + if (this.session == null || this.session.isClosed()) { + this.session = null; + this.tryOpen(); + } + if (this.session != null) { + this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); + } + } catch (NoHostAvailableException | OperationTimedOutException e) { + LOG.debug("Cassandra health-check failed, resetting session: {}", + e.getMessage()); + this.reset(); + } + } + + /** + * Force-close the driver session so it is re-opened on the next + * {@link #opened()} call. Used when a failure is observed and we + * want to start fresh on the next attempt. + */ + @Override + public void reset() { + if (this.session == null) { + return; + } + try { + this.session.close(); + } catch (Exception e) { + // Do not swallow Error (OOM / StackOverflow); only log + // ordinary exceptions raised by the driver on close. + LOG.warn("Failed to reset Cassandra session", e); + } finally { + this.session = null; + } + } + public Collection statements() { return this.batch.getStatements(); } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java index ef5a8e896b..6445fc38bd 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java @@ -17,11 +17,15 @@ package org.apache.hugegraph.unit.cassandra; +import java.net.InetSocketAddress; +import java.util.Collections; import java.util.Map; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.store.cassandra.CassandraOptions; +import org.apache.hugegraph.backend.store.cassandra.CassandraSessionPool; import org.apache.hugegraph.backend.store.cassandra.CassandraStore; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.config.OptionSpace; @@ -30,7 +34,14 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; - +import org.mockito.Mockito; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.OperationTimedOutException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -192,4 +203,235 @@ public void testParseReplicaWithNetworkTopologyStrategyAndDoubleReplica() { Whitebox.invokeStatic(CassandraStore.class, "parseReplica", config); }); } + + @Test + public void testReconnectOptionsHaveSensibleDefaults() { + // Runtime-reconnection options must exist with non-zero defaults so + // HugeGraph keeps running when Cassandra restarts (issue #2740). + Assert.assertEquals(1000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_BASE_DELAY.defaultValue()); + Assert.assertEquals(10_000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_MAX_DELAY.defaultValue()); + Assert.assertEquals(3, (int) CassandraOptions + .CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.defaultValue()); + Assert.assertEquals(1000L, (long) CassandraOptions + .CASSANDRA_QUERY_RETRY_INTERVAL.defaultValue()); + } + + @Test + public void testReconnectOptionsAreOverridable() { + String base = CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(); + String max = CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(); + String retries = CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS + .name(); + String interval = CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(); + + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(base, 500L); + conf.setProperty(max, 30_000L); + conf.setProperty(retries, 3); + conf.setProperty(interval, 1000L); + HugeConfig config = new HugeConfig(conf); + + Assert.assertEquals(500L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY)); + Assert.assertEquals(30_000L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY)); + Assert.assertEquals(3, (int) config.get( + CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS)); + Assert.assertEquals(1000L, (long) config.get( + CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL)); + } + + @Test + public void testQueryRetryAttemptsCanBeDisabled() { + String retries = CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS + .name(); + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(retries, 0); + HugeConfig config = new HugeConfig(conf); + Assert.assertEquals(0, (int) config.get( + CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS)); + } + + @Test + public void testExecuteWithRetrySucceedsAfterTransientFailures() { + // Configure retry knobs via config so the pool reads them through + // the normal path (no Whitebox overrides on retry fields). Keep the + // values within the validators' lower bounds (base >= 100, max >= + // base, interval >= 100). + Configuration conf = new PropertiesConfiguration(); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L); + HugeConfig config = new HugeConfig(conf); + CassandraSessionPool pool = new CassandraSessionPool(config, + "ks", "store"); + + com.datastax.driver.core.Session driverSession = Mockito.mock( + com.datastax.driver.core.Session.class); + ResultSet rs = Mockito.mock(ResultSet.class); + NoHostAvailableException transientFailure = + new NoHostAvailableException(Collections.emptyMap()); + Mockito.when(driverSession.execute(Mockito.any(Statement.class))) + .thenThrow(transientFailure) + .thenThrow(transientFailure) + .thenReturn(rs); + + CassandraSessionPool.Session session = pool.new Session(); + Whitebox.setInternalState(session, "session", driverSession); + + ResultSet result = session.execute("SELECT now() FROM system.local"); + Assert.assertSame(rs, result); + Mockito.verify(driverSession, Mockito.times(3)) + .execute(Mockito.any(Statement.class)); + Mockito.verify(driverSession, Mockito.never()).close(); + } + + @Test + public void testExecuteWithRetrySkipsNonIdempotentTimeoutRetry() { + Configuration conf = new PropertiesConfiguration(); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L); + HugeConfig config = new HugeConfig(conf); + CassandraSessionPool pool = new CassandraSessionPool(config, + "ks", "store"); + + com.datastax.driver.core.Session driverSession = Mockito.mock( + com.datastax.driver.core.Session.class); + OperationTimedOutException timeout = new OperationTimedOutException( + new InetSocketAddress("127.0.0.1", 9042)); + Mockito.when(driverSession.execute(Mockito.any(Statement.class))) + .thenThrow(timeout); + + CassandraSessionPool.Session session = pool.new Session(); + Whitebox.setInternalState(session, "session", driverSession); + + Assert.assertThrows(BackendException.class, () -> + session.execute("UPDATE counter SET value = value + 1")); + Mockito.verify(driverSession, Mockito.times(1)) + .execute(Mockito.any(Statement.class)); + } + + @Test + public void testExecuteWithRetryAllowsIdempotentTimeoutRetry() { + Configuration conf = new PropertiesConfiguration(); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L); + HugeConfig config = new HugeConfig(conf); + CassandraSessionPool pool = new CassandraSessionPool(config, + "ks", "store"); + + com.datastax.driver.core.Session driverSession = Mockito.mock( + com.datastax.driver.core.Session.class); + ResultSet rs = Mockito.mock(ResultSet.class); + OperationTimedOutException timeout = new OperationTimedOutException( + new InetSocketAddress("127.0.0.1", 9042)); + SimpleStatement statement = new SimpleStatement( + "SELECT now() FROM system.local"); + statement.setIdempotent(true); + Mockito.when(driverSession.execute(statement)) + .thenThrow(timeout) + .thenReturn(rs); + + CassandraSessionPool.Session session = pool.new Session(); + Whitebox.setInternalState(session, "session", driverSession); + + ResultSet result = session.execute(statement); + Assert.assertSame(rs, result); + Mockito.verify(driverSession, Mockito.times(2)).execute(statement); + } + + @Test + public void testCommitAsyncOpensSessionBeforeExecuteAsync() { + Configuration conf = new PropertiesConfiguration(); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 100L); + HugeConfig config = new HugeConfig(conf); + CassandraSessionPool pool = new CassandraSessionPool(config, + "ks", "store"); + + com.datastax.driver.core.Cluster mockCluster = Mockito.mock( + com.datastax.driver.core.Cluster.class); + com.datastax.driver.core.Session driverSession = Mockito.mock( + com.datastax.driver.core.Session.class); + ResultSetFuture future = Mockito.mock(ResultSetFuture.class); + Mockito.when(mockCluster.isClosed()).thenReturn(false); + Mockito.when(mockCluster.connect(Mockito.anyString())) + .thenReturn(driverSession); + Mockito.when(driverSession.executeAsync(Mockito.any(Statement.class))) + .thenReturn(future); + Whitebox.setInternalState(pool, "cluster", mockCluster); + + CassandraSessionPool.Session session = pool.new Session(); + Statement statement = new SimpleStatement( + "INSERT INTO system.local(key) VALUES ('test')"); + session.add(statement); + + session.commitAsync(); + + Mockito.verify(mockCluster, Mockito.times(1)).connect("ks"); + Mockito.verify(driverSession, Mockito.times(1)).executeAsync(statement); + Mockito.verify(future, Mockito.times(1)).getUninterruptibly(); + Assert.assertFalse(session.hasChanges()); + } + + @Test + public void testReconnectBaseDelayBelowMinimumRejected() { + // The validator on CASSANDRA_RECONNECT_BASE_DELAY is + // rangeInt(100L, Long.MAX_VALUE); values below 100 must be rejected + // at parse time. Setting the property as a String forces HugeConfig + // to run parseConvert() which invokes the range check. + Configuration conf = new PropertiesConfiguration(); + Assert.assertThrows(Exception.class, () -> { + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), + "50"); + new HugeConfig(conf); + }); + } + + @Test + public void testReconnectMaxDelayLessThanBaseRejected() { + // Both values must pass their individual range validators with margin + // (base >= 100, max >= 1000), so the only thing that can throw is the + // E.checkArgument(max >= base) cross-check inside the pool ctor. Set + // all four retry/reconnect properties explicitly so the test does not + // depend on default values changing in CassandraOptions. + Configuration conf = new PropertiesConfiguration(); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 10_000L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 2_000L); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_MAX_ATTEMPTS.name(), 3); + conf.setProperty( + CassandraOptions.CASSANDRA_QUERY_RETRY_INTERVAL.name(), 1_000L); + HugeConfig config = new HugeConfig(conf); + Assert.assertThrows(IllegalArgumentException.class, () -> + new CassandraSessionPool(config, "ks", "store")); + } }