diff --git a/R/frs_habitat.R b/R/frs_habitat.R index 83c7d74..069cd7b 100644 --- a/R/frs_habitat.R +++ b/R/frs_habitat.R @@ -1198,28 +1198,25 @@ frs_habitat_species <- function(conn, species_code, base_tbl, breaks, if (is.na(fp$cluster_spawn_bridge_distance)) 3000 else fp$cluster_spawn_bridge_distance - # Detect lake-connected rearing: use two-phase approach + # Detect waterbody-connected rearing: use two-phase approach rear_rules <- ps[["rules"]][["rear"]] - has_lake_rear <- length(rear_rules) > 0 && any(vapply( - rear_rules, - function(r) identical(r[["waterbody_type"]], "L"), - logical(1))) - - if (has_lake_rear) { - # Extract lake_ha_min from the first lake rearing rule - lhm <- 200 # default - for (rr in rear_rules) { - if (identical(rr[["waterbody_type"]], "L") && - !is.null(rr[["lake_ha_min"]])) { - lhm <- rr[["lake_ha_min"]] - break - } + wb_type <- NULL + wb_ha_min <- 200 + for (rr in rear_rules) { + wt <- rr[["waterbody_type"]] + if (!is.null(wt) && wt %in% c("L", "W")) { + wb_type <- wt + if (!is.null(rr[["lake_ha_min"]])) wb_ha_min <- rr[["lake_ha_min"]] + break } - # Two-phase: downstream trace + upstream lake proximity - .frs_connected_spawning(conn, table, habitat, - species = sp, bridge_gradient = bg, - distance_max = bd, lake_ha_min = lhm, - verbose = verbose) + } + + if (!is.null(wb_type)) { + wb_poly <- .frs_waterbody_tables(wb_type) + .frs_connected_waterbody(conn, table, habitat, + species = sp, waterbody_poly = wb_poly, + waterbody_ha_min = wb_ha_min, bridge_gradient = bg, + distance_max = bd, verbose = verbose) } else { # Generic cluster approach for non-lake rearing dir <- if (is.na(fp$cluster_spawn_direction)) "both" else @@ -1347,80 +1344,152 @@ frs_habitat_species <- function(conn, species_code, base_tbl, breaks, #' (`fwa_lakes_poly` with `area_ha >= lake_ha_min`). #' #' @noRd -.frs_connected_spawning <- function(conn, table, habitat, - species, bridge_gradient = 0.05, - distance_max = 3000, - lake_ha_min = 200, - verbose = TRUE) { - sp_quoted <- .frs_quote_string(species) - bg <- .frs_sql_num(bridge_gradient) +#' Trace downstream from origins with distance cap and gradient stop +#' +#' Given a set of origin points (waterbody outlets), trace downstream via +#' `fwa_downstreamtrace`, accumulate distance per origin group, cap at +#' `distance_max`, and stop at the first segment exceeding `gradient_max`. +#' Returns qualifying `linear_feature_id`s into a target table. +#' +#' @param conn DBI connection. +#' @param origins_sql Character. SQL that produces columns: `origin_id` +#' (grouping key), `blue_line_key`, `downstream_route_measure`. +#' @param target Character. Table to INSERT `linear_feature_id` results into. +#' @param distance_max Numeric. Maximum cumulative trace distance (metres). +#' @param gradient_max Numeric. Gradient threshold — trace stops at the first +#' segment exceeding this value. +#' @noRd +.frs_trace_downstream <- function(conn, origins_sql, target, + distance_max, gradient_max) { dm <- .frs_sql_num(distance_max) - lhm <- .frs_sql_num(lake_ha_min) - - n_before <- 0L - if (verbose) { - n_before <- DBI::dbGetQuery(conn, sprintf( - "SELECT count(*) FILTER (WHERE spawning)::int AS n FROM %s - WHERE species_code = %s", habitat, sp_quoted))$n - } + gm <- .frs_sql_num(gradient_max) - # Build a temp table of qualifying segment IDs from both phases. - # Then set spawning = FALSE for segments NOT in the temp table. - # This preserves spawn thresholds from frs_habitat_classify(). - qual_tbl <- sprintf("pg_temp.frs_qual_spawn_%s", tolower(species)) - .frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", qual_tbl)) - .frs_db_execute(conn, sprintf("CREATE TEMP TABLE %s (id_segment integer)", - qual_tbl)) - - # Phase 1: Downstream — trace from rearing lake outlets, - # cap at distance_max, stop at first gradient > bridge_gradient .frs_db_execute(conn, sprintf( - "INSERT INTO %s (id_segment) - WITH lake_outlets AS ( - SELECT DISTINCT ON (s2.waterbody_key) - s2.blue_line_key, s2.downstream_route_measure - FROM %s s2 - INNER JOIN %s hr ON s2.id_segment = hr.id_segment - WHERE hr.species_code = %s AND hr.rearing IS TRUE - ORDER BY s2.waterbody_key, s2.downstream_route_measure ASC - ), + "INSERT INTO %s (linear_feature_id) + WITH origins AS (%s), downstream AS ( - SELECT lo.blue_line_key AS lake_blk, + SELECT o.origin_id, t.linear_feature_id, t.gradient, t.wscode, t.downstream_route_measure, -t.length_metre + SUM(t.length_metre) OVER ( - PARTITION BY lo.blue_line_key + PARTITION BY o.origin_id ORDER BY t.wscode DESC, t.downstream_route_measure DESC - ) AS dist_to_lake - FROM lake_outlets lo + ) AS dist_to_origin + FROM origins o CROSS JOIN LATERAL whse_basemapping.fwa_downstreamtrace( - lo.blue_line_key, lo.downstream_route_measure) t + o.blue_line_key, o.downstream_route_measure) t WHERE t.blue_line_key = t.watershed_key ), downstream_capped AS ( SELECT row_number() OVER ( - PARTITION BY lake_blk + PARTITION BY origin_id ORDER BY wscode DESC, downstream_route_measure DESC ) AS rn, * - FROM downstream WHERE dist_to_lake < %s + FROM downstream WHERE dist_to_origin < %s ), nearest_barrier AS ( - SELECT DISTINCT ON (lake_blk) * + SELECT DISTINCT ON (origin_id) * FROM downstream_capped WHERE gradient > %s - ORDER BY lake_blk, wscode DESC, downstream_route_measure DESC + ORDER BY origin_id, wscode DESC, downstream_route_measure DESC ), valid_downstream AS ( SELECT d.linear_feature_id FROM downstream_capped d - LEFT JOIN nearest_barrier nb ON d.lake_blk = nb.lake_blk + LEFT JOIN nearest_barrier nb ON d.origin_id = nb.origin_id WHERE nb.rn IS NULL OR d.rn < nb.rn ) + SELECT DISTINCT linear_feature_id FROM valid_downstream", + target, origins_sql, dm, gm)) +} + + +#' Filter spawning to segments connected to a qualifying waterbody +#' +#' Two-phase connectivity check for species whose spawning depends on +#' proximity to a waterbody (e.g. SK/KO lake-connected spawning): +#' +#' Phase 1: Trace downstream from waterbody outlets, capped at +#' `distance_max`, stopped at first gradient exceeding `bridge_gradient`. +#' +#' Phase 2: Find spawn-eligible segments upstream of rearing, cluster +#' spatially, keep clusters within 2m of a qualifying waterbody polygon. +#' +#' Subtractive: sets `spawning = FALSE` for segments not found in either +#' phase, preserving threshold classification from `frs_habitat_classify()`. +#' +#' @param conn DBI connection. +#' @param table Character. Schema-qualified streams table. +#' @param habitat Character. Schema-qualified habitat table. +#' @param species Character. Species code. +#' @param waterbody_poly Character vector. Schema-qualified waterbody polygon +#' table(s) (e.g. `c("whse_basemapping.fwa_lakes_poly", +#' "whse_basemapping.fwa_manmade_waterbodies_poly")`). Multiple tables are +#' checked via UNION ALL — a cluster near ANY qualifying polygon qualifies. +#' @param waterbody_ha_min Numeric. Minimum area (ha) for qualifying +#' waterbody polygons. +#' @param bridge_gradient Numeric. Gradient threshold for downstream trace +#' barrier detection. +#' @param distance_max Numeric. Maximum downstream trace distance (metres). +#' @param verbose Logical. Print before/after counts. +#' @noRd +.frs_connected_waterbody <- function(conn, table, habitat, + species, waterbody_poly, + waterbody_ha_min = 200, + bridge_gradient = 0.05, + distance_max = 3000, + verbose = TRUE) { + sp_quoted <- .frs_quote_string(species) + lhm <- .frs_sql_num(waterbody_ha_min) + + n_before <- 0L + if (verbose) { + n_before <- DBI::dbGetQuery(conn, sprintf( + "SELECT count(*) FILTER (WHERE spawning)::int AS n FROM %s + WHERE species_code = %s", habitat, sp_quoted))$n + } + + # Temp table for qualifying segment IDs from both phases + qual_tbl <- sprintf("pg_temp.frs_qual_spawn_%s", tolower(species)) + lfid_tbl <- sprintf("pg_temp.frs_trace_lfid_%s", tolower(species)) + .frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", qual_tbl)) + .frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", lfid_tbl)) + .frs_db_execute(conn, sprintf("CREATE TEMP TABLE %s (id_segment integer)", + qual_tbl)) + .frs_db_execute(conn, sprintf( + "CREATE TEMP TABLE %s (linear_feature_id bigint)", lfid_tbl)) + + # Phase 1: Downstream trace from waterbody outlets + origins_sql <- sprintf( + "SELECT DISTINCT ON (s2.waterbody_key) + s2.waterbody_key AS origin_id, + s2.blue_line_key, s2.downstream_route_measure + FROM %s s2 + INNER JOIN %s hr ON s2.id_segment = hr.id_segment + WHERE hr.species_code = %s AND hr.rearing IS TRUE + ORDER BY s2.waterbody_key, s2.wscode_ltree, s2.localcode_ltree, + s2.downstream_route_measure", + table, habitat, sp_quoted) + + .frs_trace_downstream(conn, origins_sql, lfid_tbl, + distance_max, bridge_gradient) + + # Map traced linear_feature_ids back to id_segments + .frs_db_execute(conn, sprintf( + "INSERT INTO %s (id_segment) SELECT seg.id_segment FROM %s seg - WHERE seg.linear_feature_id IN ( - SELECT linear_feature_id FROM valid_downstream)", - qual_tbl, table, habitat, sp_quoted, dm, bg, table)) + WHERE seg.linear_feature_id IN (SELECT linear_feature_id FROM %s)", + qual_tbl, table, lfid_tbl)) + .frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", lfid_tbl)) # Phase 2: Upstream — spawn-eligible segments upstream of rearing, - # clustered, kept only if cluster touches qualifying lake polygon + # clustered, kept only if cluster touches qualifying waterbody polygon + # Build EXISTS clause that checks all waterbody polygon tables (UNION ALL) + wb_exists <- paste(vapply(waterbody_poly, function(wt) { + sprintf( + "EXISTS (SELECT 1 FROM %s lp + WHERE lp.area_ha >= %s AND ST_DWithin(cg.geom, lp.geom, 2))", + wt, lhm) + }, character(1)), collapse = " OR ") + .frs_db_execute(conn, sprintf( "INSERT INTO %s (id_segment) WITH rearing_segs AS ( @@ -1455,14 +1524,12 @@ frs_habitat_species <- function(conn, species_code, base_tbl, breaks, ), valid_clusters AS ( SELECT cg.cluster_id FROM cluster_geoms cg - WHERE EXISTS ( - SELECT 1 FROM whse_basemapping.fwa_lakes_poly lp - WHERE lp.area_ha >= %s AND ST_DWithin(cg.geom, lp.geom, 2)) + WHERE %s ) SELECT c.id_segment FROM clustered c WHERE c.cluster_id IN (SELECT cluster_id FROM valid_clusters)", qual_tbl, table, habitat, sp_quoted, - table, habitat, sp_quoted, lhm)) + table, habitat, sp_quoted, wb_exists)) # Subtractive: remove spawning NOT found in either phase .frs_db_execute(conn, sprintf( @@ -1477,7 +1544,7 @@ frs_habitat_species <- function(conn, species_code, base_tbl, breaks, n_after <- DBI::dbGetQuery(conn, sprintf( "SELECT count(*) FILTER (WHERE spawning)::int AS n FROM %s WHERE species_code = %s", habitat, sp_quoted))$n - cat(" ", species, ": connected spawning ", + cat(" ", species, ": connected waterbody ", n_before, " -> ", n_after, "\n", sep = "") } } diff --git a/R/utils.R b/R/utils.R index f5d172f..d70d86a 100644 --- a/R/utils.R +++ b/R/utils.R @@ -155,6 +155,27 @@ } +#' Resolve waterbody type to FWA polygon table(s) +#' +#' Maps a single-character waterbody type code to the FWA polygon table(s) +#' that contain those features. `"L"` returns both natural lakes and +#' manmade waterbodies (reservoirs) — the FWA splits them by digitization +#' origin, not ecology. +#' +#' @param type Character. One of `"L"` (lakes + reservoirs), `"R"` (rivers), +#' `"W"` (wetlands). +#' @return Character vector of schema-qualified table names. +#' @noRd +.frs_waterbody_tables <- function(type) { + switch(type, + "L" = c("whse_basemapping.fwa_lakes_poly", + "whse_basemapping.fwa_manmade_waterbodies_poly"), + "R" = "whse_basemapping.fwa_rivers_poly", + "W" = "whse_basemapping.fwa_wetlands_poly", + stop("Unknown waterbody_type: ", type)) +} + + #' Convert a single rule to a SQL AND predicate #' #' Translates one habitat rule (a list of predicates) into a @@ -211,19 +232,18 @@ } if (!is.null(rule[["waterbody_type"]])) { - wb_table <- switch(rule[["waterbody_type"]], - "L" = "whse_basemapping.fwa_lakes_poly", - "R" = "whse_basemapping.fwa_rivers_poly", - "W" = "whse_basemapping.fwa_wetlands_poly") + wb_tables <- .frs_waterbody_tables(rule[["waterbody_type"]]) if (!is.null(rule[["lake_ha_min"]])) { - # Already validated as L by parser - parts <- c(parts, sprintf( - "s.waterbody_key IN (SELECT waterbody_key FROM %s WHERE area_ha >= %s)", - wb_table, .frs_sql_num(rule[["lake_ha_min"]]))) + ha_sql <- .frs_sql_num(rule[["lake_ha_min"]]) + wb_sql <- paste(vapply(wb_tables, function(wt) { + sprintf("SELECT waterbody_key FROM %s WHERE area_ha >= %s", wt, ha_sql) + }, character(1)), collapse = " UNION ALL ") + parts <- c(parts, sprintf("s.waterbody_key IN (%s)", wb_sql)) } else { - parts <- c(parts, sprintf( - "s.waterbody_key IN (SELECT waterbody_key FROM %s)", - wb_table)) + wb_sql <- paste(vapply(wb_tables, function(wt) { + sprintf("SELECT waterbody_key FROM %s", wt) + }, character(1)), collapse = " UNION ALL ") + parts <- c(parts, sprintf("s.waterbody_key IN (%s)", wb_sql)) } } diff --git a/planning/active/findings.md b/planning/active/findings.md new file mode 100644 index 0000000..b2469fd --- /dev/null +++ b/planning/active/findings.md @@ -0,0 +1,29 @@ +# Findings + +## Bug 1: lake_outlets picks wrong tributary +`DISTINCT ON (waterbody_key)` with `ORDER BY waterbody_key, downstream_route_measure ASC` picks the segment with the smallest DRM on any BLK. For multi-BLK waterbodies, DRM is relative to each BLK — comparing across BLKs is meaningless. Need `ORDER BY waterbody_key, wscode_ltree, localcode_ltree, downstream_route_measure` to pick the network-topological outlet. + +**Example**: waterbody_key 329064462 (BULK) spans 10 BLKs. DRM ordering picks BLK 360504780 DRM 0 (wrong tributary). wscode ordering picks BLK 360846413 DRM 9718 (actual outlet). + +## Bug 2: cumulative distance partitions by BLK not waterbody +`PARTITION BY lo.blue_line_key` in the window function means the cumulative distance resets per-BLK. For a lake with one outlet but downstream traces crossing BLKs, this is wrong — distance should accumulate per waterbody (per lake trace). Same issue in `downstream_capped` and `nearest_barrier` partitions. + +## Original two-phase design (from PR #148) +- Phase 1: downstream trace from lake outlets via `fwa_downstreamtrace`, capped at `connected_distance_max`, stopped at first gradient > `bridge_gradient` +- Phase 2: upstream cluster via `fwa_upstream` + `ST_ClusterDBSCAN` + `ST_DWithin` against lake polygons +- Detection: dispatched when rearing rules include `waterbody_type: L` +- Subtractive approach preserves spawn threshold classification +- `lake_ha_min` read from rearing rules, not hardcoded + +## Key lesson from original implementation +Two fundamentally different algorithms (downstream trace vs upstream cluster) can't be replicated by a single generic `frs_cluster` call. The downstream phase uses `fwa_downstreamtrace` with cumulative distance; the upstream phase uses network traversal + spatial clustering + lake polygon proximity. + +## Refactor: decompose into reusable pieces +- `.frs_trace_downstream(conn, origins_sql, target, distance_max, gradient_max)` — generic downstream trace. Takes any origins SQL producing `(origin_id, blue_line_key, downstream_route_measure)`. Returns `linear_feature_id`s into a target table. Reusable for any downstream trace use case (waterbody outlets, barrier impacts, reach delineation). +- `.frs_connected_waterbody()` replaces `.frs_connected_spawning()` — name reflects mechanism (waterbody connection) not use-case (spawning). Takes `waterbody_poly` as a character vector so multiple FWA tables can be checked. +- Caller in `.frs_run_connectivity()` detects `waterbody_type` from rearing rules generically — not hardcoded to `L`. Supports `L` and `W`; `R` (rivers) excluded since river polygons don't have area thresholds in the same way. + +## Reservoirs are lakes (FWA digitization artifact) +`fwa_lakes_poly` = natural lakes, `fwa_manmade_waterbodies_poly` = reservoirs. The split is digitization origin, not ecology — a 200ha reservoir functions identically to a 200ha lake for fish rearing. bcfishpass checks BOTH in `load_habitat_linear_sk.sql` lines 217-240 (UNION of lakes + reservoirs in `clusters_near_rearing`). + +Resolution: `.frs_waterbody_tables("L")` returns both table names. One helper, used by both `.frs_rule_to_sql()` (rearing classification) and `.frs_connected_waterbody()` (upstream cluster proximity). The rules YAML stays `waterbody_type: L` — users don't need to know about FWA table structure. diff --git a/planning/active/progress.md b/planning/active/progress.md new file mode 100644 index 0000000..cd769ab --- /dev/null +++ b/planning/active/progress.md @@ -0,0 +1,9 @@ +# Progress + +## Session 2026-04-14 +- Created branch `147-sk-outlet-ordering` from main (10cd96b) +- Read issue #147: two bugs in `.frs_connected_spawning` — outlet ordering and partition key +- Phase 1 complete: fixed outlet ordering + partition key (8f9544d) +- Code-check clean, 696 tests pass +- Phase 2: extracted `.frs_trace_downstream()`, renamed to `.frs_connected_waterbody()` +- Tests pending diff --git a/planning/active/task_plan.md b/planning/active/task_plan.md new file mode 100644 index 0000000..0b0d734 --- /dev/null +++ b/planning/active/task_plan.md @@ -0,0 +1,22 @@ +# Task Plan: Issue #147 — SK outlet ordering fix + refactor + +## Phase 1: Fix lake outlet ordering + partition +- [x] Fix `ORDER BY` in `lake_outlets` CTE: add `wscode_ltree, localcode_ltree` before `downstream_route_measure` +- [x] Add `waterbody_key` to `lake_outlets` SELECT list so it's available downstream +- [x] Fix `PARTITION BY` in `downstream` CTE: change `lo.blue_line_key` to `lo.waterbody_key` +- [x] Fix all downstream CTEs: rename `lake_blk` to `lake_wbk`, partition/join on `waterbody_key` +- [x] Commit + code-check + +## Phase 2: Extract reusable trace + rename to mechanism +- [x] Extract `.frs_trace_downstream()` — generic downstream trace with distance cap + gradient stop +- [x] Rename `.frs_connected_spawning()` to `.frs_connected_waterbody()` — mechanism not use-case +- [x] Parameterize `waterbody_poly` — dispatch from `waterbody_type` in rules (L, W) +- [x] Caller detects waterbody type generically, not hardcoded to lakes +- [x] Commit + code-check + +## Phase 3: Reservoir support via shared helper +- [x] Add `.frs_waterbody_tables(type)` helper — L resolves to lakes + reservoirs +- [x] Update `.frs_rule_to_sql()` to use helper (UNION ALL for multi-table types) +- [x] Update `.frs_connected_waterbody()` Phase 2 to accept vector of poly tables +- [x] Update caller to use helper instead of hardcoded switch +- [x] Commit + code-check