-
Notifications
You must be signed in to change notification settings - Fork 506
ORC-2149: Supports merging multiple ORC files with the same schema into multiple ORC files. #2601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -36,10 +36,14 @@ | |||||||||||||||||||||||||||
| import java.util.Set; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Merge multiple ORC files that all have the same schema into a single ORC file. | ||||||||||||||||||||||||||||
| * Merge multiple ORC files that all have the same schema into one or more ORC files. | ||||||||||||||||||||||||||||
| * When {@code --maxSize} is specified, the tool splits output into multiple part files | ||||||||||||||||||||||||||||
| * under the given output directory, each not exceeding the specified size threshold. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| public class MergeFiles { | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| static final String PART_FILE_FORMAT = "part-%05d.orc"; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| public static void main(Configuration conf, String[] args) throws Exception { | ||||||||||||||||||||||||||||
| Options opts = createOptions(); | ||||||||||||||||||||||||||||
| CommandLine cli = new DefaultParser().parse(opts, args); | ||||||||||||||||||||||||||||
|
|
@@ -56,27 +60,55 @@ public static void main(Configuration conf, String[] args) throws Exception { | |||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| boolean ignoreExtension = cli.hasOption("ignoreExtension"); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| List<Path> inputFiles = new ArrayList<>(); | ||||||||||||||||||||||||||||
| OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); | ||||||||||||||||||||||||||||
| long maxSizeBytes = 0; | ||||||||||||||||||||||||||||
| if (cli.hasOption("maxSize")) { | ||||||||||||||||||||||||||||
| maxSizeBytes = Long.parseLong(cli.getOptionValue("maxSize")); | ||||||||||||||||||||||||||||
| if (maxSizeBytes <= 0) { | ||||||||||||||||||||||||||||
| System.err.println("--maxSize must be a positive number of bytes."); | ||||||||||||||||||||||||||||
| System.exit(1); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| List<LocatedFileStatus> inputStatuses = new ArrayList<>(); | ||||||||||||||||||||||||||||
| String[] files = cli.getArgs(); | ||||||||||||||||||||||||||||
| for (String root : files) { | ||||||||||||||||||||||||||||
| Path rootPath = new Path(root); | ||||||||||||||||||||||||||||
| FileSystem fs = rootPath.getFileSystem(conf); | ||||||||||||||||||||||||||||
| for (RemoteIterator<LocatedFileStatus> itr = fs.listFiles(rootPath, true); itr.hasNext(); ) { | ||||||||||||||||||||||||||||
| LocatedFileStatus status = itr.next(); | ||||||||||||||||||||||||||||
| if (status.isFile() && (ignoreExtension || status.getPath().getName().endsWith(".orc"))) { | ||||||||||||||||||||||||||||
| inputFiles.add(status.getPath()); | ||||||||||||||||||||||||||||
| inputStatuses.add(status); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| if (inputFiles.isEmpty()) { | ||||||||||||||||||||||||||||
| if (inputStatuses.isEmpty()) { | ||||||||||||||||||||||||||||
| System.err.println("No files found."); | ||||||||||||||||||||||||||||
| System.exit(1); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| List<Path> mergedFiles = OrcFile.mergeFiles( | ||||||||||||||||||||||||||||
| new Path(outputFilename), writerOptions, inputFiles); | ||||||||||||||||||||||||||||
| List<Path> inputFiles = new ArrayList<>(inputStatuses.size()); | ||||||||||||||||||||||||||||
| for (LocatedFileStatus s : inputStatuses) { | ||||||||||||||||||||||||||||
| inputFiles.add(s.getPath()); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| if (maxSizeBytes > 0) { | ||||||||||||||||||||||||||||
| mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, inputFiles, | ||||||||||||||||||||||||||||
| new Path(outputFilename), maxSizeBytes); | ||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||
| mergeIntoSingleFile(writerOptions, inputFiles, new Path(outputFilename), outputFilename); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Original single-output behavior (no --maxSize). | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| private static void mergeIntoSingleFile(OrcFile.WriterOptions writerOptions, | ||||||||||||||||||||||||||||
| List<Path> inputFiles, | ||||||||||||||||||||||||||||
| Path outputPath, | ||||||||||||||||||||||||||||
| String outputFilename) throws Exception { | ||||||||||||||||||||||||||||
| List<Path> mergedFiles = OrcFile.mergeFiles(outputPath, writerOptions, inputFiles); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| List<Path> unSuccessMergedFiles = new ArrayList<>(); | ||||||||||||||||||||||||||||
| if (mergedFiles.size() != inputFiles.size()) { | ||||||||||||||||||||||||||||
|
|
@@ -100,11 +132,77 @@ public static void main(Configuration conf, String[] args) throws Exception { | |||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Multi-output behavior when --maxSize is set. | ||||||||||||||||||||||||||||
| * Input files are grouped by cumulative raw file size; each group is merged into | ||||||||||||||||||||||||||||
| * a separate part file (part-00000.orc, part-00001.orc, ...) under outputDir. | ||||||||||||||||||||||||||||
| * A single file whose size already exceeds maxSizeBytes is placed in its own part. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| private static void mergeIntoMultipleFiles(Configuration conf, | ||||||||||||||||||||||||||||
| OrcFile.WriterOptions writerOptions, | ||||||||||||||||||||||||||||
| List<LocatedFileStatus> inputStatuses, | ||||||||||||||||||||||||||||
| List<Path> inputFiles, | ||||||||||||||||||||||||||||
| Path outputDir, | ||||||||||||||||||||||||||||
| long maxSizeBytes) throws Exception { | ||||||||||||||||||||||||||||
| FileSystem outFs = outputDir.getFileSystem(conf); | ||||||||||||||||||||||||||||
| outFs.mkdirs(outputDir); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| outFs.mkdirs(outputDir); | |
| if (outFs.exists(outputDir)) { | |
| if (!outFs.getFileStatus(outputDir).isDirectory()) { | |
| throw new IllegalArgumentException( | |
| "Output path already exists and is not a directory: " + outputDir); | |
| } | |
| if (outFs.listStatus(outputDir).length > 0) { | |
| throw new IllegalArgumentException( | |
| "Output directory must be empty for multi-file merge: " + outputDir); | |
| } | |
| } else if (!outFs.mkdirs(outputDir)) { | |
| throw new IllegalStateException("Failed to create output directory: " + outputDir); | |
| } |
Copilot
AI
Apr 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mergeIntoMultipleFiles takes writerOptions but ignores it and instead creates new OrcFile.writerOptions(conf) for each batch. This is confusing and can also drop caller-specified options (eg overwrite/key provider) if they are added later. Consider either removing the unused parameter, or (preferably) cloning the passed writerOptions for each batch (WriterOptions supports clone()) and passing the clone into OrcFile.mergeFiles.
| List<Path> merged = OrcFile.mergeFiles(partOutput, OrcFile.writerOptions(conf), batch); | |
| OrcFile.WriterOptions batchWriterOptions = writerOptions.clone(); | |
| List<Path> merged = OrcFile.mergeFiles(partOutput, batchWriterOptions, batch); |
Copilot
AI
Apr 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
--maxSize is enforced using the cumulative input file sizes (LocatedFileStatus#getLen()), not the actual size of the generated output ORC part files. The option description currently reads as if it caps the output file size; consider rewording to avoid implying a strict output-size limit.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -53,6 +53,7 @@ public class TestMergeFiles implements TestConf { | |||||||||||||||||||||||||||
| @BeforeEach | ||||||||||||||||||||||||||||
| public void openFileSystem() throws Exception { | ||||||||||||||||||||||||||||
| fs = FileSystem.getLocal(conf); | ||||||||||||||||||||||||||||
| fs.delete(workDir, true); | ||||||||||||||||||||||||||||
| fs.mkdirs(workDir); | ||||||||||||||||||||||||||||
| fs.deleteOnExit(workDir); | ||||||||||||||||||||||||||||
| testFilePath = new Path(workDir + File.separator + "TestMergeFiles.testMerge.orc"); | ||||||||||||||||||||||||||||
|
|
@@ -107,4 +108,82 @@ public void testMerge() throws Exception { | |||||||||||||||||||||||||||
| assertEquals(10000 + 20000, reader.getNumberOfRows()); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Verifies that --maxSize splits input files into multiple part files under the output | ||||||||||||||||||||||||||||
| * directory. Three source files are created; a tight size threshold forces them to be | ||||||||||||||||||||||||||||
| * written into at least two part files. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| @Test | ||||||||||||||||||||||||||||
| public void testMergeWithMaxSize() throws Exception { | ||||||||||||||||||||||||||||
| TypeDescription schema = TypeDescription.fromString("struct<x:int,y:string>"); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Create 3 source ORC files with different row counts. | ||||||||||||||||||||||||||||
| String[] sourceNames = { | ||||||||||||||||||||||||||||
| workDir + File.separator + "ms-1.orc", | ||||||||||||||||||||||||||||
| workDir + File.separator + "ms-2.orc", | ||||||||||||||||||||||||||||
| workDir + File.separator + "ms-3.orc" | ||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||
| int[] rowCounts = {5000, 5000, 5000}; | ||||||||||||||||||||||||||||
| for (int f = 0; f < sourceNames.length; f++) { | ||||||||||||||||||||||||||||
|
Comment on lines
+121
to
+128
|
||||||||||||||||||||||||||||
| Writer writer = OrcFile.createWriter(new Path(sourceNames[f]), | ||||||||||||||||||||||||||||
| OrcFile.writerOptions(conf).setSchema(schema)); | ||||||||||||||||||||||||||||
| VectorizedRowBatch batch = schema.createRowBatch(); | ||||||||||||||||||||||||||||
| LongColumnVector x = (LongColumnVector) batch.cols[0]; | ||||||||||||||||||||||||||||
| BytesColumnVector y = (BytesColumnVector) batch.cols[1]; | ||||||||||||||||||||||||||||
| for (int r = 0; r < rowCounts[f]; ++r) { | ||||||||||||||||||||||||||||
| int row = batch.size++; | ||||||||||||||||||||||||||||
| x.vector[row] = r; | ||||||||||||||||||||||||||||
| byte[] buffer = ("val-" + r).getBytes(); | ||||||||||||||||||||||||||||
| y.setRef(row, buffer, 0, buffer.length); | ||||||||||||||||||||||||||||
| if (batch.size == batch.getMaxSize()) { | ||||||||||||||||||||||||||||
| writer.addRowBatch(batch); | ||||||||||||||||||||||||||||
| batch.reset(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| if (batch.size != 0) { | ||||||||||||||||||||||||||||
| writer.addRowBatch(batch); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| writer.close(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Measure the size of the first source file to compute a threshold that forces a split. | ||||||||||||||||||||||||||||
| long singleFileSize = fs.getFileStatus(new Path(sourceNames[0])).getLen(); | ||||||||||||||||||||||||||||
| // Threshold: slightly larger than one file so at most one file fits per part. | ||||||||||||||||||||||||||||
| long maxSize = singleFileSize + 1; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Path outputDir = new Path(workDir + File.separator + "merge-multi-out"); | ||||||||||||||||||||||||||||
| fs.delete(outputDir, true); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| PrintStream origOut = System.out; | ||||||||||||||||||||||||||||
| ByteArrayOutputStream myOut = new ByteArrayOutputStream(); | ||||||||||||||||||||||||||||
| System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); | ||||||||||||||||||||||||||||
| MergeFiles.main(conf, new String[]{workDir.toString(), | ||||||||||||||||||||||||||||
| "--output", outputDir.toString(), | ||||||||||||||||||||||||||||
| "--maxSize", String.valueOf(maxSize)}); | ||||||||||||||||||||||||||||
| System.out.flush(); | ||||||||||||||||||||||||||||
| System.setOut(origOut); | ||||||||||||||||||||||||||||
|
Comment on lines
+161
to
+165
|
||||||||||||||||||||||||||||
| MergeFiles.main(conf, new String[]{workDir.toString(), | |
| "--output", outputDir.toString(), | |
| "--maxSize", String.valueOf(maxSize)}); | |
| System.out.flush(); | |
| System.setOut(origOut); | |
| try { | |
| MergeFiles.main(conf, new String[]{workDir.toString(), | |
| "--output", outputDir.toString(), | |
| "--maxSize", String.valueOf(maxSize)}); | |
| System.out.flush(); | |
| } finally { | |
| System.setOut(origOut); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long.parseLong(cli.getOptionValue("maxSize"))will throwNumberFormatExceptionfor non-numeric inputs and currently results in an uncaught exception/stack trace. Consider catching the parse error, printing a clear message (and help text if appropriate), and exiting with a non-zero status.