Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 140 additions & 73 deletions R/frs_habitat.R
Original file line number Diff line number Diff line change
Expand Up @@ -1198,28 +1198,25 @@ frs_habitat_species <- function(conn, species_code, base_tbl, breaks,
if (is.na(fp$cluster_spawn_bridge_distance)) 3000 else
fp$cluster_spawn_bridge_distance

# Detect lake-connected rearing: use two-phase approach
# Detect waterbody-connected rearing: use two-phase approach
rear_rules <- ps[["rules"]][["rear"]]
has_lake_rear <- length(rear_rules) > 0 && any(vapply(
rear_rules,
function(r) identical(r[["waterbody_type"]], "L"),
logical(1)))

if (has_lake_rear) {
# Extract lake_ha_min from the first lake rearing rule
lhm <- 200 # default
for (rr in rear_rules) {
if (identical(rr[["waterbody_type"]], "L") &&
!is.null(rr[["lake_ha_min"]])) {
lhm <- rr[["lake_ha_min"]]
break
}
wb_type <- NULL
wb_ha_min <- 200
for (rr in rear_rules) {
wt <- rr[["waterbody_type"]]
if (!is.null(wt) && wt %in% c("L", "W")) {
wb_type <- wt
if (!is.null(rr[["lake_ha_min"]])) wb_ha_min <- rr[["lake_ha_min"]]
break
}
# Two-phase: downstream trace + upstream lake proximity
.frs_connected_spawning(conn, table, habitat,
species = sp, bridge_gradient = bg,
distance_max = bd, lake_ha_min = lhm,
verbose = verbose)
}

if (!is.null(wb_type)) {
wb_poly <- .frs_waterbody_tables(wb_type)
.frs_connected_waterbody(conn, table, habitat,
species = sp, waterbody_poly = wb_poly,
waterbody_ha_min = wb_ha_min, bridge_gradient = bg,
distance_max = bd, verbose = verbose)
} else {
# Generic cluster approach for non-lake rearing
dir <- if (is.na(fp$cluster_spawn_direction)) "both" else
Expand Down Expand Up @@ -1347,80 +1344,152 @@ frs_habitat_species <- function(conn, species_code, base_tbl, breaks,
#' (`fwa_lakes_poly` with `area_ha >= lake_ha_min`).
#'
#' @noRd
.frs_connected_spawning <- function(conn, table, habitat,
species, bridge_gradient = 0.05,
distance_max = 3000,
lake_ha_min = 200,
verbose = TRUE) {
sp_quoted <- .frs_quote_string(species)
bg <- .frs_sql_num(bridge_gradient)
#' Trace downstream from origins with distance cap and gradient stop
#'
#' Given a set of origin points (waterbody outlets), trace downstream via
#' `fwa_downstreamtrace`, accumulate distance per origin group, cap at
#' `distance_max`, and stop at the first segment exceeding `gradient_max`.
#' Returns qualifying `linear_feature_id`s into a target table.
#'
#' @param conn DBI connection.
#' @param origins_sql Character. SQL that produces columns: `origin_id`
#' (grouping key), `blue_line_key`, `downstream_route_measure`.
#' @param target Character. Table to INSERT `linear_feature_id` results into.
#' @param distance_max Numeric. Maximum cumulative trace distance (metres).
#' @param gradient_max Numeric. Gradient threshold — trace stops at the first
#' segment exceeding this value.
#' @noRd
.frs_trace_downstream <- function(conn, origins_sql, target,
distance_max, gradient_max) {
dm <- .frs_sql_num(distance_max)
lhm <- .frs_sql_num(lake_ha_min)

n_before <- 0L
if (verbose) {
n_before <- DBI::dbGetQuery(conn, sprintf(
"SELECT count(*) FILTER (WHERE spawning)::int AS n FROM %s
WHERE species_code = %s", habitat, sp_quoted))$n
}
gm <- .frs_sql_num(gradient_max)

# Build a temp table of qualifying segment IDs from both phases.
# Then set spawning = FALSE for segments NOT in the temp table.
# This preserves spawn thresholds from frs_habitat_classify().
qual_tbl <- sprintf("pg_temp.frs_qual_spawn_%s", tolower(species))
.frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", qual_tbl))
.frs_db_execute(conn, sprintf("CREATE TEMP TABLE %s (id_segment integer)",
qual_tbl))

# Phase 1: Downstream — trace from rearing lake outlets,
# cap at distance_max, stop at first gradient > bridge_gradient
.frs_db_execute(conn, sprintf(
"INSERT INTO %s (id_segment)
WITH lake_outlets AS (
SELECT DISTINCT ON (s2.waterbody_key)
s2.blue_line_key, s2.downstream_route_measure
FROM %s s2
INNER JOIN %s hr ON s2.id_segment = hr.id_segment
WHERE hr.species_code = %s AND hr.rearing IS TRUE
ORDER BY s2.waterbody_key, s2.downstream_route_measure ASC
),
"INSERT INTO %s (linear_feature_id)
WITH origins AS (%s),
downstream AS (
SELECT lo.blue_line_key AS lake_blk,
SELECT o.origin_id,
t.linear_feature_id, t.gradient, t.wscode,
t.downstream_route_measure,
-t.length_metre + SUM(t.length_metre) OVER (
PARTITION BY lo.blue_line_key
PARTITION BY o.origin_id
ORDER BY t.wscode DESC, t.downstream_route_measure DESC
) AS dist_to_lake
FROM lake_outlets lo
) AS dist_to_origin
FROM origins o
CROSS JOIN LATERAL whse_basemapping.fwa_downstreamtrace(
lo.blue_line_key, lo.downstream_route_measure) t
o.blue_line_key, o.downstream_route_measure) t
WHERE t.blue_line_key = t.watershed_key
),
downstream_capped AS (
SELECT row_number() OVER (
PARTITION BY lake_blk
PARTITION BY origin_id
ORDER BY wscode DESC, downstream_route_measure DESC
) AS rn, *
FROM downstream WHERE dist_to_lake < %s
FROM downstream WHERE dist_to_origin < %s
),
nearest_barrier AS (
SELECT DISTINCT ON (lake_blk) *
SELECT DISTINCT ON (origin_id) *
FROM downstream_capped WHERE gradient > %s
ORDER BY lake_blk, wscode DESC, downstream_route_measure DESC
ORDER BY origin_id, wscode DESC, downstream_route_measure DESC
),
valid_downstream AS (
SELECT d.linear_feature_id FROM downstream_capped d
LEFT JOIN nearest_barrier nb ON d.lake_blk = nb.lake_blk
LEFT JOIN nearest_barrier nb ON d.origin_id = nb.origin_id
WHERE nb.rn IS NULL OR d.rn < nb.rn
)
SELECT DISTINCT linear_feature_id FROM valid_downstream",
target, origins_sql, dm, gm))
}


#' Filter spawning to segments connected to a qualifying waterbody
#'
#' Two-phase connectivity check for species whose spawning depends on
#' proximity to a waterbody (e.g. SK/KO lake-connected spawning):
#'
#' Phase 1: Trace downstream from waterbody outlets, capped at
#' `distance_max`, stopped at first gradient exceeding `bridge_gradient`.
#'
#' Phase 2: Find spawn-eligible segments upstream of rearing, cluster
#' spatially, keep clusters within 2m of a qualifying waterbody polygon.
#'
#' Subtractive: sets `spawning = FALSE` for segments not found in either
#' phase, preserving threshold classification from `frs_habitat_classify()`.
#'
#' @param conn DBI connection.
#' @param table Character. Schema-qualified streams table.
#' @param habitat Character. Schema-qualified habitat table.
#' @param species Character. Species code.
#' @param waterbody_poly Character vector. Schema-qualified waterbody polygon
#' table(s) (e.g. `c("whse_basemapping.fwa_lakes_poly",
#' "whse_basemapping.fwa_manmade_waterbodies_poly")`). Multiple tables are
#' checked via UNION ALL — a cluster near ANY qualifying polygon qualifies.
#' @param waterbody_ha_min Numeric. Minimum area (ha) for qualifying
#' waterbody polygons.
#' @param bridge_gradient Numeric. Gradient threshold for downstream trace
#' barrier detection.
#' @param distance_max Numeric. Maximum downstream trace distance (metres).
#' @param verbose Logical. Print before/after counts.
#' @noRd
.frs_connected_waterbody <- function(conn, table, habitat,
species, waterbody_poly,
waterbody_ha_min = 200,
bridge_gradient = 0.05,
distance_max = 3000,
verbose = TRUE) {
sp_quoted <- .frs_quote_string(species)
lhm <- .frs_sql_num(waterbody_ha_min)

n_before <- 0L
if (verbose) {
n_before <- DBI::dbGetQuery(conn, sprintf(
"SELECT count(*) FILTER (WHERE spawning)::int AS n FROM %s
WHERE species_code = %s", habitat, sp_quoted))$n
}

# Temp table for qualifying segment IDs from both phases
qual_tbl <- sprintf("pg_temp.frs_qual_spawn_%s", tolower(species))
lfid_tbl <- sprintf("pg_temp.frs_trace_lfid_%s", tolower(species))
.frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", qual_tbl))
.frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", lfid_tbl))
.frs_db_execute(conn, sprintf("CREATE TEMP TABLE %s (id_segment integer)",
qual_tbl))
.frs_db_execute(conn, sprintf(
"CREATE TEMP TABLE %s (linear_feature_id bigint)", lfid_tbl))

# Phase 1: Downstream trace from waterbody outlets
origins_sql <- sprintf(
"SELECT DISTINCT ON (s2.waterbody_key)
s2.waterbody_key AS origin_id,
s2.blue_line_key, s2.downstream_route_measure
FROM %s s2
INNER JOIN %s hr ON s2.id_segment = hr.id_segment
WHERE hr.species_code = %s AND hr.rearing IS TRUE
ORDER BY s2.waterbody_key, s2.wscode_ltree, s2.localcode_ltree,
s2.downstream_route_measure",
table, habitat, sp_quoted)

.frs_trace_downstream(conn, origins_sql, lfid_tbl,
distance_max, bridge_gradient)

# Map traced linear_feature_ids back to id_segments
.frs_db_execute(conn, sprintf(
"INSERT INTO %s (id_segment)
SELECT seg.id_segment FROM %s seg
WHERE seg.linear_feature_id IN (
SELECT linear_feature_id FROM valid_downstream)",
qual_tbl, table, habitat, sp_quoted, dm, bg, table))
WHERE seg.linear_feature_id IN (SELECT linear_feature_id FROM %s)",
qual_tbl, table, lfid_tbl))
.frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", lfid_tbl))

# Phase 2: Upstream — spawn-eligible segments upstream of rearing,
# clustered, kept only if cluster touches qualifying lake polygon
# clustered, kept only if cluster touches qualifying waterbody polygon
# Build EXISTS clause that checks all waterbody polygon tables (UNION ALL)
wb_exists <- paste(vapply(waterbody_poly, function(wt) {
sprintf(
"EXISTS (SELECT 1 FROM %s lp
WHERE lp.area_ha >= %s AND ST_DWithin(cg.geom, lp.geom, 2))",
wt, lhm)
}, character(1)), collapse = " OR ")

.frs_db_execute(conn, sprintf(
"INSERT INTO %s (id_segment)
WITH rearing_segs AS (
Expand Down Expand Up @@ -1455,14 +1524,12 @@ frs_habitat_species <- function(conn, species_code, base_tbl, breaks,
),
valid_clusters AS (
SELECT cg.cluster_id FROM cluster_geoms cg
WHERE EXISTS (
SELECT 1 FROM whse_basemapping.fwa_lakes_poly lp
WHERE lp.area_ha >= %s AND ST_DWithin(cg.geom, lp.geom, 2))
WHERE %s
)
SELECT c.id_segment FROM clustered c
WHERE c.cluster_id IN (SELECT cluster_id FROM valid_clusters)",
qual_tbl, table, habitat, sp_quoted,
table, habitat, sp_quoted, lhm))
table, habitat, sp_quoted, wb_exists))

# Subtractive: remove spawning NOT found in either phase
.frs_db_execute(conn, sprintf(
Expand All @@ -1477,7 +1544,7 @@ frs_habitat_species <- function(conn, species_code, base_tbl, breaks,
n_after <- DBI::dbGetQuery(conn, sprintf(
"SELECT count(*) FILTER (WHERE spawning)::int AS n FROM %s
WHERE species_code = %s", habitat, sp_quoted))$n
cat(" ", species, ": connected spawning ",
cat(" ", species, ": connected waterbody ",
n_before, " -> ", n_after, "\n", sep = "")
}
}
42 changes: 31 additions & 11 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,27 @@
}


#' Resolve waterbody type to FWA polygon table(s)
#'
#' Maps a single-character waterbody type code to the FWA polygon table(s)
#' that contain those features. `"L"` returns both natural lakes and
#' manmade waterbodies (reservoirs) — the FWA splits them by digitization
#' origin, not ecology.
#'
#' @param type Character. One of `"L"` (lakes + reservoirs), `"R"` (rivers),
#' `"W"` (wetlands).
#' @return Character vector of schema-qualified table names.
#' @noRd
.frs_waterbody_tables <- function(type) {
switch(type,
"L" = c("whse_basemapping.fwa_lakes_poly",
"whse_basemapping.fwa_manmade_waterbodies_poly"),
"R" = "whse_basemapping.fwa_rivers_poly",
"W" = "whse_basemapping.fwa_wetlands_poly",
stop("Unknown waterbody_type: ", type))
}


#' Convert a single rule to a SQL AND predicate
#'
#' Translates one habitat rule (a list of predicates) into a
Expand Down Expand Up @@ -211,19 +232,18 @@
}

if (!is.null(rule[["waterbody_type"]])) {
wb_table <- switch(rule[["waterbody_type"]],
"L" = "whse_basemapping.fwa_lakes_poly",
"R" = "whse_basemapping.fwa_rivers_poly",
"W" = "whse_basemapping.fwa_wetlands_poly")
wb_tables <- .frs_waterbody_tables(rule[["waterbody_type"]])
if (!is.null(rule[["lake_ha_min"]])) {
# Already validated as L by parser
parts <- c(parts, sprintf(
"s.waterbody_key IN (SELECT waterbody_key FROM %s WHERE area_ha >= %s)",
wb_table, .frs_sql_num(rule[["lake_ha_min"]])))
ha_sql <- .frs_sql_num(rule[["lake_ha_min"]])
wb_sql <- paste(vapply(wb_tables, function(wt) {
sprintf("SELECT waterbody_key FROM %s WHERE area_ha >= %s", wt, ha_sql)
}, character(1)), collapse = " UNION ALL ")
parts <- c(parts, sprintf("s.waterbody_key IN (%s)", wb_sql))
} else {
parts <- c(parts, sprintf(
"s.waterbody_key IN (SELECT waterbody_key FROM %s)",
wb_table))
wb_sql <- paste(vapply(wb_tables, function(wt) {
sprintf("SELECT waterbody_key FROM %s", wt)
}, character(1)), collapse = " UNION ALL ")
parts <- c(parts, sprintf("s.waterbody_key IN (%s)", wb_sql))
}
}

Expand Down
29 changes: 29 additions & 0 deletions planning/active/findings.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Findings

## Bug 1: lake_outlets picks wrong tributary
`DISTINCT ON (waterbody_key)` with `ORDER BY waterbody_key, downstream_route_measure ASC` picks the segment with the smallest DRM on any BLK. For multi-BLK waterbodies, DRM is relative to each BLK — comparing across BLKs is meaningless. Need `ORDER BY waterbody_key, wscode_ltree, localcode_ltree, downstream_route_measure` to pick the network-topological outlet.

**Example**: waterbody_key 329064462 (BULK) spans 10 BLKs. DRM ordering picks BLK 360504780 DRM 0 (wrong tributary). wscode ordering picks BLK 360846413 DRM 9718 (actual outlet).

## Bug 2: cumulative distance partitions by BLK not waterbody
`PARTITION BY lo.blue_line_key` in the window function means the cumulative distance resets per-BLK. For a lake with one outlet but downstream traces crossing BLKs, this is wrong — distance should accumulate per waterbody (per lake trace). Same issue in `downstream_capped` and `nearest_barrier` partitions.

## Original two-phase design (from PR #148)
- Phase 1: downstream trace from lake outlets via `fwa_downstreamtrace`, capped at `connected_distance_max`, stopped at first gradient > `bridge_gradient`
- Phase 2: upstream cluster via `fwa_upstream` + `ST_ClusterDBSCAN` + `ST_DWithin` against lake polygons
- Detection: dispatched when rearing rules include `waterbody_type: L`
- Subtractive approach preserves spawn threshold classification
- `lake_ha_min` read from rearing rules, not hardcoded

## Key lesson from original implementation
Two fundamentally different algorithms (downstream trace vs upstream cluster) can't be replicated by a single generic `frs_cluster` call. The downstream phase uses `fwa_downstreamtrace` with cumulative distance; the upstream phase uses network traversal + spatial clustering + lake polygon proximity.

## Refactor: decompose into reusable pieces
- `.frs_trace_downstream(conn, origins_sql, target, distance_max, gradient_max)` — generic downstream trace. Takes any origins SQL producing `(origin_id, blue_line_key, downstream_route_measure)`. Returns `linear_feature_id`s into a target table. Reusable for any downstream trace use case (waterbody outlets, barrier impacts, reach delineation).
- `.frs_connected_waterbody()` replaces `.frs_connected_spawning()` — name reflects mechanism (waterbody connection) not use-case (spawning). Takes `waterbody_poly` as a character vector so multiple FWA tables can be checked.
- Caller in `.frs_run_connectivity()` detects `waterbody_type` from rearing rules generically — not hardcoded to `L`. Supports `L` and `W`; `R` (rivers) excluded since river polygons don't have area thresholds in the same way.

## Reservoirs are lakes (FWA digitization artifact)
`fwa_lakes_poly` = natural lakes, `fwa_manmade_waterbodies_poly` = reservoirs. The split is digitization origin, not ecology — a 200ha reservoir functions identically to a 200ha lake for fish rearing. bcfishpass checks BOTH in `load_habitat_linear_sk.sql` lines 217-240 (UNION of lakes + reservoirs in `clusters_near_rearing`).

Resolution: `.frs_waterbody_tables("L")` returns both table names. One helper, used by both `.frs_rule_to_sql()` (rearing classification) and `.frs_connected_waterbody()` (upstream cluster proximity). The rules YAML stays `waterbody_type: L` — users don't need to know about FWA table structure.
9 changes: 9 additions & 0 deletions planning/active/progress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Progress

## Session 2026-04-14
- Created branch `147-sk-outlet-ordering` from main (10cd96b)
- Read issue #147: two bugs in `.frs_connected_spawning` — outlet ordering and partition key
- Phase 1 complete: fixed outlet ordering + partition key (8f9544d)
- Code-check clean, 696 tests pass
- Phase 2: extracted `.frs_trace_downstream()`, renamed to `.frs_connected_waterbody()`
- Tests pending
22 changes: 22 additions & 0 deletions planning/active/task_plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Task Plan: Issue #147 — SK outlet ordering fix + refactor

## Phase 1: Fix lake outlet ordering + partition
- [x] Fix `ORDER BY` in `lake_outlets` CTE: add `wscode_ltree, localcode_ltree` before `downstream_route_measure`
- [x] Add `waterbody_key` to `lake_outlets` SELECT list so it's available downstream
- [x] Fix `PARTITION BY` in `downstream` CTE: change `lo.blue_line_key` to `lo.waterbody_key`
- [x] Fix all downstream CTEs: rename `lake_blk` to `lake_wbk`, partition/join on `waterbody_key`
- [x] Commit + code-check

## Phase 2: Extract reusable trace + rename to mechanism
- [x] Extract `.frs_trace_downstream()` — generic downstream trace with distance cap + gradient stop
- [x] Rename `.frs_connected_spawning()` to `.frs_connected_waterbody()` — mechanism not use-case
- [x] Parameterize `waterbody_poly` — dispatch from `waterbody_type` in rules (L, W)
- [x] Caller detects waterbody type generically, not hardcoded to lakes
- [x] Commit + code-check

## Phase 3: Reservoir support via shared helper
- [x] Add `.frs_waterbody_tables(type)` helper — L resolves to lakes + reservoirs
- [x] Update `.frs_rule_to_sql()` to use helper (UNION ALL for multi-table types)
- [x] Update `.frs_connected_waterbody()` Phase 2 to accept vector of poly tables
- [x] Update caller to use helper instead of hardcoded switch
- [x] Commit + code-check
Loading