Skip to content

Parquet Incremental Sync#768

Open
sapienza88 wants to merge 129 commits intoapache:mainfrom
sapienza88:parquet_incr_sync
Open

Parquet Incremental Sync#768
sapienza88 wants to merge 129 commits intoapache:mainfrom
sapienza88:parquet_incr_sync

Conversation

@sapienza88
Copy link
Copy Markdown
Contributor

@sapienza88 sapienza88 commented Dec 10, 2025

What is the purpose of the pull request

Adds incremental syncing ability to the ParquetSource

Brief change log

  • Adds a new class ParquetDataManager.java for handling the fetching of data files for Parquet Source
  • Updates IT to include incremental source

Verify this pull request

  • new tests added to ITParquetConversionSource

@sapienza88 sapienza88 changed the title Parquet Incremental Sync: Given a parquet file return data from a certain modification time Parquet Incremental Sync Dec 10, 2025
@rahil-c
Copy link
Copy Markdown
Contributor

rahil-c commented Dec 15, 2025

I can do first review for this @the-other-tim-brown @vinishjail97

@vinishjail97 vinishjail97 self-requested a review December 16, 2025 08:31
Comment thread xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java Outdated
Comment thread xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java Outdated
Comment thread xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileConfig.java Outdated
Comment thread xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java Outdated
Comment on lines +245 to +259
try (ParquetWriter<Group> writer =
new ParquetWriter<Group>(
outputFile,
new GroupWriteSupport(),
parquetFileConfig.getCodec(),
(int) parquetFileConfig.getRowGroupSize(),
pageSize,
pageSize, // dictionaryPageSize
true, // enableDictionary
false, // enableValidation
ParquetWriter.DEFAULT_WRITER_VERSION,
conf)) {
Group currentGroup = null;
while ((currentGroup = (Group) reader.read()) != null) {
writer.write(currentGroup);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we writing new parquet files again like this through the writer? I think there's some misunderstanding with the parquet incremental sync feature here.

Parquet Incremental Sync Requirements.

  1. You have a target table where parquet files [p1/f1.parquet, p1/f2.parquet, p2/f1.parquet] have been synced to hudi, iceberg and delta for example.
  2. In the source changes some changes have been made a new file in partition p1 was added and p2's file was deleted. The incremental sync should now sync the new changes incrementally.

@sapienza88 It's better to align on the approach first here before we push PR's. Can you add the approach for parquet incremental sync in the PR description or any google doc if possible?

Copy link
Copy Markdown
Contributor

@vinishjail97 vinishjail97 Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sapienza88 XTable shouldn't be writing any new data or parquet files it operates at a metadata level. Can you see this comment for reference?
#550 (comment)
Fetch the parquet files that have been added since last syncInstant to retrieve the change log. We can this via the same list call and filtering files based on their creationTime is the simplest way but it's expensive

Copy link
Copy Markdown
Contributor Author

@sapienza88 sapienza88 Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinishjail97 thanks for the suggestion, but that isn't helping. Could you elaborate on that idea and how you could manage the metadata only for the task of retrieving data from a particular (modification) date? at the very least the current ConversionSource wasn't coded with that in mind.

@sapienza88
Copy link
Copy Markdown
Contributor Author

@vinishjail97 I added some comments on the functions so that the approach is clearer. All above suggestions were also taken into account in my last commit.

@vinishjail97
Copy link
Copy Markdown
Contributor

XTable shouldn't be writing any new data or parquet files it operators at a metadata level. Can you see this comment for reference? I had written few approaches on how to do incremental parquet sync.
#550 (comment)

@vinishjail97
Copy link
Copy Markdown
Contributor

@sapienza88 I'm adding a more detailed design and a class level structure to unblock this PR.

Design Principle
XTable operates at a metadata level only. The current PR approach of writing new Parquet files with filtered data is incorrect. XTable should:

  • Discover existing Parquet files from storage
  • Generate table format metadata (Hudi, Iceberg, Delta) for those files
  • NEVER write new Parquet files or transform data.

Architecture

  ┌────────────────────────────────────────────────────────────┐
  │                  ParquetConversionSource                   │
  │  - Uses ParquetFileDiscovery to find files                 │
  │  - Converts file metadata to InternalDataFile              │
  │  - Returns snapshots and table changes                     │
  └────────────────────────────────────────────────────────────┘
                              │
                              ▼
  ┌────────────────────────────────────────────────────────────┐
  │              ParquetFileDiscovery (new class)              │
  │  - Lists all .parquet files from filesystem                │
  │  - Filters files by modification time                      │
  │  - Returns lightweight file metadata                       │
  └────────────────────────────────────────────────────────────┘
                              │
                              ▼
  ┌────────────────────────────────────────────────────────────┐
  │            FileSystem (HDFS/S3/GCS/Azure)                  │
  │  - fs.listFiles(basePath, recursive=true)                  │
  └────────────────────────────────────────────────────────────┘

Use file modification time as commit identifier, you will be able to identify which files have been synced and which haven't been synced. The files not synced need to have metadata generated. The future functionality like making it optimized, handling deleted parquet files in storage can be handled incrementally, hoping to scope low for this PR.

}
}

@Test
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 184 needs to be updated to include INCREMENTAL as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Comment thread xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java Outdated
* parquet files and filtering the files based on the modification times.
*/
@Log4j2
@RequiredArgsConstructor
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit — exposing both the Lombok-generated 3-arg ctor (@RequiredArgsConstructor) and an explicit 2-arg ctor creates ambiguity about the public API. Production code calls the 2-arg form; tests call the 3-arg form. Consider dropping @RequiredArgsConstructor and either annotating the 3-arg ctor @VisibleForTesting or using a package-private static factory method for tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave the rest of your comments here to somebody else to do them.

RowFactory.create(103, "BA", 2027, 11));

Dataset<Row> dfInit = sparkSession.createDataFrame(data, schema);
Path fixedPath = Paths.get("target", "fixed-parquet-data", "parquet_table_test_2");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relative path Paths.get("target", "fixed-parquet-data", "parquet_table_test_2") pollutes the workspace across test runs, isn't cleaned up, and makes the test order-dependent when re-run without ./gradlew clean. This class already uses the @TempDir pattern — please use it here too. Also drop the commented-out // String outputPath = fixedPath.toString(); on line 457.

assertNotNull(result);
List<ParquetFileInfo> fileList = result.collect(Collectors.toList());
assertEquals(3, fileList.size());
assertEquals(1000L, fileList.get(0).getModificationTime());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit — asserting positional ordering here relies on RemoteIterator + Collectors.toList() preserving the mock insertion order. In production, FS listing order is platform-dependent and not guaranteed. Either sort inside getCurrentFilesInfo() (and document the ordering contract) or switch these assertions to Set<Long> comparisons. Same pattern applies to testGetParquetFilesMetadataAfterTime_someMatch and _exactTimeMatch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants