Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 59 additions & 38 deletions R/frs_cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,21 @@ frs_cluster <- function(conn, table, habitat,
}


#' Both-direction cluster connectivity check
#' Both-direction cluster connectivity check (three-phase)
#'
#' Evaluates upstream and downstream independently on the original data,
#' then combines: a cluster is valid if connected in either direction.
#' Uses a temp table for clustering so both checks see the same clusters.
#' Phase 1: segments with BOTH `label_cluster` and `label_connect` TRUE
#' are always valid (on-spawning rearing). These are excluded from
#' clustering so they don't dilute cluster boundaries.
#'
#' Phase 2: upstream boolean — rearing clusters below spawning. Uses
#' `FWA_Upstream()` to check if spawning exists anywhere upstream.
#'
#' Phase 3: downstream trace — rearing clusters above spawning. Uses
#' `FWA_Downstream()` on the broken streams table (mainstem only) with
#' gradient bridge and distance cap.
#'
#' A cluster is valid if it passes Phase 2 OR Phase 3. Phase 1 segments
#' are always valid regardless of clustering.
#'
#' @noRd
.frs_cluster_both <- function(conn, table, habitat,
Expand All @@ -401,7 +411,19 @@ frs_cluster <- function(conn, table, habitat,
tmp_clusters <- sprintf("pg_temp.frs_clusters_%s",
gsub("[^a-z0-9]", "", tolower(species)))

# Create shared clustering temp table
# Phase 1: on-spawning segments — always valid, excluded from clustering
phase1_ids <- DBI::dbGetQuery(conn, sprintf(
"SELECT h.id_segment FROM %s h
WHERE h.species_code = %s AND h.%s IS TRUE AND h.%s IS TRUE",
habitat, sp_quoted, label_cluster, label_connect))$id_segment

# Cluster remaining (non-phase1) segments
phase1_filter <- if (length(phase1_ids) > 0) {
sprintf("AND h.id_segment NOT IN (%s)", paste(phase1_ids, collapse = ", "))
} else {
""
}

.frs_db_execute(conn, sprintf("DROP TABLE IF EXISTS %s", tmp_clusters))
.frs_db_execute(conn, sprintf(
"CREATE TEMP TABLE %s AS
Expand All @@ -415,14 +437,14 @@ frs_cluster <- function(conn, table, habitat,
FROM %s h
INNER JOIN %s s ON h.id_segment = s.id_segment
WHERE h.species_code = %s
AND h.%s IS TRUE",
tmp_clusters, habitat, table, sp_quoted, label_cluster))
AND h.%s IS TRUE %s",
tmp_clusters, habitat, table, sp_quoted, label_cluster, phase1_filter))

.frs_db_execute(conn, sprintf(
"CREATE INDEX ON %s (cluster_id)", tmp_clusters))

# Upstream valid clusters — boolean check (spawning exists anywhere upstream)
sql_upstream <- sprintf(
# Phase 2: upstream boolean — rearing below spawning
sql_phase2 <- sprintf(
"WITH cluster_minimums AS (
SELECT DISTINCT ON (cluster_id)
cluster_id, wscode_ltree, localcode_ltree,
Expand Down Expand Up @@ -454,10 +476,12 @@ frs_cluster <- function(conn, table, habitat,
habitat, sp_quoted, label_connect, table,
conf_m)

valid_up <- DBI::dbGetQuery(conn, sql_upstream)$cluster_id
valid_phase2 <- DBI::dbGetQuery(conn, sql_phase2)$cluster_id

# Downstream valid clusters
sql_downstream <- sprintf(
# Phase 3: downstream trace on broken streams table — rearing above spawning
# Uses FWA_Downstream on the working streams table (not fwa_downstreamtrace)
# Mainstem only (blue_line_key = watershed_key) for linear path
sql_phase3 <- sprintf(
"WITH cluster_minimums AS (
SELECT DISTINCT ON (cluster_id)
cluster_id, wscode_ltree, localcode_ltree,
Expand All @@ -469,33 +493,29 @@ frs_cluster <- function(conn, table, habitat,
downstream AS (
SELECT
cm.cluster_id,
t.linear_feature_id,
t.wscode,
t.downstream_route_measure,
t.gradient,
EXISTS (
SELECT 1 FROM %s s
INNER JOIN %s h2 ON s.id_segment = h2.id_segment
WHERE s.linear_feature_id = t.linear_feature_id
AND h2.species_code = %s
AND h2.%s IS TRUE
) AS has_connect,
-t.length_metre + SUM(t.length_metre) OVER (
st.gradient,
st.wscode_ltree,
st.downstream_route_measure,
COALESCE(h2.%s, false) AS has_connect,
-st.length_metre + SUM(st.length_metre) OVER (
PARTITION BY cm.cluster_id
ORDER BY t.wscode DESC, t.downstream_route_measure DESC
ORDER BY st.wscode_ltree DESC, st.downstream_route_measure DESC
) AS dist_to_cluster
FROM cluster_minimums cm
CROSS JOIN LATERAL whse_basemapping.fwa_downstreamtrace(
cm.blue_line_key,
cm.downstream_route_measure
) t
WHERE t.blue_line_key = t.watershed_key
INNER JOIN %s st ON FWA_Downstream(
cm.blue_line_key, cm.downstream_route_measure,
cm.wscode_ltree, cm.localcode_ltree,
st.blue_line_key, st.downstream_route_measure,
st.wscode_ltree, st.localcode_ltree)
LEFT JOIN %s h2 ON st.id_segment = h2.id_segment
AND h2.species_code = %s
WHERE st.blue_line_key = st.watershed_key
),
downstream_capped AS (
SELECT
row_number() OVER (
PARTITION BY cluster_id
ORDER BY wscode DESC, downstream_route_measure DESC
ORDER BY wscode_ltree DESC, downstream_route_measure DESC
) AS rn,
*
FROM downstream
Expand All @@ -505,29 +525,30 @@ frs_cluster <- function(conn, table, habitat,
SELECT DISTINCT ON (cluster_id) *
FROM downstream_capped
WHERE has_connect IS TRUE
ORDER BY cluster_id, wscode DESC, downstream_route_measure DESC
ORDER BY cluster_id, wscode_ltree DESC, downstream_route_measure DESC
),
nearest_barrier AS (
SELECT DISTINCT ON (cluster_id) *
FROM downstream_capped
WHERE gradient >= %s
ORDER BY cluster_id, wscode DESC, downstream_route_measure DESC
ORDER BY cluster_id, wscode_ltree DESC, downstream_route_measure DESC
)
SELECT a.cluster_id
FROM nearest_connect a
LEFT JOIN nearest_barrier b ON a.cluster_id = b.cluster_id
WHERE b.rn IS NULL OR b.rn > a.rn",
tmp_clusters,
table, habitat, sp_quoted, label_connect,
label_connect,
table, habitat, sp_quoted,
bd,
bg)

valid_down <- DBI::dbGetQuery(conn, sql_downstream)$cluster_id
valid_phase3 <- DBI::dbGetQuery(conn, sql_phase3)$cluster_id

# Union of both valid sets
all_valid <- unique(c(valid_up, valid_down))
# Union: valid in Phase 2 OR Phase 3
all_valid <- unique(c(valid_phase2, valid_phase3))

# UPDATE: set FALSE for segments in clusters NOT valid in either direction
# UPDATE: set FALSE for clustered segments NOT valid in either phase
if (length(all_valid) == 0) {
.frs_db_execute(conn, sprintf(
"UPDATE %s h SET %s = FALSE
Expand Down
Loading