diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 06e6478f5fd..70549c99a7f 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -849,6 +849,14 @@ task integTestRemote(type: RestIntegTestTask) { systemProperty "user", System.getProperty("user") systemProperty "password", System.getProperty("password") + // Forward the analytics-engine parquet-indices toggle when set on the gradle command + // line. TestUtils.createIndexByRestClient reads this to back every test-created index + // with composite/parquet so RestUnifiedQueryAction.isAnalyticsIndex (post-#5432) routes + // to the analytics-engine planner via index settings. + if (System.getProperty("tests.analytics.parquet_indices") != null) { + systemProperty 'tests.analytics.parquet_indices', System.getProperty("tests.analytics.parquet_indices") + } + // Set default query size limit systemProperty 'defaultQuerySizeLimit', '10000' diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index 16827623058..4deef787910 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -37,6 +37,74 @@ public class TestUtils { private static final String MAPPING_FILE_PATH = "src/test/resources/indexDefinitions/"; + /** + * Forwarding alias for {@link AnalyticsIndexConfig#ENABLED_PROP}. Kept for any external callers + * that referenced the old constant location. + */ + public static final String ANALYTICS_PARQUET_INDICES_PROP = AnalyticsIndexConfig.ENABLED_PROP; + + /** + * Centralizes every analytics-engine-only test-index knob in one place so the parquet-backed + * settings, the bulk-load refresh strategy, and any future analytics-specific config don't drift + * across separate helpers (the spread-out-conditional pattern called out in review, borrowed from + * the OS-Spark repo's AOSS-conditional configs that ended up scattered across its codebase). + * + *
When {@link #ENABLED_PROP} is unset (default), every method here is a no-op or returns the + * standard non-parquet value, so normal CI sees zero behavior change. + */ + public static final class AnalyticsIndexConfig { + + /** + * System property that makes every test-created index parquet-backed (composite primary data + * format = parquet) with a single shard. When set, {@link + * RestUnifiedQueryAction#isAnalyticsIndex} (which routes based on {@code + * index.pluggable.dataformat.enabled} / {@code index.pluggable.dataformat=composite}, see + * #5432) returns {@code true} for every test-created index — exercising the analytics-engine + * route end-to-end without per-test rewiring. + */ + public static final String ENABLED_PROP = "tests.analytics.parquet_indices"; + + public static boolean isEnabled() { + return Boolean.parseBoolean(System.getProperty(ENABLED_PROP, "false")); + } + + /** + * Inject the parquet-backed composite-store index settings into {@code jsonObject}. No-op when + * the config is disabled; idempotent — safe on any index-creation JSON shape. + */ + static void applyIndexCreationSettings(JSONObject jsonObject) { + if (!isEnabled()) { + return; + } + JSONObject settings = + jsonObject.has("settings") ? jsonObject.getJSONObject("settings") : new JSONObject(); + JSONObject indexSettings = + settings.has("index") ? settings.getJSONObject("index") : new JSONObject(); + indexSettings.put("number_of_shards", 1); + indexSettings.put("pluggable.dataformat.enabled", true); + indexSettings.put("pluggable.dataformat", "composite"); + indexSettings.put("composite.primary_data_format", "parquet"); + indexSettings.put("composite.secondary_data_formats", new org.json.JSONArray()); + settings.put("index", indexSettings); + jsonObject.put("settings", settings); + } + + /** + * Returns the {@code _bulk} refresh query string for the current index type. Parquet-backed + * indices in the analytics-backend-lucene composite engine don't yet implement {@code + * LuceneCommitter.getSafeCommitInfo} (UnsupportedOperationException "TODO:: with index + * deleter"), which hangs {@code refresh=wait_for} until the test framework request timeout + * (~60s). Force-refresh sidesteps the safe-commit-info path while still making the bulk-loaded + * docs immediately searchable. Drop this branch once {@code LuceneCommitter.getSafeCommitInfo} + * is implemented. + */ + static String bulkLoadRefreshParam() { + return isEnabled() ? "refresh=true" : "refresh=wait_for&wait_for_active_shards=all"; + } + + private AnalyticsIndexConfig() {} + } + /** * Create test index by REST client. * @@ -48,6 +116,7 @@ public static void createIndexByRestClient(RestClient client, String indexName, Request request = new Request("PUT", "/" + indexName); JSONObject jsonObject = isNullOrEmpty(mapping) ? new JSONObject() : new JSONObject(mapping); setZeroReplicas(jsonObject); + AnalyticsIndexConfig.applyIndexCreationSettings(jsonObject); request.setJsonEntity(jsonObject.toString()); performRequest(client, request); } @@ -117,7 +186,8 @@ public static void loadDataByRestClient( RestClient client, String indexName, String dataSetFilePath) throws IOException { Path path = Paths.get(getResourceFilePath(dataSetFilePath)); Request request = - new Request("POST", "/" + indexName + "/_bulk?refresh=wait_for&wait_for_active_shards=all"); + new Request( + "POST", "/" + indexName + "/_bulk?" + AnalyticsIndexConfig.bulkLoadRefreshParam()); request.setJsonEntity(new String(Files.readAllBytes(path))); performRequest(client, request); } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java index 1a20b42751e..6c25c415af1 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java @@ -30,6 +30,7 @@ import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.legacy.SQLIntegTestCase; +import org.opensearch.sql.legacy.TestUtils; import org.opensearch.sql.protocol.response.format.Format; import org.opensearch.sql.util.RetryProcessor; @@ -49,6 +50,19 @@ protected void init() throws Exception { disableCalcite(); // calcite is enabled by default from 3.3.0 } + /** + * Returns {@code true} when the suite was started with {@code + * -Dtests.analytics.parquet_indices=true}. Use this to branch test assertions that depend on the + * execution backend — when this flag is on, every test-created index is composite/parquet, which + * makes {@code RestUnifiedQueryAction.isAnalyticsIndex} (post-#5432) route every query to the + * analytics-engine backend (DataFusion) instead of the Calcite enumerable / DSL-pushdown backend. + * DataFusion follows different ordering and null-bucket semantics than the legacy V2 and + * Calcite-DSL paths. + */ + public static boolean isAnalyticsParquetIndicesEnabled() { + return TestUtils.AnalyticsIndexConfig.isEnabled(); + } + protected JSONObject executeQuery(String query) throws IOException { return jsonify(executeQueryToString(query)); } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java index b339f3f8023..c3166dd6911 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java @@ -309,7 +309,10 @@ public void testStatsWithLimit() throws IOException { verifySchema(response, schema("a", null, "double"), schema("age", null, "int")); // If push down disabled, the final results will no longer be stable. In DSL, the order is // guaranteed because we always sort by bucket field, while we don't add sort in the plan. - if (!isPushdownDisabled()) { + // The analytics-engine backend (DataFusion) is also non-deterministic — its hash-bucket + // order picks a different null-balance row (e.g. (null,36) instead of (null,null)) for + // `head 5`. + if (!isPushdownDisabled() && !isAnalyticsParquetIndicesEnabled()) { verifyDataRows( response, rows(null, null), @@ -327,7 +330,8 @@ public void testStatsWithLimit() throws IOException { "source=%s | stats avg(balance) as a by age | head 5 | head 2 from 1", TEST_INDEX_BANK_WITH_NULL_VALUES)); verifySchema(response, schema("a", null, "double"), schema("age", null, "int")); - if (!isPushdownDisabled()) { + // Same non-deterministic head-window issue as the preceding `head 5` block. + if (!isPushdownDisabled() && !isAnalyticsParquetIndicesEnabled()) { verifyDataRows(response, rows(32838D, 28), rows(39225D, 32)); } else { assert ((Integer) response.get("size") == 2); @@ -508,7 +512,9 @@ public void testSumWithNull() throws IOException { // TODO: Fix -- temporary workaround for the pushdown issue: // The current pushdown implementation will return 0 for sum when getting null values as input. // Returning null should be the expected behavior. - Integer expectedValue = isPushdownDisabled() ? null : 0; + // The analytics-engine backend (DataFusion) follows the SQL spec like Calcite-no-pushdown — + // SUM of all-null is null, not 0. + Integer expectedValue = (isPushdownDisabled() || isAnalyticsParquetIndicesEnabled()) ? null : 0; verifyDataRows(response, rows(expectedValue)); } @@ -729,6 +735,15 @@ public void testDisableLegacyPreferred() throws IOException { throw new RuntimeException(e); } verifySchema(response, schema("a", null, "double"), schema("age", null, "int")); + // Skip on the analytics-engine backend: PPL_SYNTAX_LEGACY_PREFERRED=false is + // expected to give the same shape across backends, but v2 / Calcite-DSL drop + // the null-age bucket (5 rows) while DataFusion keeps it (6 rows). Deciding + // which is correct is a semantic question for the team; until that's resolved, + // exercising this test on the analytics path doesn't tell us anything useful. + org.junit.Assume.assumeFalse( + "PPL_SYNTAX_LEGACY_PREFERRED=false null-bucket semantics not yet aligned" + + " between V2 / Calcite-DSL and analytics-engine backends", + isAnalyticsParquetIndicesEnabled()); verifyDataRows( response, rows(32838D, 28),