diff --git a/.gitignore b/.gitignore index 23963332f..f10ffad13 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,6 @@ aster-nix-profile-*.svg # editor configuration .vscode + +# Python files +__pycache__ diff --git a/Cargo.lock b/Cargo.lock index 7e8841f56..17c0f1d2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,12 +76,6 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" -[[package]] -name = "array-init" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" - [[package]] name = "aster-bigtcp" version = "0.1.0" @@ -224,7 +218,6 @@ dependencies = [ "cfg-if", "component", "controlled", - "core2", "cpio-decoder", "crossbeam-utils", "fixed", @@ -240,12 +233,14 @@ dependencies = [ "log", "lru", "mariposa_data_capture", + "no_std_io2", "osdk-frame-allocator", "osdk-heap-allocator", "ostd", "paste", "rand", "riscv", + "serde", "snafu", "spin", "takeable", @@ -316,6 +311,7 @@ dependencies = [ "component", "log", "ostd", + "serde", "spin", ] @@ -361,7 +357,7 @@ version = "0.1.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -385,29 +381,6 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b7e4c2464d97fe331d41de9d5db0def0a96f4d823b8b32a2efd503578988973" -[[package]] -name = "binary_serde" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4b2fe974c561ed04fb89673dd221bed36fc4794b71ca6ae96ffe19949045686" -dependencies = [ - "array-init", - "binary_serde_macros", - "recursive_array", - "thiserror-no-std", -] - -[[package]] -name = "binary_serde_macros" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4790b8dbcfa9d3adf2020ebf5fd36c6bc89a77d434131615ea349bc9620e8831" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "bit_field" version = "0.10.2" @@ -467,7 +440,7 @@ checksum = "7ecc273b49b3205b83d648f0690daa588925572cc5063745bfe547fe7ec8e1a1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -537,22 +510,13 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "core2" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" -dependencies = [ - "memchr", -] - [[package]] name = "cpio-decoder" version = "0.1.0" dependencies = [ - "core2", "int-to-c-enum", "lending-iterator", + "no_std_io2", ] [[package]] @@ -670,7 +634,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -775,6 +739,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "font8x8" version = "0.2.7" @@ -817,7 +787,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -889,7 +859,18 @@ checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", ] [[package]] @@ -978,7 +959,7 @@ version = "0.1.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -1064,25 +1045,25 @@ checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libflate" -version = "2.1.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +checksum = "cd96e993e5f3368b0cb8497dae6c860c22af8ff18388c61c6c0b86c58d86b5df" dependencies = [ "adler32", - "core2", "crc32fast", "dary_heap", "libflate_lz77", + "no_std_io2", ] [[package]] name = "libflate_lz77" -version = "2.1.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +checksum = "ff7a10e427698aef6eef269482776debfef63384d30f13aad39a1a95e0e098fd" dependencies = [ - "core2", - "hashbrown 0.14.5", + "hashbrown 0.16.1", + "no_std_io2", "rle-decode-fast", ] @@ -1107,10 +1088,10 @@ name = "linux-bzimage-setup" version = "0.15.2" dependencies = [ "cfg-if", - "core2", "libflate", "linux-boot-params", "log", + "no_std_io2", "tdx-guest", "uart_16550", "uefi", @@ -1171,10 +1152,12 @@ version = "0.1.0" dependencies = [ "aster-block", "aster-logger", - "binary_serde", "component", "log", + "minicbor", + "minicbor-serde", "ostd", + "serde", "snafu", ] @@ -1193,6 +1176,36 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minicbor" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70eae6d4f18f7d76877fe7b13f0bc21f7c2b7239d2041c338335f7b388d0dd7" +dependencies = [ + "minicbor-derive", +] + +[[package]] +name = "minicbor-derive" +version = "0.19.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "294f0a0c161c510e9746adf546b8b044fbb0b00677d7dfc9a2452f9fdf63439b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "minicbor-serde" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80047f75e28e3b38f6ab2ec3c2c7669f6b411fa6f8424e1a90a3fd784b19a3f4" +dependencies = [ + "minicbor", + "serde", +] + [[package]] name = "multiboot2" version = "0.24.0" @@ -1223,6 +1236,15 @@ version = "6.6.666" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6" +[[package]] +name = "no_std_io2" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b51ed7824b6e07d354605f4abb3d9d300350701299da96642ee084f5ce631550" +dependencies = [ + "memchr", +] + [[package]] name = "nougat" version = "0.2.4" @@ -1337,6 +1359,7 @@ dependencies = [ "ostd-test", "riscv", "sbi-rt", + "serde", "slotmap", "smallvec", "snafu", @@ -1357,7 +1380,7 @@ dependencies = [ "proc-macro2", "quote", "rand", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -1463,14 +1486,14 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] name = "proc-macro2" -version = "1.0.95" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" dependencies = [ "unicode-ident", ] @@ -1512,7 +1535,7 @@ checksum = "ca414edb151b4c8d125c12566ab0d74dc9cdba36fb80eb7b848c15f495fd32d1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -1523,9 +1546,9 @@ checksum = "8bb0fd6580eeed0103c054e3fba2c2618ff476943762f28a645b63b8692b21c9" [[package]] name = "quote" -version = "1.0.40" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] @@ -1575,12 +1598,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "recursive_array" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69bad83913c3b39011ad9d43b7ac0cae139cec5d7e7288fbea5bb4b4e3cc78f6" - [[package]] name = "riscv" version = "0.11.1" @@ -1656,7 +1673,7 @@ checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -1715,7 +1732,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -1764,9 +1781,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.101" +version = "2.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" dependencies = [ "proc-macro2", "quote", @@ -1814,27 +1831,7 @@ checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", -] - -[[package]] -name = "thiserror-impl-no-std" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58e6318948b519ba6dc2b442a6d0b904ebfb8d411a3ad3e07843615a72249758" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - -[[package]] -name = "thiserror-no-std" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3ad459d94dd517257cc96add8a43190ee620011bb6e6cdc82dafd97dfafafea" -dependencies = [ - "thiserror-impl-no-std", + "syn 2.0.117", ] [[package]] @@ -1962,7 +1959,7 @@ checksum = "c19ee3a01d435eda42cb9931269b349d28a1762f91ddf01c68d276f74b957cc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] [[package]] @@ -2111,5 +2108,5 @@ checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.117", ] diff --git a/OSDK.toml b/OSDK.toml index 5caf2ff00..94b813276 100644 --- a/OSDK.toml +++ b/OSDK.toml @@ -64,6 +64,8 @@ qemu.args = """\ -drive if=none,format=raw,id=x1,file=./test/build/exfat.img \ -drive if=none,format=raw,id=r0,file=./test/build/raid1_0.img \ -drive if=none,format=raw,id=r1,file=./test/build/raid1_1.img \ + -drive if=none,format=raw,id=d0,file=./test/build/capture.img \ + -drive if=none,format=raw,id=d1,file=./test/build/capture_legacy.img \ -device virtio-blk-device,drive=x0 \ -device virtio-keyboard-device \ -device virtio-serial-device \ diff --git a/docs/src/kernel/advanced-instructions.md b/docs/src/kernel/advanced-instructions.md index e2e2822b7..79c059e6e 100644 --- a/docs/src/kernel/advanced-instructions.md +++ b/docs/src/kernel/advanced-instructions.md @@ -48,6 +48,16 @@ The following command builds and runs the syscall test binaries on Asterinas. make run AUTO_TEST=syscall ``` +To select a subset of the gVisor tests, pass test blocklist file name to the test runner script. For +example: + +```bash +make run AUTO_TEST=syscall SYSCALL_TEST_SUITE=gvisor INITARGS="futex_test" +``` + +(See comment in the launcher script, `test/syscall_test/gvisor/run_gvisor_test.sh`, for more information.) +(TODO(arthurp): This is not available for LTP tests.) + To run system call tests interactively, start an instance of Asterinas with the system call tests built and installed. ```bash diff --git a/docs/src/ostd/data-capture.md b/docs/src/ostd/data-capture.md new file mode 100644 index 000000000..2d87b2b67 --- /dev/null +++ b/docs/src/ostd/data-capture.md @@ -0,0 +1,15 @@ +# Data OQueues available in OSTD + +## Scheduler + +Scheduling events can placed on an OQueue. Unlike most OQueues, this is disabled by default, because +of the potential for unknown overhead in a very sensitive part of the system. It can be enabled with +the `capture_scheduling` feature. You can enable this feature with `--features +ostd/capture_scheduling` on the `cargo osdk` command line. (Or the +`FEATURES=ostd/capture_scheduling` environment variable for OSTDs own makefiles.) + +(NOTE: Once we are confident that the overhead is low enough, `capture_scheduling` will be enabled +by default.) + +To have the the Mariposa kernel capture the scheduling events to a file, add this *and* the kernel +command line argument `scheduler.capture_data=true`. \ No newline at end of file diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index acb89a9d2..e744d673b 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -41,6 +41,7 @@ intrusive-collections = "0.9.5" paste = "1.0" time = { version = "0.3", default-features = false, features = ["alloc"] } snafu = { workspace = true } +serde = { version = "1.0", default-features = false, features = ["derive", "alloc"]} # parse elf file xmas-elf = "0.10.0" @@ -48,8 +49,8 @@ xmas-elf = "0.10.0" bitflags = "1.3" keyable-arc = { path = "libs/keyable-arc" } # unzip initramfs -libflate = { version = "2", default-features = false } -core2 = { version = "0.4", default-features = false, features = ["alloc"] } +libflate = { version = "2.3", default-features = false } +no_std_io2 = { version = "0.9.3", features = ["alloc"] } lending-iterator = "0.1.7" spin = "0.9.4" lru = "0.12.3" diff --git a/kernel/comps/mariposa_data_capture/Cargo.toml b/kernel/comps/mariposa_data_capture/Cargo.toml index 771e3152b..801c2af04 100644 --- a/kernel/comps/mariposa_data_capture/Cargo.toml +++ b/kernel/comps/mariposa_data_capture/Cargo.toml @@ -10,7 +10,10 @@ component = { path = "../../libs/comp-sys/component" } aster-logger = { path = "../logger" } aster-block = { path = "../block" } ostd = { path = "../../../ostd" } -binary_serde = "1.0.25" +serde = { version = "1.0", default-features = false, features = ["derive", "alloc"]} +minicbor = { version = "2.2", default-features = false, features = ["derive", "alloc"] } +minicbor-serde = { version = "0.6", default-features = false } + log = "0.4" snafu = { workspace = true } diff --git a/kernel/comps/mariposa_data_capture/python/decode_mariposa_data.py b/kernel/comps/mariposa_data_capture/python/decode_mariposa_data.py new file mode 100644 index 000000000..b2e770ec9 --- /dev/null +++ b/kernel/comps/mariposa_data_capture/python/decode_mariposa_data.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: MPL-2.0 + +# /// script +# dependencies = [ +# "cbor2==5.9" +# ] +# /// + +""" +Decode a Mariposa data-capture device image and write one JSONL file per +capture file in the image. + +Usage: + dump_capture.py [--output-dir DIR] [--preview N] + +Each captured file produces a JSONL output file named: + .jsonl +where has path-separator characters replaced with underscores. + +With --preview, the first N records of every captured file are printed to +stdout instead of writing any files. +""" + +import argparse +import json +import logging +import re +from pathlib import Path + +from mariposa_data_reader import DataCaptureDevice + + +logger = logging.getLogger(__name__) + + +def _output_name(capture_path: str) -> str: + """Build a safe filename from the capture file's path.""" + safe_path = re.sub(r"[^A-Za-z0-9_\-]", "_", capture_path) + return f"{safe_path}.jsonl" + + +def _preview(device: DataCaptureDevice, n: int): + for capture_file in device: + path = capture_file.path + try: + type_name = capture_file.type_name + except Exception as exc: + logger.error(f"[{path}] warning: could not read header: {exc}") + type_name = "" + + print(f"=== {path} (type: {type_name})") + for i, record in enumerate(capture_file): + if i >= n: + print(f" ... (showing first {n} records)") + break + try: + print(f" {json.dumps(record)}") + except (ValueError, TypeError) as e: + logger.error(f"{e}") + print() + + +def _dump(device: DataCaptureDevice, output_dir: Path): + for capture_file in device: + path = capture_file.path + out_path = output_dir / _output_name(path) + count = 0 + with open(out_path, "w") as out: + for record in capture_file: + try: + out.write(json.dumps(record)) + out.write("\n") + count += 1 + except (ValueError, TypeError) as e: + logger.error( + f"Failed to write record {count} as JSON: {record}\n {e}\n" + " (If this is the last record, the stream may have been truncated, e.g., by failing to flush or running out of space.)" + ) + print( + f"Wrote {count} records (of type {capture_file.type_name}) to {out_path}." + ) + + +def main(): + parser = argparse.ArgumentParser( + description="Decode a Mariposa data-capture device image into JSONL." + ) + parser.add_argument("device", help="Path to the device image file.") + parser.add_argument( + "--output-dir", + default=".", + metavar="DIR", + help="Directory to write JSONL files into (default: current directory).", + ) + parser.add_argument( + "--preview", + "-p", + type=int, + metavar="N", + help="Print the first N records of each file to stdout instead of writing files. " + "This intensionally permits more errors than full decoding.", + ) + args = parser.parse_args() + + device = DataCaptureDevice(args.device) + + if args.preview is not None: + _preview(device, args.preview) + else: + _dump(device, Path(args.output_dir)) + + +if __name__ == "__main__": + main() diff --git a/kernel/comps/mariposa_data_capture/python/mariposa_data_reader.py b/kernel/comps/mariposa_data_capture/python/mariposa_data_reader.py new file mode 100644 index 000000000..d810750be --- /dev/null +++ b/kernel/comps/mariposa_data_capture/python/mariposa_data_reader.py @@ -0,0 +1,154 @@ +# SPDX-License-Identifier: MPL-2.0 + +""" +A library to parse data written using the `mariposa_data_capture` crate. + +This reads the data using the headers in the format defined in the crate and then decodes the +records. +""" + +import logging +from pathlib import Path + +import cbor2 + +# This must match the DIRECTORY_BLOCKS constant in `data_capture_device.rs` +BLOCK_SIZE = 4096 +DIRECTORY_BLOCKS = 1 +DIRECTORY_SIZE = DIRECTORY_BLOCKS * BLOCK_SIZE + +# This must match the MAGIC constant in `data_buffering.rs` +MAGIC = b"MARIPOSALDOSDATA\x00" + +logger = logging.getLogger(__name__) + + +class InvalidHeaderError(ValueError): + pass + + +def _decode_cbor_values(decoder: cbor2.CBORDecoder, end_pos: int): + """ + Yield decoded CBOR values from `decoder` until file offset `end_pos`, a `0xff` (the CBOR "break" + command) is encountered after a message, or an invalid record is encountered. + """ + fp = decoder.fp + i = 0 + # Breaks when the stream is over as defined above. + while True: + peek_pos = fp.tell() + peek = fp.read(1) + fp.seek(peek_pos) + if peek == b"\xff": + break + try: + record = decoder.decode() + # Check if we have overrun the file here, since we can't know the record length before + # reading it. + if fp.tell() < end_pos: + yield record + else: + break + except cbor2.CBORDecodeValueError as e: + # Only print an error if the record completed before the end of the file. + if fp.tell() < end_pos: + logger.error( + f"CBOR decoding failed at element {i} (at offset {peek_pos}), assuming end of event stream: {e}\n" + "(The stream may have been truncated, e.g., by failing to flush or running out of space.)" + ) + break + i += 1 + + +class DataCaptureDevice: + """ + A whole device containing data captured in Mariposa using the `mariposa_data_capture` crate. + """ + + def __init__(self, filename: Path): + self._filename = filename + with open(self._filename, "rb") as f: + decoder = cbor2.CBORDecoder(f) + self._file_records = list(_decode_cbor_values(decoder, DIRECTORY_SIZE)) + + def __len__(self): + return len(self._file_records) + + def __iter__(self): + """Iterate over all the files on the device.""" + for record in self._file_records: + yield DataCaptureFile(self._filename, record) + + def __getitem__(self, index): + """Get a specific file by its index.""" + return DataCaptureFile(self._filename, self._file_records[index]) + + +class DataCaptureFile: + """ + A single capture file in a `DataCaptureDevice`. + """ + + def __init__(self, device_filename: Path, record: dict): + self._device_filename = device_filename + try: + if not isinstance(record, dict): + raise KeyError() + self._offset = record["offset"] + self._length = record["length"] + self._path = record["path"] + except KeyError: + raise ValueError(f"Invalid file record: {record}") + self._header = None + + def _new_decoder(self, f) -> cbor2.CBORDecoder: + """ + Construct a new decoder on this file. It starts at the beginning of the file header; + immediately after the magic number. + """ + f.seek(self._offset) + magic = f.read(len(MAGIC)) + if magic != MAGIC: + raise InvalidHeaderError( + f"Expected file magic at offset {self._offset}, got {magic!r}. " + f"This generally means the data capture file was not sync()'ed." + ) + return cbor2.CBORDecoder(f) + + def _get_header(self): + """ + Get the header of this file. + """ + if self._header is None: + with open(self._device_filename, "rb") as f: + decoder = self._new_decoder(f) + header = decoder.decode() + self._header = header + return self._header + + @property + def path(self) -> str: + """The path provided when creating the file in the kernel capture code.""" + return self._path + + @property + def type_name(self) -> str: + """The Rust type which was serialized into the file.""" + return self._get_header()["type_name"] + + def __iter__(self): + """ + Iterate over the records in the capture file. + + The end is either when the file length is reached, the decoder encounters `0xff` instead of + a record, or an invalid CBOR record is found. `0xff` is the CBOR "break" command, so it can + never appear as the start of a correct record. An invalid record is treated as a terminator, + since a truncated file should still be readable. Note that a truncated file can produce an + oddly formatted record (or records), if the following bytes happen to be valid CBOR. + """ + end_offset = self._offset + self._length + with open(self._device_filename, "rb") as f: + decoder = self._new_decoder(f) + # skip the header + decoder.decode() + yield from _decode_cbor_values(decoder, end_offset) diff --git a/kernel/comps/mariposa_data_capture/python/test_mariposa_data_reader.py b/kernel/comps/mariposa_data_capture/python/test_mariposa_data_reader.py new file mode 100644 index 000000000..3103ad515 --- /dev/null +++ b/kernel/comps/mariposa_data_capture/python/test_mariposa_data_reader.py @@ -0,0 +1,118 @@ +# SPDX-License-Identifier: MPL-2.0 + +"""Tests for mariposa_data_reader.""" + +from itertools import zip_longest +import tempfile +import unittest + +from contextlib import contextmanager +from pathlib import Path + +import cbor2 + +from mariposa_data_reader import DataCaptureDevice + + +class TempImage: + """Helper class for creating a data capture device image for testing.""" + + def __init__(self): + """Initialize the builder with an empty temporary file.""" + self._file = tempfile.NamedTemporaryFile(delete=False) + + def write_bytes(self, data: bytes) -> None: + """Write raw bytes at the current position in the file.""" + self._file.write(data) + + def write_cbor(self, value) -> None: + """Write CBOR-encoded values at the current position in the file.""" + cbor2.dump(value, self._file) + + def write_padding(self, target_offset: int) -> None: + """Write 0xff bytes until reaching the specified offset.""" + current_offset = self._file.tell() + if target_offset < current_offset: + raise ValueError("Target offset is before current position") + + remaining = target_offset - current_offset + self.write_bytes(bytes([0xFF]) * remaining) + + def finalize(self) -> Path: + """Close the file and return its path.""" + self._file.close() + return Path(self._file.name) + + def __enter__(self): + self._file.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + Path(self._file.name).unlink() + return self._file.__exit__(exc_type, exc_val, exc_tb) + + +class TestDataCaptureDevice(unittest.TestCase): + @contextmanager + @staticmethod + def _test_file(): + with TempImage() as img: + img.write_cbor({"offset": 0x1000, "length": 0x1000, "path": "test.name"}) + img.write_cbor({"offset": 0x4000, "length": 0x1000, "path": "test.name[2]"}) + img.write_padding(0x1000) + img.write_bytes(b"MARIPOSALDOSDATA\0") + img.write_cbor({"name": "test.name", "type_name": "Test", "oqueues": []}) + img.write_cbor({"a": 1, "b": 2}) + img.write_cbor({"a": 1, "b": 2}) + img.write_cbor({"a": 1, "b": 2}) + img.write_padding(0x4000) + img.write_bytes(b"MARIPOSALDOSDATA\0") + img.write_cbor( + {"name": "test.name[2]", "type_name": "Test2", "oqueues": []} + ) + img.write_cbor({"x": 1, "y": 2}) + img.write_cbor({"x": 1, "y": 2}) + img.write_cbor({"x": 1, "y": 2}) + img.write_padding(0x5000) + yield img.finalize() + + def test_files(self): + with TestDataCaptureDevice._test_file() as test_filename: + device = DataCaptureDevice(test_filename) + self.assertEqual(len(device), 2) + self.assertEqual(device[0].path, "test.name") + self.assertEqual(device[1].path, "test.name[2]") + + def test_no_files(self): + with TempImage() as img: + img.write_padding(0x1000) + device = DataCaptureDevice(img.finalize()) + self.assertEqual(len(device), 0) + + def test_first_file_records(self): + with TestDataCaptureDevice._test_file() as test_filename: + device = DataCaptureDevice(test_filename) + file = device[0] + self.assertEqual(file.path, "test.name") + self.assertEqual(file.type_name, "Test") + + expected_records = [{"a": 1, "b": 2}, {"a": 1, "b": 2}, {"a": 1, "b": 2}] + + for record, expected_record in zip_longest(file, expected_records): + self.assertDictEqual(record, expected_record) + + def test_second_file_records(self): + with TestDataCaptureDevice._test_file() as test_filename: + device = DataCaptureDevice(test_filename) + file = device[1] + self.assertEqual(file.path, "test.name[2]") + self.assertEqual(file.type_name, "Test2") + + expected_records = [{"x": 1, "y": 2}, {"x": 1, "y": 2}, {"x": 1, "y": 2}] + + for record, expected_record in zip_longest(file, expected_records): + self.assertDictEqual(record, expected_record) + + +if __name__ == "__main__": + unittest.main() diff --git a/kernel/comps/mariposa_data_capture/src/data_buffering.rs b/kernel/comps/mariposa_data_capture/src/data_buffering.rs index 3e26b8a69..380d1b0ca 100644 --- a/kernel/comps/mariposa_data_capture/src/data_buffering.rs +++ b/kernel/comps/mariposa_data_capture/src/data_buffering.rs @@ -1,63 +1,70 @@ // SPDX-License-Identifier: MPL-2.0 -use alloc::{boxed::Box, format, sync::Arc, vec::Vec}; -use core::error::Error; +use alloc::{ + string::{String, ToString as _}, + sync::Arc, + vec::Vec, +}; use aster_block::{ BLOCK_SIZE, BlockDevice, bio::{BioDirection, BioSegment}, id::Bid, }; -use binary_serde::{BinarySerde, Endianness}; +use minicbor_serde::Serializer; use ostd::orpc::path::Path; +use serde::Serialize; +use snafu::ensure; + +use crate::{DataCaptureError, InsufficientSpaceSnafu}; + +/// The "magic number" included at the start of files to catch errors and allow finding them in +/// corrupted data. +const MAGIC: &[u8; 17] = b"MARIPOSALDOSDATA\0"; /// A buffer for managing data which will be written bit by bit, but the extracted in larger blocks. struct DataBuf { - data: Vec, + data: Serializer>, } impl DataBuf { /// Creates a new DataBuf with the specified size. pub fn new(size: usize) -> Self { DataBuf { - data: Vec::with_capacity(size), + data: Serializer::new(Vec::with_capacity(size)), } } - /// Writes a [`BinarySerde`] value to the buffer at the current offset. - /// - /// [`BinarySerde`] is derivable. See - /// https://docs.rs/binary_serde/latest/binary_serde/index.html. + /// Writes a [`Serialize`] value to the buffer at the current offset. pub fn write_value(&mut self, v: &T) where - T: BinarySerde, + T: Serialize, { - let serialization_start = self.data.len(); - self.data - .resize(serialization_start + T::SERIALIZED_SIZE, 0); - v.binary_serialize(&mut self.data[serialization_start..], Endianness::NATIVE); + v.serialize(&mut self.data).unwrap() } /// Writes raw bytes to the buffer at the current position. pub fn write_bytes(&mut self, v: &[u8]) { - self.data.extend(v); + let data = self.data.encoder_mut().writer_mut(); + data.extend(v); } /// Returns a slice containing all written data in the buffer. pub fn written_data(&self) -> &[u8] { - &self.data + self.data.encoder().writer() } /// Removes the leading n bytes by shifting remaining data to start of buffer. pub fn delete(&mut self, n: usize) { - let remaining = self.data.len() - n; - self.data.drain(0..n); - debug_assert_eq!(self.data.len(), remaining); + let data = self.data.encoder_mut().writer_mut(); + let remaining = data.len() - n; + data.drain(0..n); + debug_assert_eq!(data.len(), remaining); } /// Returns the number of bytes currently in the buffer. pub fn len(&self) -> usize { - self.data.len() + self.written_data().len() } } @@ -66,37 +73,37 @@ pub(crate) struct ChunkingWriteWrapper { data_buf: DataBuf, pub(crate) block_device: Arc, pub(crate) current_bid: Bid, + end_bid: Bid, } impl ChunkingWriteWrapper { /// Create a new wrapper for chunking writes. pub fn new( - size: usize, + buffer_size: usize, block_device: Arc, start_bid: Bid, + end_bid: Bid, ) -> ChunkingWriteWrapper { ChunkingWriteWrapper { - data_buf: DataBuf::new(size), + data_buf: DataBuf::new(buffer_size), block_device, current_bid: start_bid, + end_bid, } } - /// Writes a [`BinarySerde`] value to the output. - /// - /// [`BinarySerde`] is derivable. See - /// https://docs.rs/binary_serde/latest/binary_serde/index.html. + /// Writes a [`Serialize`] value to the output. /// /// See [`DataBuf::write_value`]. pub fn write_value(&mut self, v: &T) where - T: BinarySerde, + T: Serialize, { self.data_buf.write_value(v); } - /// Flushes the buffer to storage if it contains more than one block's worth of data. - pub fn flush_if_needed(&mut self) -> Result<(), Box> { + /// Flushes a block to storage if it contains more than one block's worth of data. + pub fn flush_if_needed(&mut self) -> Result<(), DataCaptureError> { if self.data_buf.len() > BLOCK_SIZE { let n_written = self.flush()?; self.current_bid = self.current_bid + 1; @@ -105,49 +112,50 @@ impl ChunkingWriteWrapper { Ok(()) } - /// Flush the buffered data to storage regardless of the state. + /// Flush the buffered data to storage regardless of the state. If there is no data in the + /// buffer, this writes a block of 0xff (the CBOR "break" command). This serves to mark the end + /// of the data. /// - /// The same page may be written again if it was not full when this was called. - pub fn flush(&mut self) -> Result> { + /// The same page may be written again if this is called again and if it was not full when this + /// was called the first time. + pub fn flush(&mut self) -> Result { + ensure!(self.current_bid < self.end_bid, InsufficientSpaceSnafu); + let raw_data = self.data_buf.written_data(); let bio_segment = BioSegment::alloc(1, BioDirection::ToDevice); - let n_written = bio_segment.writer()?.write(&mut raw_data.into()); + let mut writer = bio_segment.writer().expect("segment direction known"); + let n_written = writer.write(&mut raw_data.into()); + writer.fill(0xffu8); let _ = self .block_device .write_blocks_async(self.current_bid, bio_segment)?; Ok(n_written) } - pub fn sync(&mut self) -> Result<(), Box> { + /// Sync all of the data to the block device. + pub fn sync(&mut self) -> Result<(), DataCaptureError> { + self.flush()?; self.block_device.sync()?; Ok(()) } /// Writes a structured header with magic number, type information, and paths. - pub fn write_header(&mut self, paths: &[Path]) -> Result<(), Box> { + pub fn write_header(&mut self, name: &str, paths: &[Path]) -> Result<(), DataCaptureError> { // Write magic number - let magic = b"MARIPOSALDOSDATA\0"; - self.data_buf.write_bytes(magic); - - // Create JSON header with type information and optional paths - let mut json_header = format!("{{\"type\":\"{}\"", core::any::type_name::()); + self.data_buf.write_bytes(MAGIC); - json_header.push_str(", \"oqueues\": ["); - for (i, path) in paths.iter().enumerate() { - json_header.push_str(&format!("{}\"{}\"", if i > 0 { "," } else { "" }, path)); + #[derive(Serialize)] + struct Header<'a> { + name: &'a str, + type_name: &'a str, + oqueues: Vec, } - json_header.push(']'); - - json_header.push('}'); - self.data_buf.write_bytes(json_header.as_bytes()); - self.data_buf.write_bytes(&[0]); - - let padding = 64 - (self.data_buf.len() % 64); - if padding != 0 { - // The weird [0; 64] avoids an allocation by referencing a static block of 0s. - self.data_buf.write_bytes(&[0; 64][..padding]); - } + self.data_buf.write_value(&Header { + name, + type_name: core::any::type_name::(), + oqueues: paths.iter().map(|p| p.to_string()).collect(), + }); Ok(()) } @@ -170,7 +178,7 @@ mod test { #[ktest] fn test_write_value() { - #[derive(Debug, PartialEq, Eq, BinarySerde)] + #[derive(Debug, PartialEq, Eq, Serialize)] struct TestStruct { a: u32, b: u16, @@ -186,12 +194,12 @@ mod test { // Verify the bytes were written correctly let data = buf.written_data(); - assert_eq!(data.len(), 6); assert_eq!( - u32::from_le_bytes(data[..4].try_into().unwrap()), - 0x12345678 - ); - assert_eq!(u16::from_le_bytes(data[4..6].try_into().unwrap()), 0x9012); + data, + &[ + 0xa2, 0x61, 0x61, 0x1a, 0x12, 0x34, 0x56, 0x78, 0x61, 0x62, 0x19, 0x90, 0x12 + ] + ) } #[ktest] @@ -227,10 +235,10 @@ mod test { data_buf: DataBuf::new(4096 * 2), block_device: block_device.clone(), current_bid: Bid::new(0), + end_bid: Bid::new(4), }; - wrapper.write_header::(&paths).unwrap(); - assert_eq!(wrapper.data_buf.len(), 64); + wrapper.write_header::("test", &paths).unwrap(); assert_eq!(&wrapper.data_buf.written_data()[..16], b"MARIPOSALDOSDATA"); wrapper.flush().unwrap(); @@ -244,6 +252,7 @@ mod test { data_buf: DataBuf::new(4096), block_device: block_device.clone(), current_bid: Bid::new(0), + end_bid: Bid::new(4), }; // Test case 1: Buffer doesn't need flushing (<= BLOCK_SIZE) diff --git a/kernel/comps/mariposa_data_capture/src/data_capture_device.rs b/kernel/comps/mariposa_data_capture/src/data_capture_device.rs index 802caa167..7508f7173 100644 --- a/kernel/comps/mariposa_data_capture/src/data_capture_device.rs +++ b/kernel/comps/mariposa_data_capture/src/data_capture_device.rs @@ -5,18 +5,25 @@ //! This module provides [`DataCaptureDevice`], which manages block storage devices and creates //! [`DataCaptureFile`](crate::data_capture_file::DataCaptureFile) instances. -use alloc::sync::Arc; +use alloc::{ + string::{String, ToString as _}, + sync::Arc, +}; use core::sync::atomic::{AtomicUsize, Ordering}; -use aster_block::{self, BlockDevice, SECTOR_SIZE}; +use aster_block::{self, BLOCK_SIZE, BlockDevice, SECTOR_SIZE, id::Bid}; use ostd::{ new_server, - orpc::{errors::RPCError, framework::Server, orpc_impl, orpc_server, orpc_trait, path::Path}, - ostd_error, + orpc::{framework::Server, orpc_impl, orpc_server, orpc_trait, path::Path}, + sync::Mutex, }; -use snafu::{Snafu, ensure}; +use serde::Serialize; +use snafu::ensure; -use crate::DataCaptureFileBuilder; +use crate::{ + DataCaptureError, DataCaptureFileBuilder, InsufficientSpaceSnafu, + data_buffering::ChunkingWriteWrapper, +}; /// Describes a file to be created on the device #[derive(Debug, Clone)] @@ -25,16 +32,9 @@ pub struct FileDescriptor { pub path: Path, } -#[non_exhaustive] -#[ostd_error] -#[derive(Debug, Snafu)] -#[snafu()] -pub enum DataCaptureDeviceError { - #[snafu(transparent)] - RPCError { source: RPCError }, - #[snafu(display("Insufficient space on device"))] - InsufficientSpace {}, -} +/// The number of blocks allocated for the directory. This must match the DIRECTORY_BLOCKS constant +/// in `mariposa_data_reader.py`. +const DIRECTORY_BLOCKS: usize = 1; /// A wrapper around a [`BlockDevice`] which supports creating [`DataCaptureFile`]s. #[orpc_trait] @@ -45,7 +45,7 @@ pub trait DataCaptureDevice { fn new_file( &self, descriptor: FileDescriptor, - ) -> Result; + ) -> Result; } /// An implementation of [`DataCaptureDevice`]. @@ -53,13 +53,20 @@ pub trait DataCaptureDevice { pub struct DataCaptureDeviceServer { block_device: Arc, next_block_offset: AtomicUsize, + directory_writer: Mutex, } impl DataCaptureDeviceServer { pub fn new(block_device: Arc) -> Arc { new_server!(|_| DataCaptureDeviceServer { + directory_writer: Mutex::new(ChunkingWriteWrapper::new( + BLOCK_SIZE * 2, + block_device.clone(), + Bid::from_offset(0), + Bid::from_offset(DIRECTORY_BLOCKS * BLOCK_SIZE) + )), block_device, - next_block_offset: AtomicUsize::new(0), + next_block_offset: AtomicUsize::new(DIRECTORY_BLOCKS * BLOCK_SIZE), }) } } @@ -69,15 +76,31 @@ impl DataCaptureDevice for DataCaptureDeviceServer { fn new_file( &self, descriptor: FileDescriptor, - ) -> Result { - let start = self - .next_block_offset - .fetch_add(descriptor.length, Ordering::Relaxed); - let end = start + descriptor.length; + ) -> Result { + let length = descriptor.length.next_multiple_of(BLOCK_SIZE); + let start = self.next_block_offset.fetch_add(length, Ordering::Relaxed); + let end = start + length; ensure!( end <= self.block_device.metadata().nr_sectors * SECTOR_SIZE, InsufficientSpaceSnafu ); + + #[derive(Serialize)] + struct FileRecord { + offset: u64, + length: u64, + path: String, + } + + let mut writer = self.directory_writer.lock(); + writer.write_value(&FileRecord { + offset: start as u64, + length: length as u64, + path: descriptor.path.to_string(), + }); + writer.sync()?; + drop(writer); + Ok(DataCaptureFileBuilder { block_device: self.block_device.clone(), path: descriptor.path, diff --git a/kernel/comps/mariposa_data_capture/src/data_capture_file.rs b/kernel/comps/mariposa_data_capture/src/data_capture_file.rs index 61c653882..e21c8a1a6 100644 --- a/kernel/comps/mariposa_data_capture/src/data_capture_file.rs +++ b/kernel/comps/mariposa_data_capture/src/data_capture_file.rs @@ -18,11 +18,10 @@ //! [`DataCaptureFile`] has a thread which will observe all OQueues attach via //! [`DataCaptureFile::register_observer`]. -use alloc::{boxed::Box, sync::Arc, vec::Vec}; -use core::{any::Any, error::Error, sync::atomic::AtomicBool}; +use alloc::{string::ToString as _, sync::Arc, vec::Vec}; +use core::{any::Any, sync::atomic::AtomicBool}; use aster_block::{BLOCK_SIZE, BlockDevice, id::Bid}; -use binary_serde::BinarySerde; use ostd::{ new_server, orpc::{ @@ -37,32 +36,31 @@ use ostd::{ }, path, }; +use serde::Serialize; -use crate::data_buffering::ChunkingWriteWrapper; +use crate::{DataCaptureError, data_buffering::ChunkingWriteWrapper}; /// Registration information for an OQueue observer -pub struct ObserverRegistration { +pub struct ObserverRegistration { /// The path of the OQueue this is observing pub path: Path, /// An observer attachment at the appropriate type. pub observer: StrongObserver, } -impl core::fmt::Debug for ObserverRegistration { +impl core::fmt::Debug for ObserverRegistration { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("OQueueAttachment") + f.debug_struct("ObserverRegistration") .field("path", &self.path) .finish() } } #[orpc_trait] -pub trait DataCaptureFile: Any { +pub trait DataCaptureFile: Any { /// Attach a new OQueue to the output. If output has already started, then the path will not /// appear in the block header. fn register_observer(&self, attachment: ObserverRegistration) -> Result<(), RPCError>; - /// Flush any data remaining in the output buffers to disk. - fn flush(&self) -> Result<(), RPCError>; /// Sync writes to disk. fn sync(&self) -> Result<(), RPCError>; /// Enable capturing to this file. @@ -72,18 +70,18 @@ pub trait DataCaptureFile: Any { } /// Command enum for DataCaptureFile operations -enum DataCaptureFileCommand { +enum DataCaptureFileCommand { RegisterObserver(ObserverRegistration), - Flush, Sync, Stop, } -impl core::fmt::Debug for DataCaptureFileCommand { +impl core::fmt::Debug for DataCaptureFileCommand { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { - Self::RegisterObserver(arg0) => f.debug_tuple("AttachOqueue").field(arg0).finish(), - Self::Flush => write!(f, "Flush"), + Self::RegisterObserver(arg0) => { + f.debug_tuple("DataCaptureFileCommand").field(arg0).finish() + } Self::Sync => write!(f, "Sync"), Self::Stop => write!(f, "Stop"), } @@ -91,14 +89,15 @@ impl core::fmt::Debug for DataCaptureFil } #[orpc_server(DataCaptureFile)] -struct DataCaptureFileServer { +struct DataCaptureFileServer { command_oqueue: ConsumableOQueueRef>, command_producer: ValueProducer>, started: AtomicBool, stopped: AtomicBool, } -pub struct DataCaptureFileServerThread { +pub struct DataCaptureFileServerThread { + path: Path, command_consumer: Consumer>, block_device: Arc, start_bid: Bid, @@ -106,10 +105,14 @@ pub struct DataCaptureFileServerThread { server: Arc>, } -impl DataCaptureFileServerThread { - fn run(&self) -> Result<(), Box> { - let mut data_buf_handler = - ChunkingWriteWrapper::new(BLOCK_SIZE * 2, self.block_device.clone(), self.start_bid); +impl DataCaptureFileServerThread { + fn run(&self) -> Result<(), DataCaptureError> { + let mut data_buf_handler = ChunkingWriteWrapper::new( + BLOCK_SIZE * 2, + self.block_device.clone(), + self.start_bid, + self.end_bid, + ); let mut observers: Vec> = Default::default(); // The paths of the attached OQueues. Once the header is written this is set to None and // paths are no longer collected even if more OQueues are attached. @@ -134,13 +137,11 @@ impl DataCaptureFileServerThread { paths.push(path); } } - DataCaptureFileCommand::Flush => { - data_buf_handler.flush()?; - } DataCaptureFileCommand::Sync => { data_buf_handler.sync()?; } DataCaptureFileCommand::Stop => { + data_buf_handler.sync()?; self.server .stopped .store(true, core::sync::atomic::Ordering::SeqCst); @@ -160,7 +161,10 @@ impl DataCaptureFileServerThread { while let Ok(Some(v)) = o.try_strong_observe() { if started { if paths.is_some() { - data_buf_handler.write_header::(paths.as_ref().unwrap())?; + data_buf_handler.write_header::( + &self.path.to_string(), + paths.as_ref().unwrap(), + )?; paths = None; } @@ -177,18 +181,13 @@ impl DataCaptureFileServerThread { } #[orpc_impl] -impl DataCaptureFile for DataCaptureFileServer { +impl DataCaptureFile for DataCaptureFileServer { fn register_observer(&self, attachment: ObserverRegistration) -> Result<(), RPCError> { self.command_producer .produce(DataCaptureFileCommand::RegisterObserver(attachment)); Ok(()) } - fn flush(&self) -> Result<(), RPCError> { - self.command_producer.produce(DataCaptureFileCommand::Flush); - Ok(()) - } - fn sync(&self) -> Result<(), RPCError> { self.command_producer.produce(DataCaptureFileCommand::Sync); Ok(()) @@ -225,7 +224,7 @@ impl DataCaptureFileBuilder { /// Construct the [`DataCaptureFile`] for a specific type of data. pub fn build(self) -> Arc> where - T: Copy + Send + BinarySerde + 'static, + T: Copy + Send + Serialize + 'static, { // We manually context switch into the server here. This is not something we should // generally do, but this build function is required and can't be on a server (due to the @@ -246,16 +245,17 @@ impl DataCaptureFileBuilder { spawn_thread(server.clone(), { let thread = DataCaptureFileServerThread { + path: self.path.clone(), command_consumer: server .command_oqueue .attach_consumer() .expect("single purpose OQueue failed."), block_device: self.block_device, - start_bid: Bid::new(self.start as u64), - end_bid: Bid::new(self.end as u64), + start_bid: Bid::from_offset(self.start), + end_bid: Bid::from_offset(self.end), server: server.clone(), }; - move || thread.run() + move || Ok(thread.run()?) }); Ok(server) diff --git a/kernel/comps/mariposa_data_capture/src/legacy/data_capture_device.rs b/kernel/comps/mariposa_data_capture/src/legacy/data_capture_device.rs index 37b89c19e..49709aa71 100644 --- a/kernel/comps/mariposa_data_capture/src/legacy/data_capture_device.rs +++ b/kernel/comps/mariposa_data_capture/src/legacy/data_capture_device.rs @@ -4,34 +4,31 @@ //! //! See [`crate::data_capture_device`] for full documentation. -use alloc::sync::Arc; +use alloc::{ + string::{String, ToString as _}, + sync::Arc, +}; use core::sync::atomic::{AtomicUsize, Ordering}; -use aster_block::{self, BlockDevice, SECTOR_SIZE}; +use aster_block::{self, BLOCK_SIZE, BlockDevice, SECTOR_SIZE, id::Bid}; use ostd::{ new_server, - orpc::{errors::RPCError, framework::Server, orpc_impl, orpc_server, orpc_trait}, - ostd_error, + orpc::{framework::Server, orpc_impl, orpc_server, orpc_trait, path::Path}, + sync::Mutex, }; -use snafu::{Snafu, ensure}; +use serde::Serialize; +use snafu::ensure; use super::data_capture_file::DataCaptureFileBuilder; +use crate::{DataCaptureError, InsufficientSpaceSnafu, data_buffering::ChunkingWriteWrapper}; + +const DIRECTORY_BLOCKS: usize = 1; /// TEMPORARY: Describes a file to be created on the device. #[derive(Debug, Clone)] pub struct FileDescriptor { pub length: usize, -} - -#[non_exhaustive] -#[ostd_error] -#[derive(Debug, Snafu)] -#[snafu()] -pub enum DataCaptureDeviceError { - #[snafu(transparent)] - RPCError { source: RPCError }, - #[snafu(display("Insufficient space on device"))] - InsufficientSpace {}, + pub path: Path, } /// TEMPORARY: A wrapper around a [`BlockDevice`] which supports creating legacy-OQueue-backed @@ -44,7 +41,7 @@ pub trait DataCaptureDevice { fn new_file( &self, descriptor: FileDescriptor, - ) -> Result; + ) -> Result; } /// TEMPORARY: An implementation of [`DataCaptureDevice`]. @@ -52,13 +49,20 @@ pub trait DataCaptureDevice { pub struct DataCaptureDeviceServer { block_device: Arc, next_block_offset: AtomicUsize, + directory_writer: Mutex, } impl DataCaptureDeviceServer { pub fn new(block_device: Arc) -> Arc { new_server!(|_| DataCaptureDeviceServer { + directory_writer: Mutex::new(ChunkingWriteWrapper::new( + BLOCK_SIZE * 2, + block_device.clone(), + Bid::from_offset(0), + Bid::from_offset(DIRECTORY_BLOCKS * BLOCK_SIZE) + )), block_device, - next_block_offset: AtomicUsize::new(0), + next_block_offset: AtomicUsize::new(DIRECTORY_BLOCKS * BLOCK_SIZE), }) } } @@ -68,17 +72,34 @@ impl DataCaptureDevice for DataCaptureDeviceServer { fn new_file( &self, descriptor: FileDescriptor, - ) -> Result { - let start = self - .next_block_offset - .fetch_add(descriptor.length, Ordering::Relaxed); - let end = start + descriptor.length; + ) -> Result { + let length = descriptor.length.next_multiple_of(BLOCK_SIZE); + let start = self.next_block_offset.fetch_add(length, Ordering::Relaxed); + let end = start + length; ensure!( end <= self.block_device.metadata().nr_sectors * SECTOR_SIZE, InsufficientSpaceSnafu ); + + #[derive(Serialize)] + struct FileRecord { + offset: u64, + length: u64, + path: String, + } + + let mut writer = self.directory_writer.lock(); + writer.write_value(&FileRecord { + offset: start as u64, + length: length as u64, + path: descriptor.path.to_string(), + }); + writer.sync()?; + drop(writer); + Ok(DataCaptureFileBuilder { block_device: self.block_device.clone(), + path: descriptor.path, start, end, server: self.orpc_server_base().get_ref().unwrap(), diff --git a/kernel/comps/mariposa_data_capture/src/legacy/data_capture_file.rs b/kernel/comps/mariposa_data_capture/src/legacy/data_capture_file.rs index 6df05f08d..8f6309cca 100644 --- a/kernel/comps/mariposa_data_capture/src/legacy/data_capture_file.rs +++ b/kernel/comps/mariposa_data_capture/src/legacy/data_capture_file.rs @@ -5,11 +5,10 @@ //! This is otherwise identical in structure and purpose to the non-legacy version. See //! [`crate::data_capture_file`] for full documentation. -use alloc::{boxed::Box, sync::Arc, vec::Vec}; -use core::{any::Any, error::Error, sync::atomic::AtomicBool}; +use alloc::{boxed::Box, string::ToString, sync::Arc, vec::Vec}; +use core::{any::Any, sync::atomic::AtomicBool}; use aster_block::{BLOCK_SIZE, BlockDevice, id::Bid}; -use binary_serde::BinarySerde; use ostd::{ new_server, orpc::{ @@ -17,31 +16,31 @@ use ostd::{ framework::{Server, spawn_thread}, legacy_oqueue::{Consumer, OQueue, StrongObserver, locking::LockingQueue}, orpc_impl, orpc_server, orpc_trait, + path::Path, sync::{BlockOnMany, Blocker}, }, }; +use serde::Serialize; -use crate::data_buffering::ChunkingWriteWrapper; +use crate::{DataCaptureError, data_buffering::ChunkingWriteWrapper}; /// TEMPORARY: Registration information for a legacy OQueue observer. -pub struct ObserverRegistration { +pub struct ObserverRegistration { /// TEMPORARY: A strong observer attachment on the legacy OQueue. pub observer: Box>, } -impl core::fmt::Debug for ObserverRegistration { +impl core::fmt::Debug for ObserverRegistration { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("ObserverRegistration").finish() } } #[orpc_trait] -pub trait DataCaptureFile: Any { +pub trait DataCaptureFile: Any { /// TEMPORARY: Attach a new legacy OQueue to the output. If output has already started, then the /// path will not appear in the block header. fn register_observer(&self, attachment: ObserverRegistration) -> Result<(), RPCError>; - /// TEMPORARY: Flush any data remaining in the output buffers to disk. - fn flush(&self) -> Result<(), RPCError>; /// TEMPORARY: Sync writes to disk. fn sync(&self) -> Result<(), RPCError>; /// TEMPORARY: Enable capturing to this file. @@ -51,18 +50,18 @@ pub trait DataCaptureFile: Any { } /// TEMPORARY: Command enum for [`DataCaptureFile`] operations. -enum DataCaptureFileCommand { +enum DataCaptureFileCommand { RegisterObserver(ObserverRegistration), - Flush, Sync, Stop, } -impl core::fmt::Debug for DataCaptureFileCommand { +impl core::fmt::Debug for DataCaptureFileCommand { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { - Self::RegisterObserver(reg) => f.debug_tuple("RegisterObserver").field(reg).finish(), - Self::Flush => write!(f, "Flush"), + Self::RegisterObserver(reg) => { + f.debug_tuple("DataCaptureFileCommand").field(reg).finish() + } Self::Sync => write!(f, "Sync"), Self::Stop => write!(f, "Stop"), } @@ -70,24 +69,29 @@ impl core::fmt::Debug for DataCaptureFil } #[orpc_server(DataCaptureFile)] -struct DataCaptureFileServer { +struct DataCaptureFileServer { command_queue: Arc>>, started: AtomicBool, stopped: AtomicBool, } -struct DataCaptureFileServerThread { +struct DataCaptureFileServerThread { command_consumer: Box>>, block_device: Arc, + path: Path, start_bid: Bid, end_bid: Bid, server: Arc>, } -impl DataCaptureFileServerThread { - fn run(&self) -> Result<(), Box> { - let mut data_buf_handler = - ChunkingWriteWrapper::new(BLOCK_SIZE * 2, self.block_device.clone(), self.start_bid); +impl DataCaptureFileServerThread { + fn run(&self) -> Result<(), DataCaptureError> { + let mut data_buf_handler = ChunkingWriteWrapper::new( + BLOCK_SIZE * 2, + self.block_device.clone(), + self.start_bid, + self.end_bid, + ); let mut observers: Vec>> = Default::default(); let mut headers_written = false; let mut block_handler = BlockOnMany::new(); @@ -104,13 +108,11 @@ impl DataCaptureFileServerThread { DataCaptureFileCommand::RegisterObserver(ObserverRegistration { observer }) => { observers.push(observer); } - DataCaptureFileCommand::Flush => { - data_buf_handler.flush()?; - } DataCaptureFileCommand::Sync => { data_buf_handler.sync()?; } DataCaptureFileCommand::Stop => { + data_buf_handler.sync()?; self.server .stopped .store(true, core::sync::atomic::Ordering::SeqCst); @@ -130,7 +132,7 @@ impl DataCaptureFileServerThread { while let Some(v) = o.try_strong_observe() { if capturing { if !headers_written { - data_buf_handler.write_header::(&[])?; + data_buf_handler.write_header::(&self.path.to_string(), &[])?; headers_written = true; } @@ -147,7 +149,7 @@ impl DataCaptureFileServerThread { } #[orpc_impl] -impl DataCaptureFile for DataCaptureFileServer { +impl DataCaptureFile for DataCaptureFileServer { fn register_observer(&self, attachment: ObserverRegistration) -> Result<(), RPCError> { self.command_queue .produce(DataCaptureFileCommand::RegisterObserver(attachment)) @@ -155,13 +157,6 @@ impl DataCaptureFile for DataCaptureFileServer< Ok(()) } - fn flush(&self) -> Result<(), RPCError> { - self.command_queue - .produce(DataCaptureFileCommand::Flush) - .unwrap(); - Ok(()) - } - fn sync(&self) -> Result<(), RPCError> { self.command_queue .produce(DataCaptureFileCommand::Sync) @@ -190,6 +185,7 @@ impl DataCaptureFile for DataCaptureFileServer< /// TEMPORARY: A builder for a [`DataCaptureFile`] provided by a [`DataCaptureDevice`]. pub struct DataCaptureFileBuilder { pub(crate) block_device: Arc, + pub(crate) path: Path, pub(crate) start: usize, pub(crate) end: usize, pub(crate) server: Arc, @@ -199,7 +195,7 @@ impl DataCaptureFileBuilder { /// TEMPORARY: Construct the [`DataCaptureFile`] for a specific type of data. pub fn build(self) -> Arc> where - T: Copy + Send + BinarySerde + 'static, + T: Copy + Send + Serialize + 'static, { Server::orpc_server_base(self.server.as_ref()) .call_in_context(move || -> Result>, RPCError> { @@ -216,11 +212,12 @@ impl DataCaptureFileBuilder { .attach_consumer() .expect("single purpose OQueue failed."), block_device: self.block_device, - start_bid: Bid::new(self.start as u64), - end_bid: Bid::new(self.end as u64), + path: self.path, + start_bid: Bid::from_offset(self.start), + end_bid: Bid::from_offset(self.end), server: server.clone(), }; - move || thread.run() + move || Ok(thread.run()?) }); Ok(server) diff --git a/kernel/comps/mariposa_data_capture/src/legacy/mod.rs b/kernel/comps/mariposa_data_capture/src/legacy/mod.rs index d955df5a6..e614e1b14 100644 --- a/kernel/comps/mariposa_data_capture/src/legacy/mod.rs +++ b/kernel/comps/mariposa_data_capture/src/legacy/mod.rs @@ -22,7 +22,10 @@ mod legacy_tests { use aster_block::test_utils::MemoryDisk; use ostd::{ assertion::sleep, - orpc::legacy_oqueue::{OQueue as _, locking::ObservableLockingQueue}, + orpc::{ + legacy_oqueue::{OQueue as _, locking::ObservableLockingQueue}, + path::Path, + }, prelude::*, }; @@ -37,7 +40,10 @@ mod legacy_tests { let device = DataCaptureDeviceServer::new(block_device.clone()); let builder = device - .new_file(FileDescriptor { length: 4096 * 2 }) + .new_file(FileDescriptor { + path: Path::test(), + length: 4096 * 2, + }) .unwrap(); let server = builder.build::(); @@ -62,12 +68,18 @@ mod legacy_tests { sleep(Duration::from_millis(10)); // Flush and give time for capture to complete - server.flush().unwrap(); + server.sync().unwrap(); sleep(Duration::from_millis(10)); let device_data = block_device.data.lock(); - assert_eq!(device_data[64..64 + 3], [42, 100, 200]); + // The CBOR encoding of 42, 100, 200 + let data_bytes = [0x18, 0x2a, 0x18, 0x64, 0x18, 0xc8]; + assert!( + device_data + .windows(data_bytes.len()) + .any(|window| window == data_bytes) + ); } } diff --git a/kernel/comps/mariposa_data_capture/src/lib.rs b/kernel/comps/mariposa_data_capture/src/lib.rs index e4de7f8c7..af2355e6e 100644 --- a/kernel/comps/mariposa_data_capture/src/lib.rs +++ b/kernel/comps/mariposa_data_capture/src/lib.rs @@ -10,22 +10,24 @@ //! The format is a large raw file with blocks. Each block must be 4k page aligned and is in the //! following format: //! * The magic "number": `MARIPOSALDOSDATA` (null terminated). -//! * A JSON object (null terminated): +//! * A [CBOR](https://cbor.io/) object: //! ``` -//! { "oqueues": ["oqueue.path", ...], "type": "TypeName", "length": } +//! { "name": "file.path", "oqueues": ["oqueue.path", ...], "type_name": "TypeName" } //! ``` //! The `oqueues` and `length` fields are optional. -//! * 0 padding to a 64-byte boundary. -//! * A packed series of [`binary_serde`] records serialized from type `TypeName`. -//! -//! The length makes it easier to find the next block and eliminates the (very very small) risk of -//! randomly occuring magic numbers. However, it is not required. +//! * A series of CBOR objects serialized from type `TypeName` using Serde. The CBOR objects are not +//! part of a list, instead just being concatinated. In Python, they can be read using repeated +//! calls to +//! [`cbor2.CBORDecoder.decode`](https://cbor2.readthedocs.io/en/latest/api.html#cbor2.CBORDecoder.decode). //! //! The set of OQueue paths is only for convenience, so it can be incomplete or missing. This may be //! because of set of OQueues was not known when output started. //! -//! (Note: If you want to memory map the output file and read it directly, you should make sure that -//! the serialized length of `TypeName` is a multiple of it's alignment.) +//! Note: The output format is not particularly space efficient because it includes the field names +//! of structs every time they are serialized. If this becomes a problem there are a few options: 1) +//! use [`#[serde(rename=...)`](https://serde.rs/field-attrs.html) to give shorter field names in +//! the output, 2) explore using `serde_cbor`s [packed +//! format](https://docs.rs/serde_cbor/latest/serde_cbor/struct.Serializer.html#method.packed_format). #![no_std] #![deny(unsafe_code)] @@ -46,6 +48,25 @@ pub use data_capture_file::{DataCaptureFile, DataCaptureFileBuilder, ObserverReg extern crate alloc; use component::{ComponentInitError, init_component}; +use ostd::ostd_error; +use snafu::Snafu; + +#[non_exhaustive] +#[ostd_error] +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +pub enum DataCaptureError { + #[snafu(transparent)] + RPCError { + source: ostd::orpc::errors::RPCError, + }, + #[snafu(display("Insufficient space on device or in file ({context})"))] + InsufficientSpaceError {}, + #[snafu(transparent)] + IOError { + source: aster_block::bio::BioEnqueueError, + }, +} #[init_component] fn init() -> Result<(), ComponentInitError> { @@ -108,7 +129,7 @@ mod tests { sleep(Duration::from_millis(10)); // Flush and give time for capture to complete - server.flush().unwrap(); + server.sync().unwrap(); sleep(Duration::from_millis(10)); @@ -121,6 +142,12 @@ mod tests { .any(|window| window == path_bytes) ); - assert_eq!(device_data[64..64 + 3], [42, 100, 200]); // First value in first block + // The CBOR encoding of 42, 100, 200 + let data_bytes = [0x18, 0x2a, 0x18, 0x64, 0x18, 0xc8]; + assert!( + device_data + .windows(data_bytes.len()) + .any(|window| window == data_bytes) + ); } } diff --git a/kernel/comps/time/Cargo.toml b/kernel/comps/time/Cargo.toml index ada49bac0..492ddd4aa 100644 --- a/kernel/comps/time/Cargo.toml +++ b/kernel/comps/time/Cargo.toml @@ -11,6 +11,7 @@ aster-util = { path = "../../libs/aster-util" } component = { path = "../../libs/comp-sys/component" } log = "0.4" spin = "0.9.4" +serde = { version = "1.0", default-features = false, features = ["derive", "alloc"]} [target.riscv64gc-unknown-none-elf.dependencies] chrono = { version = "0.4.38", default-features = false } diff --git a/kernel/comps/time/src/clocksource.rs b/kernel/comps/time/src/clocksource.rs index 61780bb0d..1edaa1a3a 100644 --- a/kernel/comps/time/src/clocksource.rs +++ b/kernel/comps/time/src/clocksource.rs @@ -15,6 +15,7 @@ use core::{cmp::max, ops::Add, time::Duration}; use aster_util::coeff::Coeff; use ostd::sync::{LocalIrqDisabled, RwLock}; +use serde::Serialize; use crate::NANOS_PER_SECOND; @@ -172,7 +173,7 @@ impl ClockSource { /// elapsed since a reference point (typically the system boot time). /// The [`Instant`] is expressed in seconds and the fractional part is /// expressed in nanoseconds. -#[derive(Debug, Default, Copy, Clone)] +#[derive(Debug, Default, Copy, Clone, Serialize)] pub struct Instant { secs: u64, nanos: u32, diff --git a/kernel/libs/cpio-decoder/Cargo.toml b/kernel/libs/cpio-decoder/Cargo.toml index 2aaf8f5be..b47ff87f8 100644 --- a/kernel/libs/cpio-decoder/Cargo.toml +++ b/kernel/libs/cpio-decoder/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" [dependencies] int-to-c-enum = { path = "../../libs/int-to-c-enum" } -core2 = { version = "0.4", default-features = false, features = ["alloc"] } +no_std_io2 = { version = "0.9.3", features = ["alloc"] } lending-iterator = "0.1.7" [lints] diff --git a/kernel/libs/cpio-decoder/src/error.rs b/kernel/libs/cpio-decoder/src/error.rs index 61147e2bc..1e1542032 100644 --- a/kernel/libs/cpio-decoder/src/error.rs +++ b/kernel/libs/cpio-decoder/src/error.rs @@ -14,10 +14,10 @@ pub enum Error { IoError, } -impl From for Error { +impl From for Error { #[inline] - fn from(err: core2::io::Error) -> Self { - use core2::io::ErrorKind; + fn from(err: no_std_io2::io::Error) -> Self { + use no_std_io2::io::ErrorKind; match err.kind() { ErrorKind::UnexpectedEof => Self::BufferShortError, diff --git a/kernel/libs/cpio-decoder/src/lib.rs b/kernel/libs/cpio-decoder/src/lib.rs index f958a3db7..023f1ada8 100644 --- a/kernel/libs/cpio-decoder/src/lib.rs +++ b/kernel/libs/cpio-decoder/src/lib.rs @@ -26,9 +26,9 @@ use alloc::{ }; use core::cmp::min; -use core2::io::{Read, Write}; use int_to_c_enum::TryFromInt; use lending_iterator::prelude::*; +use no_std_io2::io::{Read, Write}; use crate::error::{Error, Result}; diff --git a/kernel/src/benchmarks/oqueue.rs b/kernel/src/benchmarks/oqueue.rs index 6b97d6182..7d65f0937 100644 --- a/kernel/src/benchmarks/oqueue.rs +++ b/kernel/src/benchmarks/oqueue.rs @@ -25,6 +25,10 @@ use ostd::{ Consumer, Cursor, OQueue, OQueueAttachError, Producer, StrongObserver, WeakObserver, ringbuffer::MPMCOQueue, }, + oqueue::{ + ConsumableOQueue, ConsumableOQueueRef, OQueue as OtherOQueue, OQueueRef, + ObservationQuery, + }, sync::Blocker, }, sync::{Waker, WakerKey}, @@ -394,165 +398,374 @@ struct OQueueBenchmarkInput { pub q_type: String, } -fn produce_bench( +fn spawn_on_cpu(cpu_idx: usize, f: F) +where + F: FnOnce() + Send + 'static, +{ + let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); + cpu_set.add(ostd::cpu::CpuId::try_from(cpu_idx).unwrap()); + ThreadOptions::new(f).cpu_affinity(cpu_set).spawn(); +} + +fn spawn_bench_thread( + cpu_idx: usize, + barrier: Arc, + completed: Arc, + label: &'static str, + timed: bool, + f: F, +) where + F: FnOnce() + Send + 'static, +{ + spawn_on_cpu(cpu_idx, move || { + barrier.fetch_sub(1, Ordering::Acquire); + while barrier.load(Ordering::Relaxed) > 0 { + core::hint::spin_loop(); + } + + if timed { + let now = time::clocks::RealTimeClock::get().read_time(); + f(); + let end = time::clocks::RealTimeClock::get().read_time(); + println!( + "[{}-{:?}] done in {:?}", + label, + ostd::cpu::CpuId::current_racy(), + end - now + ); + } else { + f(); + } + + completed.fetch_add(1, Ordering::Relaxed); + }); +} + +struct BenchThreadConfig { + pub n_threads: usize, + pub n_messages: usize, + pub barrier: Arc, + pub completed: Arc, + pub label: &'static str, + pub cpu_offset: usize, +} + +fn run_bench_threads( + config: BenchThreadConfig, + mut setup: Setup, + on_complete: Done, +) where + Setup: FnMut() -> Work, + Work: FnMut() + Send + 'static, + Done: Fn() + Send + Sync + 'static, +{ + let on_complete = Arc::new(on_complete); + for tid in 0..config.n_threads { + let mut work = setup(); + let barrier = config.barrier.clone(); + let completed = config.completed.clone(); + let on_complete = on_complete.clone(); + let n_messages = config.n_messages; + spawn_bench_thread( + tid + config.cpu_offset, + barrier, + completed, + config.label, + true, + move || { + for _ in 0..n_messages { + work(); + } + on_complete(); + }, + ); + } +} + +fn produce_bench_legacy( input: &OQueueBenchmarkInput, q: &Arc>, completed: &Arc, ) { - println!("Starting producers"); let barrier = Arc::new(AtomicUsize::new(input.n_threads)); - // Start all producers - for tid in 0..input.n_threads { - let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); - cpu_set.add(ostd::cpu::CpuId::try_from(tid + 1).unwrap()); - ThreadOptions::new({ - let barrier = barrier.clone(); - let completed = completed.clone(); + run_bench_threads( + BenchThreadConfig { + n_threads: input.n_threads, + n_messages: N_MESSAGES_PER_THREAD, + barrier, + completed: completed.clone(), + label: "producer", + cpu_offset: 1, + }, + || { let producer = q.attach_producer().unwrap(); - move || { - barrier.fetch_sub(1, Ordering::Acquire); - while barrier.load(Ordering::Relaxed) > 0 {} - let now = time::clocks::RealTimeClock::get().read_time(); - for _ in 0..N_MESSAGES_PER_THREAD { - producer.produce(0); - } - let end = time::clocks::RealTimeClock::get().read_time(); - println!( - "[producer-{}-{:?}] sent msg in {:?}", - tid, - ostd::cpu::CpuId::current_racy(), - end - now - ); - completed.fetch_add(1, Ordering::Relaxed); - } - }) - .cpu_affinity(cpu_set) - .spawn(); - } + move || producer.produce(0) + }, + || {}, + ); +} + +fn produce_bench>( + input: &OQueueNewBenchmarkInput, + q: &Arc, + completed: &Arc, +) { + let barrier = Arc::new(AtomicUsize::new(input.n_threads)); + run_bench_threads( + BenchThreadConfig { + n_threads: input.n_threads, + n_messages: N_MESSAGES_PER_THREAD, + barrier, + completed: completed.clone(), + label: "producer", + cpu_offset: 1, + }, + || { + let producer = q.attach_ref_producer().unwrap(); + move || producer.produce_ref(&0u64) + }, + || {}, + ); } -fn consume_bench( +fn consume_bench_legacy( input: &OQueueBenchmarkInput, q: &Arc>, completed: &Arc, ) { + // Attach consumer so the queue knows to retain values + let _consumer = q.attach_consumer().unwrap(); + let produced_completed_wq = Arc::new(ostd::sync::WaitQueue::new()); let produce_completed = Arc::new(AtomicUsize::new(0)); + let barrier = Arc::new(AtomicUsize::new(input.n_threads)); - // Initialize consumer so that the producer knows to retain values - let consumer = q.attach_consumer().unwrap(); - println!("Populating queue"); - for tid in 0..input.n_threads { - let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); - cpu_set.add(ostd::cpu::CpuId::try_from(tid + 1).unwrap()); - ThreadOptions::new({ + run_bench_threads( + BenchThreadConfig { + n_threads: input.n_threads, + n_messages: N_MESSAGES_PER_THREAD, + barrier, + completed: Arc::new(AtomicUsize::new(0)), + label: "producer", + cpu_offset: 1, + }, + || { + let producer = q.attach_producer().unwrap(); + move || { + producer.produce(0); + } + }, + { let produce_completed = produce_completed.clone(); let produced_completed_wq = produced_completed_wq.clone(); - let producer = q.attach_producer().unwrap(); move || { - for _ in 0..N_MESSAGES_PER_THREAD { - producer.produce(0); - } produce_completed.fetch_add(1, Ordering::Relaxed); produced_completed_wq.wake_all(); } - }) - .cpu_affinity(cpu_set) - .spawn(); - } - produced_completed_wq - .wait_until(|| (completed.load(Ordering::Relaxed) == input.n_threads).then_some(())); + }, + ); + + // Wait for queue to be full + produced_completed_wq.wait_until(|| { + (produce_completed.load(Ordering::Relaxed) == input.n_threads).then_some(()) + }); + + let consume_barrier = Arc::new(AtomicUsize::new(input.n_threads)); + run_bench_threads( + BenchThreadConfig { + n_threads: input.n_threads, + n_messages: N_MESSAGES_PER_THREAD, + barrier: consume_barrier, + completed: completed.clone(), + label: "consumer", + cpu_offset: 1, + }, + || { + let consumer = q.attach_consumer().unwrap(); + move || { + let _ = consumer.consume(); + } + }, + || {}, + ); +} - let barrier = Arc::new(AtomicUsize::new(input.n_threads)); - // Start all consumers - for tid in 0..input.n_threads { - let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); - cpu_set.add(ostd::cpu::CpuId::try_from(tid + 1).unwrap()); - ThreadOptions::new({ - let completed = completed.clone(); +fn consume_bench>( + input: &OQueueNewBenchmarkInput, + q: &Arc, + completed: &Arc, +) { + // Attach consumers first so the queue knows to retain messages + let _consumers: Vec<_> = (0..input.n_threads) + .map(|_| q.attach_consumer().unwrap()) + .collect(); + + let produced_completed_wq = Arc::new(ostd::sync::WaitQueue::new()); + let produce_completed = Arc::new(AtomicUsize::new(0)); + let produce_barrier = Arc::new(AtomicUsize::new(input.n_threads)); + + println!("Populating queue"); + run_bench_threads( + BenchThreadConfig { + n_threads: input.n_threads, + n_messages: N_MESSAGES_PER_THREAD, + barrier: produce_barrier, + completed: Arc::new(AtomicUsize::new(0)), + label: "producer", + cpu_offset: 1, + }, + || { + let producer = q.attach_value_producer().unwrap(); + move || { + producer.produce(0); + } + }, + { + let produce_completed = produce_completed.clone(); + let produced_completed_wq = produced_completed_wq.clone(); + move || { + produce_completed.fetch_add(1, Ordering::Relaxed); + produced_completed_wq.wake_all(); + } + }, + ); + + produced_completed_wq.wait_until(|| { + (produce_completed.load(Ordering::Relaxed) == input.n_threads).then_some(()) + }); + + let consume_barrier = Arc::new(AtomicUsize::new(input.n_threads)); + run_bench_threads( + BenchThreadConfig { + n_threads: input.n_threads, + n_messages: N_MESSAGES_PER_THREAD, + barrier: consume_barrier, + completed: completed.clone(), + label: "consumer", + cpu_offset: 1, + }, + || { let consumer = q.attach_consumer().unwrap(); - let barrier = barrier.clone(); move || { - barrier.fetch_sub(1, Ordering::Acquire); - while barrier.load(Ordering::Relaxed) > 0 {} - let now = time::clocks::RealTimeClock::get().read_time(); - for _ in 0..N_MESSAGES_PER_THREAD { - let _ = consumer.consume(); - } - let end = time::clocks::RealTimeClock::get().read_time(); - println!( - "[consumer-{}-{:?}] recv msg in {:?}", - tid, - ostd::cpu::CpuId::current_racy(), - end - now - ); - completed.fetch_add(1, Ordering::Relaxed); + let _ = consumer.consume(); } - }) - .cpu_affinity(cpu_set) - .spawn(); - } - drop(consumer); + }, + || {}, + ); } -fn mixed_bench( +fn mixed_bench_legacy( input: &OQueueBenchmarkInput, q: &Arc>, completed: &Arc, ) { + // number of threads MUST be even because an equal number of producers and consumers are created + assert!( + input.n_threads % 2 == 0, + "mixed_bench: bench.n_threads must be even (got {})", + input.n_threads + ); let n_threads_per_type: usize = input.n_threads / 2; - let barrier = Arc::new(AtomicUsize::new(input.n_threads)); - // Start all producers - for tid in 0..n_threads_per_type { - let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); - cpu_set.add(ostd::cpu::CpuId::try_from(tid + 1).unwrap()); - ThreadOptions::new({ - let completed = completed.clone(); + let barrier = Arc::new(AtomicUsize::new(input.n_threads)); + run_bench_threads( + BenchThreadConfig { + n_threads: n_threads_per_type, + n_messages: 2 * N_MESSAGES_PER_THREAD, + barrier: barrier.clone(), + completed: completed.clone(), + label: "producer", + cpu_offset: 1, + }, + || { let producer = q.attach_producer().unwrap(); - let barrier = barrier.clone(); move || { - barrier.fetch_sub(1, Ordering::Acquire); - while barrier.load(Ordering::Relaxed) > 0 {} - for _ in 0..(2 * N_MESSAGES_PER_THREAD) { - producer.produce(0); - } - crate::prelude::println!("finished producer {}", tid); - completed.fetch_add(1, Ordering::Relaxed); + producer.produce(0); } - }) - .cpu_affinity(cpu_set) - .spawn(); - } + }, + || {}, + ); + + run_bench_threads( + BenchThreadConfig { + n_threads: n_threads_per_type, + n_messages: 2 * N_MESSAGES_PER_THREAD, + barrier: barrier.clone(), + completed: completed.clone(), + label: "consumer", + cpu_offset: n_threads_per_type + 1, + }, + || { + let consumer = q.attach_consumer().unwrap(); + move || { + let _ = consumer.consume(); + } + }, + || {}, + ); +} - // Start all consumers - for tid in 0..n_threads_per_type { - let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); - cpu_set.add(ostd::cpu::CpuId::try_from(n_threads_per_type + tid + 1).unwrap()); - ThreadOptions::new({ - let completed = completed.clone(); +fn mixed_bench_new>( + input: &OQueueNewBenchmarkInput, + q: &Arc, + completed: &Arc, +) { + // number of threads MUST be even because an equal number of producers and consumers are created + assert!( + input.n_threads % 2 == 0, + "mixed_bench: bench.n_threads must be even (got {})", + input.n_threads + ); + let n_threads_per_type: usize = input.n_threads / 2; + let barrier = Arc::new(AtomicUsize::new(input.n_threads)); + + run_bench_threads( + BenchThreadConfig { + n_threads: n_threads_per_type, + n_messages: 2 * N_MESSAGES_PER_THREAD, + barrier: barrier.clone(), + completed: completed.clone(), + label: "producer", + cpu_offset: 1, + }, + || { + let producer = q.attach_value_producer().unwrap(); + move || { + producer.produce(0); + } + }, + || {}, + ); + + run_bench_threads( + BenchThreadConfig { + n_threads: n_threads_per_type, + n_messages: 2 * N_MESSAGES_PER_THREAD, + barrier: barrier.clone(), + completed: completed.clone(), + label: "consumer", + cpu_offset: n_threads_per_type + 1, + }, + || { let consumer = q.attach_consumer().unwrap(); - let barrier = barrier.clone(); move || { - barrier.fetch_sub(1, Ordering::Acquire); - while barrier.load(Ordering::Relaxed) > 0 {} - for _ in 0..(2 * N_MESSAGES_PER_THREAD) { - let _ = consumer.consume(); - } - crate::prelude::println!("finished consumer {}", tid); - completed.fetch_add(1, Ordering::Relaxed); + let _ = consumer.consume(); } - }) - .cpu_affinity(cpu_set) - .spawn(); - } + }, + || {}, + ); } -fn weak_obs_bench( +fn weak_obs_bench_legacy( input: &OQueueBenchmarkInput, q: &Arc>, completed: &Arc, ) { + assert!( + input.n_threads % 2 == 0, + "weak_obs_bench: bench.n_threads must be even (got {})", + input.n_threads + ); let n_threads_per_type: usize = input.n_threads / 2; let barrier = Arc::new(AtomicUsize::new(input.n_threads)); @@ -644,11 +857,99 @@ fn weak_obs_bench( } } -fn strong_obs_bench( +fn weak_obs_bench>( + input: &OQueueNewBenchmarkInput, + q: &Arc, + completed: &Arc, +) { + assert!( + input.n_threads % 2 == 0, + "weak_obs_bench: bench.n_threads must be even (got {})", + input.n_threads + ); + let n_threads_per_type: usize = input.n_threads / 2; + let barrier = Arc::new(AtomicUsize::new(input.n_threads)); + + // Start all producers + for tid in 0..n_threads_per_type { + let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); + cpu_set.add(ostd::cpu::CpuId::try_from(tid + 1).unwrap()); + ThreadOptions::new({ + let completed = completed.clone(); + let producer = q.attach_value_producer().unwrap(); + let barrier = barrier.clone(); + move || { + barrier.fetch_sub(1, Ordering::Acquire); + while barrier.load(Ordering::Relaxed) > 0 {} + for _ in 0..(2 * N_MESSAGES_PER_THREAD) { + producer.produce(0); + } + completed.fetch_add(1, Ordering::Relaxed); + } + }) + .cpu_affinity(cpu_set) + .spawn(); + } + + // Start all consumers + let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); + cpu_set.add(ostd::cpu::CpuId::try_from(n_threads_per_type + 1).unwrap()); + ThreadOptions::new({ + let completed = completed.clone(); + let consumer = q.attach_consumer().unwrap(); + let barrier = barrier.clone(); + move || { + barrier.fetch_sub(1, Ordering::Acquire); + while barrier.load(Ordering::Relaxed) > 0 {} + for _ in 0..(2 * N_MESSAGES_PER_THREAD) { + let _ = consumer.consume(); + } + completed.fetch_add(1, Ordering::Relaxed); + } + }) + .cpu_affinity(cpu_set) + .spawn(); + + for tid in 0..(n_threads_per_type.wrapping_sub(1)) { + let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); + cpu_set.add(ostd::cpu::CpuId::try_from(n_threads_per_type + tid + 2).unwrap()); + ThreadOptions::new({ + let completed = completed.clone(); + let weak_observer = q + .attach_weak_observer(1, ObservationQuery::new(|v: &u64| *v)) + .unwrap(); + let barrier = barrier.clone(); + move || { + barrier.fetch_sub(1, Ordering::Acquire); + while barrier.load(Ordering::Relaxed) > 0 {} + let mut cnt = 0; + for _ in 0..(2 * N_MESSAGES_PER_THREAD) { + cnt += weak_observer + .weak_observe_recent(1) + .unwrap_or_default() + .into_iter() + .filter(|v| v.is_some()) + .count(); + } + crate::prelude::println!("weak observed {} values", cnt); + completed.fetch_add(1, Ordering::Relaxed); + } + }) + .cpu_affinity(cpu_set) + .spawn(); + } +} + +fn strong_obs_bench_legacy( input: &OQueueBenchmarkInput, q: &Arc>, completed: &Arc, ) { + assert!( + input.n_threads % 2 == 0, + "weak_obs_bench: bench.n_threads must be even (got {})", + input.n_threads + ); let n_threads_per_type: usize = input.n_threads / 2; let barrier = Arc::new(AtomicUsize::new(input.n_threads)); @@ -684,7 +985,6 @@ fn strong_obs_bench( let barrier = barrier.clone(); move || { barrier.fetch_sub(1, Ordering::Acquire); - // ostd::task::Task::yield_now(); while barrier.load(Ordering::Relaxed) > 0 {} crate::prelude::println!("consumer start"); for _ in 0..(2 * N_MESSAGES_PER_THREAD) { @@ -698,6 +998,7 @@ fn strong_obs_bench( .spawn(); if input.q_type == "mpmc_oq" || input.q_type == "locking" { + info!("strong obs bench starting"); // Start all consumers for tid in 0..(n_threads_per_type.wrapping_sub(1)) { let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); @@ -712,7 +1013,7 @@ fn strong_obs_bench( crate::prelude::println!("observer start"); let mut cnt = 0; for _ in 0..(2 * N_MESSAGES_PER_THREAD) { - strong_observer.strong_observe(); + let _ = strong_observer.strong_observe(); cnt += 1; } crate::prelude::println!("strong observed {} values", cnt); @@ -747,6 +1048,86 @@ fn strong_obs_bench( } } +fn strong_obs_bench>( + input: &OQueueNewBenchmarkInput, + q: &Arc, + completed: &Arc, +) { + assert!( + input.n_threads % 2 == 0, + "weak_obs_bench: bench.n_threads must be even (got {})", + input.n_threads + ); + let n_threads_per_type: usize = input.n_threads / 2; + let barrier = Arc::new(AtomicUsize::new(input.n_threads)); + + // Start all producers + for tid in 0..n_threads_per_type { + let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); + cpu_set.add(ostd::cpu::CpuId::try_from(tid + 1).unwrap()); + ThreadOptions::new({ + let completed = completed.clone(); + let producer = q.attach_value_producer().unwrap(); + let barrier = barrier.clone(); + move || { + barrier.fetch_sub(1, Ordering::Acquire); + while barrier.load(Ordering::Relaxed) > 0 {} + for _ in 0..(2 * N_MESSAGES_PER_THREAD) { + producer.produce(0); + } + completed.fetch_add(1, Ordering::Relaxed); + } + }) + .cpu_affinity(cpu_set) + .spawn(); + } + + // Start all consumers + let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); + cpu_set.add(ostd::cpu::CpuId::try_from(n_threads_per_type + 1).unwrap()); + ThreadOptions::new({ + let completed = completed.clone(); + let consumer = q.attach_consumer().unwrap(); + let barrier = barrier.clone(); + move || { + barrier.fetch_sub(1, Ordering::Acquire); + while barrier.load(Ordering::Relaxed) > 0 {} + for _ in 0..(2 * N_MESSAGES_PER_THREAD) { + let _ = consumer.consume(); + } + completed.fetch_add(1, Ordering::Relaxed); + } + }) + .cpu_affinity(cpu_set) + .spawn(); + + // Start all strong observers + for tid in 0..(n_threads_per_type.wrapping_sub(1)) { + let mut cpu_set = ostd::cpu::set::CpuSet::new_empty(); + cpu_set.add(ostd::cpu::CpuId::try_from(n_threads_per_type + 2 + tid).unwrap()); + ThreadOptions::new({ + let completed = completed.clone(); + let strong_observer = q + .attach_strong_observer(ObservationQuery::new(|v: &u64| *v)) + .unwrap(); + let barrier = barrier.clone(); + move || { + barrier.fetch_sub(1, Ordering::Acquire); + while barrier.load(Ordering::Relaxed) > 0 {} + let mut cnt = 0; + for _ in 0..(2 * N_MESSAGES_PER_THREAD) { + let _ = strong_observer.strong_observe(); + cnt += 1; + } + crate::prelude::println!("strong observed {} values", cnt); + completed.fetch_add(1, Ordering::Relaxed); + } + }) + .cpu_affinity(cpu_set) + .spawn(); + } +} + type OQueueBenchFn = &'static dyn Fn(&OQueueBenchmarkInput, &Arc>, &Arc); @@ -793,13 +1174,13 @@ impl OQueueBenchmark { impl Benchmark for OQueueBenchmark { fn init(&mut self, n_threads: usize, _n_repeat: usize, _iter: usize) { - let karg = get_kernel_cmd_line().unwrap(); + let karg = get_kernel_cmd_line().expect("no kernel command line"); self.input = Some(OQueueBenchmarkInput { n_threads, n_messages: N_MESSAGES_PER_THREAD * n_threads, q_type: karg .get_module_arg_by_name::("bench", "q_type") - .unwrap(), + .expect("missing bench.q_type=... on kernel command line"), }); } @@ -1026,23 +1407,98 @@ impl Benchmark for OQueueScalingBenchmark { } } +struct OQueueNewBenchmark { + name: String, + test_type: NewBenchType, + input: Option, +} + +struct OQueueNewBenchmarkInput { + pub n_threads: usize, +} + +enum NewBenchType { + Produce, + Consume, + Mixed, + WeakObs, + StrongObs, +} + +impl OQueueNewBenchmark { + fn new(name: &str, test_type: NewBenchType) -> Box { + Box::new(Self { + name: name.to_string(), + input: None, + test_type, + }) + } + + fn get_ref_oq(&self) -> OQueueRef { + OQueueRef::new_anonymous(2 << 20) + } + + fn get_consumable_oq(&self) -> ConsumableOQueueRef { + ConsumableOQueueRef::new_anonymous(2 << 20) + } +} + +impl Benchmark for OQueueNewBenchmark { + fn init(&mut self, n_threads: usize, _n_repeat: usize, _iter: usize) { + let _karg = get_kernel_cmd_line().expect("no kernel command line"); + self.input = Some(OQueueNewBenchmarkInput { n_threads }); + } + + fn run(&self, completed: Arc) { + let input = self.input.as_ref().unwrap(); + match self.test_type { + NewBenchType::Produce => { + let q = Arc::new(self.get_ref_oq()); + produce_bench(input, &q, &completed); + } + NewBenchType::Consume => { + let q = Arc::new(self.get_consumable_oq()); + consume_bench(input, &q, &completed); + } + NewBenchType::Mixed => { + let q = Arc::new(self.get_consumable_oq()); + mixed_bench_new(input, &q, &completed); + } + NewBenchType::WeakObs => { + let q = Arc::new(self.get_consumable_oq()); + weak_obs_bench(input, &q, &completed); + } + NewBenchType::StrongObs => { + let q = Arc::new(self.get_consumable_oq()); + strong_obs_bench(input, &q, &completed); + } + } + } + fn name(&self) -> &str { + &self.name + } +} + pub fn register_benchmarks(bc: &mut BenchmarkHarness) { bc.register_benchmark(OQueueBenchmark::new( - &produce_bench, - "oqueue::produce_bench", + &produce_bench_legacy, + "oqueue::produce_bench_legacy", )); bc.register_benchmark(OQueueBenchmark::new( - &consume_bench, - "oqueue::consume_bench", + &consume_bench_legacy, + "oqueue::consume_bench_legacy", )); - bc.register_benchmark(OQueueBenchmark::new(&mixed_bench, "oqueue::mixed_bench")); bc.register_benchmark(OQueueBenchmark::new( - &weak_obs_bench, - "oqueue::weak_obs_bench", + &mixed_bench_legacy, + "oqueue::mixed_bench_legacy", )); bc.register_benchmark(OQueueBenchmark::new( - &strong_obs_bench, - "oqueue::strong_obs_bench", + &weak_obs_bench_legacy, + "oqueue::weak_obs_bench_legacy", + )); + bc.register_benchmark(OQueueBenchmark::new( + &strong_obs_bench_legacy, + "oqueue::strong_obs_bench_legacy", )); bc.register_benchmark(OQueueScalingBenchmark::new( @@ -1057,4 +1513,24 @@ pub fn register_benchmarks(bc: &mut BenchmarkHarness) { "oqueue_scaling::weak_obs", OQueueScalingBenchmarkType::WeakObserver, )); + bc.register_benchmark(OQueueNewBenchmark::new( + "oqueue::produce_bench", + NewBenchType::Produce, + )); + bc.register_benchmark(OQueueNewBenchmark::new( + "oqueue::consume_bench", + NewBenchType::Consume, + )); + bc.register_benchmark(OQueueNewBenchmark::new( + "oqueue::mixed_bench", + NewBenchType::Mixed, + )); + bc.register_benchmark(OQueueNewBenchmark::new( + "oqueue::weak_obs_bench", + NewBenchType::WeakObs, + )); + bc.register_benchmark(OQueueNewBenchmark::new( + "oqueue::strong_obs_bench", + NewBenchType::StrongObs, + )); } diff --git a/kernel/src/data_capture.rs b/kernel/src/data_capture.rs new file mode 100644 index 000000000..941fd421d --- /dev/null +++ b/kernel/src/data_capture.rs @@ -0,0 +1,152 @@ +// SPDX-License-Identifier: MPL-2.0 + +//! OQueue data capture utilities. + +use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use core::time::Duration; + +use log::{error, info}; +use ostd::{ + ignore_err, new_server, + orpc::{ + framework::{notifier::Notifier as _, spawn_thread}, + orpc_server, + }, + sync::Mutex, +}; + +use crate::{fs, kcmdline, util::timer::TimerServer}; + +#[orpc_server] +struct DataCaptureManager {} + +impl DataCaptureManager { + pub fn spawn(period: Duration) -> Arc { + let server = new_server!(|_| Self {}); + spawn_thread(server.clone(), { + let server = server.clone(); + move || server.main(period) + }); + server + } + + pub fn main(&self, period: Duration) -> Result<(), Box> { + let notify_server = TimerServer::spawn(period); + + let notify_observer = notify_server + .notification_oqueue() + .attach_strong_observer()?; + loop { + notify_observer.strong_observe(); + for f in DATA_CAPTURE_FILE_SYNCERS.lock().iter() { + f(); + } + } + } +} + +static DATA_CAPTURE_DEVICE_LEGACY: Mutex< + Option>, +> = Mutex::new(None); + +static DATA_CAPTURE_DEVICE: Mutex>> = + Mutex::new(None); + +pub(super) static DATA_CAPTURE_FILE_FINALIZERS: Mutex>> = + Mutex::new(Vec::new()); + +static DATA_CAPTURE_FILE_SYNCERS: Mutex>> = Mutex::new(Vec::new()); + +pub(super) fn start_capture_devices() { + match fs::start_block_device("capture_legacy") { + Ok(capture_block_device) => { + DATA_CAPTURE_DEVICE_LEGACY.lock().replace( + mariposa_data_capture::legacy::DataCaptureDeviceServer::new(capture_block_device), + ); + info!("[kernel] Initialized legacy data capture device (capture_legacy)"); + } + Err(e) => error!( + "[kernel] Failed to initialize legacy data capture device (capture_legacy): {}", + e + ), + } + + match fs::start_block_device("capture") { + Ok(capture_block_device) => { + DATA_CAPTURE_DEVICE.lock().replace( + mariposa_data_capture::DataCaptureDeviceServer::new(capture_block_device), + ); + info!("[kernel] Initialized new data capture device (capture)"); + } + Err(e) => error!( + "[kernel] Failed to initialized new data capture device (capture): {}", + e + ), + } + + // Start a server which syncs the data_capture devices every `secs` seconds based on the + // kcmdline arg `data_capture.sync_period`. If `data_capture.sync_period` is not provided or is + // <= 0, then do not sync periodically. + if let Some(secs) = kcmdline::get_kernel_cmd_line() + .and_then(|cl| cl.get_module_arg_by_name("data_capture", "sync_period")) + && secs > 0.0 + { + DataCaptureManager::spawn(Duration::from_secs_f32(secs)); + } +} + +/// Create a new data capture file for legacy OQueues. +pub fn new_legacy_data_capture_file( + descriptor: mariposa_data_capture::legacy::FileDescriptor, +) -> Arc> { + let ret = DATA_CAPTURE_DEVICE_LEGACY + .lock() + .as_ref() + .unwrap() + .new_file(descriptor) + .unwrap() + .build(); + DATA_CAPTURE_FILE_FINALIZERS.lock().push(Box::new({ + let ret = ret.clone(); + move || { + ignore_err!(ret.stop()); + info!("[kernel] Stopped data capture device (capture)"); + } + })); + DATA_CAPTURE_FILE_SYNCERS.lock().push(Box::new({ + let ret = ret.clone(); + move || { + ignore_err!(ret.sync()); + info!("[kernel] Sync'd data capture device (capture)"); + } + })); + ret +} + +/// Create a new data capture file for OQueues. +pub fn new_data_capture_file( + descriptor: mariposa_data_capture::FileDescriptor, +) -> Arc> { + let ret = DATA_CAPTURE_DEVICE + .lock() + .as_ref() + .unwrap() + .new_file(descriptor) + .unwrap() + .build(); + DATA_CAPTURE_FILE_FINALIZERS.lock().push(Box::new({ + let ret = ret.clone(); + move || { + ignore_err!(ret.stop()); + info!("[kernel] Stopped legacy data capture device (capture_legacy)"); + } + })); + DATA_CAPTURE_FILE_SYNCERS.lock().push(Box::new({ + let ret = ret.clone(); + move || { + ignore_err!(ret.sync()); + info!("[kernel] Sync'd legacy data capture device (capture_legacy)"); + } + })); + ret +} diff --git a/kernel/src/event.rs b/kernel/src/event.rs new file mode 100644 index 000000000..434dcb956 --- /dev/null +++ b/kernel/src/event.rs @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: MPL-2.0 + +use aster_time::Instant; +use ostd::task::Task; +use serde::Serialize; + +use crate::{process::posix_thread::AsPosixThread as _, thread::Tid}; + +#[derive(Debug, Clone, Copy, Serialize)] +pub enum TaskId { + KernelTask(usize), + PosixThread(Tid), + Unknown, +} + +impl TaskId { + pub fn new(task: &Task) -> Self { + if let Some(t) = task.as_posix_thread() { + Self::PosixThread(t.tid()) + } else { + Self::KernelTask(task.id().into()) + } + } +} + +#[derive(Debug, Clone, Copy, Serialize)] +pub struct EventContext { + pub task: TaskId, + pub timestamp: Instant, +} + +impl EventContext { + /// Creates a new EventContext from the current context + pub fn new() -> Self { + EventContext { + task: Task::current() + .map(|t| TaskId::new(&t)) + .unwrap_or(TaskId::Unknown), + timestamp: aster_time::read_monotonic_time().into(), + } + } +} + +impl Default for EventContext { + fn default() -> Self { + Self::new() + } +} diff --git a/kernel/src/fs/mod.rs b/kernel/src/fs/mod.rs index 46e276d2c..0c87ff2b0 100644 --- a/kernel/src/fs/mod.rs +++ b/kernel/src/fs/mod.rs @@ -37,7 +37,7 @@ use crate::{ /// Start a thread of the block device to pop requests from the block device's /// request queue and process them if there are any. If the request queue is empty, /// the thread will wait until there is a request in the queue. -fn start_block_device(device_name: &str) -> Result> { +pub(crate) fn start_block_device(device_name: &str) -> Result> { if let Some(device) = aster_block::get_device(device_name) { let cloned_device = device.clone(); let task_fn = move || { diff --git a/kernel/src/fs/rootfs.rs b/kernel/src/fs/rootfs.rs index 3c0fa41d8..c57de2b0d 100644 --- a/kernel/src/fs/rootfs.rs +++ b/kernel/src/fs/rootfs.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: MPL-2.0 -use core2::io::{Cursor, Read}; use cpio_decoder::{CpioDecoder, FileType}; use lending_iterator::LendingIterator; use libflate::gzip::Decoder as GZipDecoder; +use no_std_io2::io::{Cursor, Read}; use spin::Once; use super::{ @@ -25,7 +25,7 @@ impl<'a> BoxedReader<'a> { } impl Read for BoxedReader<'_> { - fn read(&mut self, buf: &mut [u8]) -> core2::io::Result { + fn read(&mut self, buf: &mut [u8]) -> no_std_io2::io::Result { self.0.read(buf) } } diff --git a/kernel/src/fs/utils/inode.rs b/kernel/src/fs/utils/inode.rs index 2a2ede448..face6ae41 100644 --- a/kernel/src/fs/utils/inode.rs +++ b/kernel/src/fs/utils/inode.rs @@ -5,7 +5,7 @@ use core::{any::TypeId, time::Duration}; use aster_rights::Full; -use core2::io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult, Write}; +use no_std_io2::io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult, Write}; use ostd::task::Task; use super::{ diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 85b14c3fb..ba38db9a2 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -24,17 +24,34 @@ #![register_tool(component_access_control)] use aster_framebuffer::FRAMEBUFFER_CONSOLE; +#[cfg(not(baseline_asterinas))] +mod data_capture; +pub mod event; +use aster_time::Instant; +#[cfg(not(baseline_asterinas))] +pub use data_capture::{new_data_capture_file, new_legacy_data_capture_file}; use kcmdline::KCmdlineArg; +#[cfg(not(baseline_asterinas))] +use mariposa_data_capture::ObserverRegistration; use ostd::{ arch::qemu::{QemuExitCode, exit_qemu}, boot::boot_info, cpu::{CpuId, CpuSet}, + ignore_err, + task::scheduler::{SchedulingEvent, SchedulingEventKind}, +}; +#[cfg(not(baseline_asterinas))] +use ostd::{ + orpc::oqueue::{OQueueBase as _, ObservationQuery, registry::lookup_by_path}, + path, }; use process::{Process, spawn_init_process}; use sched::SchedPolicy; +use serde::Serialize; use spin::Once; use crate::{ + event::{EventContext, TaskId}, kcmdline::set_kernel_cmd_line, prelude::*, thread::kernel_thread::ThreadOptions, @@ -159,6 +176,11 @@ fn init_thread() { .unwrap_or(false); set_huge_mapping_preserve_on_dontneed(huge_mapping_preserve_on_dontneed); + #[cfg(not(baseline_asterinas))] + { + data_capture::start_capture_devices(); + } + #[cfg(target_arch = "x86_64")] net::lazy_init(); fs::lazy_init(); @@ -216,11 +238,57 @@ fn init_thread() { pmu.start(); } + #[cfg(not(baseline_asterinas))] + if karg + .get_module_arg_by_name::("scheduler", "capture_data") + .unwrap_or(false) + { + if let Some(oqueue) = lookup_by_path::(&path!(scheduler.events)) { + #[derive(Debug, Clone, Copy, Serialize)] + struct KernelSchedulingEvent { + timestamp: Instant, + task: TaskId, + kind: SchedulingEventKind, + } + + let capture_file = new_data_capture_file::( + mariposa_data_capture::FileDescriptor { + path: path!(scheduler.events), + length: 500 * 1024 * 1024, + }, + ); + + ignore_err!( + capture_file.register_observer(ObserverRegistration { + path: path!(scheduler.events), + observer: oqueue + .attach_strong_observer(ObservationQuery::new(|e: &SchedulingEvent| { + let context = EventContext::new(); + KernelSchedulingEvent { + timestamp: context.timestamp, + kind: e.kind, + task: TaskId::new(&e.task), + } + })) + .unwrap(), + }) + ); + ignore_err!(capture_file.start()); + } else { + error!("Could not find scheduler.events OQueue. Scheduler events will not be captured.") + } + } + // Wait till initproc become zombie. while !initproc.status().is_zombie() { ostd::task::halt_cpu(); } + #[cfg(not(baseline_asterinas))] + for f in data_capture::DATA_CAPTURE_FILE_FINALIZERS.lock().drain(..) { + f(); + } + // TODO: exit via qemu isa debug device should not be the only way. let exit_code = if initproc.status().exit_code() == 0 { QemuExitCode::Success diff --git a/osdk/Cargo.lock b/osdk/Cargo.lock index 02c7e6cb6..83131e353 100644 --- a/osdk/Cargo.lock +++ b/osdk/Cargo.lock @@ -301,15 +301,6 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" -[[package]] -name = "core2" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" -dependencies = [ - "memchr", -] - [[package]] name = "crc32fast" version = "1.4.2" @@ -417,6 +408,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "getrandom" version = "0.3.3" @@ -434,10 +431,6 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -dependencies = [ - "ahash", - "allocator-api2", -] [[package]] name = "hashbrown" @@ -445,6 +438,17 @@ version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "heck" version = "0.5.0" @@ -592,25 +596,25 @@ checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libflate" -version = "2.1.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +checksum = "cd96e993e5f3368b0cb8497dae6c860c22af8ff18388c61c6c0b86c58d86b5df" dependencies = [ "adler32", - "core2", "crc32fast", "dary_heap", "libflate_lz77", + "no_std_io2", ] [[package]] name = "libflate_lz77" -version = "2.1.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +checksum = "ff7a10e427698aef6eef269482776debfef63384d30f13aad39a1a95e0e098fd" dependencies = [ - "core2", - "hashbrown 0.14.5", + "hashbrown 0.16.1", + "no_std_io2", "rle-decode-fast", ] @@ -654,6 +658,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "no_std_io2" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b51ed7824b6e07d354605f4abb3d9d300350701299da96642ee084f5ce631550" +dependencies = [ + "memchr", +] + [[package]] name = "num-format" version = "0.4.4" diff --git a/ostd/Cargo.toml b/ostd/Cargo.toml index 61476eed9..6f516ceb1 100644 --- a/ostd/Cargo.toml +++ b/ostd/Cargo.toml @@ -43,6 +43,7 @@ bitvec = { version = "1.0", default-features = false, features = ["alloc"] } crossbeam-utils = { version = "0.8.21", default-features = false } tinyvec = { version = "1.10"} static_assertions = "*" +serde = { version = "1.0", default-features = false, features = ["derive", "alloc"]} [target.'cfg(target_arch = "x86_64")'.dependencies] x86_64 = "0.14.13" @@ -65,6 +66,7 @@ fdt = { version = "0.1.5", features = ["pretty-printing"] } default = ["cvm_guest"] capture_stacks = [] track_mutex = [] +capture_scheduling = [] # The guest OS support for Confidential VMs (CVMs), e.g., Intel TDX cvm_guest = ["dep:tdx-guest", "dep:iced-x86"] diff --git a/ostd/libs/linux-bzimage/builder/Cargo.toml b/ostd/libs/linux-bzimage/builder/Cargo.toml index 9b49bf027..d78ed47a0 100644 --- a/ostd/libs/linux-bzimage/builder/Cargo.toml +++ b/ostd/libs/linux-bzimage/builder/Cargo.toml @@ -12,7 +12,7 @@ repository = "https://github.com/ldos-project/asterinas" align_ext = { version = "0.1.0", path = "../../align_ext" } bitflags = "1.3" bytemuck = { version = "1.17.0", features = ["derive"] } -libflate = "2.1.0" +libflate = "2.3.0" serde = { version = "1.0.192", features = ["derive"] } xmas-elf = "0.10.0" diff --git a/ostd/libs/linux-bzimage/setup/Cargo.toml b/ostd/libs/linux-bzimage/setup/Cargo.toml index b05dfedf7..4bcfd2f1f 100644 --- a/ostd/libs/linux-bzimage/setup/Cargo.toml +++ b/ostd/libs/linux-bzimage/setup/Cargo.toml @@ -13,14 +13,14 @@ path = "src/main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [target.'cfg(target_os = "none")'.dependencies] -core2 = { version = "0.4.0", default-features = false, features = ["nightly"] } +no_std_io2 = { version = "0.9.3", default-features = false } [target.'cfg(not(target_os = "none"))'.dependencies] -core2 = { version = "0.4.0", default-features = false } +no_std_io2 = { version = "0.9.3", default-features = false } [dependencies] cfg-if = "1.0.0" -libflate = { version = "2.1.0", default-features = false } +libflate = { version = "2.3.0", default-features = false } linux-boot-params = { version = "0.15.2", path = "../boot-params" } uart_16550 = "0.3.0" xmas-elf = "0.10.0" diff --git a/ostd/libs/linux-bzimage/setup/src/x86/amd64_efi/decoder.rs b/ostd/libs/linux-bzimage/setup/src/x86/amd64_efi/decoder.rs index 585fb141a..05f564e49 100644 --- a/ostd/libs/linux-bzimage/setup/src/x86/amd64_efi/decoder.rs +++ b/ostd/libs/linux-bzimage/setup/src/x86/amd64_efi/decoder.rs @@ -7,8 +7,8 @@ extern crate alloc; use alloc::vec::Vec; use core::convert::TryFrom; -use core2::io::Read; use libflate::{gzip, zlib}; +use no_std_io2::io::Read; enum MagicNumber { Elf, diff --git a/ostd/src/error.rs b/ostd/src/error.rs index ec815b8d8..96391466e 100644 --- a/ostd/src/error.rs +++ b/ostd/src/error.rs @@ -24,22 +24,31 @@ pub trait OstdError: snafu::Error { #[snafu(visibility(pub))] pub enum Error { /// Invalid arguments provided. + #[snafu(display("Invalid arguments ({context})"))] InvalidArgs, /// Insufficient memory available. + #[snafu(display("Insufficient memory ({context})"))] NoMemory, /// Page fault occurred. + #[snafu(display("Page fault ({context})"))] PageFault, /// Access to a resource is denied. + #[snafu(display("Access denied ({context})"))] AccessDenied, /// Input/output error. + #[snafu(display("Input/output error ({context})"))] IoError, /// Insufficient system resources. + #[snafu(display("Insufficient resources ({context})"))] NotEnoughResources, /// Arithmetic Overflow occurred. + #[snafu(display("Arithmetic overflow ({context})"))] Overflow, /// Memory mapping already exists for the given virtual address. + #[snafu(display("Memory mapping already exists for this virtual address ({context})"))] MapAlreadyMappedVaddr, /// Error when allocating kernel virtual memory. + #[snafu(display("Error allocating kernel virtual memory ({context})"))] KVirtAreaAllocError, /// A ORPC error #[snafu(transparent)] diff --git a/ostd/src/lib.rs b/ostd/src/lib.rs index 8cfc75221..8f2f585a1 100644 --- a/ostd/src/lib.rs +++ b/ostd/src/lib.rs @@ -138,6 +138,8 @@ unsafe fn init() { bus::init(); + task::scheduler::init(); + arch::irq::enable_local(); invoke_ffi_init_funcs(); diff --git a/ostd/src/orpc/oqueue/implementation.rs b/ostd/src/orpc/oqueue/implementation.rs index eeffe9acf..dc6cf0114 100644 --- a/ostd/src/orpc/oqueue/implementation.rs +++ b/ostd/src/orpc/oqueue/implementation.rs @@ -37,6 +37,7 @@ use crate::{ Cursor, InlineStrongObserver, OQueueError, ObservationQuery, ResourceUnavailableSnafu, single_thread_ring_buffer::RingBuffer, }, + path::Path, }, sync::{LocalIrqDisabled, SpinLock, WaitQueue, WakerKey}, }; @@ -65,6 +66,7 @@ pub(crate) struct OQueueImplementation { /// The size to use for the consumer and strong-observer ring-buffers. len: usize, supports_consume: bool, + path: Option, pub(super) put_wait_queue: WaitQueue, pub(super) read_wait_queue: WaitQueue, } @@ -74,7 +76,9 @@ impl OQueueImplementation { /// /// * `len` is the ring buffer length used for consumers and strong-observers. /// * `supports_consume` specifies the attachment it allows later. - pub(crate) fn new(mut len: usize, supports_consume: bool) -> Self { + /// * `paths` are the paths associated with this OQueue; pass an empty `Vec` for anonymous + /// queues. + pub(crate) fn new(mut len: usize, supports_consume: bool, path: Option) -> Self { if len < 2 { warn!( "Creating an OQueue with length {len} is automatically increased to 2. Ring buffers smaller than 2 are not supported." @@ -91,11 +95,17 @@ impl OQueueImplementation { }), len, supports_consume, + path, put_wait_queue: WaitQueue::new(), read_wait_queue: WaitQueue::new(), } } + /// Return the path associated with this OQueue, or `None` for anonymous queues. + pub(super) fn path(&self) -> Option<&Path> { + self.path.as_ref() + } + /// Detach a consumer. This will free the consumer ring buffer if there are no consumers left. pub(super) fn detach_consumer(self: &Arc) { let mut inner = self.inner.lock(); diff --git a/ostd/src/orpc/oqueue/mod.rs b/ostd/src/orpc/oqueue/mod.rs index 0b298f3a4..02febcf26 100644 --- a/ostd/src/orpc/oqueue/mod.rs +++ b/ostd/src/orpc/oqueue/mod.rs @@ -92,17 +92,18 @@ pub enum OQueueError { /// The handle has been detached. Once this is returned, all future operations will return /// it. This can happen because access has been revoked (for instance, due to the observer /// being too slow), or the OQueue has been deleted. + #[snafu(display("Handle detached ({context})"))] Detached, - /// The operation is supported by this OQueue but the required resources are missing (e.g., /// observer slots or memory). + #[snafu(display("Resource unavailable ({context})"))] ResourceUnavailable, - /// The operation is not supported or not allowed by this OQueue. An operation may not be /// supported if, for example, the OQueue does not support consumers and a client tries to /// attach one. An operation may also be disallowed to prevent problems such as potential hangs /// or crashes. For example, strong observers may not be allowed on an OQueue to prevent /// `produce` from blocking. + #[snafu(display("Operation unsupported or disallowed ({context})"))] Unsupported, } @@ -138,6 +139,9 @@ pub trait OQueueBase { where U: Copy + Send + 'static; + /// Return the path of this OQueue, or `None` if it is anonymous. + fn path(&self) -> Option<&Path>; + /// Erase the kind of OQueue. This will not allow additional operations to succeed. It /// simply makes the checks dynamic. fn as_any_oqueue(&self) -> AnyOQueueRef; @@ -197,6 +201,10 @@ macro_rules! impl_oqueue_base_forward { self.$member.attach_inline_strong_observer(f) } + fn path(&self) -> Option<&Path> { + self.$member.path() + } + fn as_any_oqueue(&self) -> AnyOQueueRef { self.$member.as_any_oqueue() } @@ -285,7 +293,12 @@ impl ConsumableOQueueRef { /// Create a new OQueue with the specified buffer length and support for produce by value and /// consumers. pub fn new(len: usize, path: Path) -> Self { - let ret = Self::new_anonymous(len); + let inner = Arc::new(implementation::OQueueImplementation::new( + len, + true, + Some(path.clone()), + )); + let ret = Self { inner }; registry::register(&path, ret.as_any_oqueue()); ret } @@ -295,7 +308,7 @@ impl ConsumableOQueueRef { /// observation such as for ephemeral queues. pub fn new_anonymous(len: usize) -> Self { Self { - inner: Arc::new(implementation::OQueueImplementation::new(len, true)), + inner: Arc::new(implementation::OQueueImplementation::new(len, true, None)), } } } @@ -315,7 +328,13 @@ clone_without_t!(OQueueRef, : ?Sized + 'static); impl OQueueRef { /// Create a new observation OQueue with the specified buffer length. pub fn new(len: usize, path: Path) -> Self { - let ret = Self::new_anonymous(len); + let ret = Self { + inner: Arc::new(implementation::OQueueImplementation::new( + len, + false, + Some(path.clone()), + )), + }; registry::register(&path, ret.as_any_oqueue()); ret } @@ -324,7 +343,7 @@ impl OQueueRef { /// when the OQueue should never be discovered for observation such as for ephemeral queues. pub fn new_anonymous(len: usize) -> Self { Self { - inner: Arc::new(implementation::OQueueImplementation::new(len, false)), + inner: Arc::new(implementation::OQueueImplementation::new(len, false, None)), } } } @@ -1159,4 +1178,13 @@ mod test { generic_test::TestMessage, >::new(4, Path::test())); } + + #[ktest] + fn oqueue_path() { + let named = ConsumableOQueueRef::::new(4, Path::test()); + assert_eq!(named.path(), Some(&Path::test())); + + let anon = ConsumableOQueueRef::::new_anonymous(4); + assert_eq!(anon.path(), None); + } } diff --git a/ostd/src/orpc_common/errors.rs b/ostd/src/orpc_common/errors.rs index 0a2dde29c..a3d70f6d3 100644 --- a/ostd/src/orpc_common/errors.rs +++ b/ostd/src/orpc_common/errors.rs @@ -21,13 +21,13 @@ use crate::prelude::Box; pub enum RPCError { /// A panic occurred in the server during the call. The panic payload will be converted to a string, if possible. If /// it cannot be, then the string will be a generic error message. - #[snafu(display("{message}"))] + #[snafu(display("{message} ({context})"))] Panic { /// message associated with this panic message: String, }, /// The server does not exist or is not running. This can happen when a server already crashed or has been shutdown. - #[snafu(display("Server does not exist or not running"))] + #[snafu(display("Server does not exist or is not running ({context})"))] ServerMissing, } diff --git a/ostd/src/task/scheduler/mod.rs b/ostd/src/task/scheduler/mod.rs index 6106d0978..8c176fe42 100644 --- a/ostd/src/task/scheduler/mod.rs +++ b/ostd/src/task/scheduler/mod.rs @@ -10,9 +10,12 @@ pub mod info; use core::time::Duration; +use serde::Serialize; use spin::Once; use super::{Task, preempt::cpu_local, processor}; +#[cfg(feature = "capture_scheduling")] +use crate::orpc::oqueue::RefProducer; use crate::{ cpu::{CpuId, CpuSet, PinCurrentCpu}, prelude::*, @@ -20,6 +23,35 @@ use crate::{ timer, }; +/// Initialize scheduler globals. +/// +/// This should be called before the scheduler runs for the first time, but after the allocator is +/// fully initialized. +pub(crate) fn init() { + #[cfg(feature = "capture_scheduling")] + SCHEDULING_EVENT_PRODUCER.call_once(|| { + use crate::{ + orpc::oqueue::{OQueue, OQueueRef}, + path, + }; + + // TODO(arthurp): This calls the OQueue constructor before the scheduler is running. This is + // probably safe, but we should have documentation on when and why this is allowed. + let oqueue = OQueueRef::new(1024, path!(scheduler.events)); + oqueue.attach_ref_producer().unwrap() + }); +} + +#[cfg(feature = "capture_scheduling")] +static SCHEDULING_EVENT_PRODUCER: Once> = Once::new(); + +/// Get the producer handle for the scheduling event OQueue. This will panic if called before +/// [`init()`]. +#[cfg(feature = "capture_scheduling")] +fn get_scheduling_event_producer() -> &'static crate::orpc::oqueue::RefProducer { + SCHEDULING_EVENT_PRODUCER.get().unwrap() +} + /// Injects a scheduler implementation into framework. /// /// This function can only be called once and must be called during the initialization of kernel. @@ -37,6 +69,24 @@ pub fn inject_scheduler(scheduler: &'static dyn Scheduler) { static SCHEDULER: Once<&'static dyn Scheduler> = Once::new(); +/// An event either or scheduling or descheduling a task. +#[derive(Debug)] +pub struct SchedulingEvent { + /// The task + pub task: Arc, + /// The kind of event + pub kind: SchedulingEventKind, +} + +/// The kind of a [`SchedulingEvent`]. +#[derive(Debug, Clone, Copy, Serialize)] +pub enum SchedulingEventKind { + /// The task is about to start executing. + Schedule, + /// The task has stopped executing. + Deschedule, +} + /// A per-CPU task scheduler. pub trait Scheduler: Sync + Send { /// Enqueues a runnable task. @@ -270,6 +320,26 @@ where }; }; + // This redefines `next_task` with the same value it started with, but moves the value out + // temporarily. This avoids an atomic incr and decr. + #[cfg(feature = "capture_scheduling")] + let next_task = { + let producer = get_scheduling_event_producer(); + if let Some(t) = Task::current() { + producer.produce_ref(&SchedulingEvent { + task: t.cloned(), + kind: SchedulingEventKind::Deschedule, + }); + } + + let scheduling_event = SchedulingEvent { + task: next_task, + kind: SchedulingEventKind::Schedule, + }; + producer.produce_ref(&scheduling_event); + scheduling_event.task + }; + // `switch_to_task` will spin if it finds that the next task is still running on some CPU core, // which guarantees soundness regardless of the scheduler implementation. // diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..9e9c5e227 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +black==21.10b0 +cbor2==5.9 +pytest==9.0 diff --git a/test/Makefile b/test/Makefile index cf0c9d15d..cc4448c3d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -25,6 +25,9 @@ EXT2_IMAGE := $(BUILD_DIR)/ext2.img EXFAT_IMAGE := $(BUILD_DIR)/exfat.img RAID1_IMAGE0 := $(BUILD_DIR)/raid1_0.img RAID1_IMAGE1 := $(BUILD_DIR)/raid1_1.img +CAPTURE_IMAGE := $(BUILD_DIR)/capture.img +CAPTURE_LEGACY_IMAGE := $(BUILD_DIR)/capture_legacy.img +CAPTURE_IMAGE_SIZE := 4G INITRAMFS_EMPTY_DIRS := \ $(INITRAMFS)/root \ @@ -255,8 +258,16 @@ $(RAID1_IMAGE0) $(RAID1_IMAGE1): @mke2fs $(RAID1_IMAGE0) @dd if=$(RAID1_IMAGE0) of=$(RAID1_IMAGE1) +# We use truncate for capture files because sparse allocation will save disk-space in the host, the +# writes will mostly be cached in RAM, and realistic access times are not needed. +$(CAPTURE_IMAGE): + truncate -s $(CAPTURE_IMAGE_SIZE) $@ + +$(CAPTURE_LEGACY_IMAGE): + truncate -s $(CAPTURE_IMAGE_SIZE) $@ + .PHONY: build -build: $(INITRAMFS_IMAGE) $(EXT2_IMAGE) $(EXFAT_IMAGE) $(RAID1_IMAGE0) $(RAID1_IMAGE1) +build: $(INITRAMFS_IMAGE) $(EXT2_IMAGE) $(EXFAT_IMAGE) $(RAID1_IMAGE0) $(RAID1_IMAGE1) $(CAPTURE_IMAGE) $(CAPTURE_LEGACY_IMAGE) .PHONY: format format: diff --git a/test/syscall_test/gvisor/run_gvisor_test.sh b/test/syscall_test/gvisor/run_gvisor_test.sh index 244458887..03f119a24 100755 --- a/test/syscall_test/gvisor/run_gvisor_test.sh +++ b/test/syscall_test/gvisor/run_gvisor_test.sh @@ -2,6 +2,22 @@ # SPDX-License-Identifier: MPL-2.0 +# Run gVisor syscall tests. +# +# Usage: run_gvisor_test.sh [TEST_NAME...] +# +# If no arguments are given, all tests are run. If TEST_NAME arguments are given, only those test +# binaries are run (matched exactly, e.g. "futex_test"). They are run with the same configuration as +# this script otherwise would, so the configuration (e.g., +# `test/syscall_test/gvisor/blocklists/futex_test`) will still apply. +# +# To run a subset of tests via the `make run`, pass the test names through INITARGS: +# +# make run AUTO_TEST=syscall SYSCALL_TEST_SUITE=gvisor INITARGS="futex_test" +# make run AUTO_TEST=syscall SYSCALL_TEST_SUITE=gvisor INITARGS="futex_test socket_test" + +# TODO: This should probably allow passing full test selectors as passed to --gtest_filter. + SCRIPT_DIR=$(dirname "$0") TEST_TMP_DIR=${SYSCALL_TEST_WORKDIR:-/tmp} TEST_BIN_DIR=$SCRIPT_DIR/tests @@ -53,7 +69,14 @@ run_one_test(){ rm -f $FAIL_CASES && touch $FAIL_CASES rm -rf $TEST_TMP_DIR/* -for syscall_test in $(find $TEST_BIN_DIR/. -name \*_test) ; do +# Create test list based on arguments +if [ $# -gt 0 ]; then + tests="$*" +else + tests="$(find $TEST_BIN_DIR/. -name \*_test)" +fi + +for syscall_test in "$tests" ; do test_name=$(basename "$syscall_test") run_one_test $test_name if [ $? -eq 0 ] && PASSED_TESTS=$((PASSED_TESTS+1));then diff --git a/test/syscall_test/run_syscall_test.sh b/test/syscall_test/run_syscall_test.sh index ec59a3931..43418af88 100755 --- a/test/syscall_test/run_syscall_test.sh +++ b/test/syscall_test/run_syscall_test.sh @@ -10,13 +10,13 @@ GVISOR_DIR=/opt/gvisor if [ "${SYSCALL_TEST_SUITE}" == "ltp" ]; then echo "Running LTP syscall tests..." - if ! "${LTP_DIR}/run_ltp_test.sh"; then + if ! "${LTP_DIR}/run_ltp_test.sh" "$@"; then echo "Error: LTP syscall tests failed." >&2 exit 1 fi elif [ "${SYSCALL_TEST_SUITE}" == "gvisor" ]; then echo "Running gVisor syscall tests..." - if ! "${GVISOR_DIR}/run_gvisor_test.sh"; then + if ! "${GVISOR_DIR}/run_gvisor_test.sh" "$@"; then echo "Error: gVisor syscall tests failed." >&2 exit 2 fi diff --git a/tools/qemu_args.sh b/tools/qemu_args.sh index 3afa64af1..e1ea51733 100755 --- a/tools/qemu_args.sh +++ b/tools/qemu_args.sh @@ -91,6 +91,8 @@ COMMON_QEMU_ARGS="\ -drive if=none,format=raw,id=x1,file=./test/build/exfat.img \ -drive if=none,format=raw,id=r0,file=./test/build/raid1_0.img,cache=directsync \ -drive if=none,format=raw,id=r1,file=./test/build/raid1_1.img,cache=directsync \ + -drive if=none,format=raw,id=d0,file=./test/build/capture.img \ + -drive if=none,format=raw,id=d1,file=./test/build/capture_legacy.img \ " if [ "$1" = "iommu" ]; then @@ -113,6 +115,8 @@ QEMU_ARGS="\ -device virtio-blk-pci,bus=pcie.0,addr=0x7,drive=x1,serial=vexfat,disable-legacy=on,disable-modern=off,queue-size=64,num-queues=1,request-merging=off,backend_defaults=off,discard=off,write-zeroes=off,event_idx=off,indirect_desc=off,queue_reset=off$IOMMU_DEV_EXTRA \ -device virtio-blk-pci,bus=pcie.0,addr=0x8,drive=r0,serial=raid0,disable-legacy=on,disable-modern=off,queue-size=64,num-queues=1,request-merging=off,backend_defaults=off,discard=off,write-zeroes=off,event_idx=off,indirect_desc=off,queue_reset=off$IOMMU_DEV_EXTRA \ -device virtio-blk-pci,bus=pcie.0,addr=0x9,drive=r1,serial=raid1,disable-legacy=on,disable-modern=off,queue-size=64,num-queues=1,request-merging=off,backend_defaults=off,discard=off,write-zeroes=off,event_idx=off,indirect_desc=off,queue_reset=off$IOMMU_DEV_EXTRA \ + -device virtio-blk-pci,bus=pcie.0,addr=0xa,drive=d0,serial=capture,disable-legacy=on,disable-modern=off,queue-size=64,num-queues=1,request-merging=off,backend_defaults=off,discard=off,write-zeroes=off,event_idx=off,indirect_desc=off,queue_reset=off$IOMMU_DEV_EXTRA \ + -device virtio-blk-pci,bus=pcie.0,addr=0xb,drive=d1,serial=capture_legacy,disable-legacy=on,disable-modern=off,queue-size=64,num-queues=1,request-merging=off,backend_defaults=off,discard=off,write-zeroes=off,event_idx=off,indirect_desc=off,queue_reset=off$IOMMU_DEV_EXTRA \ -device virtio-net-pci,netdev=net01,disable-legacy=on,disable-modern=off$VIRTIO_NET_FEATURES$IOMMU_DEV_EXTRA \ -device virtio-serial-pci,disable-legacy=on,disable-modern=off$IOMMU_DEV_EXTRA \ -device virtconsole,chardev=mux \ @@ -128,6 +132,8 @@ MICROVM_QEMU_ARGS="\ -device virtio-blk-device,drive=x1,serial=vexfat \ -device virtio-blk-device,drive=r0,serial=raid0 \ -device virtio-blk-device,drive=r1,serial=raid1 \ + -device virtio-blk-device,drive=d0,serial=capture \ + -device virtio-blk-device,drive=d1,serial=capture_legacy \ -device virtio-keyboard-device \ -device virtio-net-device,netdev=net01 \ -device virtio-serial-device \