Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,14 @@ task integTestRemote(type: RestIntegTestTask) {
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")

// Forward the analytics-engine parquet-indices toggle when set on the gradle command
// line. TestUtils.createIndexByRestClient reads this to back every test-created index
// with composite/parquet so RestUnifiedQueryAction.isAnalyticsIndex (post-#5432) routes
// to the analytics-engine planner via index settings.
if (System.getProperty("tests.analytics.parquet_indices") != null) {
systemProperty 'tests.analytics.parquet_indices', System.getProperty("tests.analytics.parquet_indices")
}

// Set default query size limit
systemProperty 'defaultQuerySizeLimit', '10000'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,74 @@ public class TestUtils {

private static final String MAPPING_FILE_PATH = "src/test/resources/indexDefinitions/";

/**
* Forwarding alias for {@link AnalyticsIndexConfig#ENABLED_PROP}. Kept for any external callers
* that referenced the old constant location.
*/
public static final String ANALYTICS_PARQUET_INDICES_PROP = AnalyticsIndexConfig.ENABLED_PROP;

/**
* Centralizes every analytics-engine-only test-index knob in one place so the parquet-backed
* settings, the bulk-load refresh strategy, and any future analytics-specific config don't drift
* across separate helpers (the spread-out-conditional pattern called out in review, borrowed from
* the OS-Spark repo's AOSS-conditional configs that ended up scattered across its codebase).
*
* <p>When {@link #ENABLED_PROP} is unset (default), every method here is a no-op or returns the
* standard non-parquet value, so normal CI sees zero behavior change.
*/
public static final class AnalyticsIndexConfig {

/**
* System property that makes every test-created index parquet-backed (composite primary data
* format = parquet) with a single shard. When set, {@link
* RestUnifiedQueryAction#isAnalyticsIndex} (which routes based on {@code
* index.pluggable.dataformat.enabled} / {@code index.pluggable.dataformat=composite}, see
* #5432) returns {@code true} for every test-created index — exercising the analytics-engine
* route end-to-end without per-test rewiring.
*/
public static final String ENABLED_PROP = "tests.analytics.parquet_indices";

public static boolean isEnabled() {
return Boolean.parseBoolean(System.getProperty(ENABLED_PROP, "false"));
}

/**
* Inject the parquet-backed composite-store index settings into {@code jsonObject}. No-op when
* the config is disabled; idempotent — safe on any index-creation JSON shape.
*/
static void applyIndexCreationSettings(JSONObject jsonObject) {
if (!isEnabled()) {
return;
}
JSONObject settings =
jsonObject.has("settings") ? jsonObject.getJSONObject("settings") : new JSONObject();
JSONObject indexSettings =
settings.has("index") ? settings.getJSONObject("index") : new JSONObject();
indexSettings.put("number_of_shards", 1);
indexSettings.put("pluggable.dataformat.enabled", true);
indexSettings.put("pluggable.dataformat", "composite");
indexSettings.put("composite.primary_data_format", "parquet");
indexSettings.put("composite.secondary_data_formats", new org.json.JSONArray());
settings.put("index", indexSettings);
jsonObject.put("settings", settings);
}

/**
* Returns the {@code _bulk} refresh query string for the current index type. Parquet-backed
* indices in the analytics-backend-lucene composite engine don't yet implement {@code
* LuceneCommitter.getSafeCommitInfo} (UnsupportedOperationException "TODO:: with index
* deleter"), which hangs {@code refresh=wait_for} until the test framework request timeout
* (~60s). Force-refresh sidesteps the safe-commit-info path while still making the bulk-loaded
* docs immediately searchable. Drop this branch once {@code LuceneCommitter.getSafeCommitInfo}
* is implemented.
*/
static String bulkLoadRefreshParam() {
return isEnabled() ? "refresh=true" : "refresh=wait_for&wait_for_active_shards=all";
}

private AnalyticsIndexConfig() {}
}

/**
* Create test index by REST client.
*
Expand All @@ -48,6 +116,7 @@ public static void createIndexByRestClient(RestClient client, String indexName,
Request request = new Request("PUT", "/" + indexName);
JSONObject jsonObject = isNullOrEmpty(mapping) ? new JSONObject() : new JSONObject(mapping);
setZeroReplicas(jsonObject);
AnalyticsIndexConfig.applyIndexCreationSettings(jsonObject);
request.setJsonEntity(jsonObject.toString());
performRequest(client, request);
}
Expand Down Expand Up @@ -117,7 +186,8 @@ public static void loadDataByRestClient(
RestClient client, String indexName, String dataSetFilePath) throws IOException {
Path path = Paths.get(getResourceFilePath(dataSetFilePath));
Request request =
new Request("POST", "/" + indexName + "/_bulk?refresh=wait_for&wait_for_active_shards=all");
new Request(
"POST", "/" + indexName + "/_bulk?" + AnalyticsIndexConfig.bulkLoadRefreshParam());
request.setJsonEntity(new String(Files.readAllBytes(path)));
performRequest(client, request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.common.setting.Settings.Key;
import org.opensearch.sql.legacy.SQLIntegTestCase;
import org.opensearch.sql.legacy.TestUtils;
import org.opensearch.sql.protocol.response.format.Format;
import org.opensearch.sql.util.RetryProcessor;

Expand All @@ -49,6 +50,19 @@ protected void init() throws Exception {
disableCalcite(); // calcite is enabled by default from 3.3.0
}

/**
* Returns {@code true} when the suite was started with {@code
* -Dtests.analytics.parquet_indices=true}. Use this to branch test assertions that depend on the
* execution backend — when this flag is on, every test-created index is composite/parquet, which
* makes {@code RestUnifiedQueryAction.isAnalyticsIndex} (post-#5432) route every query to the
* analytics-engine backend (DataFusion) instead of the Calcite enumerable / DSL-pushdown backend.
* DataFusion follows different ordering and null-bucket semantics than the legacy V2 and
* Calcite-DSL paths.
*/
public static boolean isAnalyticsParquetIndicesEnabled() {
return TestUtils.AnalyticsIndexConfig.isEnabled();
}

protected JSONObject executeQuery(String query) throws IOException {
return jsonify(executeQueryToString(query));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,10 @@ public void testStatsWithLimit() throws IOException {
verifySchema(response, schema("a", null, "double"), schema("age", null, "int"));
// If push down disabled, the final results will no longer be stable. In DSL, the order is
// guaranteed because we always sort by bucket field, while we don't add sort in the plan.
if (!isPushdownDisabled()) {
// The analytics-engine backend (DataFusion) is also non-deterministic — its hash-bucket
// order picks a different null-balance row (e.g. (null,36) instead of (null,null)) for
// `head 5`.
if (!isPushdownDisabled() && !isAnalyticsParquetIndicesEnabled()) {
verifyDataRows(
response,
rows(null, null),
Expand All @@ -327,7 +330,8 @@ public void testStatsWithLimit() throws IOException {
"source=%s | stats avg(balance) as a by age | head 5 | head 2 from 1",
TEST_INDEX_BANK_WITH_NULL_VALUES));
verifySchema(response, schema("a", null, "double"), schema("age", null, "int"));
if (!isPushdownDisabled()) {
// Same non-deterministic head-window issue as the preceding `head 5` block.
if (!isPushdownDisabled() && !isAnalyticsParquetIndicesEnabled()) {
verifyDataRows(response, rows(32838D, 28), rows(39225D, 32));
} else {
assert ((Integer) response.get("size") == 2);
Expand Down Expand Up @@ -508,7 +512,9 @@ public void testSumWithNull() throws IOException {
// TODO: Fix -- temporary workaround for the pushdown issue:
// The current pushdown implementation will return 0 for sum when getting null values as input.
// Returning null should be the expected behavior.
Integer expectedValue = isPushdownDisabled() ? null : 0;
// The analytics-engine backend (DataFusion) follows the SQL spec like Calcite-no-pushdown —
// SUM of all-null is null, not 0.
Integer expectedValue = (isPushdownDisabled() || isAnalyticsParquetIndicesEnabled()) ? null : 0;
Comment on lines +516 to +517
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking does all these assertion changes means semantic/behavior change? Or we can simply fix the assertion in this way.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on the test:

  • Existing pattern (isPushdownDisabled() ? null : 0) — extend it (testSumWithNull, testStatsWithLimit).
  • Semantic divergenceAssume.assumeFalse (testDisableLegacyPreferred).
  • Percentile tests (testStatsPercentileBySpan and friends) — since #21584 no longer wires percentile_approx on the analytics path, those queries fail at execution before the assertion. Reverted those branches in efa97de — they were doing nothing.

verifyDataRows(response, rows(expectedValue));
}

Expand Down Expand Up @@ -729,6 +735,15 @@ public void testDisableLegacyPreferred() throws IOException {
throw new RuntimeException(e);
}
verifySchema(response, schema("a", null, "double"), schema("age", null, "int"));
// Skip on the analytics-engine backend: PPL_SYNTAX_LEGACY_PREFERRED=false is
// expected to give the same shape across backends, but v2 / Calcite-DSL drop
// the null-age bucket (5 rows) while DataFusion keeps it (6 rows). Deciding
// which is correct is a semantic question for the team; until that's resolved,
// exercising this test on the analytics path doesn't tell us anything useful.
org.junit.Assume.assumeFalse(
"PPL_SYNTAX_LEGACY_PREFERRED=false null-bucket semantics not yet aligned"
+ " between V2 / Calcite-DSL and analytics-engine backends",
isAnalyticsParquetIndicesEnabled());
verifyDataRows(
response,
rows(32838D, 28),
Expand Down
Loading