From 271ab8021a44fb45d37593f0ae52fbec06672bd7 Mon Sep 17 00:00:00 2001 From: Johnny Wan <2695191695@qq.com> Date: Thu, 29 Jan 2026 20:23:53 +0000 Subject: [PATCH 1/3] Refactor evaluation function and update schemas - Corrected the argument order in `evaluation_function` to match the expected signature. - Updated `.gitignore` to exclude Markdown files. - Removed legacy unittest-based test file and replaced it with a placeholder for pytest. - Enhanced `evaluation_function` with detailed error handling and validation using Pydantic. - Added new fields to `PathResult` schema for better path evaluation feedback. - Cleaned up the `EvaluationDetails` schema by removing unused scoring fields. --- .gitignore | 1 + evaluation_function/algorithms/__init__.py | 14 ++ evaluation_function/algorithms/bipartite.py | 84 ++++++++ .../algorithms/connectivity.py | 111 +++++++++++ .../algorithms/shortest_path.py | 180 ++++++++++++++++++ evaluation_function/algorithms/utils.py | 63 ++++++ evaluation_function/dev.py | 3 +- evaluation_function/evaluation.py | 169 +++++++++++++++- evaluation_function/evaluation_test.py | 34 +--- evaluation_function/schemas/result.py | 6 +- tests/test_core_algorithms.py | 122 ++++++++++++ tests/test_evaluation_function_core.py | 46 +++++ 12 files changed, 797 insertions(+), 36 deletions(-) create mode 100644 evaluation_function/algorithms/__init__.py create mode 100644 evaluation_function/algorithms/bipartite.py create mode 100644 evaluation_function/algorithms/connectivity.py create mode 100644 evaluation_function/algorithms/shortest_path.py create mode 100644 evaluation_function/algorithms/utils.py create mode 100644 tests/test_core_algorithms.py create mode 100644 tests/test_evaluation_function_core.py diff --git a/.gitignore b/.gitignore index ef4165d..cb5a15a 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ __pycache__/ # C extensions *.so +*.md # Distribution / packaging .Python diff --git a/evaluation_function/algorithms/__init__.py b/evaluation_function/algorithms/__init__.py new file mode 100644 index 0000000..ca3caa7 --- /dev/null +++ b/evaluation_function/algorithms/__init__.py @@ -0,0 +1,14 @@ +""" +Core graph algorithms used by the evaluation function. +""" + +from .connectivity import connectivity_info +from .shortest_path import shortest_path_info +from .bipartite import bipartite_info + +__all__ = [ + "connectivity_info", + "shortest_path_info", + "bipartite_info", +] + diff --git a/evaluation_function/algorithms/bipartite.py b/evaluation_function/algorithms/bipartite.py new file mode 100644 index 0000000..f34839b --- /dev/null +++ b/evaluation_function/algorithms/bipartite.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from collections import deque + +from evaluation_function.schemas import BipartiteResult, Graph + +from .utils import build_adjacency, node_ids + + +def _reconstruct_odd_cycle(u: str, v: str, parent: dict[str, str], depth: dict[str, int]) -> list[str]: + # Build paths to root + pu = [u] + pv = [v] + cu, cv = u, v + while cu in parent: + cu = parent[cu] + pu.append(cu) + while cv in parent: + cv = parent[cv] + pv.append(cv) + + set_pu = {x: i for i, x in enumerate(pu)} + lca = None + j = None + for idx, node in enumerate(pv): + if node in set_pu: + lca = node + j = idx + break + + if lca is None or j is None: + # Fallback: just return the triangle-ish evidence + return [u, v, u] + + i = set_pu[lca] + path_u_to_lca = pu[: i + 1] # u..lca + path_v_to_lca = pv[: j + 1] # v..lca + path_v_to_lca.reverse() # lca..v + + cycle = path_u_to_lca + path_v_to_lca[1:] + [u] + return cycle + + +def bipartite_info( + graph: Graph, + *, + return_partitions: bool = False, + return_odd_cycle: bool = False, +) -> BipartiteResult: + # Bipartite is typically defined for undirected graphs; we treat directed as undirected for checking. + adj = build_adjacency(graph, undirected=True) + + color: dict[str, int] = {} + parent: dict[str, str] = {} + depth: dict[str, int] = {} + + for start in node_ids(graph): + if start in color: + continue + q = deque([start]) + color[start] = 0 + depth[start] = 0 + + while q: + u = q.popleft() + for ae in adj.get(u, []): + v = ae.to + if v not in color: + color[v] = 1 - color[u] + parent[v] = u + depth[v] = depth[u] + 1 + q.append(v) + elif color[v] == color[u]: + cycle = _reconstruct_odd_cycle(u, v, parent, depth) if return_odd_cycle else None + return BipartiteResult(is_bipartite=False, partitions=None, odd_cycle=cycle) + + partitions = None + if return_partitions: + left = [n for n in color.keys() if color[n] == 0] + right = [n for n in color.keys() if color[n] == 1] + partitions = [left, right] + + return BipartiteResult(is_bipartite=True, partitions=partitions, odd_cycle=None) + diff --git a/evaluation_function/algorithms/connectivity.py b/evaluation_function/algorithms/connectivity.py new file mode 100644 index 0000000..f6870b1 --- /dev/null +++ b/evaluation_function/algorithms/connectivity.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +from collections import deque +from typing import Literal, Optional + +from evaluation_function.schemas import ConnectivityResult, Graph + +from .utils import build_adjacency, build_reverse_adjacency, node_ids + + +ConnectivityType = Literal["connected", "strongly_connected", "weakly_connected"] + + +def _components_undirected(graph: Graph) -> list[list[str]]: + adj = build_adjacency(graph, undirected=True) + seen: set[str] = set() + comps: list[list[str]] = [] + for start in node_ids(graph): + if start in seen: + continue + q = deque([start]) + seen.add(start) + comp: list[str] = [] + while q: + u = q.popleft() + comp.append(u) + for ae in adj.get(u, []): + if ae.to not in seen: + seen.add(ae.to) + q.append(ae.to) + comps.append(comp) + return comps + + +def _is_strongly_connected(graph: Graph) -> bool: + ids = node_ids(graph) + if len(ids) <= 1: + return True + + adj = build_adjacency(graph, undirected=False) + radj = build_reverse_adjacency(graph) + + def dfs(start: str, adjacency: dict[str, list]) -> set[str]: + stack = [start] + seen: set[str] = set() + while stack: + u = stack.pop() + if u in seen: + continue + seen.add(u) + for ae in adjacency.get(u, []): + if ae.to not in seen: + stack.append(ae.to) + return seen + + s = ids[0] + if len(dfs(s, adj)) != len(ids): + return False + if len(dfs(s, radj)) != len(ids): + return False + return True + + +def connectivity_info( + graph: Graph, + *, + connectivity_type: ConnectivityType = "connected", + return_components: bool = False, +) -> ConnectivityResult: + ids = node_ids(graph) + if len(ids) <= 1: + comps = [ids] + return ConnectivityResult( + is_connected=True, + num_components=len(comps), + components=comps if return_components else None, + connectivity_type=connectivity_type, + largest_component_size=len(ids), + ) + + if connectivity_type == "strongly_connected": + is_conn = _is_strongly_connected(graph) + # Components for SCCs are out-of-scope for this ticket. + return ConnectivityResult( + is_connected=is_conn, + num_components=1 if is_conn else 2, + components=None, + connectivity_type=connectivity_type, + largest_component_size=len(ids) if is_conn else None, + ) + + if connectivity_type == "weakly_connected": + comps = _components_undirected(graph) + return ConnectivityResult( + is_connected=len(comps) == 1, + num_components=len(comps), + components=comps if return_components else None, + connectivity_type=connectivity_type, + largest_component_size=max((len(c) for c in comps), default=0), + ) + + # Default: undirected connectivity. + comps = _components_undirected(graph) + return ConnectivityResult( + is_connected=len(comps) == 1, + num_components=len(comps), + components=comps if return_components else None, + connectivity_type="connected", + largest_component_size=max((len(c) for c in comps), default=0), + ) + diff --git a/evaluation_function/algorithms/shortest_path.py b/evaluation_function/algorithms/shortest_path.py new file mode 100644 index 0000000..3356d05 --- /dev/null +++ b/evaluation_function/algorithms/shortest_path.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import heapq +from typing import Literal, Optional + +from evaluation_function.schemas import Graph, PathResult + +from .utils import build_adjacency, edge_weight_lookup, node_ids, path_weight + + +ShortestPathAlgorithm = Literal["bfs", "dijkstra", "bellman_ford", "auto"] + + +class NegativeCycleError(Exception): + pass + + +def _reconstruct_path(parent: dict[str, str], source: str, target: str) -> Optional[list[str]]: + if source == target: + return [source] + if target not in parent: + return None + cur = target + out: list[str] = [cur] + while cur != source: + cur = parent[cur] + out.append(cur) + out.reverse() + return out + + +def _bfs_shortest_path(graph: Graph, source: str, target: str, *, undirected: bool) -> PathResult: + from collections import deque + + adj = build_adjacency(graph, undirected=undirected) + if source not in adj or target not in adj: + return PathResult(algorithm_used="bfs", path_exists=False) + q = deque([source]) + parent: dict[str, str] = {} + dist: dict[str, int] = {source: 0} + while q: + u = q.popleft() + if u == target: + break + for ae in adj.get(u, []): + v = ae.to + if v not in dist: + dist[v] = dist[u] + 1 + parent[v] = u + q.append(v) + if target not in dist: + return PathResult(algorithm_used="bfs", path_exists=False) + path = _reconstruct_path(parent, source, target) + return PathResult(algorithm_used="bfs", path_exists=True, distance=float(dist[target]), path=path) + + +def _dijkstra_shortest_path(graph: Graph, source: str, target: str, *, undirected: bool) -> PathResult: + adj = build_adjacency(graph, undirected=undirected) + if source not in adj or target not in adj: + return PathResult(algorithm_used="dijkstra", path_exists=False) + + dist: dict[str, float] = {source: 0.0} + parent: dict[str, str] = {} + pq: list[tuple[float, str]] = [(0.0, source)] + seen: set[str] = set() + + while pq: + d, u = heapq.heappop(pq) + if u in seen: + continue + seen.add(u) + if u == target: + break + for ae in adj.get(u, []): + if ae.weight < 0: + raise ValueError("Dijkstra cannot be used with negative edge weights.") + nd = d + ae.weight + if nd < dist.get(ae.to, float("inf")): + dist[ae.to] = nd + parent[ae.to] = u + heapq.heappush(pq, (nd, ae.to)) + + if target not in dist: + return PathResult(algorithm_used="dijkstra", path_exists=False) + path = _reconstruct_path(parent, source, target) + return PathResult(algorithm_used="dijkstra", path_exists=True, distance=dist[target], path=path) + + +def _bellman_ford_shortest_path(graph: Graph, source: str, target: str, *, undirected: bool) -> PathResult: + ids = node_ids(graph) + if source not in ids or target not in ids: + return PathResult(algorithm_used="bellman_ford", path_exists=False) + + # Build edge list + edges: list[tuple[str, str, float]] = [] + for e in graph.edges: + w = float(e.weight if e.weight is not None else 1.0) + edges.append((e.source, e.target, w)) + if undirected: + edges.append((e.target, e.source, w)) + + dist: dict[str, float] = {source: 0.0} + parent: dict[str, str] = {} + + # Relax V-1 times + for _ in range(max(0, len(ids) - 1)): + changed = False + for u, v, w in edges: + if u not in dist: + continue + nd = dist[u] + w + if nd < dist.get(v, float("inf")): + dist[v] = nd + parent[v] = u + changed = True + if not changed: + break + + # Detect negative cycle reachable from source + for u, v, w in edges: + if u not in dist: + continue + if dist[u] + w < dist.get(v, float("inf")): + raise NegativeCycleError("Negative cycle detected (reachable from source).") + + if target not in dist: + return PathResult(algorithm_used="bellman_ford", path_exists=False) + path = _reconstruct_path(parent, source, target) + return PathResult(algorithm_used="bellman_ford", path_exists=True, distance=dist[target], path=path) + + +def shortest_path_info( + graph: Graph, + *, + source: str, + target: str, + algorithm: ShortestPathAlgorithm = "auto", + supplied_path: Optional[list[str]] = None, +) -> PathResult: + undirected = not bool(graph.directed) + + # Auto-select algorithm + if algorithm == "auto": + weights = [float(e.weight if e.weight is not None else 1.0) for e in graph.edges] + has_negative = any(w < 0 for w in weights) + is_unweighted = all(abs(w - 1.0) < 1e-12 for w in weights) or len(weights) == 0 + if has_negative: + algorithm = "bellman_ford" + elif is_unweighted: + algorithm = "bfs" + else: + algorithm = "dijkstra" + + if algorithm == "bfs": + info = _bfs_shortest_path(graph, source, target, undirected=undirected) + elif algorithm == "dijkstra": + info = _dijkstra_shortest_path(graph, source, target, undirected=undirected) + elif algorithm == "bellman_ford": + info = _bellman_ford_shortest_path(graph, source, target, undirected=undirected) + else: + raise ValueError(f"Unsupported algorithm: {algorithm}") + + # Validate supplied path (if any) + if supplied_path is not None: + w_lookup = edge_weight_lookup(graph, undirected=undirected) + w = path_weight(supplied_path, w_lookup) + is_valid = w is not None and len(supplied_path) >= 1 and supplied_path[0] == source and supplied_path[-1] == target + is_shortest = None + if info.path_exists and info.distance is not None and is_valid and w is not None: + is_shortest = abs(w - info.distance) < 1e-9 + info = info.model_copy( + update={ + "supplied_path_is_valid": is_valid, + "supplied_path_weight": w, + "supplied_path_is_shortest": is_shortest, + } + ) + + return info + diff --git a/evaluation_function/algorithms/utils.py b/evaluation_function/algorithms/utils.py new file mode 100644 index 0000000..368d425 --- /dev/null +++ b/evaluation_function/algorithms/utils.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable, Optional + +from evaluation_function.schemas import Edge, Graph + + +@dataclass(frozen=True) +class AdjEdge: + to: str + weight: float + edge: Edge + + +def node_ids(graph: Graph) -> list[str]: + return [n.id for n in graph.nodes] + + +def build_adjacency(graph: Graph, *, undirected: bool) -> dict[str, list[AdjEdge]]: + adj: dict[str, list[AdjEdge]] = {n.id: [] for n in graph.nodes} + for e in graph.edges: + w = float(e.weight if e.weight is not None else 1.0) + adj.setdefault(e.source, []).append(AdjEdge(to=e.target, weight=w, edge=e)) + if undirected: + adj.setdefault(e.target, []).append(AdjEdge(to=e.source, weight=w, edge=e)) + return adj + + +def build_reverse_adjacency(graph: Graph) -> dict[str, list[AdjEdge]]: + """Directed reverse adjacency (for strongly connected checks).""" + adj: dict[str, list[AdjEdge]] = {n.id: [] for n in graph.nodes} + for e in graph.edges: + w = float(e.weight if e.weight is not None else 1.0) + adj.setdefault(e.target, []).append(AdjEdge(to=e.source, weight=w, edge=e)) + return adj + + +def edge_weight_lookup(graph: Graph, *, undirected: bool) -> dict[tuple[str, str], float]: + """Lookup of min weight for (u,v). Useful for validating user-provided paths.""" + lookup: dict[tuple[str, str], float] = {} + for e in graph.edges: + w = float(e.weight if e.weight is not None else 1.0) + key = (e.source, e.target) + lookup[key] = min(lookup.get(key, w), w) + if undirected: + key2 = (e.target, e.source) + lookup[key2] = min(lookup.get(key2, w), w) + return lookup + + +def path_weight(path: Iterable[str], w_lookup: dict[tuple[str, str], float]) -> Optional[float]: + path_list = list(path) + if len(path_list) <= 1: + return 0.0 + total = 0.0 + for u, v in zip(path_list, path_list[1:]): + w = w_lookup.get((u, v)) + if w is None: + return None + total += w + return total + diff --git a/evaluation_function/dev.py b/evaluation_function/dev.py index 886d641..3db0ce0 100644 --- a/evaluation_function/dev.py +++ b/evaluation_function/dev.py @@ -16,7 +16,8 @@ def dev(): answer = sys.argv[1] response = sys.argv[2] - result = evaluation_function(answer, response, Params()) + # evaluation_function signature is (response, answer, params) + result = evaluation_function(response, answer, Params()) print(result.to_dict()) diff --git a/evaluation_function/evaluation.py b/evaluation_function/evaluation.py index 61ecaa3..3abea5b 100755 --- a/evaluation_function/evaluation.py +++ b/evaluation_function/evaluation.py @@ -1,5 +1,13 @@ -from typing import Any +from __future__ import annotations + +from typing import Any, Optional + from lf_toolkit.evaluation import Result, Params +from pydantic import ValidationError + +from evaluation_function.algorithms import bipartite_info, connectivity_info, shortest_path_info +from evaluation_function.algorithms.shortest_path import NegativeCycleError +from evaluation_function.schemas import Answer, EvaluationParams, Graph, Response def evaluation_function( response: Any, @@ -29,6 +37,159 @@ def evaluation_function( to output the evaluation response. """ - return Result( - is_correct=response == answer - ) \ No newline at end of file + def _to_dictish(obj: Any) -> Any: + if obj is None: + return None + if isinstance(obj, (dict, list, str, int, float, bool)): + return obj + if hasattr(obj, "model_dump"): + return obj.model_dump() + if hasattr(obj, "dict"): + return obj.dict() + if hasattr(obj, "to_dict"): + return obj.to_dict() + if hasattr(obj, "__dict__"): + return obj.__dict__ + return obj + + def _ok() -> Result: + return Result(is_correct=True) + + def _err(msg: str) -> Result: + # lf_toolkit.Result does not take `feedback=...`; it takes feedback_items. + return Result(is_correct=False, feedback_items=[("error", msg)]) + + try: + resp = Response.model_validate(_to_dictish(response) or {}) + except ValidationError as e: + return _err(f"Invalid response schema: {e}") + + try: + ans = Answer.model_validate(_to_dictish(answer) or {}) + except ValidationError as e: + return _err(f"Invalid answer schema: {e}") + + # lf_toolkit Params may not be a plain dict; best-effort coercion + raw_params = _to_dictish(params) or {} + try: + p = EvaluationParams.model_validate(raw_params) + except ValidationError as e: + return _err( + "Invalid params schema. Expected e.g. " + "{evaluation_type: 'connectivity'|'shortest_path'|'bipartite', ...}. " + f"Error: {e}" + ) + + # Graph selection: + # - If the task is 'compute a property', the graph is typically in answer.graph (question graph). + # - If the task is 'build a graph with a property', the graph is typically in response.graph. + expected_graph: Optional[Graph] = ans.graph or resp.graph + student_graph: Optional[Graph] = resp.graph or ans.graph + + if expected_graph is None or student_graph is None: + return _err("No graph provided in either response.graph or answer.graph.") + + eval_type = p.evaluation_type + + if eval_type == "connectivity": + conn_params = p.connectivity + check_type = conn_params.check_type if conn_params else "connected" + want_components = bool(conn_params.return_components) if conn_params else False + + expected = ans.is_connected + if expected is None: + expected = connectivity_info(expected_graph, connectivity_type=check_type, return_components=False).is_connected + + # If student explicitly provided a boolean answer, grade that; otherwise grade the graph property. + student_value = resp.is_connected + if student_value is None: + student_value = connectivity_info(student_graph, connectivity_type=check_type, return_components=False).is_connected + + details = connectivity_info(student_graph, connectivity_type=check_type, return_components=want_components) + is_correct = bool(student_value) == bool(expected) + fb = f"Connectivity ({check_type}): expected={expected}, got={student_value}." + if want_components and details.components is not None: + fb += f" components={details.components}" + return _ok() if is_correct else _err(fb) + + if eval_type == "bipartite": + b_params = p.bipartite + want_parts = bool(b_params.return_partitions) if b_params else False + want_odd = bool(b_params.return_odd_cycle) if b_params else False + + expected = ans.is_bipartite + if expected is None: + expected = bipartite_info(expected_graph).is_bipartite + + student_value = resp.is_bipartite + if student_value is None: + student_value = bipartite_info(student_graph).is_bipartite + + details = bipartite_info(student_graph, return_partitions=want_parts, return_odd_cycle=want_odd) + is_correct = bool(student_value) == bool(expected) + fb = f"Bipartite: expected={expected}, got={student_value}." + if want_parts and details.partitions is not None: + fb += f" partitions={details.partitions}" + if want_odd and details.odd_cycle is not None: + fb += f" odd_cycle={details.odd_cycle}" + return _ok() if is_correct else _err(fb) + + if eval_type == "shortest_path": + sp = p.shortest_path + if sp is None: + return _err("Missing params.shortest_path for evaluation_type='shortest_path'.") + + try: + expected_info = shortest_path_info( + expected_graph, + source=sp.source_node, + target=sp.target_node, + algorithm=sp.algorithm if sp.algorithm != "auto" else "auto", + ) + except NegativeCycleError as e: + return _err(f"Expected graph has a negative cycle: {e}") + + try: + student_info = shortest_path_info( + student_graph, + source=sp.source_node, + target=sp.target_node, + algorithm=sp.algorithm if sp.algorithm != "auto" else "auto", + supplied_path=resp.path, + ) + except NegativeCycleError as e: + return _err(f"Student graph has a negative cycle: {e}") + except ValueError as e: + return _err(str(e)) + + # Determine expected distance: + expected_distance = ans.shortest_distance if ans.shortest_distance is not None else ans.distance + if expected_distance is None: + expected_distance = expected_info.distance + + # Determine student distance: + student_distance = resp.distance if resp.distance is not None else student_info.distance + + if expected_distance is None: + return _err("Could not determine expected shortest distance.") + if student_distance is None: + return _err("Could not determine student's shortest distance (distance/path).") + + tol = ans.tolerance if hasattr(ans, "tolerance") else 1e-9 + is_correct = abs(float(student_distance) - float(expected_distance)) <= float(tol) + + fb = ( + f"Shortest path {sp.source_node}->{sp.target_node}: " + f"expected_distance={expected_distance}, got_distance={student_distance}, " + f"algorithm_used={student_info.algorithm_used}." + ) + if resp.path is not None: + fb += ( + f" supplied_path_valid={student_info.supplied_path_is_valid}," + f" supplied_path_weight={student_info.supplied_path_weight}," + f" supplied_path_is_shortest={student_info.supplied_path_is_shortest}." + ) + + return _ok() if is_correct else _err(fb) + + return _err(f"Unsupported evaluation_type: {eval_type}") \ No newline at end of file diff --git a/evaluation_function/evaluation_test.py b/evaluation_function/evaluation_test.py index 7a5c5bd..ee5a933 100755 --- a/evaluation_function/evaluation_test.py +++ b/evaluation_function/evaluation_test.py @@ -1,30 +1,8 @@ -import unittest +""" +Legacy placeholder test file. -from .evaluation import Params, evaluation_function +We use pytest tests under the top-level `tests/` folder instead. +""" -class TestEvaluationFunction(unittest.TestCase): - """ - TestCase Class used to test the algorithm. - --- - Tests are used here to check that the algorithm written - is working as it should. - - It's best practise to write these tests first to get a - kind of 'specification' for how your algorithm should - work, and you should run these tests before committing - your code to AWS. - - Read the docs on how to use unittest here: - https://docs.python.org/3/library/unittest.html - - Use evaluation_function() to check your algorithm works - as it should. - """ - - def test_evaluation(self): - response, answer, params = "Hello, World", "Hello, World", Params() - - result = evaluation_function(response, answer, params).to_dict() - - self.assertEqual(result.get("is_correct"), True) - self.assertFalse(result.get("feedback", False)) +def test_placeholder(): + assert True diff --git a/evaluation_function/schemas/result.py b/evaluation_function/schemas/result.py index 6a09f4a..8ae5828 100644 --- a/evaluation_function/schemas/result.py +++ b/evaluation_function/schemas/result.py @@ -21,6 +21,9 @@ class PathResult(BaseModel): path_exists: bool = Field(True, description="Whether a path exists") algorithm_used: Optional[str] = Field(None, description="Algorithm used") all_paths: Optional[list[list[str]]] = Field(None, description="All shortest paths if multiple exist") + supplied_path_is_valid: Optional[bool] = Field(None, description="Whether a supplied path is valid") + supplied_path_weight: Optional[float] = Field(None, description="Total weight of a supplied path") + supplied_path_is_shortest: Optional[bool] = Field(None, description="Whether a supplied path is a shortest path") class ConnectivityResult(BaseModel): @@ -207,9 +210,6 @@ class EvaluationDetails(BaseModel): computation_steps: list[ComputationStep] = Field(default_factory=list, description="Step-by-step computation") hints: list[str] = Field(default_factory=list, description="Hints for incorrect answers") - # Scoring - partial_score: Optional[float] = Field(None, ge=0.0, le=1.0, description="Partial credit score") - scoring_breakdown: Optional[dict[str, float]] = Field(None, description="Breakdown of partial scoring") # ============================================================================= diff --git a/tests/test_core_algorithms.py b/tests/test_core_algorithms.py new file mode 100644 index 0000000..721b90d --- /dev/null +++ b/tests/test_core_algorithms.py @@ -0,0 +1,122 @@ +import pytest + +from evaluation_function.algorithms.bipartite import bipartite_info +from evaluation_function.algorithms.connectivity import connectivity_info +from evaluation_function.algorithms.shortest_path import NegativeCycleError, shortest_path_info +from evaluation_function.schemas import Edge, Graph, Node + + +def g(nodes, edges, *, directed=False): + return Graph(nodes=[Node(id=n) for n in nodes], edges=[Edge(**e) for e in edges], directed=directed) + + +class TestConnectivity: + def test_empty_graph_is_connected(self): + graph = Graph(nodes=[], edges=[], directed=False) + info = connectivity_info(graph) + assert info.is_connected is True + + def test_undirected_disconnected(self): + graph = g(["A", "B"], [], directed=False) + info = connectivity_info(graph, return_components=True) + assert info.is_connected is False + assert sorted([sorted(c) for c in info.components]) == [["A"], ["B"]] + + def test_directed_strongly_connected(self): + graph = g(["A", "B"], [{"source": "A", "target": "B"}, {"source": "B", "target": "A"}], directed=True) + info = connectivity_info(graph, connectivity_type="strongly_connected") + assert info.is_connected is True + + def test_directed_weakly_connected(self): + graph = g(["A", "B"], [{"source": "A", "target": "B"}], directed=True) + info = connectivity_info(graph, connectivity_type="weakly_connected") + assert info.is_connected is True + + def test_directed_not_strongly_connected(self): + graph = g(["A", "B"], [{"source": "A", "target": "B"}], directed=True) + info = connectivity_info(graph, connectivity_type="strongly_connected") + assert info.is_connected is False + + +class TestShortestPath: + def test_unweighted_bfs(self): + graph = g(["A", "B", "C"], [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}], directed=False) + info = shortest_path_info(graph, source="A", target="C", algorithm="auto") + assert info.path_exists is True + assert info.algorithm_used == "bfs" + assert info.distance == 2.0 + assert info.path == ["A", "B", "C"] + + def test_weighted_dijkstra(self): + graph = g( + ["A", "B", "C"], + [ + {"source": "A", "target": "B", "weight": 2}, + {"source": "B", "target": "C", "weight": 2}, + {"source": "A", "target": "C", "weight": 10}, + ], + directed=False, + ) + info = shortest_path_info(graph, source="A", target="C", algorithm="auto") + assert info.algorithm_used == "dijkstra" + assert info.distance == 4.0 + assert info.path == ["A", "B", "C"] + + def test_negative_weight_bellman_ford(self): + graph = g( + ["A", "B", "C"], + [ + {"source": "A", "target": "B", "weight": -1}, + {"source": "B", "target": "C", "weight": 2}, + {"source": "A", "target": "C", "weight": 5}, + ], + directed=True, + ) + info = shortest_path_info(graph, source="A", target="C", algorithm="auto") + assert info.algorithm_used == "bellman_ford" + assert info.distance == 1.0 + assert info.path == ["A", "B", "C"] + + def test_negative_cycle_raises(self): + graph = g( + ["A", "B"], + [{"source": "A", "target": "B", "weight": 1}, {"source": "B", "target": "A", "weight": -2}], + directed=True, + ) + with pytest.raises(NegativeCycleError): + shortest_path_info(graph, source="A", target="B", algorithm="auto") + + def test_supplied_path_validation(self): + graph = g(["A", "B", "C"], [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}], directed=False) + info = shortest_path_info(graph, source="A", target="C", algorithm="auto", supplied_path=["A", "B", "C"]) + assert info.supplied_path_is_valid is True + assert info.supplied_path_is_shortest is True + + +class TestBipartite: + def test_bipartite_square(self): + graph = g( + ["A", "B", "C", "D"], + [ + {"source": "A", "target": "B"}, + {"source": "B", "target": "C"}, + {"source": "C", "target": "D"}, + {"source": "D", "target": "A"}, + ], + directed=False, + ) + info = bipartite_info(graph, return_partitions=True) + assert info.is_bipartite is True + assert info.partitions is not None + + def test_not_bipartite_triangle(self): + graph = g( + ["A", "B", "C"], + [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}, {"source": "C", "target": "A"}], + directed=False, + ) + info = bipartite_info(graph, return_odd_cycle=True) + assert info.is_bipartite is False + assert info.odd_cycle is not None + assert len(info.odd_cycle) >= 3 + diff --git a/tests/test_evaluation_function_core.py b/tests/test_evaluation_function_core.py new file mode 100644 index 0000000..b26e397 --- /dev/null +++ b/tests/test_evaluation_function_core.py @@ -0,0 +1,46 @@ +from lf_toolkit.evaluation import Params + +from evaluation_function.evaluation import evaluation_function + + +def test_evaluation_connectivity_property_question(): + # Question graph is in answer.graph; student provides boolean. + answer = { + "graph": {"nodes": [{"id": "A"}, {"id": "B"}], "edges": [{"source": "A", "target": "B"}], "directed": False}, + "is_connected": True, + } + response = {"is_connected": True} + params = {"evaluation_type": "connectivity", "connectivity": {"check_type": "connected"}} + result = evaluation_function(response, answer, Params(params)).to_dict() + assert result["is_correct"] is True + + +def test_evaluation_bipartite_graph_building_task(): + # Student builds a bipartite graph; answer encodes the property. + answer = {"is_bipartite": True} + response = { + "graph": { + "nodes": [{"id": "A"}, {"id": "B"}, {"id": "X"}], + "edges": [{"source": "A", "target": "X"}, {"source": "B", "target": "X"}], + "directed": False, + } + } + params = {"evaluation_type": "bipartite"} + result = evaluation_function(response, answer, Params(params)).to_dict() + assert result["is_correct"] is True + + +def test_evaluation_shortest_path_distance(): + answer = { + "graph": { + "nodes": [{"id": "A"}, {"id": "B"}, {"id": "C"}], + "edges": [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}], + "directed": False, + }, + "shortest_distance": 2, + } + response = {"distance": 2} + params = {"evaluation_type": "shortest_path", "shortest_path": {"source_node": "A", "target_node": "C", "algorithm": "auto"}} + result = evaluation_function(response, answer, Params(params)).to_dict() + assert result["is_correct"] is True + From 232806b55b574e9e14bc6593d063bae8b17b65e7 Mon Sep 17 00:00:00 2001 From: Johnny Wan <2695191695@qq.com> Date: Fri, 30 Jan 2026 02:47:05 +0000 Subject: [PATCH 2/3] Add cycle detection parameters and include cycle info in algorithms - Added `max_nodes` and `max_cycles` fields to `CycleDetectionParams` for enhanced cycle enumeration control. - Included `cycle_info` in the algorithms module to support cycle detection functionality. --- evaluation_function/algorithms/__init__.py | 2 + evaluation_function/algorithms/cycles.py | 436 +++++++++++++++++++++ evaluation_function/schemas/params.py | 8 + tests/test_cycles.py | 202 ++++++++++ 4 files changed, 648 insertions(+) create mode 100644 evaluation_function/algorithms/cycles.py create mode 100644 tests/test_cycles.py diff --git a/evaluation_function/algorithms/__init__.py b/evaluation_function/algorithms/__init__.py index ca3caa7..88cbe2d 100644 --- a/evaluation_function/algorithms/__init__.py +++ b/evaluation_function/algorithms/__init__.py @@ -5,10 +5,12 @@ from .connectivity import connectivity_info from .shortest_path import shortest_path_info from .bipartite import bipartite_info +from .cycles import cycle_info __all__ = [ "connectivity_info", "shortest_path_info", "bipartite_info", + "cycle_info", ] diff --git a/evaluation_function/algorithms/cycles.py b/evaluation_function/algorithms/cycles.py new file mode 100644 index 0000000..2cc086d --- /dev/null +++ b/evaluation_function/algorithms/cycles.py @@ -0,0 +1,436 @@ +from __future__ import annotations + +from collections import deque +from typing import Optional + +from evaluation_function.schemas import CycleResult, Graph + +from .utils import build_adjacency, node_ids + + +def _close_cycle(cycle: list[str]) -> list[str]: + if not cycle: + return cycle + return cycle if cycle[0] == cycle[-1] else (cycle + [cycle[0]]) + + +def _canonical_rotation(nodes: list[str]) -> tuple[str, ...]: + """Canonical rotation of a cyclic sequence (no repeated last node).""" + if not nodes: + return tuple() + n = len(nodes) + best: Optional[tuple[str, ...]] = None + for i in range(n): + rot = tuple(nodes[i:] + nodes[:i]) + if best is None or rot < best: + best = rot + return best or tuple(nodes) + + +def _canonical_cycle(cycle: list[str], *, undirected: bool) -> tuple[str, ...]: + """ + Canonicalize a cycle for deduping. + + Input may be closed (start == end) or open; output is a closed tuple. + """ + c = cycle[:] + if len(c) >= 2 and c[0] == c[-1]: + c = c[:-1] + if not c: + return tuple() + if len(c) == 1: + return (c[0], c[0]) + + fwd = _canonical_rotation(c) + if not undirected: + return fwd + (fwd[0],) + + rev = _canonical_rotation(list(reversed(c))) + best = min(fwd, rev) + return best + (best[0],) + + +def _reconstruct_cycle_from_tree_edge(u: str, v: str, parent: dict[str, Optional[str]]) -> list[str]: + """ + Reconstruct a cycle in an undirected BFS/DFS tree given a non-tree edge (u, v). + """ + anc_u: set[str] = set() + x = u + while x is not None: + anc_u.add(x) + x = parent.get(x) + + # walk v upward to LCA + lca = v + while lca not in anc_u: + nxt = parent.get(lca) + if nxt is None: + break + lca = nxt + + path_u: list[str] = [] + x = u + while x is not None and x != lca: + path_u.append(x) + x = parent.get(x) + path_u.append(lca) + + path_v: list[str] = [] + x = v + while x is not None and x != lca: + path_v.append(x) + x = parent.get(x) + path_v.append(lca) + + # u -> ... -> lca -> ... -> v -> u (via edge v-u) + cycle = path_u + list(reversed(path_v))[1:] + [u] + return cycle + + +def _find_any_cycle_directed(graph: Graph) -> Optional[list[str]]: + adj = build_adjacency(graph, undirected=False) + color: dict[str, int] = {} # 0 unvisited, 1 in-stack, 2 done + parent: dict[str, str] = {} + + for start in node_ids(graph): + if color.get(start, 0) != 0: + continue + + stack: list[tuple[str, int]] = [(start, 0)] + color[start] = 1 + + while stack: + u, idx = stack[-1] + neigh = adj.get(u, []) + if idx >= len(neigh): + color[u] = 2 + stack.pop() + continue + + v = neigh[idx].to + stack[-1] = (u, idx + 1) + + cv = color.get(v, 0) + if cv == 0: + parent[v] = u + color[v] = 1 + stack.append((v, 0)) + elif cv == 1: + # back-edge u -> v; reconstruct v ... u -> v + cur = u + tmp = [cur] + while cur != v and cur in parent: + cur = parent[cur] + tmp.append(cur) + if tmp[-1] != v: + # Should be on stack; but if not, skip reconstruction. + return [v, u, v] + tmp.reverse() # v ... u + return tmp + [v] + + return None + + +def _find_any_cycle_undirected(graph: Graph) -> Optional[list[str]]: + adj = build_adjacency(graph, undirected=True) + seen: set[str] = set() + parent: dict[str, Optional[str]] = {} + + for start in node_ids(graph): + if start in seen: + continue + parent[start] = None + stack = [start] + while stack: + u = stack.pop() + if u in seen: + continue + seen.add(u) + for ae in adj.get(u, []): + v = ae.to + if v not in seen: + parent[v] = u + stack.append(v) + elif parent.get(u) != v: + return _reconstruct_cycle_from_tree_edge(u, v, parent) + return None + + +def _all_simple_cycles_bruteforce( + graph: Graph, + *, + min_length: int, + max_length: Optional[int], + max_cycles: int, + max_nodes: int, +) -> list[list[str]]: + ids = sorted(node_ids(graph)) + if len(ids) > max_nodes: + return [] + + directed = bool(graph.directed) + adj = build_adjacency(graph, undirected=not directed) + index = {n: i for i, n in enumerate(ids)} + undirected = not directed + + seen_cycles: set[tuple[str, ...]] = set() + out: list[list[str]] = [] + + def add_cycle(cycle: list[str]) -> None: + if len(out) >= max_cycles: + return + key = _canonical_cycle(cycle, undirected=undirected) + if not key: + return + if key in seen_cycles: + return + # enforce length bounds using number of distinct vertices in the cycle + k = len(key) - 1 # edges/vertices count in simple cycle + if k < min_length: + return + if max_length is not None and k > max_length: + return + seen_cycles.add(key) + out.append(list(key)) + + for start in ids: + if len(out) >= max_cycles: + break + start_i = index[start] + path = [start] + in_path = {start} + + def dfs(u: str, prev: Optional[str]) -> None: + if len(out) >= max_cycles: + return + + # Prune on path length (distinct vertices) + if max_length is not None and len(path) > max_length: + return + + for ae in adj.get(u, []): + v = ae.to + if index.get(v, -1) < start_i: + continue # ensure start is the minimum-id vertex in cycle + if not directed and prev is not None and v == prev: + continue # don't immediately traverse back on undirected edge + if v == start: + if len(path) >= 1: + add_cycle(path + [start]) + continue + if v in in_path: + continue + in_path.add(v) + path.append(v) + dfs(v, u) + path.pop() + in_path.remove(v) + + dfs(start, None) + + out.sort(key=lambda c: (len(c), c)) + return out + + +def _girth_and_shortest_cycle(graph: Graph) -> tuple[Optional[int], Optional[list[str]]]: + ids = node_ids(graph) + if len(ids) == 0: + return None, None + + directed = bool(graph.directed) + adj = build_adjacency(graph, undirected=not directed) + + best_len: Optional[int] = None + best_cycle: Optional[list[str]] = None + + if directed: + for start in ids: + dist: dict[str, int] = {start: 0} + parent: dict[str, Optional[str]] = {start: None} + q = deque([start]) + while q: + u = q.popleft() + du = dist[u] + if best_len is not None and du + 1 >= best_len: + continue + for ae in adj.get(u, []): + v = ae.to + if v == start: + cand = du + 1 + if best_len is None or cand < best_len: + # reconstruct start -> ... -> u then back to start + path: list[str] = [] + x: Optional[str] = u + while x is not None: + path.append(x) + x = parent.get(x) + path.reverse() + best_len = cand + best_cycle = path + [start] + continue + if v not in dist: + dist[v] = du + 1 + parent[v] = u + q.append(v) + return best_len, best_cycle + + # Undirected girth using BFS from each vertex + for start in ids: + dist: dict[str, int] = {start: 0} + parent: dict[str, Optional[str]] = {start: None} + q = deque([start]) + while q: + u = q.popleft() + du = dist[u] + if best_len is not None and du * 2 + 1 >= best_len: + continue + for ae in adj.get(u, []): + v = ae.to + if v not in dist: + dist[v] = du + 1 + parent[v] = u + q.append(v) + elif parent.get(u) != v: + cand = dist[u] + dist[v] + 1 + if best_len is None or cand < best_len: + best_len = cand + best_cycle = _reconstruct_cycle_from_tree_edge(u, v, parent) + + return best_len, best_cycle + + +def _detect_negative_cycle_bellman_ford( + graph: Graph, *, source_node: Optional[str] = None +) -> tuple[bool, Optional[list[str]]]: + """ + Detect any negative-weight cycle using Bellman-Ford. + + If source_node is None, uses a "super source" approach (dist=0 for all nodes) + to detect negative cycles anywhere in the graph. + """ + ids = node_ids(graph) + if not ids: + return False, None + + directed = bool(graph.directed) + edges: list[tuple[str, str, float]] = [] + for e in graph.edges: + w = float(e.weight if e.weight is not None else 1.0) + edges.append((e.source, e.target, w)) + if not directed: + edges.append((e.target, e.source, w)) + + if source_node is not None and source_node not in set(ids): + return False, None + + if source_node is None: + dist = {v: 0.0 for v in ids} + else: + dist = {source_node: 0.0} + + pred: dict[str, str] = {} + x: Optional[str] = None + + # Relax |V| times; if we can relax on the |V|-th iteration, there's a negative cycle. + for _ in range(len(ids)): + x = None + for u, v, w in edges: + if u not in dist: + continue + nd = dist[u] + w + if nd < dist.get(v, float("inf")): + dist[v] = nd + pred[v] = u + x = v + + if x is None: + return False, None + + # Move x into the cycle by following predecessors |V| times + y = x + for _ in range(len(ids)): + y = pred.get(y, y) + + # Collect the cycle by walking until we repeat y + cycle = [y] + cur = pred.get(y) + while cur is not None and cur != y and cur not in cycle: + cycle.append(cur) + cur = pred.get(cur) + + if not cycle: + return True, None + + # Make it a forward cycle order + cycle.reverse() + cycle = _close_cycle(cycle) + return True, cycle + + +def cycle_info( + graph: Graph, + *, + find_all: bool = False, + min_length: int = 3, + max_length: Optional[int] = None, + max_cycles: int = 1000, + max_nodes: int = 15, + return_cycles: bool = True, + return_shortest_cycle: bool = True, + return_girth: bool = True, + detect_negative_cycle: bool = False, + return_negative_cycle: bool = True, + negative_cycle_source_node: Optional[str] = None, +) -> CycleResult: + """ + Compute cycle-related information for a graph. + + - Cycle existence uses DFS (O(V+E)). + - Shortest cycle (girth) uses BFS (O(VE)) on unweighted edges. + - All cycles uses brute force with safeguards (intended for small graphs). + - Negative cycle uses Bellman-Ford (O(VE)). + """ + directed = bool(graph.directed) + + # Fast existence check first + any_cycle = ( + _find_any_cycle_directed(graph) if directed else _find_any_cycle_undirected(graph) + ) + has_cycle = any_cycle is not None + + cycles: Optional[list[list[str]]] = None + if find_all and return_cycles: + cycles = _all_simple_cycles_bruteforce( + graph, + min_length=max(1, int(min_length)), + max_length=max_length, + max_cycles=max(1, int(max_cycles)), + max_nodes=max(1, int(max_nodes)), + ) + elif return_cycles and any_cycle is not None: + cycles = [_close_cycle(any_cycle)] + + girth: Optional[int] = None + shortest_cycle: Optional[list[str]] = None + if return_girth or return_shortest_cycle: + g_len, g_cycle = _girth_and_shortest_cycle(graph) + girth = g_len if return_girth else None + shortest_cycle = _close_cycle(g_cycle) if (return_shortest_cycle and g_cycle) else None + + has_negative_cycle: Optional[bool] = None + negative_cycle: Optional[list[str]] = None + if detect_negative_cycle: + has_negative_cycle, neg = _detect_negative_cycle_bellman_ford( + graph, source_node=negative_cycle_source_node + ) + negative_cycle = _close_cycle(neg) if (return_negative_cycle and neg) else None + + return CycleResult( + has_cycle=has_cycle, + cycles=cycles, + shortest_cycle=shortest_cycle, + girth=girth, + has_negative_cycle=has_negative_cycle, + negative_cycle=negative_cycle, + ) + diff --git a/evaluation_function/schemas/params.py b/evaluation_function/schemas/params.py index c36f221..7559da7 100644 --- a/evaluation_function/schemas/params.py +++ b/evaluation_function/schemas/params.py @@ -151,6 +151,14 @@ class CycleDetectionParams(BaseModel): False, description="Whether to find all cycles or just detect presence" ) + max_nodes: int = Field( + 15, + description="Maximum number of nodes allowed when enumerating all cycles (safety guard)" + ) + max_cycles: int = Field( + 1000, + description="Maximum number of cycles to return when enumerating all cycles (safety guard)" + ) max_length: Optional[int] = Field( None, description="Maximum cycle length to consider" diff --git a/tests/test_cycles.py b/tests/test_cycles.py new file mode 100644 index 0000000..ae1d5f7 --- /dev/null +++ b/tests/test_cycles.py @@ -0,0 +1,202 @@ +import pytest + +from evaluation_function.algorithms.cycles import cycle_info +from evaluation_function.schemas import Edge, Graph, Node + + +def g(nodes, edges, *, directed=False): + return Graph(nodes=[Node(id=n) for n in nodes], edges=[Edge(**e) for e in edges], directed=directed) + + +class TestCycleDetection: + def test_undirected_tree_is_acyclic(self): + graph = g( + ["A", "B", "C", "D"], + [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}, {"source": "C", "target": "D"}], + directed=False, + ) + info = cycle_info(graph, find_all=False, return_cycles=True, return_girth=True, return_shortest_cycle=True) + assert info.has_cycle is False + assert info.girth is None + assert info.shortest_cycle is None + assert info.cycles in (None, []) + + def test_undirected_triangle_has_cycle(self): + graph = g( + ["A", "B", "C"], + [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}, {"source": "C", "target": "A"}], + directed=False, + ) + info = cycle_info(graph, find_all=False, return_cycles=True, return_girth=True, return_shortest_cycle=True) + assert info.has_cycle is True + assert info.girth == 3 + assert info.shortest_cycle is not None + assert len(info.shortest_cycle) == 4 # closed cycle + assert info.shortest_cycle[0] == info.shortest_cycle[-1] + + def test_directed_dag_is_acyclic(self): + graph = g( + ["1", "2", "3"], + [{"source": "1", "target": "2"}, {"source": "2", "target": "3"}, {"source": "1", "target": "3"}], + directed=True, + ) + info = cycle_info(graph, find_all=False, return_cycles=True, return_girth=True, return_shortest_cycle=True) + assert info.has_cycle is False + assert info.girth is None + assert info.shortest_cycle is None + + def test_directed_cycle_detected(self): + graph = g( + ["A", "B", "C"], + [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}, {"source": "C", "target": "A"}], + directed=True, + ) + info = cycle_info(graph, find_all=False, return_cycles=True, return_girth=True, return_shortest_cycle=True) + assert info.has_cycle is True + assert info.girth == 3 + assert info.shortest_cycle is not None + assert info.shortest_cycle[0] == info.shortest_cycle[-1] + + def test_directed_self_loop_is_cycle_length_1(self): + graph = g(["A"], [{"source": "A", "target": "A"}], directed=True) + info = cycle_info(graph, find_all=False, return_cycles=True, return_girth=True, return_shortest_cycle=True) + assert info.has_cycle is True + assert info.girth == 1 + assert info.shortest_cycle == ["A", "A"] + + +class TestAllCycles: + def test_find_all_cycles_undirected_small_graph(self): + # Square with a diagonal -> 2 triangles + 1 square + graph = g( + ["A", "B", "C", "D"], + [ + {"source": "A", "target": "B"}, + {"source": "B", "target": "C"}, + {"source": "C", "target": "D"}, + {"source": "D", "target": "A"}, + {"source": "A", "target": "C"}, + ], + directed=False, + ) + info = cycle_info( + graph, + find_all=True, + min_length=3, + max_length=None, + max_nodes=15, + max_cycles=100, + return_cycles=True, + return_girth=True, + return_shortest_cycle=True, + ) + assert info.has_cycle is True + assert info.cycles is not None + lengths = sorted({len(c) - 1 for c in info.cycles}) # edge-counts + assert lengths == [3, 4] + assert sum(1 for c in info.cycles if (len(c) - 1) == 3) == 2 + assert sum(1 for c in info.cycles if (len(c) - 1) == 4) == 1 + + def test_find_all_cycles_respects_max_length(self): + graph = g( + ["A", "B", "C", "D"], + [ + {"source": "A", "target": "B"}, + {"source": "B", "target": "C"}, + {"source": "C", "target": "D"}, + {"source": "D", "target": "A"}, + {"source": "A", "target": "C"}, + ], + directed=False, + ) + info = cycle_info( + graph, + find_all=True, + min_length=3, + max_length=3, + max_nodes=15, + max_cycles=100, + return_cycles=True, + return_girth=False, + return_shortest_cycle=False, + ) + assert info.cycles is not None + assert all((len(c) - 1) == 3 for c in info.cycles) + + def test_find_all_cycles_guard_max_nodes(self): + # Create a simple big cycle with 16 nodes (guard should prevent enumeration) + nodes = [str(i) for i in range(16)] + edges = [{"source": str(i), "target": str((i + 1) % 16)} for i in range(16)] + graph = g(nodes, edges, directed=False) + info = cycle_info( + graph, + find_all=True, + max_nodes=15, + max_cycles=100, + return_cycles=True, + return_girth=True, + return_shortest_cycle=True, + ) + assert info.has_cycle is True + assert info.cycles == [] # enumeration skipped + assert info.girth == 16 + + +class TestNegativeCycle: + def test_negative_cycle_directed_detected(self): + graph = g( + ["A", "B"], + [{"source": "A", "target": "B", "weight": 1}, {"source": "B", "target": "A", "weight": -2}], + directed=True, + ) + info = cycle_info( + graph, + detect_negative_cycle=True, + return_negative_cycle=True, + return_girth=False, + return_shortest_cycle=False, + return_cycles=False, + ) + assert info.has_negative_cycle is True + assert info.negative_cycle is not None + assert info.negative_cycle[0] == info.negative_cycle[-1] + + def test_negative_cycle_undirected_negative_edge_detected(self): + # Under our undirected-as-bidirectional model, a single negative edge implies a negative 2-cycle. + graph = g( + ["A", "B"], + [{"source": "A", "target": "B", "weight": -1}], + directed=False, + ) + info = cycle_info( + graph, + detect_negative_cycle=True, + return_negative_cycle=True, + return_girth=False, + return_shortest_cycle=False, + return_cycles=False, + ) + assert info.has_negative_cycle is True + assert info.negative_cycle is not None + + def test_no_negative_cycle(self): + graph = g( + ["A", "B", "C"], + [ + {"source": "A", "target": "B", "weight": 1}, + {"source": "B", "target": "C", "weight": 1}, + {"source": "C", "target": "A", "weight": 1}, + ], + directed=True, + ) + info = cycle_info( + graph, + detect_negative_cycle=True, + return_negative_cycle=True, + return_girth=False, + return_shortest_cycle=False, + return_cycles=False, + ) + assert info.has_negative_cycle is False + assert info.negative_cycle is None + From 2fb9ae6498ab1daba9b159a5013108dab239ea25 Mon Sep 17 00:00:00 2001 From: Johnny Wan <2695191695@qq.com> Date: Fri, 30 Jan 2026 03:39:16 +0000 Subject: [PATCH 3/3] Add cycle detection and shortest cycle evaluation to the evaluation function - Enhanced the `evaluation_function` to support cycle detection and shortest cycle evaluation. - Implemented logic for handling cycle detection parameters, including `find_all`, `min_length`, `max_length`, `max_nodes`, and `max_cycles`. - Added checks for negative cycles and updated feedback messages for cycle-related evaluations. - Integrated `cycle_info` to facilitate cycle-related computations and validations. --- evaluation_function/evaluation.py | 116 +++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/evaluation_function/evaluation.py b/evaluation_function/evaluation.py index 3abea5b..fe71498 100755 --- a/evaluation_function/evaluation.py +++ b/evaluation_function/evaluation.py @@ -5,7 +5,7 @@ from lf_toolkit.evaluation import Result, Params from pydantic import ValidationError -from evaluation_function.algorithms import bipartite_info, connectivity_info, shortest_path_info +from evaluation_function.algorithms import bipartite_info, connectivity_info, cycle_info, shortest_path_info from evaluation_function.algorithms.shortest_path import NegativeCycleError from evaluation_function.schemas import Answer, EvaluationParams, Graph, Response @@ -134,6 +134,120 @@ def _err(msg: str) -> Result: fb += f" odd_cycle={details.odd_cycle}" return _ok() if is_correct else _err(fb) + if eval_type in {"cycle_detection", "find_all_cycles"}: + c_params = p.cycle_detection + find_all = True if eval_type == "find_all_cycles" else bool(c_params.find_all) if c_params else False + min_len = int(c_params.min_length) if c_params else 3 + max_len = c_params.max_length if c_params else None + max_nodes = int(c_params.max_nodes) if c_params else 15 + max_cycles = int(c_params.max_cycles) if c_params else 1000 + want_cycles = bool(c_params.return_cycles) if c_params else True + + expected = ans.has_cycle + if expected is None: + expected = cycle_info( + expected_graph, + find_all=False, + return_cycles=False, + return_girth=False, + return_shortest_cycle=False, + ).has_cycle + + student_value = resp.has_cycle + if student_value is None: + student_value = cycle_info( + student_graph, + find_all=False, + return_cycles=False, + return_girth=False, + return_shortest_cycle=False, + ).has_cycle + + details = cycle_info( + student_graph, + find_all=find_all, + min_length=min_len, + max_length=max_len, + max_nodes=max_nodes, + max_cycles=max_cycles, + return_cycles=want_cycles, + return_girth=False, + return_shortest_cycle=False, + ) + + is_correct = bool(student_value) == bool(expected) + fb = f"Cycle detection: expected={expected}, got={student_value}." + if want_cycles and details.cycles is not None: + fb += f" cycles_found={len(details.cycles)}" + return _ok() if is_correct else _err(fb) + + if eval_type == "shortest_cycle": + c_params = p.cycle_detection + min_len = int(c_params.min_length) if c_params else 3 + max_len = c_params.max_length if c_params else None + + expected_girth = ans.girth + if expected_girth is None: + expected_girth = cycle_info( + expected_graph, + find_all=False, + min_length=min_len, + max_length=max_len, + return_cycles=False, + return_girth=True, + return_shortest_cycle=False, + ).girth + + student_girth = resp.girth + if student_girth is None: + student_girth = cycle_info( + student_graph, + find_all=False, + min_length=min_len, + max_length=max_len, + return_cycles=False, + return_girth=True, + return_shortest_cycle=False, + ).girth + + # Treat acyclic graphs as girth=None + is_correct = student_girth == expected_girth + fb = f"Shortest cycle (girth): expected={expected_girth}, got={student_girth}." + return _ok() if is_correct else _err(fb) + + if eval_type == "negative_cycle": + n_params = p.negative_cycle + want_cycle = bool(n_params.return_cycle) if n_params else True + source_node = n_params.source_node if n_params else None + + expected_neg = cycle_info( + expected_graph, + detect_negative_cycle=True, + return_negative_cycle=False, + negative_cycle_source_node=source_node, + return_cycles=False, + return_girth=False, + return_shortest_cycle=False, + ).has_negative_cycle + expected_neg_bool = bool(expected_neg) + + details = cycle_info( + student_graph, + detect_negative_cycle=True, + return_negative_cycle=want_cycle, + negative_cycle_source_node=source_node, + return_cycles=False, + return_girth=False, + return_shortest_cycle=False, + ) + student_neg_bool = bool(details.has_negative_cycle) + + is_correct = student_neg_bool == expected_neg_bool + fb = f"Negative cycle: expected={expected_neg_bool}, got={student_neg_bool}." + if want_cycle and details.negative_cycle is not None: + fb += f" negative_cycle={details.negative_cycle}" + return _ok() if is_correct else _err(fb) + if eval_type == "shortest_path": sp = p.shortest_path if sp is None: