diff --git a/java/tools/src/java/org/apache/orc/tools/Driver.java b/java/tools/src/java/org/apache/orc/tools/Driver.java index b134a1bb80..48acf340ff 100644 --- a/java/tools/src/java/org/apache/orc/tools/Driver.java +++ b/java/tools/src/java/org/apache/orc/tools/Driver.java @@ -93,7 +93,7 @@ public static void main(String[] args) throws Exception { System.err.println(" data - print the data from the ORC file"); System.err.println(" json-schema - scan JSON files to determine their schema"); System.err.println(" key - print information about the keys"); - System.err.println(" merge - merge multiple ORC files into a single ORC file"); + System.err.println(" merge - merge multiple ORC files into one or more ORC files"); System.err.println(" meta - print the metadata about the ORC file"); System.err.println(" scan - scan the ORC file"); System.err.println(" sizes - list size on disk of each column"); diff --git a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java index 39130f82d5..10bb53d694 100644 --- a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java +++ b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java @@ -36,10 +36,14 @@ import java.util.Set; /** - * Merge multiple ORC files that all have the same schema into a single ORC file. + * Merge multiple ORC files that all have the same schema into one or more ORC files. + * When {@code --maxSize} is specified, the tool splits output into multiple part files + * under the given output directory, each not exceeding the specified size threshold. */ public class MergeFiles { + static final String PART_FILE_FORMAT = "part-%05d.orc"; + public static void main(Configuration conf, String[] args) throws Exception { Options opts = createOptions(); CommandLine cli = new DefaultParser().parse(opts, args); @@ -56,9 +60,16 @@ public static void main(Configuration conf, String[] args) throws Exception { } boolean ignoreExtension = cli.hasOption("ignoreExtension"); - List inputFiles = new ArrayList<>(); - OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); + long maxSizeBytes = 0; + if (cli.hasOption("maxSize")) { + maxSizeBytes = Long.parseLong(cli.getOptionValue("maxSize")); + if (maxSizeBytes <= 0) { + System.err.println("--maxSize must be a positive number of bytes."); + System.exit(1); + } + } + List inputStatuses = new ArrayList<>(); String[] files = cli.getArgs(); for (String root : files) { Path rootPath = new Path(root); @@ -66,17 +77,38 @@ public static void main(Configuration conf, String[] args) throws Exception { for (RemoteIterator itr = fs.listFiles(rootPath, true); itr.hasNext(); ) { LocatedFileStatus status = itr.next(); if (status.isFile() && (ignoreExtension || status.getPath().getName().endsWith(".orc"))) { - inputFiles.add(status.getPath()); + inputStatuses.add(status); } } } - if (inputFiles.isEmpty()) { + if (inputStatuses.isEmpty()) { System.err.println("No files found."); System.exit(1); } - List mergedFiles = OrcFile.mergeFiles( - new Path(outputFilename), writerOptions, inputFiles); + List inputFiles = new ArrayList<>(inputStatuses.size()); + for (LocatedFileStatus s : inputStatuses) { + inputFiles.add(s.getPath()); + } + + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf); + + if (maxSizeBytes > 0) { + mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, inputFiles, + new Path(outputFilename), maxSizeBytes); + } else { + mergeIntoSingleFile(writerOptions, inputFiles, new Path(outputFilename), outputFilename); + } + } + + /** + * Original single-output behavior (no --maxSize). + */ + private static void mergeIntoSingleFile(OrcFile.WriterOptions writerOptions, + List inputFiles, + Path outputPath, + String outputFilename) throws Exception { + List mergedFiles = OrcFile.mergeFiles(outputPath, writerOptions, inputFiles); List unSuccessMergedFiles = new ArrayList<>(); if (mergedFiles.size() != inputFiles.size()) { @@ -100,11 +132,77 @@ public static void main(Configuration conf, String[] args) throws Exception { } } + /** + * Multi-output behavior when --maxSize is set. + * Input files are grouped by cumulative raw file size; each group is merged into + * a separate part file (part-00000.orc, part-00001.orc, ...) under outputDir. + * A single file whose size already exceeds maxSizeBytes is placed in its own part. + */ + private static void mergeIntoMultipleFiles(Configuration conf, + OrcFile.WriterOptions writerOptions, + List inputStatuses, + List inputFiles, + Path outputDir, + long maxSizeBytes) throws Exception { + FileSystem outFs = outputDir.getFileSystem(conf); + outFs.mkdirs(outputDir); + + // Group input files into batches where each batch's total size <= maxSizeBytes. + List> batches = new ArrayList<>(); + List currentBatch = new ArrayList<>(); + long currentBatchSize = 0; + + for (LocatedFileStatus status : inputStatuses) { + long fileSize = status.getLen(); + if (!currentBatch.isEmpty() && currentBatchSize + fileSize > maxSizeBytes) { + batches.add(currentBatch); + currentBatch = new ArrayList<>(); + currentBatchSize = 0; + } + currentBatch.add(status.getPath()); + currentBatchSize += fileSize; + } + if (!currentBatch.isEmpty()) { + batches.add(currentBatch); + } + + int totalMerged = 0; + List allUnmerged = new ArrayList<>(); + + for (int i = 0; i < batches.size(); i++) { + List batch = batches.get(i); + Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT, i)); + List merged = OrcFile.mergeFiles(partOutput, OrcFile.writerOptions(conf), batch); + totalMerged += merged.size(); + + if (merged.size() != batch.size()) { + Set mergedSet = new HashSet<>(merged); + for (Path p : batch) { + if (!mergedSet.contains(p)) { + allUnmerged.add(p); + } + } + } + } + + if (!allUnmerged.isEmpty()) { + System.err.println("List of files that could not be merged:"); + allUnmerged.forEach(path -> System.err.println(path.toString())); + } + + System.out.printf( + "Output path: %s, Input files size: %d, Merge files size: %d, Output files: %d%n", + outputDir, inputFiles.size(), totalMerged, batches.size()); + if (!allUnmerged.isEmpty()) { + System.exit(1); + } + } + private static Options createOptions() { Options result = new Options(); result.addOption(Option.builder("o") .longOpt("output") - .desc("Output filename") + .desc("Output filename (single-file mode) or output directory (multi-file mode)") .hasArg() .build()); @@ -113,6 +211,15 @@ private static Options createOptions() { .desc("Ignore ORC file extension") .build()); + result.addOption(Option.builder("m") + .longOpt("maxSize") + .desc("Maximum size in bytes for each output ORC file. When set, --output is treated as " + + "an output directory and merged files are written as part-00000.orc, " + + "part-00001.orc, etc. Files are grouped at file boundaries so an individual " + + "file larger than this threshold will still be placed in its own part.") + .hasArg() + .build()); + result.addOption(Option.builder("h") .longOpt("help") .desc("Print help message") diff --git a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java index 3fdfeba0c4..2901a337ce 100644 --- a/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java +++ b/java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java @@ -53,6 +53,7 @@ public class TestMergeFiles implements TestConf { @BeforeEach public void openFileSystem() throws Exception { fs = FileSystem.getLocal(conf); + fs.delete(workDir, true); fs.mkdirs(workDir); fs.deleteOnExit(workDir); testFilePath = new Path(workDir + File.separator + "TestMergeFiles.testMerge.orc"); @@ -107,4 +108,82 @@ public void testMerge() throws Exception { assertEquals(10000 + 20000, reader.getNumberOfRows()); } } + + /** + * Verifies that --maxSize splits input files into multiple part files under the output + * directory. Three source files are created; a tight size threshold forces them to be + * written into at least two part files. + */ + @Test + public void testMergeWithMaxSize() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + + // Create 3 source ORC files with different row counts. + String[] sourceNames = { + workDir + File.separator + "ms-1.orc", + workDir + File.separator + "ms-2.orc", + workDir + File.separator + "ms-3.orc" + }; + int[] rowCounts = {5000, 5000, 5000}; + for (int f = 0; f < sourceNames.length; f++) { + Writer writer = OrcFile.createWriter(new Path(sourceNames[f]), + OrcFile.writerOptions(conf).setSchema(schema)); + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector x = (LongColumnVector) batch.cols[0]; + BytesColumnVector y = (BytesColumnVector) batch.cols[1]; + for (int r = 0; r < rowCounts[f]; ++r) { + int row = batch.size++; + x.vector[row] = r; + byte[] buffer = ("val-" + r).getBytes(); + y.setRef(row, buffer, 0, buffer.length); + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + } + writer.close(); + } + + // Measure the size of the first source file to compute a threshold that forces a split. + long singleFileSize = fs.getFileStatus(new Path(sourceNames[0])).getLen(); + // Threshold: slightly larger than one file so at most one file fits per part. + long maxSize = singleFileSize + 1; + + Path outputDir = new Path(workDir + File.separator + "merge-multi-out"); + fs.delete(outputDir, true); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8)); + MergeFiles.main(conf, new String[]{workDir.toString(), + "--output", outputDir.toString(), + "--maxSize", String.valueOf(maxSize)}); + System.out.flush(); + System.setOut(origOut); + String output = myOut.toString(StandardCharsets.UTF_8); + System.out.println(output); + + assertTrue(output.contains("Input files size: 3"), "Should report 3 input files"); + assertTrue(output.contains("Merge files size: 3"), "All 3 files should be merged"); + assertTrue(fs.isDirectory(outputDir), "Output directory should be created"); + + // Verify that multiple part files were created and total row count is correct. + long totalRows = 0; + int partCount = 0; + for (int i = 0; ; i++) { + Path part = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, i)); + if (!fs.exists(part)) { + break; + } + partCount++; + try (Reader reader = OrcFile.createReader(part, OrcFile.readerOptions(conf))) { + totalRows += reader.getNumberOfRows(); + } + } + assertTrue(partCount > 1, "Expected more than one output part file, got: " + partCount); + assertEquals(5000 + 5000 + 5000, totalRows, "Total row count across all parts should match"); + } } diff --git a/site/_docs/java-tools.md b/site/_docs/java-tools.md index a3d546e007..21a1fec1d1 100644 --- a/site/_docs/java-tools.md +++ b/site/_docs/java-tools.md @@ -17,7 +17,7 @@ The subcommands for the tools are: * data - print the data of an ORC file * json-schema (since ORC 1.4) - determine the schema of JSON documents * key (since ORC 1.5) - print information about the encryption keys - * merge (since ORC 2.0.1) - merge multiple ORC files into a single ORC file + * merge (since ORC 2.0.1) - merge multiple ORC files into one or more ORC files * meta - print the metadata of an ORC file * scan (since ORC 1.3) - scan the data for benchmarking * sizes (since ORC 1.7.2) - list size on disk of each column @@ -356,13 +356,39 @@ ______________________________________________________________________ ## Java Merge -The merge command can merge multiple ORC files that all have the same schema into a single ORC file. +The merge command can merge multiple ORC files that all have the same schema. By default +it writes a single output file. If `--maxSize` is set, `--output` is treated as a directory +and the tool writes multiple part files (`part-00000.orc`, `part-00001.orc`, …) under it. +Input files are grouped using their on-disk sizes so that each part’s total input size +does not exceed the given threshold (a single input file larger than the threshold is still +merged into its own part). + +`-h,--help` + : Print help + +`-i,--ignoreExtension` + : Include files that do not end in `.orc` + +`-m,--maxSize ` + : Maximum size in bytes for each output part; enables multi-file output under `--output` + +`-o,--output ` + : Output ORC filename (single-file mode) or output directory (when `--maxSize` is set) + +Merge into one ORC file: ~~~ shell % java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/merged.orc /path/to/input_orc/ ______________________________________________________________________ ~~~ +Merge into multiple ORC files under a directory (each part bounded by size): + +~~~ shell +% java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/out_dir/ --maxSize 1073741824 /path/to/input_orc/ +______________________________________________________________________ +~~~ + ## Java Version The version command prints the version of this ORC tool.