diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5434e4a..c106191 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,6 +19,7 @@ All notable changes to the **AIMBAT** project will be documented in this file.
- Fix importlib error for Python 3.7
- Fix typo on readme
- Listing snapshots when there were non causes error
+- Debug flag from cli commands didn't do anything
### π Documentation
@@ -83,6 +84,9 @@ All notable changes to the **AIMBAT** project will be documented in this file.
- Make data reading more modular
- Move aimbat source to src directory
- Use pandas Timestamp and Timedelta
+- Improve docstrings, io DI pattern, and data source terminology
+- **(core)** Re-arange core, move set_default_event and friends out of core.
+- Active event -> default event
### π New Features
@@ -114,6 +118,9 @@ All notable changes to the **AIMBAT** project will be documented in this file.
- Add bandpass filtering ([#214](https://github.com/pysmo/aimbat/issues/214))
- Add mccc
- Add in-memory seismogram data cache
+- Add JSON datasource
+- Add TUI and supporting changes
+- Implement interactive shell and major documentation update for v2
### π§ͺ Testing
diff --git a/README.md b/README.md
index 2778f8b..3268d12 100644
--- a/README.md
+++ b/README.md
@@ -22,61 +22,76 @@
-Documentation: https://aimbat.readthedocs.io
+Documentation: https://aimbat.pysmo.org
Source Code: https://github.com/pysmo/aimbat
-
---
AIMBAT (Automated and Interactive Measurement of Body wave Arrival Times) is an
-open-source software package for efficiently measuring teleseismic body wave arrival
-times for large seismic arrays [[1]](#1). It is based on a widely used method called
-MCCC (Multi-Channel Cross-Correlation) [[2]](#2). The package is automated in the sense
-of initially aligning seismograms for MCCC, which is achieved by an ICCS (Iterative Cross
-Correlation and Stack) algorithm. Meanwhile, a GUI (graphical user interface) is built to
-perform seismogram quality control interactively. Therefore, user processing time is
-reduced while valuable input from a user's expertise is retained. As a byproduct, SAC
-[[3]](#3) plotting and phase picking functionalities are replicated and enhanced.
-
-Modules and scripts included in the AIMBAT package were developed using
-[Python](http://www.python.org/) and its open-source modules on the Mac OS X platform
-since 2009. The original MCCC [[2]](#2) code was transcribed into Python.
-The GUI of AIMBAT was inspired and initiated at the
-[2009 EarthScope USArray Data Processing and Analysis Short Course](https://www.iris.edu/hq/es_course/content/2009.html).
-AIMBAT runs on Mac OS X, Linux/Unix and Windows thanks to the platform-independent
-feature of Python.
-
-For more information visit the
-[project website](http://www.earth.northwestern.edu/~xlou/aimbat.html) or the
-[pysmo repositories](https://github.com/pysmo).
-
+open-source tool for measuring teleseismic body wave arrival times. Seismograms
+are automatically aligned using the ICCS [Iterative Cross-Correlation and Stack][^1]
+algorithm; picks are then reviewed and refined interactively before a final
+MCCC (Multi-Channel Cross-Correlation) [^2] pass computes the definitive
+arrival times.
+
+## Version 2
+
+AIMBAT v2 is a complete rewrite. It shares the same goal as v1 but none of the
+code.
+
+- **Complete rewrite.** The algorithms are optimised and projects are stored in
+ a SQLite database (via [SQLModel](https://sqlmodel.tiangolo.com)), making them
+ persistent, portable, and inspectable.
+- **Focused scope.** Much of the underlying code has moved into the
+ [pysmo](https://github.com/pysmo/pysmo) library, leaving AIMBAT to focus on
+ the user-facing ICCS β quality-control β MCCC workflow rather than
+ reimplementing general seismogram utilities.
+- **Flexible data storage.** A single project can hold any number of seismic
+ events. Files from different events can live anywhere on disk β no need to
+ keep them in separate directories or follow a particular layout.
+- **Maintainable.** v2 is built on modern, typed Python with a comprehensive
+ test suite and strict dependency management, so it keeps working as the
+ ecosystem evolves.
+- **Multiple interfaces.** AIMBAT can be used via a CLI, an interactive shell,
+ a terminal UI, or directly as a Python library.
+
+## Quick Start
+
+```bash
+pip install aimbat
+
+# Create a project in the current directory
+aimbat project create
+
+# Import SAC files β events and stations are detected automatically
+aimbat data add *.sac
+
+# List events to find their IDs, then set one as the default
+aimbat event list
+aimbat event default
+
+# Open the terminal UI to run ICCS, review picks, and run MCCC
+aimbat tui
+
+# Or work interactively from the shell (tab-completion, command history)
+aimbat shell
+```
## Authors' Contacts
-* [Xiaoting Lou](http://geophysics.earth.northwestern.edu/people/xlou/aimbat.html) Email: xlou at u.northwestern.edu
-
-* [Suzan van der Lee](http://geophysics.earth.northwestern.edu/seismology/suzan/) Email: suzan at northwestern.edu
-
-* [Simon Lloyd](https://www.slloyd.net/) Email: simon at pysmo.org
-
-## References
-
-[1]
-Xiaoting Lou, Suzan van der Lee, and Simon Lloyd (2013),
-AIMBAT: A Python/Matplotlib Tool for Measuring Teleseismic Arrival Times.
-*Seismol. Res. Lett.*, 84(1), 85-93, doi:10.1785/0220120033.
+- Xiaoting Lou β xlou at u.northwestern.edu
+- Suzan van der Lee β suzan at northwestern.edu
+- Simon Lloyd β simon at pysmo.org
-[2]
-VanDecar, J. C., and R. S. Crosson (1990),
-Determination of teleseismic relative phase arrival times using multi-channel
-cross-correlation and
-least squares.
-*Bulletin of the Seismological Society of America*, 80(1), 150β169.
+[^1]: Xiaoting Lou, Suzan van der Lee, and Simon Lloyd, βAIMBAT: A Python/Matplotlib
+ Tool for Measuring Teleseismic Arrival Times.β Seismological Research Letters,
+ vol. 84, no. 1, Jan. 2013, pp. 85β93, .
-[3]
-Goldstein, P., D. Dodge, M. Firpo, and L. Minner (2003),
-SAC2000: Signal processing and analysis tools for seismologists and engineers,
-*International Geophysics*, 81, 1613β1614.
+[^2]: VanDecar, J. C., and R. S. Crosson. βDetermination of Teleseismic
+ Relative Phase Arrival Times Using Multi-Channel Cross-Correlation and
+ Least Squares.β Bulletin of the Seismological Society of America,
+ vol. 80, no. 1, Feb. 1990, pp. 150β69,
+ .
diff --git a/docs/snippets/api_alignment.py b/docs/snippets/api_alignment.py
new file mode 100644
index 0000000..2911948
--- /dev/null
+++ b/docs/snippets/api_alignment.py
@@ -0,0 +1,21 @@
+from sqlmodel import Session
+from aimbat.db import engine
+from aimbat.core import (
+ create_iccs_instance,
+ create_snapshot,
+ get_default_event,
+ run_iccs,
+ run_mccc,
+)
+
+with Session(engine) as session:
+ event = get_default_event(session)
+ assert event is not None
+
+ bound = create_iccs_instance(session, event)
+
+ run_iccs(session, bound.iccs, autoflip=True, autoselect=True)
+ create_snapshot(session, event, comment="after ICCS")
+
+ run_mccc(session, event, bound.iccs, all_seismograms=False)
+ create_snapshot(session, event, comment="after MCCC")
diff --git a/docs/snippets/api_deduplicate.py b/docs/snippets/api_deduplicate.py
new file mode 100644
index 0000000..3f026af
--- /dev/null
+++ b/docs/snippets/api_deduplicate.py
@@ -0,0 +1,96 @@
+"""
+Deduplicate events that were imported from sources reporting slightly different
+origin times for the same earthquake.
+
+Background
+----------
+``add_data_to_project`` deduplicates stations by SEED code
+``(network, name, location, channel)`` β so importing the same station twice,
+even with different coordinates, always reuses the existing record. Station
+duplicates therefore cannot arise through the normal import path.
+
+Events are deduplicated by exact origin time. When two data sources report
+the same earthquake with origin times that differ by a second or two, they are
+stored as *separate* ``AimbatEvent`` records. This script finds such
+near-duplicate events, merges their seismograms into the canonical record
+(the one with the most seismograms), averages the location and depth, then
+removes the duplicates.
+
+Run this script *before* starting any processing, and take a snapshot
+afterwards so the clean state is recoverable.
+"""
+
+from pandas import Timedelta
+from sqlmodel import Session, select
+
+from aimbat.db import engine
+from aimbat.models import AimbatEvent
+
+# Merge events whose origin times differ by less than this value.
+TIME_TOLERANCE = Timedelta(seconds=10)
+
+
+def _mean(values: list[float]) -> float:
+ return sum(values) / len(values)
+
+
+def _mean_opt(values: list[float | None]) -> float | None:
+ clean = [v for v in values if v is not None]
+ return sum(clean) / len(clean) if clean else None
+
+
+def deduplicate_events(session: Session, tolerance: Timedelta = TIME_TOLERANCE) -> int:
+ """Merge event records whose origin times are within *tolerance*.
+
+ Events are sorted by time and clustered greedily: a new cluster begins
+ whenever the gap to the previous event exceeds *tolerance*.
+
+ For each cluster the record with the most seismograms is kept as the
+ canonical entry; its location and depth are updated to the group mean.
+
+ Returns the number of duplicate records removed.
+ """
+ events = sorted(
+ session.exec(select(AimbatEvent)).all(),
+ key=lambda e: e.time,
+ )
+
+ # Build clusters of near-simultaneous events.
+ clusters: list[list[AimbatEvent]] = []
+ for event in events:
+ if clusters and event.time - clusters[-1][-1].time <= tolerance:
+ clusters[-1].append(event)
+ else:
+ clusters.append([event])
+
+ removed = 0
+ for cluster in clusters:
+ if len(cluster) < 2:
+ continue
+
+ canonical = max(cluster, key=lambda e: len(e.seismograms))
+ duplicates = [e for e in cluster if e.id != canonical.id]
+
+ # Set location / depth to the group mean.
+ canonical.latitude = _mean([e.latitude for e in cluster])
+ canonical.longitude = _mean([e.longitude for e in cluster])
+ canonical.depth = _mean_opt([e.depth for e in cluster])
+
+ for dup in duplicates:
+ for seis in list(dup.seismograms):
+ seis.event_id = canonical.id
+ session.add(seis)
+ session.flush() # apply FK changes before deleting the row
+ session.delete(dup)
+ removed += 1
+
+ session.add(canonical)
+
+ session.commit()
+ return removed
+
+
+with Session(engine) as session:
+ n = deduplicate_events(session)
+
+print(f"Removed {n} duplicate event(s).")
diff --git a/docs/snippets/api_load_project.py b/docs/snippets/api_load_project.py
new file mode 100644
index 0000000..b95553b
--- /dev/null
+++ b/docs/snippets/api_load_project.py
@@ -0,0 +1,229 @@
+"""
+Load a project from SAC files that carry no event/station headers.
+
+Layout:
+ - 3 events
+ - 10 broadband stations
+ - 20 seismograms: events 1 and 2 recorded at 7 stations each,
+ event 3 recorded at 6 stations
+"""
+
+import json
+from pathlib import Path
+from typing import Any
+
+from sqlmodel import Session, select
+
+from aimbat.db import engine
+from aimbat.core import (
+ add_data_to_project,
+ create_project,
+ create_snapshot,
+ set_default_event,
+)
+from aimbat.io import DataType
+from aimbat.models import AimbatEvent, AimbatStation
+
+# ------------------------------------------------------------------ #
+# Metadata #
+# ------------------------------------------------------------------ #
+
+EVENTS: list[dict[str, Any]] = [
+ {
+ "time": "2024-01-12T08:14:33Z",
+ "latitude": 37.52,
+ "longitude": 143.04,
+ "depth": 35.0,
+ },
+ {
+ "time": "2024-02-28T21:07:55Z",
+ "latitude": -23.11,
+ "longitude": -67.89,
+ "depth": 120.0,
+ },
+ {
+ "time": "2024-03-09T03:51:20Z",
+ "latitude": 51.72,
+ "longitude": 178.35,
+ "depth": 55.0,
+ },
+]
+
+STATIONS: list[dict[str, Any]] = [
+ {
+ "name": "ANMO",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 34.946,
+ "longitude": -106.457,
+ "elevation": 1820.0,
+ },
+ {
+ "name": "COLA",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 64.874,
+ "longitude": -147.861,
+ "elevation": 84.0,
+ },
+ {
+ "name": "GUMO",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 13.589,
+ "longitude": 144.868,
+ "elevation": 74.0,
+ },
+ {
+ "name": "HRV",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 42.506,
+ "longitude": -71.558,
+ "elevation": 200.0,
+ },
+ {
+ "name": "MAJO",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 36.536,
+ "longitude": 138.204,
+ "elevation": 399.0,
+ },
+ {
+ "name": "MIDW",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 28.216,
+ "longitude": -177.370,
+ "elevation": 150.0,
+ },
+ {
+ "name": "POHA",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 19.757,
+ "longitude": -155.531,
+ "elevation": 1936.0,
+ },
+ {
+ "name": "SSPA",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 40.636,
+ "longitude": -77.888,
+ "elevation": 270.0,
+ },
+ {
+ "name": "TATO",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 24.975,
+ "longitude": 121.498,
+ "elevation": 75.0,
+ },
+ {
+ "name": "YSS",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 46.958,
+ "longitude": 142.760,
+ "elevation": 89.0,
+ },
+]
+
+# Which stations recorded each event (indices into STATIONS list).
+# 7 + 7 + 6 = 20 seismograms total.
+EVENT_STATION_MAP = {
+ 0: [0, 1, 2, 3, 4, 5, 6], # event 1 β 7 seismograms
+ 1: [0, 1, 2, 3, 4, 5, 6], # event 2 β 7 seismograms
+ 2: [0, 1, 2, 3, 4, 5], # event 3 β 6 seismograms
+}
+
+# ------------------------------------------------------------------ #
+# Helpers #
+# ------------------------------------------------------------------ #
+
+
+def write_json(data: dict, path: Path) -> Path:
+ path.write_text(json.dumps(data))
+ return path
+
+
+def sac_path(event_idx: int, station_idx: int) -> Path:
+ """Return the path to the SAC file for a given event/station pair."""
+ return Path(f"data/ev{event_idx + 1:02d}_st{station_idx + 1:02d}.sac")
+
+
+# ------------------------------------------------------------------ #
+# Main #
+# ------------------------------------------------------------------ #
+
+workdir = Path("json_metadata")
+workdir.mkdir(exist_ok=True)
+
+# 1. Initialise project
+create_project(engine)
+
+with Session(engine) as session:
+
+ # 2. Register events from JSON
+ event_files = [
+ write_json(ev, workdir / f"event_{i:02d}.json") for i, ev in enumerate(EVENTS)
+ ]
+ add_data_to_project(session, event_files, DataType.JSON_EVENT)
+
+ # 3. Register stations from JSON
+ station_files = [
+ write_json(st, workdir / f"station_{i:02d}.json")
+ for i, st in enumerate(STATIONS)
+ ]
+ add_data_to_project(session, station_files, DataType.JSON_STATION)
+
+ # 4. Retrieve the newly created records
+ events = session.exec(select(AimbatEvent)).all()
+ stations = session.exec(select(AimbatStation)).all()
+
+ # Build lookup maps by (time, network+name) so insertion order doesn't matter
+ event_by_time = {str(e.time)[:19]: e for e in events}
+ station_by_key = {(s.network, s.name): s for s in stations}
+
+ # 5. Add SAC files, linking each to its pre-registered event and station
+ for ev_idx, st_indices in EVENT_STATION_MAP.items():
+ ev_time = EVENTS[ev_idx]["time"][:19]
+ db_event = event_by_time[ev_time]
+
+ for st_idx in st_indices:
+ st_meta = STATIONS[st_idx]
+ db_station = station_by_key[(st_meta["network"], st_meta["name"])]
+
+ add_data_to_project(
+ session,
+ [sac_path(ev_idx, st_idx)],
+ DataType.SAC,
+ event_id=db_event.id,
+ station_id=db_station.id,
+ )
+
+ # 6. Set the event with the most seismograms as the default
+ events = session.exec(select(AimbatEvent)).all()
+ default = max(events, key=lambda e: len(e.seismograms))
+ set_default_event(session, default)
+
+ # 7. Snapshot the initial state before any processing
+ create_snapshot(session, default, comment="initial import")
+
+print("Project ready.")
+print(f" Events: {len(events)}")
+print(f" Stations: {len(stations)}")
+print(f" Default event: {default.id} ({len(default.seismograms)} seismograms)")
diff --git a/docs/snippets/api_query.py b/docs/snippets/api_query.py
new file mode 100644
index 0000000..ce09560
--- /dev/null
+++ b/docs/snippets/api_query.py
@@ -0,0 +1,15 @@
+from sqlmodel import Session, select
+from aimbat.db import engine
+from aimbat.models import AimbatEvent, AimbatSeismogram
+
+with Session(engine) as session:
+ events = session.exec(select(AimbatEvent)).all()
+ for event in events:
+ print(f"{event.time} {len(event.seismograms)} seismograms")
+
+ # Filter β seismograms marked as selected
+ selected = session.exec(
+ select(AimbatSeismogram).where(
+ AimbatSeismogram.parameters.has(select=True) # type: ignore[attr-defined]
+ )
+ ).all()
diff --git a/docs/usage/api.md b/docs/usage/api.md
index a3394e0..8189341 100644
--- a/docs/usage/api.md
+++ b/docs/usage/api.md
@@ -2,7 +2,8 @@
If none of the above interfaces suit your needs, or you want to write custom
scripts, you can use the AIMBAT Python API. This is the most powerful way to
-interact with your projects.
+interact with your projects. View the full [API reference](../api/aimbat.md)
+here.
## Core Concepts
@@ -14,3 +15,159 @@ The API is built on three main components:
(`aimbat.core`).
3. **Database Session**: A SQLAlchemy session used to track changes and
interact with the project database.
+
+## Project Location
+
+By default AIMBAT reads and writes `aimbat.db` in the current directory. Set
+`AIMBAT_PROJECT` to use a different path:
+
+```bash
+export AIMBAT_PROJECT=/path/to/my/project.db
+```
+
+The `aimbat.db.engine` singleton picks this up automatically, so scripts that
+import it will use the same database as the CLI.
+
+## Session Management
+
+Every database operation requires a `Session`. Use it as a context manager so
+it is always closed cleanly:
+
+```python
+from sqlmodel import Session
+from aimbat.db import engine
+
+with Session(engine) as session:
+ # query or modify data here
+ pass
+```
+
+Changes accumulate in the session and are written to disk only when
+`session.commit()` is called (or when you call a core function that commits
+internally). If an exception is raised before committing, the session is rolled
+back automatically.
+
+## Creating a Project
+
+```python
+from aimbat.db import engine
+from aimbat.core import create_project
+
+create_project(engine)
+```
+
+This is a one-time operation that creates the schema and the SQLite triggers
+that enforce the single-default-event constraint and track modification times.
+It raises `RuntimeError` if the schema already exists.
+
+## Adding Data
+
+The central function is `add_data_to_project`:
+
+```python
+from sqlmodel import Session
+from aimbat.db import engine
+from aimbat.core import add_data_to_project
+from aimbat.io import DataType
+
+with Session(engine) as session:
+ add_data_to_project(session, paths, DataType.SAC)
+```
+
+The `DataType` enum controls what is read from each source:
+
+| `DataType` | What is created |
+|------------------|------------------------------------------|
+| `SAC` | Event + Station + Seismogram |
+| `JSON_EVENT` | Event only (no seismogram) |
+| `JSON_STATION` | Station only (no seismogram) |
+
+### JSON formats
+
+**Event** (`DataType.JSON_EVENT`):
+
+```json
+{
+ "time": "2024-03-15T14:22:11Z",
+ "latitude": 37.5,
+ "longitude": 143.0,
+ "depth": 35.0
+}
+```
+
+**Station** (`DataType.JSON_STATION`):
+
+```json
+{
+ "name": "ANMO",
+ "network": "IU",
+ "location": "00",
+ "channel": "BHZ",
+ "latitude": 34.946,
+ "longitude": -106.457,
+ "elevation": 1820.0
+}
+```
+
+### Providing event or station metadata externally
+
+SAC files from some sources omit event or station headers. In that case, add
+the metadata separately first and then link the SAC files to the resulting
+database records using `event_id` and `station_id`:
+
+```python
+with Session(engine) as session:
+ add_data_to_project(session, [event_json], DataType.JSON_EVENT)
+ add_data_to_project(session, [station_json], DataType.JSON_STATION)
+
+ event = session.exec(select(AimbatEvent)).one()
+ station = session.exec(select(AimbatStation)).one()
+
+ add_data_to_project(
+ session,
+ sac_files,
+ DataType.SAC,
+ event_id=event.id,
+ station_id=station.id,
+ )
+```
+
+## Worked Example
+
+The script below builds a complete project from scratch. It loads **3 events**,
+**10 stations**, and **20 seismograms** where the SAC files carry waveform data
+but no event or station headers β all metadata is provided via JSON.
+
+```python
+--8<-- "docs/snippets/api_load_project.py"
+```
+
+## Querying the Database
+
+Models can be queried directly using SQLModel's `select`:
+
+```python
+--8<-- "docs/snippets/api_query.py"
+```
+
+## Deduplicating Events
+
+`add_data_to_project` deduplicates stations automatically by SEED code
+`(network, name, location, channel)`, so importing the same station from
+multiple sources never creates duplicate records.
+
+Events are a different story: they are deduplicated by exact origin time.
+When two data sources report the same earthquake with times that differ by a
+second or two, they are stored as separate `AimbatEvent` records. The script
+below detects such near-duplicates, merges their seismograms into the record
+with the most data, averages the location and depth, and removes the extras.
+
+```python
+--8<-- "docs/snippets/api_deduplicate.py"
+```
+
+## Running Alignment
+
+```python
+--8<-- "docs/snippets/api_alignment.py"
+```
diff --git a/docs/usage/cli.md b/docs/usage/cli.md
index c2894a1..9cc2955 100644
--- a/docs/usage/cli.md
+++ b/docs/usage/cli.md
@@ -3,11 +3,11 @@
The CLI is the primary tool for project administration, data import, and batch
processing. Every command has a `--help` flag that prints its full option list.
-!!! warning "Parameter validation"
- The CLI writes parameter values directly to the database without
- data-aware validation. For example, it will not prevent you from setting a
- time window that extends beyond the available data. Use the
- [TUI](tui.md) or [Shell](shell.md) when validation matters.
+!!! note "Parameter validation"
+ Event parameters (e.g. time window, bandpass settings) are validated against
+ the seismogram data before being written to the database β invalid values are
+ rejected with an error message. Other fields (e.g. raw seismogram or station
+ attributes) are written directly without data-aware checks.
## Project location
diff --git a/docs/usage/shell.md b/docs/usage/shell.md
index 5b0c43c..42b3a30 100644
--- a/docs/usage/shell.md
+++ b/docs/usage/shell.md
@@ -1 +1,79 @@
-# AIMBAT Shell
+# Interactive Shell
+
+The AIMBAT shell is a persistent, interactive session that wraps all CLI commands
+with tab-completion, command history, and live ICCS feedback. It is the recommended
+interface when working interactively from the terminal.
+
+## Starting the shell
+
+```bash
+aimbat shell # start in the context of the default event
+aimbat shell --event # start in the context of a specific event
+```
+
+The `--event` flag accepts a full UUID or any unique prefix. It sets the shell's
+initial event context without changing the database default event.
+
+## Event context
+
+The shell maintains a local **event context** β the event that all commands
+operate on. This is independent of the database default event and is never
+written to the database.
+
+The prompt reflects the current context:
+
+```
+aimbat> # using the database default event
+aimbat [6a4a1b2c]> # using a specific event (first 8 chars of ID)
+```
+
+### Switching events
+
+```
+event switch # switch to a specific event
+event switch # reset to the database default event
+```
+
+`event switch` accepts a full UUID or any unique prefix. Switching immediately
+reports the ICCS status for the new event.
+
+## Commands
+
+All CLI commands are available in the shell, without the leading `aimbat`. For
+example, `aimbat event list` becomes simply `event list`.
+
+## Tab completion and history
+
+Press **Tab** at any point to complete commands, subcommands, and flags.
+Command history is saved to `~/.aimbat_history` and persists across sessions.
+Use the up/down arrow keys to navigate it.
+
+## ICCS status
+
+After every command, the shell checks whether the ICCS instance for the current
+event is still valid and prints a status line when something changes:
+
+```
+ICCS ready (event 6a4a1b2c)
+ICCS not ready β
+```
+
+The status is also printed on startup. A warm ICCS cache is reused across
+commands in the same session, so repeated operations on the same event avoid
+redundant data loading.
+
+## Parameter validation
+
+Setting a parameter that would produce an invalid ICCS configuration is
+rejected before anything is written to the database:
+
+```
+aimbat> event parameter set window_pre 999
+ValueError: ICCS rejected window_pre=999:
+```
+
+The database is left unchanged on rejection.
+
+## Exiting
+
+Type `exit`, `quit`, or `q`, or press **Ctrl+D**.
diff --git a/src/aimbat/_cli/common.py b/src/aimbat/_cli/common.py
index 9132b0d..843a2d9 100644
--- a/src/aimbat/_cli/common.py
+++ b/src/aimbat/_cli/common.py
@@ -127,27 +127,25 @@ class TableParameters:
"Shorten UUIDs and format data."
-# ------------------------------------------------
-# Hints for error messages
-# ------------------------------------------------
+# -------------------------------------------------
+# Decorators
+# -------------------------------------------------
-@dataclass(frozen=True)
-class CliHints:
- """Hints for error messages."""
+def print_error_panel(e: Exception) -> None:
+ """Print an exception to the console in a red panel."""
+ from rich.console import Console
+ from rich.panel import Panel
- SET_DEFAULT_EVENT = (
- "Hint: set a default event with `aimbat event default `."
+ console = Console(stderr=True)
+ panel = Panel(
+ f"{e}",
+ title="Error",
+ title_align="left",
+ border_style="red",
+ expand=True,
)
- LIST_EVENTS = "Hint: view available events with `aimbat event list`."
-
-
-HINTS = CliHints()
-
-
-# -------------------------------------------------
-# Decorators
-# -------------------------------------------------
+ console.print(panel)
def simple_exception[F: Callable[..., Any]](func: F) -> F:
@@ -158,8 +156,6 @@ def simple_exception[F: Callable[..., Any]](func: F) -> F:
callable unchanged.
"""
from functools import wraps
- from rich.console import Console
- from rich.panel import Panel
import sys
@wraps(func)
@@ -169,15 +165,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
except Exception as e:
- console = Console()
- panel = Panel(
- f"{e}",
- title="Error",
- title_align="left",
- border_style="red",
- expand=True,
- )
- console.print(panel)
+ print_error_panel(e)
sys.exit(1)
return wrapper # type: ignore
diff --git a/src/aimbat/_cli/event.py b/src/aimbat/_cli/event.py
index ec969c0..84010db 100644
--- a/src/aimbat/_cli/event.py
+++ b/src/aimbat/_cli/event.py
@@ -108,7 +108,7 @@ def cli_event_parameter_set(
with Session(engine) as session:
event = resolve_event(session, global_parameters.event_id)
- set_event_parameter(session, event, name, parsed_value)
+ set_event_parameter(session, event, name, parsed_value, validate_iccs=True)
@parameter.command(name="dump")
diff --git a/src/aimbat/_cli/shell.py b/src/aimbat/_cli/shell.py
new file mode 100644
index 0000000..fce073e
--- /dev/null
+++ b/src/aimbat/_cli/shell.py
@@ -0,0 +1,257 @@
+"""Interactive AIMBAT shell with tab-completion and command history.
+
+All CLI commands are available. Press Tab to complete commands, Ctrl+D
+or type `exit` to leave.
+
+Shell-only commands:
+ event switch [ID] Switch the shell's event context without changing the
+ database default. Omit the ID to reset to the default event.
+"""
+
+import uuid
+from cyclopts import App, Parameter
+from typing import Annotated, TYPE_CHECKING
+from .common import simple_exception, _event_id_converter, print_error_panel
+
+if TYPE_CHECKING:
+ from rich.console import Console
+ from aimbat.core import BoundICCS
+
+app = App(name="shell", help=__doc__, help_format="markdown")
+
+
+def _build_completion_dict(cyclopts_app: App) -> dict[str, dict | None]:
+ """Recursively build a NestedCompleter dict from a cyclopts app tree."""
+ skip: set[str] = set(cyclopts_app.help_flags)
+ if hasattr(cyclopts_app, "version_flags"):
+ skip.update(cyclopts_app.version_flags)
+ result: dict[str, dict | None] = {}
+
+ # Flags from this app's own default command.
+ if cyclopts_app.default_command is not None:
+ for arg in cyclopts_app.assemble_argument_collection():
+ if arg.show:
+ for flag in arg.names:
+ if flag.startswith("-") and flag not in skip:
+ result[flag] = None
+
+ # Subcommands (recurse).
+ for name in cyclopts_app:
+ if name in skip:
+ continue
+ sub = cyclopts_app[name]
+ nested = _build_completion_dict(sub)
+ result[name] = nested if nested else None
+ return result
+
+
+def _extract_event_flag(tokens: list[str]) -> str | None:
+ """Return the value of --event / --event-id from a token list, or None."""
+ flags = {"--event", "--event-id"}
+ for i, tok in enumerate(tokens):
+ if tok in flags and i + 1 < len(tokens):
+ return tokens[i + 1]
+ for flag in flags:
+ if tok.startswith(f"{flag}="):
+ return tok.split("=", 1)[1]
+ return None
+
+
+def _inject_event(tokens: list[str], event_id: uuid.UUID) -> list[str]:
+ """Append --event to tokens unless an event flag is already present."""
+ if _extract_event_flag(tokens) is None:
+ return tokens + ["--event", str(event_id)]
+ return tokens
+
+
+def _parse_event_id(value: str) -> uuid.UUID:
+ """Parse a full UUID or unique prefix into a UUID.
+
+ Mirrors the --event converter used by CLI commands so that `event switch`
+ accepts the same shortened prefixes.
+ """
+ try:
+ return uuid.UUID(value)
+ except ValueError:
+ from aimbat.db import engine
+ from aimbat.utils import string_to_uuid
+ from aimbat.models import AimbatEvent
+ from sqlmodel import Session
+
+ with Session(engine) as session:
+ return string_to_uuid(session, value, AimbatEvent)
+
+
+def _check_iccs(
+ console: "Console",
+ prev: "BoundICCS | None",
+ *,
+ startup: bool = False,
+ event_id: uuid.UUID | None = None,
+) -> "BoundICCS | None":
+ """Query ICCS status and print a line only when something changes.
+
+ On startup (`startup=True`) always prints. On subsequent calls prints only
+ when status changes: stale cache rebuilt, event changed, ICCS became invalid,
+ or recovered.
+
+ Args:
+ console: Rich console for output.
+ prev: The BoundICCS from the previous check, or None.
+ startup: If True, always print regardless of previous state.
+ event_id: Event to check ICCS for, or None to use the default event.
+
+ Returns:
+ The current BoundICCS, or None if unavailable.
+ """
+ from aimbat.db import engine
+ from aimbat.core import resolve_event, create_iccs_instance
+ from sqlmodel import Session
+
+ try:
+ with Session(engine) as session:
+ event = resolve_event(session, event_id)
+ bound = create_iccs_instance(session, event)
+ changed = (
+ prev is None
+ or prev.event_id != bound.event_id
+ or bound.created_at != prev.created_at
+ )
+ if startup or changed:
+ console.print(f"[green]ICCS ready[/green] (event {str(event.id)[:8]})")
+ return bound
+ except Exception as exc:
+ if startup or prev is not None:
+ # startup: always report; post-command: only report on transition
+ console.print(f"[yellow]ICCS not ready[/yellow] β {exc}")
+ return None
+
+
+@app.default
+@simple_exception
+def cli_shell(
+ *,
+ event_id: Annotated[
+ uuid.UUID | None,
+ Parameter(
+ name=["--event", "--event-id"],
+ help="Start the shell in the context of a specific event. "
+ "Full UUID or any unique prefix as shown in the table. "
+ "Does not change the database default event.",
+ converter=_event_id_converter,
+ ),
+ ] = None,
+) -> None:
+ """Start an interactive AIMBAT shell."""
+ from aimbat.app import app as aimbat_app
+ from prompt_toolkit import PromptSession
+ from prompt_toolkit.completion import NestedCompleter
+ from prompt_toolkit.history import FileHistory
+ from cyclopts import CycloptsError
+ from rich.console import Console
+ from pathlib import Path
+ import shlex
+
+ console = Console()
+
+ # Shell-local event context β None means "use DB default event".
+ # Modified by `event switch`; never written to the database.
+ shell_event_id: uuid.UUID | None = event_id
+
+ completion_dict = _build_completion_dict(aimbat_app)
+ completion_dict.pop("shell", None)
+ completion_dict.pop("tui", None)
+
+ # Inject the shell-only `event switch` subcommand into tab completion.
+ event_completions = completion_dict.get("event")
+ if isinstance(event_completions, dict):
+ event_completions["switch"] = None
+
+ import sys
+
+ pt_session: PromptSession[str] | None = None
+ if sys.stdin.isatty():
+ pt_session = PromptSession(
+ history=FileHistory(str(Path.home() / ".aimbat_history")),
+ completer=NestedCompleter.from_nested_dict(completion_dict),
+ complete_while_typing=True,
+ )
+
+ console.print(
+ "[bold]AIMBAT shell[/bold] (Tab to complete, Ctrl+D or [bold]exit[/bold] to quit)"
+ )
+
+ current_bound = _check_iccs(console, None, startup=True, event_id=shell_event_id)
+
+ def _prompt() -> str:
+ if shell_event_id is not None:
+ return f"aimbat [{str(shell_event_id)[:8]}]> "
+ return "aimbat> "
+
+ while True:
+ try:
+ if pt_session is not None:
+ text = pt_session.prompt(_prompt).strip()
+ else:
+ raw = sys.stdin.readline()
+ if not raw:
+ break
+ text = raw.strip()
+ except KeyboardInterrupt:
+ continue
+ except EOFError:
+ break
+
+ if not text:
+ continue
+ if text in ("exit", "quit", "q"):
+ break
+
+ try:
+ tokens = shlex.split(text)
+ except ValueError as exc:
+ console.print(f"[red]Parse error:[/red] {exc}")
+ continue
+
+ # Strip a leading "aimbat" token typed out of habit and remind the user.
+ if tokens and tokens[0] == "aimbat":
+ tokens = tokens[1:]
+ console.print("[dim]Tip: no need to type 'aimbat' in the shell.[/dim]")
+ if not tokens:
+ continue
+
+ # Shell-only: event switch [id] β changes context without touching the DB.
+ if tokens[:2] == ["event", "switch"]:
+ if len(tokens) < 3:
+ # No argument: reset to DB default event.
+ shell_event_id = None
+ current_bound = _check_iccs(console, None, startup=True, event_id=None)
+ else:
+ try:
+ new_event_id = _parse_event_id(tokens[2])
+ shell_event_id = new_event_id
+ current_bound = _check_iccs(
+ console, None, startup=True, event_id=shell_event_id
+ )
+ except Exception as exc:
+ print_error_panel(exc)
+ continue
+
+ # Inject the shell event context into commands that don't override it.
+ if shell_event_id is not None:
+ tokens = _inject_event(tokens, shell_event_id)
+
+ try:
+ aimbat_app(tokens, exit_on_error=False)
+
+ # Check ICCS for whichever event was actually targeted by the command.
+ effective_flag = _extract_event_flag(tokens)
+ check_event_id = (
+ _parse_event_id(effective_flag) if effective_flag else shell_event_id
+ )
+ current_bound = _check_iccs(console, current_bound, event_id=check_event_id)
+ except (CycloptsError, SystemExit):
+ # Errors already printed by Cyclopts or subcommand
+ pass
+ except Exception as exc:
+ print_error_panel(exc)
diff --git a/src/aimbat/_tui/app.py b/src/aimbat/_tui/app.py
index 43b639d..108bd01 100644
--- a/src/aimbat/_tui/app.py
+++ b/src/aimbat/_tui/app.py
@@ -4,6 +4,7 @@
import uuid
from collections.abc import Callable
+from contextlib import suppress
from pathlib import Path
from pydantic import ValidationError
@@ -39,10 +40,10 @@
create_snapshot,
delete_seismogram_by_id,
reset_seismogram_parameters_by_id,
- sync_iccs_parameters,
delete_snapshot_by_id,
delete_station_by_id,
get_default_event,
+ resolve_event,
rollback_to_snapshot_by_id,
run_iccs,
run_mccc,
@@ -173,13 +174,11 @@ def on_mount(self) -> None:
# Prime _last_known_default_id so the first poll doesn't fire a
# spurious refresh_all().
- try:
- with Session(engine) as session:
- self._last_known_default_id: uuid.UUID | None = get_default_event(
- session
- ).id
- except (NoResultFound, RuntimeError):
- self._last_known_default_id = None
+ with Session(engine) as session:
+ _default = get_default_event(session)
+ self._last_known_default_id: uuid.UUID | None = (
+ _default.id if _default is not None else None
+ )
self.set_interval(5, self._check_iccs_staleness)
self._create_iccs()
@@ -191,10 +190,8 @@ def on_tab_activated(self, event: TabbedContent.TabActivated) -> None:
self._active_tab = event.pane.id
self.refresh_bindings()
if not isinstance(self.focused, Tabs):
- try:
+ with suppress(Exception):
event.pane.query_one(DataTable).focus()
- except Exception:
- pass
def check_action(self, action: str, parameters: tuple[object, ...]) -> bool | None:
tab = getattr(self, "_active_tab", "")
@@ -217,7 +214,7 @@ def _get_current_event(self, session: Session) -> AimbatEvent:
if event is not None:
return event
self._current_event_id = None
- return get_default_event(session)
+ return resolve_event(session)
# ------------------------------------------------------------------
# ICCS lifecycle
@@ -309,10 +306,10 @@ def _check_iccs_staleness(self) -> None:
try:
with Session(engine) as session:
event = self._get_current_event(session)
- try:
- default_id: uuid.UUID | None = get_default_event(session).id
- except NoResultFound:
- default_id = None
+ _default = get_default_event(session)
+ default_id: uuid.UUID | None = (
+ _default.id if _default is not None else None
+ )
changed = False
if default_id != self._last_known_default_id:
self._last_known_default_id = default_id
@@ -336,10 +333,8 @@ def _refresh_event_bar(self) -> None:
try:
with Session(engine) as session:
event = self._get_current_event(session)
- try:
- default_id = get_default_event(session).id
- except NoResultFound:
- default_id = None
+ _default = get_default_event(session)
+ default_id = _default.id if _default is not None else None
marker = "β" if event.id == default_id else "βΆ"
iccs_status = (
" β ICCS ready" if self._bound_iccs is not None else " β no ICCS"
@@ -368,15 +363,13 @@ def _refresh_seismograms(self) -> None:
ccnorm_map: dict[uuid.UUID, float] = {}
if self._bound_iccs is not None:
- try:
+ with suppress(Exception):
for iccs_seis, ccnorm in zip(
self._bound_iccs.iccs.seismograms, self._bound_iccs.iccs.ccnorms
):
ccnorm_map[iccs_seis.extra["id"]] = float(ccnorm)
- except Exception:
- pass
- try:
+ with suppress(NoResultFound, RuntimeError):
with Session(engine) as session:
event = self._get_current_event(session)
seismograms = sorted(
@@ -410,8 +403,6 @@ def _refresh_seismograms(self) -> None:
cc,
key=str(seis.id),
)
- except (NoResultFound, RuntimeError):
- pass
if table.row_count > 0:
table.move_cursor(row=min(saved_row, table.row_count - 1))
@@ -419,7 +410,7 @@ def _refresh_parameters(self) -> None:
table = self.query_one("#parameter-table", DataTable)
saved_row = table.cursor_row
table.clear()
- try:
+ with suppress(NoResultFound, RuntimeError):
with Session(engine) as session:
event = self._get_current_event(session)
p = event.parameters
@@ -434,8 +425,6 @@ def _refresh_parameters(self) -> None:
label = field_info.title or attr
desc = field_info.description or ""
table.add_row(label, display, desc, key=attr)
- except (NoResultFound, RuntimeError):
- pass
if table.row_count > 0:
table.move_cursor(row=min(saved_row, table.row_count - 1))
@@ -443,7 +432,7 @@ def _refresh_stations(self) -> None:
table = self.query_one("#station-table", DataTable)
saved_row = table.cursor_row
table.clear()
- try:
+ with suppress(NoResultFound, RuntimeError):
with Session(engine) as session:
event = self._get_current_event(session)
seen: set[uuid.UUID] = set()
@@ -468,8 +457,6 @@ def _refresh_stations(self) -> None:
elev,
key=str(st.id),
)
- except (NoResultFound, RuntimeError):
- pass
if table.row_count > 0:
table.move_cursor(row=min(saved_row, table.row_count - 1))
@@ -477,7 +464,7 @@ def _refresh_snapshots(self) -> None:
table = self.query_one("#snapshot-table", DataTable)
saved_row = table.cursor_row
table.clear()
- try:
+ with suppress(NoResultFound, RuntimeError):
with Session(engine) as session:
event = self._get_current_event(session)
for snap in event.snapshots:
@@ -496,8 +483,6 @@ def _refresh_snapshots(self) -> None:
flipped_count,
key=str(snap.id),
)
- except (NoResultFound, RuntimeError):
- pass
if table.row_count > 0:
table.move_cursor(row=min(saved_row, table.row_count - 1))
@@ -577,24 +562,12 @@ def on_input(raw: str | None) -> None:
self.push_screen(ParameterInputModal(label, current_str, unit), on_input)
def _apply_parameter(self, attr: str, value: object) -> None:
- """Write a parameter to the DB and sync to the in-memory ICCS object."""
- iccs = self._bound_iccs.iccs if self._bound_iccs is not None else None
-
- # Validate with ICCS first β before touching the DB β so invalid values
- # are rejected without being persisted.
- if iccs is not None and hasattr(iccs, attr):
- try:
- setattr(iccs, attr, value)
- iccs.clear_cache()
- except ValueError as exc:
- self.notify(str(exc), severity="error")
- return
-
+ """Validate, write a parameter to the DB, and rebuild ICCS."""
try:
with Session(engine) as session:
event = self._get_current_event(session)
if attr in {p.value for p in EventParameter}:
- set_event_parameter(session, event, EventParameter(attr), value) # type: ignore[call-overload]
+ set_event_parameter(session, event, EventParameter(attr), value, validate_iccs=True) # type: ignore[call-overload]
else:
# mccc_damp / mccc_min_ccnorm β not in EventParameter enum
validated = AimbatEventParametersBase.model_validate(
@@ -608,20 +581,12 @@ def _apply_parameter(self, attr: str, value: object) -> None:
e["msg"].removeprefix("Value error, ") for e in exc.errors()
)
self.notify(msgs, severity="error")
- self._create_iccs() # revert ICCS to DB state
return
except Exception as exc:
self.notify(str(exc), severity="error")
- self._create_iccs() # revert ICCS to DB state
return
- if self._bound_iccs is None:
- # Parameter change may have fixed previously invalid ranges.
- self._create_iccs()
- else:
- # Acknowledge our own write so staleness check doesn't recreate.
- self._bound_iccs.created_at = Timestamp.now("UTC")
-
+ self._create_iccs()
self._refresh_parameters()
self._refresh_seismograms()
self._refresh_event_bar()
@@ -752,12 +717,7 @@ def on_confirm(confirmed: bool | None) -> None:
try:
with Session(engine) as session:
rollback_to_snapshot_by_id(session, uuid.UUID(snap_id))
- if self._bound_iccs is not None:
- event = self._get_current_event(session)
- sync_iccs_parameters(session, event, self._bound_iccs.iccs)
- self._bound_iccs.created_at = Timestamp.now("UTC")
- if self._bound_iccs is None:
- self._create_iccs()
+ self._create_iccs()
self.refresh_all()
self.notify("Rolled back to snapshot", timeout=3)
except Exception as exc:
@@ -827,12 +787,8 @@ def _require_iccs(self) -> bool:
if self._current_event_id is not None:
has_event = True
else:
- try:
- with Session(engine) as session:
- get_default_event(session)
- has_event = True
- except (NoResultFound, RuntimeError):
- has_event = False
+ with Session(engine) as session:
+ has_event = get_default_event(session) is not None
if has_event:
self.notify(
"ICCS not ready β check event parameters (Parameters tab)",
diff --git a/src/aimbat/_tui/modals.py b/src/aimbat/_tui/modals.py
index 60b0519..e3a8658 100644
--- a/src/aimbat/_tui/modals.py
+++ b/src/aimbat/_tui/modals.py
@@ -5,7 +5,6 @@
import uuid
from enum import StrEnum
-from sqlalchemy.exc import NoResultFound
from sqlmodel import Session, select
from textual import on
from textual.app import ComposeResult
@@ -108,11 +107,10 @@ def _populate(self, table: DataTable) -> None:
try:
with Session(engine) as session:
events = session.exec(select(AimbatEvent)).all()
- default_id: uuid.UUID | None = None
- try:
- default_id = get_default_event(session).id
- except NoResultFound:
- pass
+ _default = get_default_event(session)
+ default_id: uuid.UUID | None = (
+ _default.id if _default is not None else None
+ )
for event in events:
is_default = event.id == default_id
diff --git a/src/aimbat/_utils.py b/src/aimbat/_utils.py
index dfdde26..e009f25 100644
--- a/src/aimbat/_utils.py
+++ b/src/aimbat/_utils.py
@@ -1,3 +1,6 @@
+from contextlib import suppress
+
+
def export_module_names(globals_dict: dict, module_name: str) -> None:
"""
Updates the __module__ attribute of all objects in __all__ to match
@@ -12,9 +15,5 @@ def export_module_names(globals_dict: dict, module_name: str) -> None:
for name in all_names:
obj = globals_dict.get(name)
if obj is not None and hasattr(obj, "__module__"):
- try:
- # Attempt to write the module name
+ with suppress(AttributeError, TypeError):
obj.__module__ = module_name
- except (AttributeError, TypeError):
- # Safely ignore objects with read-only __module__ attributes
- pass
diff --git a/src/aimbat/app.py b/src/aimbat/app.py
index 51142ba..d40ed74 100644
--- a/src/aimbat/app.py
+++ b/src/aimbat/app.py
@@ -38,6 +38,7 @@
app.command("aimbat._cli.snapshot:app", name="snapshot")
app.command("aimbat._cli.utils:app", name="utils")
app.command("aimbat._tui.app:main", name="tui")
+app.command("aimbat._cli.shell:app", name="shell")
if __name__ == "__main__":
diff --git a/src/aimbat/core/__init__.py b/src/aimbat/core/__init__.py
index d7c18b2..051aaea 100644
--- a/src/aimbat/core/__init__.py
+++ b/src/aimbat/core/__init__.py
@@ -5,8 +5,7 @@
`aimbat.models`. The main areas covered are:
- **Default event** β get and set the default event (`get_default_event`,
- `set_default_event`). Only one event is set as default at a time; switching
- clears the seismogram data cache.
+ `set_default_event`).
- **Data** β add data to the project, linking each source to its station,
event, and seismogram records (`add_data_to_project`).
- **Events, seismograms, stations** β query, update, and delete records; read
diff --git a/src/aimbat/core/_default_event.py b/src/aimbat/core/_default_event.py
index d0ac008..80ff180 100644
--- a/src/aimbat/core/_default_event.py
+++ b/src/aimbat/core/_default_event.py
@@ -2,11 +2,9 @@
from sqlmodel import Session, select
from sqlalchemy.exc import NoResultFound
-from contextlib import suppress
from uuid import UUID
from aimbat.logger import logger
from aimbat.models import AimbatEvent
-from aimbat._cli.common import HINTS
__all__ = [
"get_default_event",
@@ -16,33 +14,23 @@
]
-def get_default_event(session: Session) -> AimbatEvent:
+def get_default_event(session: Session) -> AimbatEvent | None:
"""
- Return the currently default event (i.e. the one being processed by default).
+ Return the currently default event, or None if no event is set as default.
Args:
session: SQL session.
Returns:
- Default Event
-
- Raises
- NoResultFound: When no event is set as default.
+ Default Event, or None.
"""
logger.debug("Attempting to determine default event.")
statement = select(AimbatEvent).where(AimbatEvent.is_default == 1)
+ default_event = session.exec(statement).one_or_none()
- # NOTE: While there technically can be no default event in the database,
- # we typically don't really want to go beyond this point when that is the
- # case. Hence we call `one` rather than `one_or_none`.
- try:
- default_event = session.exec(statement).one()
- except NoResultFound:
- raise NoResultFound(f"No default event found. {HINTS.SET_DEFAULT_EVENT}")
-
- logger.debug(f"Default event: {default_event.id}")
+ logger.debug(f"Default event: {default_event.id if default_event else None}")
return default_event
@@ -57,16 +45,21 @@ def resolve_event(session: Session, event_id: UUID | None = None) -> AimbatEvent
Returns:
The specified event or the default event.
+
+ Raises:
+ ValueError: If an explicit event_id is given but not found.
+ NoResultFound: If no event_id is given and no default event is set.
"""
if event_id:
logger.debug(f"Resolving event by explicit ID: {event_id}")
event = session.get(AimbatEvent, event_id)
if event is None:
- raise ValueError(
- f"No AimbatEvent found with id: {event_id}. {HINTS.LIST_EVENTS}"
- )
+ raise ValueError(f"No AimbatEvent found with id: {event_id}.")
return event
- return get_default_event(session)
+ event = get_default_event(session)
+ if event is None:
+ raise NoResultFound("No default event found.")
+ return event
def set_default_event_by_id(session: Session, event_id: UUID) -> None:
@@ -83,9 +76,7 @@ def set_default_event_by_id(session: Session, event_id: UUID) -> None:
logger.info(f"Setting default event to event with id={event_id}.")
if event_id not in session.exec(select(AimbatEvent.id)).all():
- raise ValueError(
- f"No AimbatEvent found with id: {event_id}. {HINTS.LIST_EVENTS}"
- )
+ raise ValueError(f"No AimbatEvent found with id: {event_id}.")
aimbat_event = session.exec(
select(AimbatEvent).where(AimbatEvent.id == event_id)
@@ -104,9 +95,9 @@ def set_default_event(session: Session, event: AimbatEvent) -> None:
logger.info(f"Setting default {event=}")
- with suppress(NoResultFound):
- if event.id == get_default_event(session).id:
- return
+ current = get_default_event(session)
+ if current is not None and event.id == current.id:
+ return
event.is_default = True
session.add(event)
diff --git a/src/aimbat/core/_event.py b/src/aimbat/core/_event.py
index c516435..a5904f0 100644
--- a/src/aimbat/core/_event.py
+++ b/src/aimbat/core/_event.py
@@ -8,7 +8,6 @@
from collections.abc import Sequence
from uuid import UUID
from aimbat.logger import logger
-from aimbat._cli.common import HINTS
from aimbat.models import (
AimbatEvent,
AimbatEventParameters,
@@ -51,9 +50,7 @@ def delete_event_by_id(session: Session, event_id: UUID) -> None:
event = session.get(AimbatEvent, event_id)
if event is None:
- raise NoResultFound(
- f"Unable to find event using id: {event_id}. {HINTS.LIST_EVENTS}"
- )
+ raise NoResultFound(f"Unable to find event using id: {event_id}.")
delete_event(session, event)
@@ -161,18 +158,30 @@ def set_event_parameter(
event: AimbatEvent,
name: EventParameterTimedelta,
value: Timedelta,
+ *,
+ validate_iccs: bool = ...,
) -> None: ...
@overload
def set_event_parameter(
- session: Session, event: AimbatEvent, name: EventParameterFloat, value: float
+ session: Session,
+ event: AimbatEvent,
+ name: EventParameterFloat,
+ value: float,
+ *,
+ validate_iccs: bool = ...,
) -> None: ...
@overload
def set_event_parameter(
- session: Session, event: AimbatEvent, name: EventParameterBool, value: bool | str
+ session: Session,
+ event: AimbatEvent,
+ name: EventParameterBool,
+ value: bool | str,
+ *,
+ validate_iccs: bool = ...,
) -> None: ...
@@ -182,6 +191,8 @@ def set_event_parameter(
event: AimbatEvent,
name: EventParameter,
value: Timedelta | bool | float | str,
+ *,
+ validate_iccs: bool = ...,
) -> None: ...
@@ -190,6 +201,8 @@ def set_event_parameter(
event: AimbatEvent,
name: EventParameter,
value: Timedelta | bool | float | str,
+ *,
+ validate_iccs: bool = False,
) -> None:
"""Set event parameter value for the given event.
@@ -198,6 +211,8 @@ def set_event_parameter(
event: AimbatEvent.
name: Name of the parameter.
value: Value to set.
+ validate_iccs: If True, attempt ICCS construction with the new value
+ before committing. Raises and leaves the database unchanged on failure.
"""
logger.info(f"Setting {name=} to {value} for {event=}.")
@@ -205,7 +220,24 @@ def set_event_parameter(
parameters = AimbatEventParametersBase.model_validate(
event.parameters, update={name: value}
)
- setattr(event.parameters, name, getattr(parameters, name))
+ new_value = getattr(parameters, name)
+
+ if validate_iccs:
+ from aimbat.core._iccs import validate_iccs_construction
+
+ # Temporarily apply the new value in-memory with autoflush suppressed so
+ # the session never writes to the DB during the validation query.
+ old_value = getattr(event.parameters, name)
+ with session.no_autoflush:
+ setattr(event.parameters, name, new_value)
+ try:
+ validate_iccs_construction(event)
+ except Exception as exc:
+ setattr(event.parameters, name, old_value)
+ raise ValueError(f"ICCS rejected {name}={value}: {exc}") from exc
+ setattr(event.parameters, name, old_value)
+
+ setattr(event.parameters, name, new_value)
session.add(event)
session.commit()
diff --git a/src/aimbat/core/_iccs.py b/src/aimbat/core/_iccs.py
index 6d9a537..99c658b 100644
--- a/src/aimbat/core/_iccs.py
+++ b/src/aimbat/core/_iccs.py
@@ -5,7 +5,6 @@
from pandas import Timestamp
from sqlmodel import Session
-from pysmo.functions import clone_to_mini
from pysmo.tools.iccs import (
ICCS,
MiniICCSSeismogram,
@@ -23,9 +22,16 @@
AimbatSeismogramParametersBase,
)
+_RETURN_FIG_WARNING = (
+ "Returning figure and axes objects instead of showing the plot. "
+ "This is intended for testing purposes; in normal usage, return_fig should be False."
+)
+
__all__ = [
"BoundICCS",
"create_iccs_instance",
+ "clear_iccs_cache",
+ "validate_iccs_construction",
"sync_iccs_parameters",
"run_iccs",
"run_mccc",
@@ -62,30 +68,41 @@ def is_stale(self, event: AimbatEvent) -> bool:
return event.last_modified > self.created_at
-def create_iccs_instance(session: Session, event: AimbatEvent) -> BoundICCS:
- """Create a BoundICCS instance for the given event.
+# Process-level ICCS cache. In normal CLI use this is always cold (one command
+# per process). In the shell a warm entry is reused across commands, avoiding
+# redundant data loading and ICCS computation.
+_iccs_cache: dict[UUID, BoundICCS] = {}
+
+
+def clear_iccs_cache() -> None:
+ """Clear the process-level ICCS cache."""
+ _iccs_cache.clear()
- Seismogram data is copied into MiniICCSSeismogram objects so the session
- does not need to remain open after this call.
+
+def _build_iccs(event: AimbatEvent) -> ICCS:
+ """Build an ICCS instance from an event's current parameters and seismograms.
Args:
- session: Database session.
event: AimbatEvent.
Returns:
- BoundICCS instance tied to the given event.
+ A freshly constructed ICCS instance.
"""
-
- logger.info(f"Creating ICCS instance for event {event.id}.")
-
p = event.parameters
-
seismograms = [
- clone_to_mini(MiniICCSSeismogram, seis, update={"extra": {"id": seis.id}})
+ MiniICCSSeismogram(
+ begin_time=seis.begin_time,
+ delta=seis.delta,
+ data=seis.data,
+ t0=seis.t0,
+ t1=seis.t1,
+ flip=seis.flip,
+ select=seis.select,
+ extra={"id": seis.id},
+ )
for seis in event.seismograms
]
-
- iccs = ICCS(
+ return ICCS(
seismograms=seismograms,
window_pre=p.window_pre,
window_post=p.window_post,
@@ -95,7 +112,55 @@ def create_iccs_instance(session: Session, event: AimbatEvent) -> BoundICCS:
min_ccnorm=p.min_ccnorm,
context_width=settings.context_width,
)
- return BoundICCS(iccs=iccs, event_id=event.id, created_at=Timestamp.now("UTC"))
+
+
+def create_iccs_instance(session: Session, event: AimbatEvent) -> BoundICCS:
+ """Return a BoundICCS instance for the given event.
+
+ Returns the cached instance when it is still fresh (i.e. `event.last_modified`
+ has not advanced since the instance was created). Otherwise builds a new one
+ and updates the cache.
+
+ `MiniICCSSeismogram` instances are constructed directly from each
+ `AimbatSeismogram`, passing `data` by reference to the read-only io cache.
+ No waveform data is copied. The session does not need to remain open after
+ this call.
+
+ Args:
+ session: Database session.
+ event: AimbatEvent.
+
+ Returns:
+ BoundICCS instance tied to the given event.
+ """
+ cached = _iccs_cache.get(event.id)
+ if cached is not None and not cached.is_stale(event):
+ logger.debug(f"Returning cached BoundICCS for event {event.id}.")
+ return cached
+
+ logger.info(f"Creating ICCS instance for event {event.id}.")
+ bound = BoundICCS(
+ iccs=_build_iccs(event),
+ event_id=event.id,
+ created_at=Timestamp.now("UTC"),
+ )
+ _iccs_cache[event.id] = bound
+ return bound
+
+
+def validate_iccs_construction(event: AimbatEvent) -> None:
+ """Try to construct an ICCS instance for the event without caching the result.
+
+ Use this to check whether the event's current (possibly uncommitted) parameters
+ are compatible with ICCS construction before persisting them to the database.
+
+ Args:
+ event: AimbatEvent.
+
+ Raises:
+ Any exception raised by ICCS construction (e.g. invalid parameter values).
+ """
+ _build_iccs(event)
def _write_back_seismograms(session: Session, iccs: ICCS) -> None:
@@ -253,9 +318,7 @@ def update_pick(
_write_back_seismograms(session, iccs)
return None
- logger.warning(
- "Returning figure and axes objects instead of showing the plot. This is intended for testing purposes; in normal usage, return_fig should be False."
- )
+ logger.warning(_RETURN_FIG_WARNING)
return result
@@ -295,9 +358,7 @@ def update_timewindow(
session.commit()
return None
- logger.warning(
- "Returning figure and axes objects instead of showing the plot. This is intended for testing purposes; in normal usage, return_fig should be False."
- )
+ logger.warning(_RETURN_FIG_WARNING)
return result
@@ -332,7 +393,5 @@ def update_min_ccnorm(
session.commit()
return None
- logger.warning(
- "Returning figure and axes objects instead of showing the plot. This is intended for testing purposes; in normal usage, return_fig should be False."
- )
+ logger.warning(_RETURN_FIG_WARNING)
return result
diff --git a/src/aimbat/io/_base.py b/src/aimbat/io/_base.py
index 553a747..ef9a27c 100644
--- a/src/aimbat/io/_base.py
+++ b/src/aimbat/io/_base.py
@@ -220,16 +220,17 @@ def read_seismogram_data(
) -> npt.NDArray[np.float64]:
"""Read seismogram waveform data from a data source.
- Results are cached in memory by `(datasource, datatype)` key. The cache
- entry is invalidated when `write_seismogram_data` is called for the same
- key.
+ Results are cached in memory by `(datasource, datatype)` key. The returned
+ array is read-only; to write new data use `write_seismogram_data`. The
+ cache entry is invalidated when `write_seismogram_data` is called for the
+ same key.
Args:
datasource: Data source path or name.
datatype: Data type of the source.
Returns:
- Seismogram waveform data as a NumPy array.
+ Read-only seismogram waveform data as a NumPy array.
Raises:
NotImplementedError: If `datatype` has no registered data reader.
@@ -242,7 +243,9 @@ def read_seismogram_data(
)
key = (str(datasource), datatype)
if key not in _cache:
- _cache[key] = reader(datasource)
+ arr = reader(datasource)
+ arr.flags.writeable = False
+ _cache[key] = arr
return _cache[key]
diff --git a/tests/functional/test_shell.py b/tests/functional/test_shell.py
new file mode 100644
index 0000000..832f6be
--- /dev/null
+++ b/tests/functional/test_shell.py
@@ -0,0 +1,257 @@
+"""Functional tests for the AIMBAT interactive shell.
+
+Helper functions are tested directly (no subprocess). The REPL itself is
+exercised via subprocess with piped stdin.
+"""
+
+import os
+import subprocess
+import uuid
+from collections.abc import Callable, Sequence
+from pathlib import Path
+
+import pytest
+
+from aimbat._cli.shell import _build_completion_dict, _extract_event_flag, _inject_event
+from aimbat.app import app as aimbat_app
+
+_AIMBAT_LOGFILE = "aimbat_test.log"
+
+
+# ---------------------------------------------------------------------------
+# Local fixture β subprocess with stdin support
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture()
+def shell_subprocess(
+ db_path: Path,
+) -> Callable[[str], subprocess.CompletedProcess[str]]:
+ """Run ``aimbat shell`` as a subprocess with stdin piped in.
+
+ Args:
+ db_path: Path to the temporary project database file.
+
+ Returns:
+ A callable that accepts stdin text and returns the completed process.
+ """
+
+ def _run(stdin: str) -> subprocess.CompletedProcess[str]:
+ env = os.environ.copy()
+ env["AIMBAT_DB_URL"] = f"sqlite+pysqlite:///{db_path}"
+ env["AIMBAT_LOGFILE"] = _AIMBAT_LOGFILE
+ env["COLUMNS"] = "1000"
+ return subprocess.run(
+ ["uv", "run", "aimbat", "shell"],
+ input=stdin,
+ capture_output=True,
+ text=True,
+ env=env,
+ )
+
+ return _run
+
+
+# ===========================================================================
+# _extract_event_flag
+# ===========================================================================
+
+
+class TestExtractEventFlag:
+ """Tests for the ``_extract_event_flag`` helper."""
+
+ def test_no_flag_returns_none(self) -> None:
+ assert _extract_event_flag(["event", "list"]) is None
+
+ def test_empty_tokens_returns_none(self) -> None:
+ assert _extract_event_flag([]) is None
+
+ def test_space_separated_event(self) -> None:
+ assert _extract_event_flag(["--event", "abc123"]) == "abc123"
+
+ def test_space_separated_event_id(self) -> None:
+ assert _extract_event_flag(["--event-id", "abc123"]) == "abc123"
+
+ def test_equals_event(self) -> None:
+ assert _extract_event_flag(["--event=abc123"]) == "abc123"
+
+ def test_equals_event_id(self) -> None:
+ assert _extract_event_flag(["--event-id=abc123"]) == "abc123"
+
+ def test_flag_buried_in_token_list(self) -> None:
+ assert _extract_event_flag(["event", "dump", "--event", "myid"]) == "myid"
+
+ def test_flag_at_end_without_value_returns_none(self) -> None:
+ # --event is the last token with no following value
+ assert _extract_event_flag(["event", "list", "--event"]) is None
+
+ def test_other_flags_ignored(self) -> None:
+ assert _extract_event_flag(["--all", "--json", "--verbose"]) is None
+
+
+# ===========================================================================
+# _inject_event
+# ===========================================================================
+
+
+class TestInjectEvent:
+ """Tests for the ``_inject_event`` helper."""
+
+ _UID = uuid.UUID("12345678-1234-5678-1234-567812345678")
+
+ def test_appends_event_flag_when_absent(self) -> None:
+ result = _inject_event(["event", "list"], self._UID)
+ assert "--event" in result
+ assert str(self._UID) in result
+
+ def test_no_change_when_event_already_present(self) -> None:
+ tokens = ["event", "dump", "--event", "other-id"]
+ result = _inject_event(tokens, self._UID)
+ assert result == tokens
+
+ def test_no_change_when_event_id_already_present(self) -> None:
+ tokens = ["event", "dump", "--event-id", "other-id"]
+ result = _inject_event(tokens, self._UID)
+ assert result == tokens
+
+ def test_original_tokens_not_mutated(self) -> None:
+ tokens = ["event", "list"]
+ original = tokens[:]
+ _inject_event(tokens, self._UID)
+ assert tokens == original
+
+ def test_returns_new_list(self) -> None:
+ tokens = ["event", "list"]
+ result = _inject_event(tokens, self._UID)
+ assert result is not tokens
+
+
+# ===========================================================================
+# _build_completion_dict
+# ===========================================================================
+
+
+class TestBuildCompletionDict:
+ """Tests for the ``_build_completion_dict`` helper."""
+
+ def test_returns_dict(self) -> None:
+ assert isinstance(_build_completion_dict(aimbat_app), dict)
+
+ def test_contains_top_level_commands(self) -> None:
+ result = _build_completion_dict(aimbat_app)
+ for key in ("event", "data", "station", "seismogram", "snapshot"):
+ assert key in result, f"'{key}' missing from completion dict"
+
+ def test_event_subcommands_present(self) -> None:
+ result = _build_completion_dict(aimbat_app)
+ event_cmds = result.get("event")
+ assert isinstance(event_cmds, dict)
+ for sub in ("list", "dump", "default", "delete"):
+ assert sub in event_cmds, f"'event {sub}' missing from completion dict"
+
+ def test_help_flags_excluded(self) -> None:
+ result = _build_completion_dict(aimbat_app)
+ assert "--help" not in result
+ assert "-h" not in result
+
+ def test_shell_and_tui_excluded(self) -> None:
+ # shell removes itself and tui from the REPL's completions
+ # (checked at runtime in cli_shell; here we just verify the dict
+ # is built correctly from the top-level app)
+ result = _build_completion_dict(aimbat_app)
+ # shell and tui may or may not appear depending on whether they are
+ # registered subcommands; the key assertion is that the call succeeds
+ # and returns a non-empty mapping
+ assert len(result) > 0
+
+
+# ===========================================================================
+# Shell subprocess (integration)
+# ===========================================================================
+
+
+@pytest.mark.slow
+@pytest.mark.cli
+class TestShellSubprocess:
+ """Integration tests for the shell REPL started as a real subprocess."""
+
+ def test_exits_cleanly_on_exit_command(
+ self,
+ aimbat_subprocess: Callable[[Sequence[str]], subprocess.CompletedProcess[str]],
+ shell_subprocess: Callable[[str], subprocess.CompletedProcess[str]],
+ ) -> None:
+ """Shell exits with code 0 when the user types 'exit'."""
+ aimbat_subprocess(["project", "create"])
+ result = shell_subprocess("exit\n")
+ assert result.returncode == 0, result.stderr
+
+ def test_exits_cleanly_on_eof(
+ self,
+ aimbat_subprocess: Callable[[Sequence[str]], subprocess.CompletedProcess[str]],
+ shell_subprocess: Callable[[str], subprocess.CompletedProcess[str]],
+ ) -> None:
+ """Shell exits cleanly when stdin is closed (simulates Ctrl+D)."""
+ aimbat_subprocess(["project", "create"])
+ result = shell_subprocess("")
+ assert result.returncode == 0, result.stderr
+
+ def test_executes_command(
+ self,
+ aimbat_subprocess: Callable[[Sequence[str]], subprocess.CompletedProcess[str]],
+ shell_subprocess: Callable[[str], subprocess.CompletedProcess[str]],
+ ) -> None:
+ """Commands typed in the shell produce output."""
+ aimbat_subprocess(["project", "create"])
+ result = shell_subprocess("project info\nexit\n")
+ assert result.returncode == 0, result.stderr
+ assert "Project Info" in result.stdout
+
+ def test_unknown_command_does_not_crash(
+ self,
+ aimbat_subprocess: Callable[[Sequence[str]], subprocess.CompletedProcess[str]],
+ shell_subprocess: Callable[[str], subprocess.CompletedProcess[str]],
+ ) -> None:
+ """An unrecognised command is ignored; the shell keeps running."""
+ aimbat_subprocess(["project", "create"])
+ result = shell_subprocess("notacommand\nexit\n")
+ assert result.returncode == 0, result.stderr
+
+ def test_aimbat_prefix_stripped_and_command_runs(
+ self,
+ aimbat_subprocess: Callable[[Sequence[str]], subprocess.CompletedProcess[str]],
+ shell_subprocess: Callable[[str], subprocess.CompletedProcess[str]],
+ ) -> None:
+ """Typing 'aimbat ' strips the prefix and still runs the command."""
+ aimbat_subprocess(["project", "create"])
+ result = shell_subprocess("aimbat project info\nexit\n")
+ assert result.returncode == 0, result.stderr
+ assert "Project Info" in result.stdout
+ assert "Tip:" in result.stdout
+
+ def test_event_switch_requires_no_db_write(
+ self,
+ aimbat_subprocess: Callable[[Sequence[str]], subprocess.CompletedProcess[str]],
+ shell_subprocess: Callable[[str], subprocess.CompletedProcess[str]],
+ ) -> None:
+ """'event switch' without an ID resets to the DB default without error."""
+ aimbat_subprocess(["project", "create"])
+ result = shell_subprocess("event switch\nexit\n")
+ assert result.returncode == 0, result.stderr
+
+ def test_invalid_event_flag_does_not_crash(
+ self,
+ aimbat_subprocess: Callable[[Sequence[str]], subprocess.CompletedProcess[str]],
+ shell_subprocess: Callable[[str], subprocess.CompletedProcess[str]],
+ ) -> None:
+ """The shell should not crash when an invalid --event is specified."""
+ aimbat_subprocess(["project", "create"])
+ # 'event list --event invalid' previously crashed the shell
+ result = shell_subprocess("event list --event invalid\nexit\n")
+ assert result.returncode == 0, result.stderr
+
+ # Check for error in stdout OR stderr
+ combined_output = result.stdout + result.stderr
+ assert "Error" in combined_output
+
+ # We want to ensure there is only ONE error panel (no double errors)
+ assert combined_output.count("Error") == 1
diff --git a/tests/functional/test_tui.py b/tests/functional/test_tui.py
new file mode 100644
index 0000000..58dced1
--- /dev/null
+++ b/tests/functional/test_tui.py
@@ -0,0 +1,245 @@
+"""Functional tests for the AIMBAT Terminal User Interface.
+
+Each test runs the Textual app in headless mode via ``App.run_test()``.
+Because ``aimbat._tui.app`` imports ``engine`` at module level, both
+``aimbat.db.engine`` and ``aimbat._tui.app.engine`` must be monkeypatched
+to the test fixture's database.
+"""
+
+import asyncio
+
+import aimbat.db
+import aimbat._tui.app
+import pytest
+from sqlalchemy import Engine
+from textual.widgets import DataTable, Static, TabbedContent, TabPane
+
+from aimbat._tui.app import AimbatTUI
+
+_TUI_SIZE = (120, 40)
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _patch_engine(monkeypatch: pytest.MonkeyPatch, engine: Engine) -> None:
+ """Patch the engine in both the db module and the TUI app module."""
+ monkeypatch.setattr(aimbat.db, "engine", engine)
+ monkeypatch.setattr(aimbat._tui.app, "engine", engine)
+
+
+# ===========================================================================
+# Startup β empty database
+# ===========================================================================
+
+
+@pytest.mark.slow
+class TestTUIEmptyDatabase:
+ """TUI smoke tests against a project with no data."""
+
+ def test_starts_without_error(
+ self, patched_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """App mounts without raising an exception."""
+ _patch_engine(monkeypatch, patched_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause()
+
+ asyncio.run(_run())
+
+ def test_four_tabs_present(
+ self, patched_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """The four expected tab panes are mounted."""
+ _patch_engine(monkeypatch, patched_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause()
+ tab_ids = {pane.id for pane in pilot.app.query(TabPane)}
+ for expected in (
+ "tab-seismograms",
+ "tab-parameters",
+ "tab-stations",
+ "tab-snapshots",
+ ):
+ assert expected in tab_ids
+
+ asyncio.run(_run())
+
+ def test_event_bar_shows_no_event_message(
+ self, patched_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Event bar indicates that no event is selected when the DB is empty."""
+ _patch_engine(monkeypatch, patched_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause()
+ bar = pilot.app.query_one("#event-bar", Static)
+ assert "No event" in str(bar.render())
+
+ asyncio.run(_run())
+
+ def test_seismogram_table_empty(
+ self, patched_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Seismogram table has no rows when the project has no data."""
+ _patch_engine(monkeypatch, patched_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause()
+ table = pilot.app.query_one("#seismogram-table", DataTable)
+ assert table.row_count == 0
+
+ asyncio.run(_run())
+
+ def test_quit_action_exits(
+ self, patched_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Pressing 'q' exits the application."""
+ _patch_engine(monkeypatch, patched_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause()
+ await pilot.press("q")
+
+ asyncio.run(_run())
+
+
+# ===========================================================================
+# Startup β loaded database
+# ===========================================================================
+
+
+@pytest.mark.slow
+class TestTUIWithData:
+ """TUI tests against a project pre-populated with multi-event data."""
+
+ def test_starts_without_error(
+ self, loaded_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """App mounts without raising an exception when data is present."""
+ _patch_engine(monkeypatch, loaded_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause(delay=0.5)
+
+ asyncio.run(_run())
+
+ def test_seismogram_table_populated(
+ self, loaded_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Seismogram table has rows once a default event is set."""
+ _patch_engine(monkeypatch, loaded_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause(delay=0.5)
+ table = pilot.app.query_one("#seismogram-table", DataTable)
+ assert table.row_count > 0
+
+ asyncio.run(_run())
+
+ def test_station_table_populated(
+ self, loaded_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Station table has rows once a default event is set."""
+ _patch_engine(monkeypatch, loaded_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause(delay=0.5)
+ await pilot.press("L") # switch to next tab
+ await pilot.press("L")
+ await pilot.pause()
+ table = pilot.app.query_one("#station-table", DataTable)
+ assert table.row_count > 0
+
+ asyncio.run(_run())
+
+ def test_snapshot_table_empty_initially(
+ self, loaded_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Snapshot table starts empty before any snapshot is created."""
+ _patch_engine(monkeypatch, loaded_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause(delay=0.5)
+ table = pilot.app.query_one("#snapshot-table", DataTable)
+ assert table.row_count == 0
+
+ asyncio.run(_run())
+
+
+# ===========================================================================
+# Tab navigation
+# ===========================================================================
+
+
+@pytest.mark.slow
+class TestTUITabNavigation:
+ """Tests for keyboard-driven tab switching."""
+
+ def test_vim_right_advances_tab(
+ self, patched_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Pressing 'L' switches to the next tab."""
+ _patch_engine(monkeypatch, patched_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause()
+ tc = pilot.app.query_one(TabbedContent)
+ initial_tab = tc.active
+ await pilot.press("L")
+ await pilot.pause()
+ assert tc.active != initial_tab
+
+ asyncio.run(_run())
+
+ def test_vim_left_wraps_or_stays(
+ self, patched_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Pressing 'H' on the first tab does not crash."""
+ _patch_engine(monkeypatch, patched_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause()
+ await pilot.press("H")
+ await pilot.pause()
+ # App still responsive
+ tc = pilot.app.query_one(TabbedContent)
+ assert tc.active is not None
+
+ asyncio.run(_run())
+
+ def test_full_tab_cycle(
+ self, patched_engine: Engine, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Cycling through all four tabs and back arrives at a known state."""
+ _patch_engine(monkeypatch, patched_engine)
+
+ async def _run() -> None:
+ async with AimbatTUI().run_test(size=_TUI_SIZE) as pilot:
+ await pilot.pause()
+ tc = pilot.app.query_one(TabbedContent)
+ visited: list[str] = [tc.active]
+ for _ in range(3):
+ await pilot.press("L")
+ await pilot.pause()
+ visited.append(tc.active)
+ assert (
+ len(set(visited)) == 4
+ ), f"Expected 4 distinct tabs, got {visited}"
+
+ asyncio.run(_run())
diff --git a/tests/integration/core/test_data.py b/tests/integration/core/test_data.py
index 84eadb3..5cfb4bb 100644
--- a/tests/integration/core/test_data.py
+++ b/tests/integration/core/test_data.py
@@ -273,6 +273,7 @@ def test_get_data_sources_for_default_event(self, session: Session) -> None:
session (Session): Database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
data_sources = get_data_for_event(session, default_event)
assert len(data_sources) != 0, "Expected data sources for the default event."
assert all(
diff --git a/tests/integration/core/test_event.py b/tests/integration/core/test_event.py
index 16dd379..12f666e 100644
--- a/tests/integration/core/test_event.py
+++ b/tests/integration/core/test_event.py
@@ -83,6 +83,7 @@ def test_switch_by_id(self, session: Session) -> None:
session (Session): The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
event_ids = list(session.exec(select(AimbatEvent.id)).all())
event_ids.remove(default_event.id)
@@ -93,8 +94,10 @@ def test_switch_by_id(self, session: Session) -> None:
set_default_event_by_id(session, new_default_event_id)
+ switched_event = get_default_event(session)
+ assert switched_event is not None
assert (
- get_default_event(session).id == new_default_event_id
+ switched_event.id == new_default_event_id
), "expected the default event to switch to the new event by id"
def test_switch_by_id_invalid(self, session: Session) -> None:
@@ -127,8 +130,7 @@ def test_get_default_event_no_default(self, session: Session) -> None:
is None
), "expected no default event in the database after deactivating"
- with pytest.raises(NoResultFound):
- get_default_event(session)
+ assert get_default_event(session) is None
# ===================================================================
@@ -268,6 +270,7 @@ def test_get_timedelta_parameter(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
value = get_event_parameter(session, default_event, EventParameter.WINDOW_PRE)
assert isinstance(value, Timedelta)
@@ -278,6 +281,7 @@ def test_get_float_parameter(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
value = get_event_parameter(session, default_event, EventParameter.MIN_CCNORM)
assert isinstance(value, float)
@@ -288,6 +292,7 @@ def test_get_bool_parameter(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
value = get_event_parameter(session, default_event, EventParameter.COMPLETED)
assert isinstance(value, bool)
@@ -302,6 +307,7 @@ def test_set_timedelta_parameter(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
new_value = Timedelta(seconds=20)
set_event_parameter(
session, default_event, EventParameter.WINDOW_POST, new_value
@@ -318,6 +324,7 @@ def test_set_float_parameter(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
new_value = 0.75
set_event_parameter(
session, default_event, EventParameter.MIN_CCNORM, new_value
@@ -334,6 +341,7 @@ def test_set_bool_parameter(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
set_event_parameter(session, default_event, EventParameter.COMPLETED, True)
assert (
get_event_parameter(session, default_event, EventParameter.COMPLETED)
diff --git a/tests/integration/core/test_seismogram.py b/tests/integration/core/test_seismogram.py
index f6d2b44..b21d808 100644
--- a/tests/integration/core/test_seismogram.py
+++ b/tests/integration/core/test_seismogram.py
@@ -411,5 +411,6 @@ def test_returns_figure(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
fig, _ = plot_all_seismograms(session, event=default_event, return_fig=True)
assert isinstance(fig, Figure)
diff --git a/tests/integration/core/test_snapshots.py b/tests/integration/core/test_snapshots.py
index 9d3685c..fea8c03 100644
--- a/tests/integration/core/test_snapshots.py
+++ b/tests/integration/core/test_snapshots.py
@@ -42,6 +42,7 @@ def snapshot(session: Session) -> AimbatSnapshot:
An AimbatSnapshot for the default event.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
return session.exec(select(AimbatSnapshot)).one()
@@ -57,6 +58,7 @@ def test_creates_snapshot(self, session: Session) -> None:
"""
assert len(session.exec(select(AimbatSnapshot)).all()) == 0
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
assert len(session.exec(select(AimbatSnapshot)).all()) == 1
@@ -67,6 +69,7 @@ def test_snapshot_linked_to_default_event(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
assert snapshot.event_id == default_event.id
@@ -78,6 +81,7 @@ def test_snapshot_with_comment(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event, comment="test comment")
snapshot = session.exec(select(AimbatSnapshot)).one()
assert snapshot.comment == "test comment"
@@ -89,6 +93,7 @@ def test_snapshot_without_comment(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
assert snapshot.comment is None
@@ -100,6 +105,7 @@ def test_snapshot_captures_seismogram_parameters(self, session: Session) -> None
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
n_seismograms = len(default_event.seismograms)
create_snapshot(session, default_event)
@@ -116,6 +122,7 @@ def test_snapshot_captures_event_parameters(
snapshot: An AimbatSnapshot for the default event.
"""
default_event = get_default_event(session)
+ assert default_event is not None
assert (
snapshot.event_parameters_snapshot.parameters_id
== default_event.parameters.id
@@ -170,6 +177,7 @@ def test_rollback_restores_event_parameters(
snapshot: An AimbatSnapshot capturing the original parameters.
"""
default_event = get_default_event(session)
+ assert default_event is not None
original_min_ccnorm = snapshot.event_parameters_snapshot.min_ccnorm
# Mutate the parameter after taking the snapshot
@@ -192,6 +200,7 @@ def test_rollback_restores_seismogram_parameters(
snapshot: An AimbatSnapshot capturing the original parameters.
"""
default_event = get_default_event(session)
+ assert default_event is not None
seismogram = default_event.seismograms[0]
original_select = snapshot.seismogram_parameters_snapshots[0].select
@@ -212,6 +221,7 @@ def test_rollback_by_id(self, session: Session, snapshot: AimbatSnapshot) -> Non
snapshot: An AimbatSnapshot to roll back to.
"""
default_event = get_default_event(session)
+ assert default_event is not None
original_min_ccnorm = snapshot.event_parameters_snapshot.min_ccnorm
default_event.parameters.min_ccnorm = 0.0
@@ -232,6 +242,7 @@ def test_rollback_restores_all_event_parameters(
snapshot: An AimbatSnapshot capturing the original parameters.
"""
default_event = get_default_event(session)
+ assert default_event is not None
params = default_event.parameters
snap = snapshot.event_parameters_snapshot
@@ -271,6 +282,7 @@ def test_rollback_restores_all_seismogram_parameters(
snapshot: An AimbatSnapshot capturing the original parameters.
"""
default_event = get_default_event(session)
+ assert default_event is not None
seismogram = default_event.seismograms[0]
params = seismogram.parameters
snap = next(
@@ -313,6 +325,7 @@ def test_no_snapshots_initially(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
assert len(get_snapshots(session, event=default_event)) == 0
def test_get_snapshots_for_default_event(
@@ -325,6 +338,7 @@ def test_get_snapshots_for_default_event(
snapshot: An AimbatSnapshot for the default event.
"""
default_event = get_default_event(session)
+ assert default_event is not None
snapshots = get_snapshots(session, event=default_event, all_events=False)
assert len(snapshots) == 1
assert snapshots[0].id == snapshot.id
@@ -348,6 +362,7 @@ def test_multiple_snapshots(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event, comment="first")
create_snapshot(session, default_event, comment="second")
assert len(get_snapshots(session, event=default_event)) == 2
@@ -364,6 +379,7 @@ def test_as_string(self, session: Session, snapshot: AimbatSnapshot) -> None:
snapshot: An AimbatSnapshot to include in the dump.
"""
default_event = get_default_event(session)
+ assert default_event is not None
result = dump_snapshot_tables_to_json(
session, all_events=False, as_string=True, event=default_event
)
@@ -381,6 +397,7 @@ def test_as_dict(self, session: Session, snapshot: AimbatSnapshot) -> None:
snapshot: An AimbatSnapshot to include in the dump.
"""
default_event = get_default_event(session)
+ assert default_event is not None
result = dump_snapshot_tables_to_json(
session, all_events=False, as_string=False, event=default_event
)
@@ -398,6 +415,7 @@ def test_all_events_includes_more_snapshots(
snapshot: An AimbatSnapshot to include in the dump.
"""
default_event = get_default_event(session)
+ assert default_event is not None
default_only = dump_snapshot_tables_to_json(
session, all_events=False, as_string=False, event=default_event
)
diff --git a/tests/integration/core/test_station.py b/tests/integration/core/test_station.py
index b61f0fb..cb6ebe3 100644
--- a/tests/integration/core/test_station.py
+++ b/tests/integration/core/test_station.py
@@ -42,6 +42,7 @@ def station(session: Session) -> AimbatStation:
The first AimbatStation in the default event.
"""
default_event = get_default_event(session)
+ assert default_event is not None
return default_event.seismograms[0].station
@@ -96,6 +97,7 @@ def test_returns_stations(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
stations = get_stations_in_event(session, default_event, as_json=False)
assert len(stations) > 0, "Expected at least one station for the default event"
@@ -106,6 +108,7 @@ def test_returns_aimbat_station_instances(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
stations = get_stations_in_event(session, default_event, as_json=False)
assert all(
isinstance(s, AimbatStation) for s in stations
@@ -118,6 +121,7 @@ def test_as_json_returns_list_of_dicts(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
result = get_stations_in_event(session, default_event, as_json=True)
assert isinstance(result, list), "Expected a list when as_json=True"
assert all(
@@ -131,6 +135,7 @@ def test_as_json_count_matches_objects(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
objects = get_stations_in_event(session, default_event, as_json=False)
json_list = get_stations_in_event(session, default_event, as_json=True)
assert len(objects) == len(
@@ -144,6 +149,7 @@ def test_stations_belong_to_default_event(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
default_station_ids = {s.station_id for s in default_event.seismograms}
stations = get_stations_in_event(session, default_event, as_json=False)
returned_ids = {s.id for s in stations}
@@ -162,6 +168,7 @@ def test_returns_stations_for_event(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
stations = get_stations_in_event(session, default_event)
assert len(stations) > 0, "Expected at least one station for the given event"
@@ -172,6 +179,7 @@ def test_returns_aimbat_station_instances(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
stations = get_stations_in_event(session, default_event)
assert all(
isinstance(s, AimbatStation) for s in stations
@@ -184,6 +192,7 @@ def test_station_ids_match_event_seismograms(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
expected_ids = {s.station_id for s in default_event.seismograms}
returned_ids = {s.id for s in get_stations_in_event(session, default_event)}
assert (
diff --git a/tests/integration/models/test_models.py b/tests/integration/models/test_models.py
index 26709c2..f346697 100644
--- a/tests/integration/models/test_models.py
+++ b/tests/integration/models/test_models.py
@@ -227,6 +227,7 @@ def test_delete_event_cascades_to_snapshots(self, session: Session) -> None:
from aimbat.core import create_snapshot, get_default_event
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event, comment="before delete")
assert len(session.exec(select(AimbatSnapshot)).all()) == 1
assert len(session.exec(select(AimbatEventParametersSnapshot)).all()) == 1
@@ -299,6 +300,7 @@ def test_delete_snapshot_cascades_to_parameter_snapshots(
from aimbat.core import create_snapshot, get_default_event
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
diff --git a/tests/integration/models/test_operations.py b/tests/integration/models/test_operations.py
index 353e2c6..70225b9 100644
--- a/tests/integration/models/test_operations.py
+++ b/tests/integration/models/test_operations.py
@@ -215,6 +215,7 @@ def test_snapshot_has_event_parameters_snapshot(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
assert isinstance(
@@ -230,6 +231,7 @@ def test_snapshot_has_seismogram_parameter_snapshots(
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
assert len(snapshot.seismogram_parameters_snapshots) > 0
@@ -245,6 +247,7 @@ def test_snapshot_back_reference_to_event(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
assert isinstance(snapshot.event, AimbatEvent)
@@ -256,6 +259,7 @@ def test_snapshot_seismogram_count(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
session.refresh(snapshot)
@@ -270,6 +274,7 @@ def test_snapshot_selected_seismogram_count(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
session.refresh(snapshot)
@@ -283,6 +288,7 @@ def test_snapshot_flipped_seismogram_count(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
session.refresh(snapshot)
@@ -302,6 +308,7 @@ def test_snapshot_counts_reflect_toggled_flip_and_select(
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
seismograms = default_event.seismograms
assert len(seismograms) >= 2
@@ -504,6 +511,7 @@ def test_parameter_snapshots_deleted(
seismogram: An AimbatSeismogram to delete.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
parameters_id = seismogram.parameters.id
@@ -525,6 +533,7 @@ def test_event_parameters_snapshot_deleted(self, session: Session) -> None:
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
ep_snapshot_id = snapshot.event_parameters_snapshot.id
@@ -541,6 +550,7 @@ def test_seismogram_parameters_snapshots_deleted(self, session: Session) -> None
session: The database session.
"""
default_event = get_default_event(session)
+ assert default_event is not None
create_snapshot(session, default_event)
snapshot = session.exec(select(AimbatSnapshot)).one()
sp_snapshot_ids = [s.id for s in snapshot.seismogram_parameters_snapshots]
diff --git a/tests/unit/_cli/test_common.py b/tests/unit/_cli/test_common.py
index 1136d03..8826e9e 100644
--- a/tests/unit/_cli/test_common.py
+++ b/tests/unit/_cli/test_common.py
@@ -5,8 +5,6 @@
GlobalParameters,
IccsPlotParameters,
TableParameters,
- CliHints,
- HINTS,
simple_exception,
)
from aimbat import settings
@@ -70,30 +68,6 @@ def test_short_can_be_set_false(self) -> None:
assert params.short is False
-class TestCliHints:
- """Tests for the CliHints frozen dataclass."""
-
- def test_set_default_event_hint_content(self) -> None:
- """Verifies that SET_DEFAULT_EVENT hint references the default command."""
- assert "default" in CliHints.SET_DEFAULT_EVENT
- assert "aimbat event default" in CliHints.SET_DEFAULT_EVENT
-
- def test_list_events_hint_content(self) -> None:
- """Verifies that LIST_EVENTS hint references the list command."""
- assert "list" in CliHints.LIST_EVENTS
- assert "aimbat event list" in CliHints.LIST_EVENTS
-
- def test_hints_instance_is_frozen(self) -> None:
- """Verifies that the CliHints dataclass is frozen (immutable)."""
- with pytest.raises((AttributeError, TypeError)):
- HINTS.SET_DEFAULT_EVENT = "new value"
-
- def test_hints_singleton_values(self) -> None:
- """Verifies that the HINTS singleton has the expected attribute values."""
- assert HINTS.SET_DEFAULT_EVENT == CliHints.SET_DEFAULT_EVENT
- assert HINTS.LIST_EVENTS == CliHints.LIST_EVENTS
-
-
class TestSimpleException:
"""Tests for the simple_exception decorator."""
diff --git a/uv.lock b/uv.lock
index dcb780f..7e30b22 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1662,11 +1662,11 @@ wheels = [
[[package]]
name = "platformdirs"
-version = "4.9.2"
+version = "4.9.4"
source = { registry = "https://pypi.org/simple" }
-sdist = { url = "https://files.pythonhosted.org/packages/1b/04/fea538adf7dbbd6d186f551d595961e564a3b6715bdf276b477460858672/platformdirs-4.9.2.tar.gz", hash = "sha256:9a33809944b9db043ad67ca0db94b14bf452cc6aeaac46a88ea55b26e2e9d291", size = 28394, upload-time = "2026-02-16T03:56:10.574Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/19/56/8d4c30c8a1d07013911a8fdbd8f89440ef9f08d07a1b50ab8ca8be5a20f9/platformdirs-4.9.4.tar.gz", hash = "sha256:1ec356301b7dc906d83f371c8f487070e99d3ccf9e501686456394622a01a934", size = 28737, upload-time = "2026-03-05T18:34:13.271Z" }
wheels = [
- { url = "https://files.pythonhosted.org/packages/48/31/05e764397056194206169869b50cf2fee4dbbbc71b344705b9c0d878d4d8/platformdirs-4.9.2-py3-none-any.whl", hash = "sha256:9170634f126f8efdae22fb58ae8a0eaa86f38365bc57897a6c4f781d1f5875bd", size = 21168, upload-time = "2026-02-16T03:56:08.891Z" },
+ { url = "https://files.pythonhosted.org/packages/63/d7/97f7e3a6abb67d8080dd406fd4df842c2be0efaf712d1c899c32a075027c/platformdirs-4.9.4-py3-none-any.whl", hash = "sha256:68a9a4619a666ea6439f2ff250c12a853cd1cbd5158d258bd824a7df6be2f868", size = 21216, upload-time = "2026-03-05T18:34:12.172Z" },
]
[[package]]
@@ -2198,27 +2198,27 @@ wheels = [
[[package]]
name = "ruff"
-version = "0.15.4"
-source = { registry = "https://pypi.org/simple" }
-sdist = { url = "https://files.pythonhosted.org/packages/da/31/d6e536cdebb6568ae75a7f00e4b4819ae0ad2640c3604c305a0428680b0c/ruff-0.15.4.tar.gz", hash = "sha256:3412195319e42d634470cc97aa9803d07e9d5c9223b99bcb1518f0c725f26ae1", size = 4569550, upload-time = "2026-02-26T20:04:14.959Z" }
-wheels = [
- { url = "https://files.pythonhosted.org/packages/f2/82/c11a03cfec3a4d26a0ea1e571f0f44be5993b923f905eeddfc397c13d360/ruff-0.15.4-py3-none-linux_armv6l.whl", hash = "sha256:a1810931c41606c686bae8b5b9a8072adac2f611bb433c0ba476acba17a332e0", size = 10453333, upload-time = "2026-02-26T20:04:20.093Z" },
- { url = "https://files.pythonhosted.org/packages/ce/5d/6a1f271f6e31dffb31855996493641edc3eef8077b883eaf007a2f1c2976/ruff-0.15.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:5a1632c66672b8b4d3e1d1782859e98d6e0b4e70829530666644286600a33992", size = 10853356, upload-time = "2026-02-26T20:04:05.808Z" },
- { url = "https://files.pythonhosted.org/packages/b1/d8/0fab9f8842b83b1a9c2bf81b85063f65e93fb512e60effa95b0be49bfc54/ruff-0.15.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:a4386ba2cd6c0f4ff75252845906acc7c7c8e1ac567b7bc3d373686ac8c222ba", size = 10187434, upload-time = "2026-02-26T20:03:54.656Z" },
- { url = "https://files.pythonhosted.org/packages/85/cc/cc220fd9394eff5db8d94dec199eec56dd6c9f3651d8869d024867a91030/ruff-0.15.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b2496488bdfd3732747558b6f95ae427ff066d1fcd054daf75f5a50674411e75", size = 10535456, upload-time = "2026-02-26T20:03:52.738Z" },
- { url = "https://files.pythonhosted.org/packages/fa/0f/bced38fa5cf24373ec767713c8e4cadc90247f3863605fb030e597878661/ruff-0.15.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3f1c4893841ff2d54cbda1b2860fa3260173df5ddd7b95d370186f8a5e66a4ac", size = 10287772, upload-time = "2026-02-26T20:04:08.138Z" },
- { url = "https://files.pythonhosted.org/packages/2b/90/58a1802d84fed15f8f281925b21ab3cecd813bde52a8ca033a4de8ab0e7a/ruff-0.15.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:820b8766bd65503b6c30aaa6331e8ef3a6e564f7999c844e9a547c40179e440a", size = 11049051, upload-time = "2026-02-26T20:04:03.53Z" },
- { url = "https://files.pythonhosted.org/packages/d2/ac/b7ad36703c35f3866584564dc15f12f91cb1a26a897dc2fd13d7cb3ae1af/ruff-0.15.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c9fb74bab47139c1751f900f857fa503987253c3ef89129b24ed375e72873e85", size = 11890494, upload-time = "2026-02-26T20:04:10.497Z" },
- { url = "https://files.pythonhosted.org/packages/93/3d/3eb2f47a39a8b0da99faf9c54d3eb24720add1e886a5309d4d1be73a6380/ruff-0.15.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f80c98765949c518142b3a50a5db89343aa90f2c2bf7799de9986498ae6176db", size = 11326221, upload-time = "2026-02-26T20:04:12.84Z" },
- { url = "https://files.pythonhosted.org/packages/ff/90/bf134f4c1e5243e62690e09d63c55df948a74084c8ac3e48a88468314da6/ruff-0.15.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:451a2e224151729b3b6c9ffb36aed9091b2996fe4bdbd11f47e27d8f2e8888ec", size = 11168459, upload-time = "2026-02-26T20:04:00.969Z" },
- { url = "https://files.pythonhosted.org/packages/b5/e5/a64d27688789b06b5d55162aafc32059bb8c989c61a5139a36e1368285eb/ruff-0.15.4-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:a8f157f2e583c513c4f5f896163a93198297371f34c04220daf40d133fdd4f7f", size = 11104366, upload-time = "2026-02-26T20:03:48.099Z" },
- { url = "https://files.pythonhosted.org/packages/f1/f6/32d1dcb66a2559763fc3027bdd65836cad9eb09d90f2ed6a63d8e9252b02/ruff-0.15.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:917cc68503357021f541e69b35361c99387cdbbf99bd0ea4aa6f28ca99ff5338", size = 10510887, upload-time = "2026-02-26T20:03:45.771Z" },
- { url = "https://files.pythonhosted.org/packages/ff/92/22d1ced50971c5b6433aed166fcef8c9343f567a94cf2b9d9089f6aa80fe/ruff-0.15.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:e9737c8161da79fd7cfec19f1e35620375bd8b2a50c3e77fa3d2c16f574105cc", size = 10285939, upload-time = "2026-02-26T20:04:22.42Z" },
- { url = "https://files.pythonhosted.org/packages/e6/f4/7c20aec3143837641a02509a4668fb146a642fd1211846634edc17eb5563/ruff-0.15.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:291258c917539e18f6ba40482fe31d6f5ac023994ee11d7bdafd716f2aab8a68", size = 10765471, upload-time = "2026-02-26T20:03:58.924Z" },
- { url = "https://files.pythonhosted.org/packages/d0/09/6d2f7586f09a16120aebdff8f64d962d7c4348313c77ebb29c566cefc357/ruff-0.15.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:3f83c45911da6f2cd5936c436cf86b9f09f09165f033a99dcf7477e34041cbc3", size = 11263382, upload-time = "2026-02-26T20:04:24.424Z" },
- { url = "https://files.pythonhosted.org/packages/1b/fa/2ef715a1cd329ef47c1a050e10dee91a9054b7ce2fcfdd6a06d139afb7ec/ruff-0.15.4-py3-none-win32.whl", hash = "sha256:65594a2d557d4ee9f02834fcdf0a28daa8b3b9f6cb2cb93846025a36db47ef22", size = 10506664, upload-time = "2026-02-26T20:03:50.56Z" },
- { url = "https://files.pythonhosted.org/packages/d0/a8/c688ef7e29983976820d18710f955751d9f4d4eb69df658af3d006e2ba3e/ruff-0.15.4-py3-none-win_amd64.whl", hash = "sha256:04196ad44f0df220c2ece5b0e959c2f37c777375ec744397d21d15b50a75264f", size = 11651048, upload-time = "2026-02-26T20:04:17.191Z" },
- { url = "https://files.pythonhosted.org/packages/3e/0a/9e1be9035b37448ce2e68c978f0591da94389ade5a5abafa4cf99985d1b2/ruff-0.15.4-py3-none-win_arm64.whl", hash = "sha256:60d5177e8cfc70e51b9c5fad936c634872a74209f934c1e79107d11787ad5453", size = 10966776, upload-time = "2026-02-26T20:03:56.908Z" },
+version = "0.15.5"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/77/9b/840e0039e65fcf12758adf684d2289024d6140cde9268cc59887dc55189c/ruff-0.15.5.tar.gz", hash = "sha256:7c3601d3b6d76dce18c5c824fc8d06f4eef33d6df0c21ec7799510cde0f159a2", size = 4574214, upload-time = "2026-03-05T20:06:34.946Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/47/20/5369c3ce21588c708bcbe517a8fbe1a8dfdb5dfd5137e14790b1da71612c/ruff-0.15.5-py3-none-linux_armv6l.whl", hash = "sha256:4ae44c42281f42e3b06b988e442d344a5b9b72450ff3c892e30d11b29a96a57c", size = 10478185, upload-time = "2026-03-05T20:06:29.093Z" },
+ { url = "https://files.pythonhosted.org/packages/44/ed/e81dd668547da281e5dce710cf0bc60193f8d3d43833e8241d006720e42b/ruff-0.15.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6edd3792d408ebcf61adabc01822da687579a1a023f297618ac27a5b51ef0080", size = 10859201, upload-time = "2026-03-05T20:06:32.632Z" },
+ { url = "https://files.pythonhosted.org/packages/c4/8f/533075f00aaf19b07c5cd6aa6e5d89424b06b3b3f4583bfa9c640a079059/ruff-0.15.5-py3-none-macosx_11_0_arm64.whl", hash = "sha256:89f463f7c8205a9f8dea9d658d59eff49db05f88f89cc3047fb1a02d9f344010", size = 10184752, upload-time = "2026-03-05T20:06:40.312Z" },
+ { url = "https://files.pythonhosted.org/packages/66/0e/ba49e2c3fa0395b3152bad634c7432f7edfc509c133b8f4529053ff024fb/ruff-0.15.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ba786a8295c6574c1116704cf0b9e6563de3432ac888d8f83685654fe528fd65", size = 10534857, upload-time = "2026-03-05T20:06:19.581Z" },
+ { url = "https://files.pythonhosted.org/packages/59/71/39234440f27a226475a0659561adb0d784b4d247dfe7f43ffc12dd02e288/ruff-0.15.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:fd4b801e57955fe9f02b31d20375ab3a5c4415f2e5105b79fb94cf2642c91440", size = 10309120, upload-time = "2026-03-05T20:06:00.435Z" },
+ { url = "https://files.pythonhosted.org/packages/f5/87/4140aa86a93df032156982b726f4952aaec4a883bb98cb6ef73c347da253/ruff-0.15.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:391f7c73388f3d8c11b794dbbc2959a5b5afe66642c142a6effa90b45f6f5204", size = 11047428, upload-time = "2026-03-05T20:05:51.867Z" },
+ { url = "https://files.pythonhosted.org/packages/5a/f7/4953e7e3287676f78fbe85e3a0ca414c5ca81237b7575bdadc00229ac240/ruff-0.15.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8dc18f30302e379fe1e998548b0f5e9f4dff907f52f73ad6da419ea9c19d66c8", size = 11914251, upload-time = "2026-03-05T20:06:22.887Z" },
+ { url = "https://files.pythonhosted.org/packages/77/46/0f7c865c10cf896ccf5a939c3e84e1cfaeed608ff5249584799a74d33835/ruff-0.15.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1cc6e7f90087e2d27f98dc34ed1b3ab7c8f0d273cc5431415454e22c0bd2a681", size = 11333801, upload-time = "2026-03-05T20:05:57.168Z" },
+ { url = "https://files.pythonhosted.org/packages/d3/01/a10fe54b653061585e655f5286c2662ebddb68831ed3eaebfb0eb08c0a16/ruff-0.15.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c1cb7169f53c1ddb06e71a9aebd7e98fc0fea936b39afb36d8e86d36ecc2636a", size = 11206821, upload-time = "2026-03-05T20:06:03.441Z" },
+ { url = "https://files.pythonhosted.org/packages/7a/0d/2132ceaf20c5e8699aa83da2706ecb5c5dcdf78b453f77edca7fb70f8a93/ruff-0.15.5-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:9b037924500a31ee17389b5c8c4d88874cc6ea8e42f12e9c61a3d754ff72f1ca", size = 11133326, upload-time = "2026-03-05T20:06:25.655Z" },
+ { url = "https://files.pythonhosted.org/packages/72/cb/2e5259a7eb2a0f87c08c0fe5bf5825a1e4b90883a52685524596bfc93072/ruff-0.15.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:65bb414e5b4eadd95a8c1e4804f6772bbe8995889f203a01f77ddf2d790929dd", size = 10510820, upload-time = "2026-03-05T20:06:37.79Z" },
+ { url = "https://files.pythonhosted.org/packages/ff/20/b67ce78f9e6c59ffbdb5b4503d0090e749b5f2d31b599b554698a80d861c/ruff-0.15.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:d20aa469ae3b57033519c559e9bc9cd9e782842e39be05b50e852c7c981fa01d", size = 10302395, upload-time = "2026-03-05T20:05:54.504Z" },
+ { url = "https://files.pythonhosted.org/packages/5f/e5/719f1acccd31b720d477751558ed74e9c88134adcc377e5e886af89d3072/ruff-0.15.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:15388dd28c9161cdb8eda68993533acc870aa4e646a0a277aa166de9ad5a8752", size = 10754069, upload-time = "2026-03-05T20:06:06.422Z" },
+ { url = "https://files.pythonhosted.org/packages/c3/9c/d1db14469e32d98f3ca27079dbd30b7b44dbb5317d06ab36718dee3baf03/ruff-0.15.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:b30da330cbd03bed0c21420b6b953158f60c74c54c5f4c1dabbdf3a57bf355d2", size = 11304315, upload-time = "2026-03-05T20:06:10.867Z" },
+ { url = "https://files.pythonhosted.org/packages/28/3a/950367aee7c69027f4f422059227b290ed780366b6aecee5de5039d50fa8/ruff-0.15.5-py3-none-win32.whl", hash = "sha256:732e5ee1f98ba5b3679029989a06ca39a950cced52143a0ea82a2102cb592b74", size = 10551676, upload-time = "2026-03-05T20:06:13.705Z" },
+ { url = "https://files.pythonhosted.org/packages/b8/00/bf077a505b4e649bdd3c47ff8ec967735ce2544c8e4a43aba42ee9bf935d/ruff-0.15.5-py3-none-win_amd64.whl", hash = "sha256:821d41c5fa9e19117616c35eaa3f4b75046ec76c65e7ae20a333e9a8696bc7fe", size = 11678972, upload-time = "2026-03-05T20:06:45.379Z" },
+ { url = "https://files.pythonhosted.org/packages/fe/4e/cd76eca6db6115604b7626668e891c9dd03330384082e33662fb0f113614/ruff-0.15.5-py3-none-win_arm64.whl", hash = "sha256:b498d1c60d2fe5c10c45ec3f698901065772730b411f164ae270bb6bfcc4740b", size = 10965572, upload-time = "2026-03-05T20:06:16.984Z" },
]
[[package]]
diff --git a/zensical.toml b/zensical.toml
index b00cd2b..5a07e64 100644
--- a/zensical.toml
+++ b/zensical.toml
@@ -18,12 +18,12 @@ nav = [
] },
{ "Usage" = [
{"Using AIMBAT" = "usage/index.md"},
- {"CLI" = "usage/cli.md"},
+ {"Command line" = "usage/cli.md"},
{"Shell" = "usage/shell.md"},
- {"TUI" = "usage/tui.md"},
- {"GUI" = "usage/gui.md"},
- {"API" = "usage/api.md"},
- {"Defaults" = "usage/defaults.md"},
+ {"Terminal UI" = "usage/tui.md"},
+ {"Graphical UI" = "usage/gui.md"},
+ {"Python API" = "usage/api.md"},
+ {"Aimbat Defaults" = "usage/defaults.md"},
] },
{ "API reference" = [
"api/aimbat.md",