diff --git a/R/frs_cluster.R b/R/frs_cluster.R index 5f4cdc0..3f1d349 100644 --- a/R/frs_cluster.R +++ b/R/frs_cluster.R @@ -382,11 +382,21 @@ frs_cluster <- function(conn, table, habitat, } -#' Both-direction cluster connectivity check +#' Both-direction cluster connectivity check (three-phase) #' -#' Evaluates upstream and downstream independently on the original data, -#' then combines: a cluster is valid if connected in either direction. -#' Uses a temp table for clustering so both checks see the same clusters. +#' Phase 1: segments with BOTH `label_cluster` and `label_connect` TRUE +#' are always valid (on-spawning rearing). These are excluded from +#' clustering so they don't dilute cluster boundaries. +#' +#' Phase 2: upstream boolean — rearing clusters below spawning. Uses +#' `FWA_Upstream()` to check if spawning exists anywhere upstream. +#' +#' Phase 3: downstream trace — rearing clusters above spawning. Uses +#' `FWA_Downstream()` on the broken streams table (mainstem only) with +#' gradient bridge and distance cap. +#' +#' A cluster is valid if it passes Phase 2 OR Phase 3. Phase 1 segments +#' are always valid regardless of clustering. #' #' @noRd .frs_cluster_both <- function(conn, table, habitat, @@ -401,7 +411,19 @@ frs_cluster <- function(conn, table, habitat, tmp_clusters <- sprintf("pg_temp.frs_clusters_%s", gsub("[^a-z0-9]", "", tolower(species))) - # Create shared clustering temp table + # Phase 1: on-spawning segments — always valid, excluded from clustering + phase1_ids <- DBI::dbGetQuery(conn, sprintf( + "SELECT h.id_segment FROM %s h + WHERE h.species_code = %s AND h.%s IS TRUE AND h.%s IS TRUE", + habitat, sp_quoted, label_cluster, label_connect))$id_segment + + # Cluster remaining (non-phase1) segments + phase1_filter <- if (length(phase1_ids) > 0) { + sprintf("AND h.id_segment NOT IN (%s)", paste(phase1_ids, collapse = ", ")) + } else { + "" + } + .frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", tmp_clusters)) .frs_db_execute(conn, sprintf( "CREATE TEMP TABLE %s AS @@ -415,14 +437,14 @@ frs_cluster <- function(conn, table, habitat, FROM %s h INNER JOIN %s s ON h.id_segment = s.id_segment WHERE h.species_code = %s - AND h.%s IS TRUE", - tmp_clusters, habitat, table, sp_quoted, label_cluster)) + AND h.%s IS TRUE %s", + tmp_clusters, habitat, table, sp_quoted, label_cluster, phase1_filter)) .frs_db_execute(conn, sprintf( "CREATE INDEX ON %s (cluster_id)", tmp_clusters)) - # Upstream valid clusters — boolean check (spawning exists anywhere upstream) - sql_upstream <- sprintf( + # Phase 2: upstream boolean — rearing below spawning + sql_phase2 <- sprintf( "WITH cluster_minimums AS ( SELECT DISTINCT ON (cluster_id) cluster_id, wscode_ltree, localcode_ltree, @@ -454,10 +476,12 @@ frs_cluster <- function(conn, table, habitat, habitat, sp_quoted, label_connect, table, conf_m) - valid_up <- DBI::dbGetQuery(conn, sql_upstream)$cluster_id + valid_phase2 <- DBI::dbGetQuery(conn, sql_phase2)$cluster_id - # Downstream valid clusters - sql_downstream <- sprintf( + # Phase 3: downstream trace on broken streams table — rearing above spawning + # Uses FWA_Downstream on the working streams table (not fwa_downstreamtrace) + # Mainstem only (blue_line_key = watershed_key) for linear path + sql_phase3 <- sprintf( "WITH cluster_minimums AS ( SELECT DISTINCT ON (cluster_id) cluster_id, wscode_ltree, localcode_ltree, @@ -469,33 +493,29 @@ frs_cluster <- function(conn, table, habitat, downstream AS ( SELECT cm.cluster_id, - t.linear_feature_id, - t.wscode, - t.downstream_route_measure, - t.gradient, - EXISTS ( - SELECT 1 FROM %s s - INNER JOIN %s h2 ON s.id_segment = h2.id_segment - WHERE s.linear_feature_id = t.linear_feature_id - AND h2.species_code = %s - AND h2.%s IS TRUE - ) AS has_connect, - -t.length_metre + SUM(t.length_metre) OVER ( + st.gradient, + st.wscode_ltree, + st.downstream_route_measure, + COALESCE(h2.%s, false) AS has_connect, + -st.length_metre + SUM(st.length_metre) OVER ( PARTITION BY cm.cluster_id - ORDER BY t.wscode DESC, t.downstream_route_measure DESC + ORDER BY st.wscode_ltree DESC, st.downstream_route_measure DESC ) AS dist_to_cluster FROM cluster_minimums cm - CROSS JOIN LATERAL whse_basemapping.fwa_downstreamtrace( - cm.blue_line_key, - cm.downstream_route_measure - ) t - WHERE t.blue_line_key = t.watershed_key + INNER JOIN %s st ON FWA_Downstream( + cm.blue_line_key, cm.downstream_route_measure, + cm.wscode_ltree, cm.localcode_ltree, + st.blue_line_key, st.downstream_route_measure, + st.wscode_ltree, st.localcode_ltree) + LEFT JOIN %s h2 ON st.id_segment = h2.id_segment + AND h2.species_code = %s + WHERE st.blue_line_key = st.watershed_key ), downstream_capped AS ( SELECT row_number() OVER ( PARTITION BY cluster_id - ORDER BY wscode DESC, downstream_route_measure DESC + ORDER BY wscode_ltree DESC, downstream_route_measure DESC ) AS rn, * FROM downstream @@ -505,29 +525,30 @@ frs_cluster <- function(conn, table, habitat, SELECT DISTINCT ON (cluster_id) * FROM downstream_capped WHERE has_connect IS TRUE - ORDER BY cluster_id, wscode DESC, downstream_route_measure DESC + ORDER BY cluster_id, wscode_ltree DESC, downstream_route_measure DESC ), nearest_barrier AS ( SELECT DISTINCT ON (cluster_id) * FROM downstream_capped WHERE gradient >= %s - ORDER BY cluster_id, wscode DESC, downstream_route_measure DESC + ORDER BY cluster_id, wscode_ltree DESC, downstream_route_measure DESC ) SELECT a.cluster_id FROM nearest_connect a LEFT JOIN nearest_barrier b ON a.cluster_id = b.cluster_id WHERE b.rn IS NULL OR b.rn > a.rn", tmp_clusters, - table, habitat, sp_quoted, label_connect, + label_connect, + table, habitat, sp_quoted, bd, bg) - valid_down <- DBI::dbGetQuery(conn, sql_downstream)$cluster_id + valid_phase3 <- DBI::dbGetQuery(conn, sql_phase3)$cluster_id - # Union of both valid sets - all_valid <- unique(c(valid_up, valid_down)) + # Union: valid in Phase 2 OR Phase 3 + all_valid <- unique(c(valid_phase2, valid_phase3)) - # UPDATE: set FALSE for segments in clusters NOT valid in either direction + # UPDATE: set FALSE for clustered segments NOT valid in either phase if (length(all_valid) == 0) { .frs_db_execute(conn, sprintf( "UPDATE %s h SET %s = FALSE