diff --git a/src/sgraph/analyzers/__init__.py b/src/sgraph/analyzers/__init__.py new file mode 100644 index 0000000..218be17 --- /dev/null +++ b/src/sgraph/analyzers/__init__.py @@ -0,0 +1,71 @@ +""" +Analyzers for modeling various sources into SGraph structures. + +This module provides tools for analyzing source code, databases and other +structures into hierarchic graph models. + +Usage examples: + + # Simple Python analysis + >>> from sgraph.analyzers import analyze_python + >>> result = analyze_python("./src") + >>> result.graph.to_xml("model.xml") + + # Finer control + >>> from sgraph.analyzers import AnalyzerConfig, AnalysisLevel + >>> from sgraph.analyzers.code.python import analyze_python_project + >>> config = AnalyzerConfig( + ... root_path="./src", + ... level=AnalysisLevel.FULL, + ... exclude_patterns=("**/test/**",), + ... ) + >>> result = analyze_python_project(config) +""" +from sgraph.analyzers.base import ( + AnalyzerConfig, + AnalysisResult, + AnalysisError, + AnalysisLevel, + DependencyKind, + SourceLocation, +) + + +# Lazy import to avoid circular dependencies during package init +def analyze_python( + path: str, + level: "AnalysisLevel" = AnalysisLevel.FUNCTIONS, + **kwargs, +) -> "AnalysisResult": + """ + Analyze a Python project and produce an SGraph model. + + Args: + path: Root directory of the project + level: Analysis detail level + **kwargs: Other AnalyzerConfig parameters + + Returns: + AnalysisResult containing the graph, errors and statistics + + Example: + >>> result = analyze_python("./src/sgraph") + >>> print(result.graph.rootNode.getNodeCount()) + """ + from sgraph.analyzers.code.python.python_analyzer import analyze_python as _analyze + return _analyze(path, level, **kwargs) + + +__all__ = [ + # Main functions + "analyze_python", + # Configuration + "AnalyzerConfig", + "AnalysisLevel", + # Results + "AnalysisResult", + "AnalysisError", + # Types + "DependencyKind", + "SourceLocation", +] diff --git a/src/sgraph/analyzers/base.py b/src/sgraph/analyzers/base.py new file mode 100644 index 0000000..7b96dd0 --- /dev/null +++ b/src/sgraph/analyzers/base.py @@ -0,0 +1,134 @@ +"""Shared types and helper functions for the analyzer architecture.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum, auto +from pathlib import Path +from typing import TYPE_CHECKING +from collections.abc import Sequence + +if TYPE_CHECKING: + from sgraph import SGraph + + +class AnalysisLevel(Enum): + """Analysis detail level.""" + PACKAGES_ONLY = auto() # Packages/directories only + FILES = auto() # + files + CLASSES = auto() # + classes + FUNCTIONS = auto() # + functions/methods + FULL = auto() # + attributes, parameters, decorators + + +class DependencyKind(Enum): + """Dependency types.""" + IMPORT = "import" + FROM_IMPORT = "from_import" + INHERITS = "inherits" + IMPLEMENTS = "implements" + CALLS = "calls" + TYPE_REF = "type_ref" + + +@dataclass(frozen=True, slots=True) +class SourceLocation: + """Source code reference.""" + file: Path + line: int + column: int = 0 + end_line: int | None = None + end_column: int | None = None + + +@dataclass +class AnalyzerConfig: + """ + Analyzer configuration. + + Attributes: + root_path: Root directory of the project to analyze + level: Analysis detail level + include_patterns: Glob patterns for files to include + exclude_patterns: Glob patterns for files/directories to skip + follow_external_imports: Whether to follow external dependencies + include_stdlib: Whether to include standard-library modules + """ + root_path: Path + level: AnalysisLevel = AnalysisLevel.FUNCTIONS + include_patterns: Sequence[str] = ("**/*.py",) + exclude_patterns: Sequence[str] = ( + "**/__pycache__/**", + "**/.*", + "**/venv/**", + "**/.venv/**", + "**/env/**", + "**/node_modules/**", + "**/*.egg-info/**", + "**/build/**", + "**/dist/**", + ) + follow_external_imports: bool = False + include_stdlib: bool = False + + def __post_init__(self): + # Convert string to Path + if isinstance(self.root_path, str): + object.__setattr__(self, 'root_path', Path(self.root_path)) + + +@dataclass +class AnalysisError: + """A single error during analysis.""" + file: Path + message: str + line: int | None = None + exception: Exception | None = None + + def __str__(self) -> str: + loc = f":{self.line}" if self.line else "" + return f"{self.file}{loc}: {self.message}" + + +@dataclass +class AnalysisResult: + """ + Analysis result. + + Attributes: + graph: The produced SGraph model + config: The configuration that was used + errors: List of errors encountered during analysis + stats: Statistics (files analyzed, elements, etc.) + """ + graph: "SGraph" + config: AnalyzerConfig + errors: list[AnalysisError] = field(default_factory=list) + stats: dict[str, int] = field(default_factory=dict) + + @property + def success(self) -> bool: + """Whether the analysis succeeded (at least one element).""" + return self.graph.rootNode.getNodeCount() > 0 + + @property + def file_count(self) -> int: + """Number of files analyzed.""" + return self.stats.get("files_analyzed", 0) + + @property + def error_count(self) -> int: + """Number of errors.""" + return len(self.errors) + + def summary(self) -> str: + """Return a summary of the analysis.""" + lines = [ + f"Files analyzed: {self.file_count}", + f"Packages: {self.stats.get('packages', 0)}", + f"Modules: {self.stats.get('modules', 0)}", + f"Classes: {self.stats.get('classes', 0)}", + f"Functions: {self.stats.get('functions', 0)}", + f"Dependencies: {self.stats.get('dependencies', 0)}", + f"Errors: {self.error_count}", + ] + return "\n".join(lines) diff --git a/src/sgraph/analyzers/code/__init__.py b/src/sgraph/analyzers/code/__init__.py new file mode 100644 index 0000000..5eb2e70 --- /dev/null +++ b/src/sgraph/analyzers/code/__init__.py @@ -0,0 +1,6 @@ +"""Code analyzers for various programming languages.""" +from sgraph.analyzers.code.python import analyze_python_project + +__all__ = [ + "analyze_python_project", +] diff --git a/src/sgraph/analyzers/code/base.py b/src/sgraph/analyzers/code/base.py new file mode 100644 index 0000000..7136010 --- /dev/null +++ b/src/sgraph/analyzers/code/base.py @@ -0,0 +1,113 @@ +"""Shared structures for code analysis.""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Iterator +import fnmatch + + +@dataclass(frozen=True, slots=True) +class SourceFile: + """ + Metadata for a source file. + + Attributes: + path: Absolute path to the file + relative_path: Path relative to the root directory + content: File contents (loaded separately) + """ + path: Path + relative_path: Path + content: str | None = None + + @property + def module_path(self) -> str: + """ + Convert a file path into a Python module path. + + E.g. src/sgraph/analyzers/__init__.py -> src.sgraph.analyzers + """ + parts = list(self.relative_path.with_suffix('').parts) + if parts and parts[-1] == '__init__': + parts = parts[:-1] + return '.'.join(parts) + + @property + def is_package_init(self) -> bool: + """Whether the file is a package __init__.py.""" + return self.relative_path.name == '__init__.py' + + +def discover_source_files( + root: Path, + include_patterns: tuple[str, ...], + exclude_patterns: tuple[str, ...], +) -> Iterator[SourceFile]: + """ + Find source files in a directory. + + Args: + root: Root directory + include_patterns: Glob patterns for files to include + exclude_patterns: Glob patterns for files to skip + + Yields: + SourceFile objects for the files found + """ + root = root.resolve() + + def is_excluded(path: Path) -> bool: + rel_path = path.relative_to(root) + rel_str = str(rel_path) + rel_parts = rel_path.parts + + for pat in exclude_patterns: + # Check whether the pattern is a simple directory name (e.g. "__pycache__") + # or of the form **/name/** or **/name/* + clean_pat = pat.strip("*").strip("/") + if not clean_pat: + continue + + # If the pattern is "**/__pycache__/**", check whether "__pycache__" is in the path + if pat.startswith("**/") and (pat.endswith("/**") or pat.endswith("/*")): + dir_name = clean_pat.rstrip("/*") + if dir_name in rel_parts: + return True + + # Simple fnmatch without ** support + if fnmatch.fnmatch(rel_str, pat): + return True + + return False + + for pattern in include_patterns: + for file_path in root.glob(pattern): + if file_path.is_file() and not is_excluded(file_path): + yield SourceFile( + path=file_path, + relative_path=file_path.relative_to(root), + ) + + +def read_source_file(source: SourceFile, encoding: str = 'utf-8') -> SourceFile: + """ + Read a file's contents into a SourceFile object. + + Args: + source: SourceFile that is missing its content + encoding: Character encoding (default: utf-8) + + Returns: + A new SourceFile with content filled in + """ + try: + content = source.path.read_text(encoding=encoding) + except UnicodeDecodeError: + # Fall back to latin-1 + content = source.path.read_text(encoding='latin-1') + return SourceFile( + path=source.path, + relative_path=source.relative_path, + content=content + ) diff --git a/src/sgraph/analyzers/code/python/__init__.py b/src/sgraph/analyzers/code/python/__init__.py new file mode 100644 index 0000000..25f12e1 --- /dev/null +++ b/src/sgraph/analyzers/code/python/__init__.py @@ -0,0 +1,10 @@ +"""Python code analyzer.""" +from sgraph.analyzers.code.python.python_analyzer import ( + analyze_python_project, + analyze_python, +) + +__all__ = [ + "analyze_python_project", + "analyze_python", +] diff --git a/src/sgraph/analyzers/code/python/ast_visitor.py b/src/sgraph/analyzers/code/python/ast_visitor.py new file mode 100644 index 0000000..12f2a05 --- /dev/null +++ b/src/sgraph/analyzers/code/python/ast_visitor.py @@ -0,0 +1,227 @@ +"""Python AST processing for building the element structure.""" +from __future__ import annotations + +import ast +from typing import Any + +from sgraph import SElement + +from ...base import AnalyzerConfig, AnalysisLevel +from ..base import SourceFile + + +def visit_module( + tree: ast.Module, + file_element: SElement, + source: SourceFile, + config: AnalyzerConfig, + stats: dict[str, int], +) -> list[dict[str, Any]]: + """ + Walk the AST and create elements under file_element. + + Args: + tree: Parsed AST tree + file_element: SElement representing the file/package + source: Source file information + config: Analyzer configuration + stats: Statistics (mutated in place) + + Returns: + List of collected import information for later resolution + """ + visitor = _ModuleVisitor( + file_element=file_element, + source=source, + config=config, + stats=stats, + ) + visitor.visit(tree) + return visitor.pending_imports + + +class _ModuleVisitor(ast.NodeVisitor): + """AST visitor that builds the SElement structure.""" + + def __init__( + self, + file_element: SElement, + source: SourceFile, + config: AnalyzerConfig, + stats: dict[str, int], + ): + self.file_element = file_element + self.source = source + self.config = config + self.stats = stats + self.current_scope: SElement = file_element + self.pending_imports: list[dict[str, Any]] = [] + + def visit_Module(self, node: ast.Module) -> None: + """Handle the module top level.""" + for child in node.body: + self.visit(child) + + def visit_Import(self, node: ast.Import) -> None: + """Handle 'import x' and 'import x as y' statements.""" + for alias in node.names: + self.pending_imports.append({ + "module": alias.name, + "alias": alias.asname, + "is_from": False, + "line": node.lineno, + }) + + def visit_ImportFrom(self, node: ast.ImportFrom) -> None: + """Handle 'from x import y' statements.""" + if node.module is None and node.level == 0: + return # Invalid import + + names = [alias.name for alias in node.names] + self.pending_imports.append({ + "module": node.module or "", + "names": names, + "level": node.level, # Relative import level (0 = absolute) + "is_from": True, + "line": node.lineno, + }) + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + """Handle class definitions.""" + if self.config.level.value < AnalysisLevel.CLASSES.value: + return + + class_elem = SElement(self.current_scope, node.name) + class_elem.setType("class") + class_elem.addAttribute("line", node.lineno) + + # Decorators + if node.decorator_list and self.config.level == AnalysisLevel.FULL: + decorators = [_get_decorator_name(d) for d in node.decorator_list] + class_elem.addAttribute("decorators", ";".join(filter(None, decorators))) + + # Inheritance - store for later resolution + if node.bases: + base_names = [_get_name_from_node(b) for b in node.bases] + base_names = [b for b in base_names if b] # Filter out empties + if base_names: + class_elem.addAttribute("_pending_bases", base_names) + + self.stats["classes"] = self.stats.get("classes", 0) + 1 + + # Process the class body + if self.config.level.value >= AnalysisLevel.FUNCTIONS.value: + old_scope = self.current_scope + self.current_scope = class_elem + for child in node.body: + self.visit(child) + self.current_scope = old_scope + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + """Handle function/method definitions.""" + self._handle_function(node, is_async=False) + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + """Handle async functions.""" + self._handle_function(node, is_async=True) + + def _handle_function( + self, + node: ast.FunctionDef | ast.AsyncFunctionDef, + is_async: bool = False, + ) -> None: + """Shared handling for functions and methods.""" + if self.config.level.value < AnalysisLevel.FUNCTIONS.value: + return + + func_elem = SElement(self.current_scope, node.name) + + # Determine type: method vs function + is_method = self.current_scope.getType() == "class" + func_elem.setType("method" if is_method else "function") + func_elem.addAttribute("line", node.lineno) + + if is_async: + func_elem.addAttribute("async", "true") + + # Decorators at FULL level + if node.decorator_list and self.config.level == AnalysisLevel.FULL: + decorators = [_get_decorator_name(d) for d in node.decorator_list] + decorators = [d for d in decorators if d] + if decorators: + func_elem.addAttribute("decorators", ";".join(decorators)) + + # Parameters at FULL level + if self.config.level == AnalysisLevel.FULL: + params = _extract_parameters(node.args) + if params: + func_elem.addAttribute("parameters", ";".join(params)) + + # Return type annotation + if node.returns: + return_type = _get_name_from_node(node.returns) + if return_type: + func_elem.addAttribute("return_type", return_type) + + self.stats["functions"] = self.stats.get("functions", 0) + 1 + + +def _get_decorator_name(node: ast.expr) -> str: + """Extract the decorator name from an AST node.""" + match node: + case ast.Name(id=name): + return name + case ast.Attribute(attr=attr): + return attr + case ast.Call(func=func): + return _get_decorator_name(func) + case _: + return "" + + +def _get_name_from_node(node: ast.expr) -> str: + """Extract a name from an AST expression (type annotations, base classes, etc.).""" + match node: + case ast.Name(id=name): + return name + case ast.Attribute(value=value, attr=attr): + prefix = _get_name_from_node(value) + return f"{prefix}.{attr}" if prefix else attr + case ast.Subscript(value=value): + # E.g. list[str] -> list + return _get_name_from_node(value) + case ast.Constant(value=value): + # String annotation "SomeType" + return str(value) if isinstance(value, str) else "" + case ast.BinOp(): + # Union type X | Y (Python 3.10+) + return "" + case _: + return "" + + +def _extract_parameters(args: ast.arguments) -> list[str]: + """Extract function parameters.""" + params: list[str] = [] + + # Positional-only parameters (Python 3.8+) + for arg in args.posonlyargs: + params.append(arg.arg) + + # Regular positional/keyword parameters + for arg in args.args: + params.append(arg.arg) + + # *args + if args.vararg: + params.append(f"*{args.vararg.arg}") + + # Keyword-only parameters + for arg in args.kwonlyargs: + params.append(arg.arg) + + # **kwargs + if args.kwarg: + params.append(f"**{args.kwarg.arg}") + + return params diff --git a/src/sgraph/analyzers/code/python/import_resolver.py b/src/sgraph/analyzers/code/python/import_resolver.py new file mode 100644 index 0000000..b59e457 --- /dev/null +++ b/src/sgraph/analyzers/code/python/import_resolver.py @@ -0,0 +1,167 @@ +"""Python import resolution for creating dependencies.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from sgraph import SElement + +from ...base import AnalyzerConfig + + +@dataclass(slots=True) +class ImportTarget: + """A resolved import target.""" + element: SElement + module_path: str + is_external: bool = False + + +def resolve_import( + import_info: dict[str, Any], + from_element: SElement, + module_registry: dict[str, SElement], + config: AnalyzerConfig, +) -> list[ImportTarget]: + """ + Resolve an import statement into target elements. + + Args: + import_info: Import information (from ast_visitor) + from_element: The element the import is made from + module_registry: Registered modules (module_path -> SElement) + config: Analyzer configuration + + Returns: + List of ImportTarget objects (may be empty if nothing is found) + """ + results: list[ImportTarget] = [] + module_name = import_info.get("module", "") + level = import_info.get("level", 0) + names = import_info.get("names", []) + + # Handle relative imports + if level > 0: + base_path = _resolve_relative_import_base( + level=level, + from_element=from_element, + ) + if base_path is None: + return [] + + if module_name: + # "from .subpkg import x" -> base_path.module_name + full_module = f"{base_path}.{module_name}" if base_path else module_name + target = _find_module(full_module, module_registry, config) + if target: + results.append(target) + elif names: + # "from . import x, y" -> resolve each name separately + for name in names: + full_module = f"{base_path}.{name}" if base_path else name + target = _find_module(full_module, module_registry, config) + if target: + results.append(target) + else: + # Plain package import + target = _find_module(base_path, module_registry, config) + if target: + results.append(target) + else: + # Absolute import + if module_name: + target = _find_module(module_name, module_registry, config) + if target: + results.append(target) + + return results + + +def _find_module( + module_name: str, + module_registry: dict[str, SElement], + config: AnalyzerConfig, +) -> ImportTarget | None: + """Find a module in the registry.""" + if not module_name: + return None + + # Check for a direct match + if module_name in module_registry: + return ImportTarget( + element=module_registry[module_name], + module_path=module_name, + is_external=False, + ) + + # Try to find a parent module + parts = module_name.split(".") + for i in range(len(parts), 0, -1): + partial = ".".join(parts[:i]) + if partial in module_registry: + return ImportTarget( + element=module_registry[partial], + module_path=module_name, + is_external=False, + ) + + # External module - skipped unless we follow them + if not config.follow_external_imports: + return None + + return None + + +def _resolve_relative_import_base( + level: int, + from_element: SElement, +) -> str | None: + """ + Resolve the base path for a relative import. + + Args: + level: Number of dots (1 = ".", 2 = "..", etc.) + from_element: The element the import is made from + + Returns: + Base path (e.g. "pkg" when in pkg/main.py) or None + + Example: + # If we are in pkg/sub/module.py + # level=1 -> "pkg.sub" + # level=2 -> "pkg" + """ + # Collect the element's path from the root + path_parts: list[str] = [] + current = from_element + while current is not None and current.name != '': + path_parts.insert(0, current.name) + current = current.parent + + # If a file (not __init__.py), drop the file name + # __init__.py represents the package; other files are modules within the package + if from_element.getType() == "file": + if path_parts: + path_parts = path_parts[:-1] + + # Move up level-1 levels + # level=1 (from . import x) = same package + # level=2 (from .. import x) = parent package + if level > 1: + steps_up = level - 1 + if steps_up >= len(path_parts): + return None # Too many levels up + path_parts = path_parts[:-steps_up] + + if not path_parts: + # We are at the root; a relative import cannot work + return "" + + return ".".join(path_parts) + + +def get_dependency_type(import_info: dict[str, Any]) -> str: + """Return the dependency type based on the import information.""" + if import_info.get("is_from"): + return "from_import" + return "import" diff --git a/src/sgraph/analyzers/code/python/python_analyzer.py b/src/sgraph/analyzers/code/python/python_analyzer.py new file mode 100644 index 0000000..a2c1ea9 --- /dev/null +++ b/src/sgraph/analyzers/code/python/python_analyzer.py @@ -0,0 +1,260 @@ +"""Python source code analyzer.""" +from __future__ import annotations + +import ast +from pathlib import Path +from typing import Any + +from sgraph import SGraph, SElement, SElementAssociation + +from ...base import ( + AnalyzerConfig, + AnalysisResult, + AnalysisError, + AnalysisLevel, +) +from ..base import SourceFile, discover_source_files, read_source_file +from .ast_visitor import visit_module +from .import_resolver import resolve_import, get_dependency_type + + +def analyze_python_project(config: AnalyzerConfig) -> AnalysisResult: + """ + Analyze a Python project and produce an SGraph model. + + Args: + config: Analyzer configuration + + Returns: + AnalysisResult containing the graph, errors and statistics + + Example: + >>> config = AnalyzerConfig(root_path=Path("./src")) + >>> result = analyze_python_project(config) + >>> print(f"Analyzed {result.file_count} files") + >>> result.graph.to_xml("model.xml") + """ + graph = SGraph(SElement(None, '')) + errors: list[AnalysisError] = [] + stats: dict[str, int] = { + "files_analyzed": 0, + "files_skipped": 0, + "packages": 0, + "modules": 0, + "classes": 0, + "functions": 0, + "dependencies": 0, + } + + # Module registry for import resolution + module_registry: dict[str, SElement] = {} + + # Collect pending imports from all files + all_pending_imports: list[tuple[SElement, list[dict[str, Any]]]] = [] + + # Phase 1: Discover and analyze files + source_files = list(discover_source_files( + config.root_path, + tuple(config.include_patterns), + tuple(config.exclude_patterns), + )) + + for source in source_files: + try: + source_with_content = read_source_file(source) + file_element, pending_imports = _analyze_file( + graph=graph, + source=source_with_content, + config=config, + stats=stats, + module_registry=module_registry, + ) + if pending_imports: + all_pending_imports.append((file_element, pending_imports)) + + except SyntaxError as e: + errors.append(AnalysisError( + file=source.path, + message=f"Syntax error: {e.msg}", + line=e.lineno, + exception=e, + )) + stats["files_skipped"] += 1 + + except Exception as e: + errors.append(AnalysisError( + file=source.path, + message=str(e), + exception=e, + )) + stats["files_skipped"] += 1 + + # Phase 2: Resolve imports into dependencies + _resolve_all_imports( + all_pending_imports=all_pending_imports, + module_registry=module_registry, + config=config, + stats=stats, + ) + + return AnalysisResult( + graph=graph, + config=config, + errors=errors, + stats=stats, + ) + + +def _analyze_file( + graph: SGraph, + source: SourceFile, + config: AnalyzerConfig, + stats: dict[str, int], + module_registry: dict[str, SElement], +) -> tuple[SElement, list[dict[str, Any]]]: + """ + Analyze a single Python file. + + Returns: + tuple[SElement, list]: (created element, pending imports) + """ + if source.content is None: + raise ValueError(f"Source content is None for {source.path}") + + # Parse the AST + tree = ast.parse(source.content, filename=str(source.path)) + + # Build the element path + path_parts = list(source.relative_path.parts) + + # Handle __init__.py specially (it represents a package) + is_package_init = source.is_package_init + + if is_package_init: + path_parts = path_parts[:-1] # Drop __init__.py + element_type = "package" + else: + # Strip the .py suffix from the file name + if path_parts: + path_parts[-1] = path_parts[-1].removesuffix('.py') + element_type = "file" + + # Create or get the element + if path_parts: + element_path = "/" + "/".join(path_parts) + file_element = graph.createOrGetElementFromPath(element_path) + else: + # Empty path = root (e.g. a lone __init__.py) + file_element = graph.rootNode + + file_element.setType(element_type) + file_element.addAttribute("source_path", str(source.path)) + + # Register the module + module_path = source.module_path + if module_path: + module_registry[module_path] = file_element + + # Update statistics + stats["files_analyzed"] += 1 + if is_package_init: + stats["packages"] += 1 + else: + stats["modules"] += 1 + + # Collect content from the AST (classes, functions, imports) + pending_imports: list[dict[str, Any]] = [] + if config.level.value >= AnalysisLevel.FILES.value: + pending_imports = visit_module( + tree=tree, + file_element=file_element, + source=source, + config=config, + stats=stats, + ) + + return file_element, pending_imports + + +def _resolve_all_imports( + all_pending_imports: list[tuple[SElement, list[dict[str, Any]]]], + module_registry: dict[str, SElement], + config: AnalyzerConfig, + stats: dict[str, int], +) -> None: + """Resolve all imports into dependencies.""" + for from_element, pending_imports in all_pending_imports: + for import_info in pending_imports: + targets = resolve_import( + import_info=import_info, + from_element=from_element, + module_registry=module_registry, + config=config, + ) + for target in targets: + _create_import_dependency( + from_elem=from_element, + to_elem=target.element, + import_info=import_info, + stats=stats, + ) + + +def _create_import_dependency( + from_elem: SElement, + to_elem: SElement, + import_info: dict[str, Any], + stats: dict[str, int], +) -> None: + """Create an import dependency between elements.""" + # Avoid self-references + if from_elem is to_elem: + return + + dep_type = get_dependency_type(import_info) + + # Collect attributes + attrs: dict[str, str | int | list[str]] = {} + if "line" in import_info: + attrs["line"] = import_info["line"] + if "names" in import_info: + attrs["imported_names"] = import_info["names"] + + # Check whether the same dependency already exists + for existing in from_elem.outgoing: + if existing.toElement is to_elem and existing.deptype == dep_type: + return # Duplicate, skip + + # Create the association + ea = SElementAssociation(from_elem, to_elem, dep_type, attrs) + ea.initElems() + stats["dependencies"] += 1 + + +def analyze_python( + path: str | Path, + level: AnalysisLevel = AnalysisLevel.FUNCTIONS, + **kwargs: Any, +) -> AnalysisResult: + """ + Convenient entry point for Python analysis. + + Args: + path: Root directory of the project + level: Analysis detail level + **kwargs: Other AnalyzerConfig parameters + + Returns: + AnalysisResult + + Example: + >>> result = analyze_python("./src/sgraph") + >>> print(result.graph.rootNode.getNodeCount()) + >>> result.graph.to_xml("model.xml") + """ + config = AnalyzerConfig( + root_path=Path(path) if isinstance(path, str) else path, + level=level, + **kwargs, + ) + return analyze_python_project(config) diff --git a/src/sgraph/analyzers/database/__init__.py b/src/sgraph/analyzers/database/__init__.py new file mode 100644 index 0000000..b638e52 --- /dev/null +++ b/src/sgraph/analyzers/database/__init__.py @@ -0,0 +1,3 @@ +"""Database analyzers (coming soon).""" + +__all__: list[str] = [] diff --git a/src/sgraph/analyzers/infrastructure/__init__.py b/src/sgraph/analyzers/infrastructure/__init__.py new file mode 100644 index 0000000..8d2a01c --- /dev/null +++ b/src/sgraph/analyzers/infrastructure/__init__.py @@ -0,0 +1,3 @@ +"""Infrastructure analyzers (coming soon).""" + +__all__: list[str] = [] diff --git a/tests/analyzers/__init__.py b/tests/analyzers/__init__.py new file mode 100644 index 0000000..04911ab --- /dev/null +++ b/tests/analyzers/__init__.py @@ -0,0 +1 @@ +"""Tests for the analyzers.""" diff --git a/tests/analyzers/code/__init__.py b/tests/analyzers/code/__init__.py new file mode 100644 index 0000000..6571198 --- /dev/null +++ b/tests/analyzers/code/__init__.py @@ -0,0 +1 @@ +"""Tests for the code analyzers.""" diff --git a/tests/analyzers/code/python/__init__.py b/tests/analyzers/code/python/__init__.py new file mode 100644 index 0000000..934df0d --- /dev/null +++ b/tests/analyzers/code/python/__init__.py @@ -0,0 +1 @@ +"""Tests for the Python analyzer.""" diff --git a/tests/analyzers/code/python/test_python_analyzer.py b/tests/analyzers/code/python/test_python_analyzer.py new file mode 100644 index 0000000..06e6e57 --- /dev/null +++ b/tests/analyzers/code/python/test_python_analyzer.py @@ -0,0 +1,232 @@ +"""Integration tests for the Python analyzer.""" +import pytest +from pathlib import Path +import tempfile +import os + +from sgraph.analyzers import analyze_python, AnalyzerConfig, AnalysisLevel +from sgraph.analyzers.code.python import analyze_python_project + + +class TestAnalyzePython: + """Integration tests for analyze_python().""" + + @pytest.fixture + def sgraph_src_path(self) -> Path: + """The sgraph project's src/sgraph directory (dogfooding).""" + return Path(__file__).parent.parent.parent.parent.parent / "src" / "sgraph" + + def test_analyze_sgraph_project(self, sgraph_src_path: Path): + """Analyze the sgraph project itself (dogfooding).""" + if not sgraph_src_path.exists(): + pytest.skip("sgraph src directory not found") + + result = analyze_python(sgraph_src_path) + + assert result.success + assert result.file_count > 10 + assert result.error_count == 0 + assert result.stats["classes"] > 0 + assert result.stats["functions"] > 0 + + def test_analyze_with_different_levels(self, sgraph_src_path: Path): + """Test the different detail levels.""" + if not sgraph_src_path.exists(): + pytest.skip("sgraph src directory not found") + + results = {} + for level in [AnalysisLevel.FILES, AnalysisLevel.CLASSES, AnalysisLevel.FUNCTIONS]: + results[level] = analyze_python(sgraph_src_path, level=level) + + # Higher detail level = more elements + files_count = results[AnalysisLevel.FILES].graph.rootNode.getNodeCount() + classes_count = results[AnalysisLevel.CLASSES].graph.rootNode.getNodeCount() + functions_count = results[AnalysisLevel.FUNCTIONS].graph.rootNode.getNodeCount() + + assert functions_count >= classes_count >= files_count + + def test_handles_syntax_errors_gracefully(self, tmp_path: Path): + """Invalid syntax does not crash the analyzer.""" + bad_file = tmp_path / "bad.py" + bad_file.write_text("def broken(:\n pass") + + result = analyze_python(tmp_path) + + assert len(result.errors) == 1 + assert "Syntax error" in result.errors[0].message + + def test_empty_directory(self, tmp_path: Path): + """An empty directory does not crash the analyzer.""" + result = analyze_python(tmp_path) + + assert result.file_count == 0 + assert result.error_count == 0 + + def test_single_file(self, tmp_path: Path): + """A single file is analyzed.""" + test_file = tmp_path / "test.py" + test_file.write_text(""" +def hello(): + pass + +class MyClass: + def method(self): + pass +""") + result = analyze_python(tmp_path) + + assert result.file_count == 1 + assert result.stats["functions"] == 2 # hello + method + assert result.stats["classes"] == 1 + + def test_package_with_init(self, tmp_path: Path): + """A package with __init__.py is recognized.""" + pkg = tmp_path / "mypackage" + pkg.mkdir() + (pkg / "__init__.py").write_text("# Package init") + (pkg / "module.py").write_text("def func(): pass") + + result = analyze_python(tmp_path) + + assert result.stats["packages"] == 1 + assert result.stats["modules"] == 1 + + def test_excludes_pycache(self, tmp_path: Path): + """__pycache__ is skipped.""" + cache = tmp_path / "__pycache__" + cache.mkdir() + (cache / "cached.py").write_text("# Should be ignored") + (tmp_path / "real.py").write_text("def func(): pass") + + result = analyze_python(tmp_path) + + assert result.file_count == 1 + + def test_import_dependencies_created(self, tmp_path: Path): + """Import statements create dependencies.""" + (tmp_path / "main.py").write_text(""" +from utils import helper +import other +""") + (tmp_path / "utils.py").write_text("def helper(): pass") + (tmp_path / "other.py").write_text("x = 1") + + result = analyze_python(tmp_path) + + # Check that dependencies are found + main_elem = result.graph.findElementFromPath("/main") + assert main_elem is not None + assert len(main_elem.outgoing) >= 1 # At least the utils dependency + + +class TestAnalyzePythonProject: + """Tests for analyze_python_project().""" + + def test_custom_exclude_patterns(self, tmp_path: Path): + """Custom exclude patterns work.""" + (tmp_path / "main.py").write_text("def main(): pass") + test_dir = tmp_path / "tests" + test_dir.mkdir() + (test_dir / "test_main.py").write_text("def test(): pass") + + config = AnalyzerConfig( + root_path=tmp_path, + exclude_patterns=("**/tests/**",), + ) + result = analyze_python_project(config) + + assert result.file_count == 1 + + def test_full_level_extracts_parameters(self, tmp_path: Path): + """The FULL level extracts parameters.""" + (tmp_path / "test.py").write_text(""" +def func(a, b, *args, key=None, **kwargs): + pass +""") + result = analyze_python(tmp_path, level=AnalysisLevel.FULL) + + # Find the function + func_elem = result.graph.findElementFromPath("/test/func") + assert func_elem is not None + params = func_elem.attrs.get("parameters", "") + assert "a" in params + assert "b" in params + assert "*args" in params + assert "**kwargs" in params + + def test_decorators_extracted(self, tmp_path: Path): + """Decorators are extracted at the FULL level.""" + (tmp_path / "test.py").write_text(""" +@staticmethod +def static_func(): + pass + +@property +def prop(self): + pass +""") + result = analyze_python(tmp_path, level=AnalysisLevel.FULL) + + static_elem = result.graph.findElementFromPath("/test/static_func") + assert static_elem is not None + decorators = static_elem.attrs.get("decorators", "") + assert "staticmethod" in decorators + + +class TestRelativeImports: + """Tests for relative imports.""" + + def test_relative_import_same_package(self, tmp_path: Path): + """from . import x works.""" + pkg = tmp_path / "pkg" + pkg.mkdir() + (pkg / "__init__.py").write_text("") + (pkg / "main.py").write_text("from . import utils") + (pkg / "utils.py").write_text("x = 1") + + result = analyze_python(tmp_path) + + main_elem = result.graph.findElementFromPath("/pkg/main") + assert main_elem is not None + # Check the dependency + deps = [ea.toElement.name for ea in main_elem.outgoing] + assert "pkg" in deps or "utils" in deps + + def test_relative_import_parent_package(self, tmp_path: Path): + """from .. import x works.""" + pkg = tmp_path / "pkg" + sub = pkg / "sub" + pkg.mkdir() + sub.mkdir() + (pkg / "__init__.py").write_text("ROOT = 1") + (pkg / "utils.py").write_text("x = 1") + (sub / "__init__.py").write_text("") + (sub / "module.py").write_text("from .. import utils") + + result = analyze_python(tmp_path) + + module_elem = result.graph.findElementFromPath("/pkg/sub/module") + assert module_elem is not None + + +class TestAsyncFunctions: + """Tests for async functions.""" + + def test_async_function_marked(self, tmp_path: Path): + """Async functions are marked.""" + (tmp_path / "test.py").write_text(""" +async def async_func(): + pass + +def sync_func(): + pass +""") + result = analyze_python(tmp_path) + + async_elem = result.graph.findElementFromPath("/test/async_func") + sync_elem = result.graph.findElementFromPath("/test/sync_func") + + assert async_elem is not None + assert async_elem.attrs.get("async") == "true" + assert sync_elem is not None + assert sync_elem.attrs.get("async") is None diff --git a/tests/analyzers/test_base.py b/tests/analyzers/test_base.py new file mode 100644 index 0000000..c1084e3 --- /dev/null +++ b/tests/analyzers/test_base.py @@ -0,0 +1,165 @@ +"""Tests for the analyzers/base.py module.""" +import pytest +from pathlib import Path + +from sgraph.analyzers.base import ( + AnalysisLevel, + AnalyzerConfig, + AnalysisError, + AnalysisResult, + DependencyKind, + SourceLocation, +) +from sgraph import SGraph, SElement + + +class TestAnalysisLevel: + """Tests for the AnalysisLevel enum.""" + + def test_level_ordering(self): + """Levels are in the correct order.""" + assert AnalysisLevel.PACKAGES_ONLY.value < AnalysisLevel.FILES.value + assert AnalysisLevel.FILES.value < AnalysisLevel.CLASSES.value + assert AnalysisLevel.CLASSES.value < AnalysisLevel.FUNCTIONS.value + assert AnalysisLevel.FUNCTIONS.value < AnalysisLevel.FULL.value + + +class TestAnalyzerConfig: + """Tests for AnalyzerConfig.""" + + def test_string_path_converted(self): + """A string path is converted to a Path object.""" + config = AnalyzerConfig(root_path="./src") + assert isinstance(config.root_path, Path) + assert config.root_path == Path("./src") + + def test_default_values(self): + """Default values are sensible.""" + config = AnalyzerConfig(root_path=Path(".")) + assert config.level == AnalysisLevel.FUNCTIONS + assert "**/*.py" in config.include_patterns + assert "**/__pycache__/**" in config.exclude_patterns + assert config.follow_external_imports is False + + def test_custom_values(self): + """Custom values work.""" + config = AnalyzerConfig( + root_path=Path("/test"), + level=AnalysisLevel.CLASSES, + include_patterns=("*.py",), + exclude_patterns=("**/test/**",), + follow_external_imports=True, + ) + assert config.level == AnalysisLevel.CLASSES + assert config.include_patterns == ("*.py",) + assert config.follow_external_imports is True + + +class TestAnalysisError: + """Tests for AnalysisError.""" + + def test_str_representation(self): + """The string representation is clear.""" + error = AnalysisError( + file=Path("/test/file.py"), + message="Test error", + line=42, + ) + assert "/test/file.py:42: Test error" == str(error) + + def test_str_without_line(self): + """String without a line number.""" + error = AnalysisError( + file=Path("/test/file.py"), + message="Test error", + ) + assert "/test/file.py: Test error" == str(error) + + +class TestAnalysisResult: + """Tests for AnalysisResult.""" + + def test_success_with_elements(self): + """success is True when there are elements.""" + graph = SGraph(SElement(None, '')) + SElement(graph.rootNode, "test") + result = AnalysisResult( + graph=graph, + config=AnalyzerConfig(root_path=Path(".")), + ) + assert result.success is True + + def test_file_count(self): + """file_count returns the correct value.""" + graph = SGraph(SElement(None, '')) + result = AnalysisResult( + graph=graph, + config=AnalyzerConfig(root_path=Path(".")), + stats={"files_analyzed": 10}, + ) + assert result.file_count == 10 + + def test_error_count(self): + """error_count returns the number of errors.""" + graph = SGraph(SElement(None, '')) + result = AnalysisResult( + graph=graph, + config=AnalyzerConfig(root_path=Path(".")), + errors=[ + AnalysisError(file=Path("a.py"), message="Error 1"), + AnalysisError(file=Path("b.py"), message="Error 2"), + ], + ) + assert result.error_count == 2 + + def test_summary(self): + """summary() returns a summary.""" + graph = SGraph(SElement(None, '')) + result = AnalysisResult( + graph=graph, + config=AnalyzerConfig(root_path=Path(".")), + stats={ + "files_analyzed": 5, + "packages": 1, + "modules": 4, + "classes": 3, + "functions": 10, + "dependencies": 2, + }, + ) + summary = result.summary() + assert "Files analyzed: 5" in summary + assert "Classes: 3" in summary + assert "Functions: 10" in summary + + +class TestSourceLocation: + """Tests for SourceLocation.""" + + def test_frozen_dataclass(self): + """SourceLocation is immutable.""" + loc = SourceLocation(file=Path("test.py"), line=10) + with pytest.raises(AttributeError): + loc.line = 20 # type: ignore + + def test_optional_fields(self): + """Optional fields work.""" + loc = SourceLocation( + file=Path("test.py"), + line=10, + column=5, + end_line=15, + end_column=10, + ) + assert loc.column == 5 + assert loc.end_line == 15 + + +class TestDependencyKind: + """Tests for DependencyKind.""" + + def test_values(self): + """Enum values are correct.""" + assert DependencyKind.IMPORT.value == "import" + assert DependencyKind.FROM_IMPORT.value == "from_import" + assert DependencyKind.INHERITS.value == "inherits"