Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dist/
build/
eggs/
*.whl
.claude/

# Virtual environments
.venv/
Expand Down
27 changes: 5 additions & 22 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,19 @@ dependencies = [
"pyyaml>=6.0",
"tiktoken>=0.5.0",
"regex>=2023.0",
"transformers>=4.30.0",
"torch>=2.0.0",
"numpy>=1.24.0",
]

[project.optional-dependencies]
all = [
"sentinelguard[pii]",
"sentinelguard[adversarial]",
"sentinelguard[advanced]",
"sentinelguard[api]",
"sentinelguard[monitoring]",
]
pii = [
# PII detection (Presidio)
"presidio-analyzer>=2.2.0",
"presidio-anonymizer>=2.2.0",
"spacy>=3.6.0",
]
adversarial = [
# Model-based detection (HuggingFace)
"transformers>=4.30.0",
"torch>=2.0.0",
"numpy>=1.24.0",
]
advanced = [
"transformers>=4.30.0",
"torch>=2.0.0",
"numpy>=1.24.0",
"scikit-learn>=1.3.0",
"sentence-transformers>=2.2.0",
"scikit-learn>=1.3.0",
]

[project.optional-dependencies]
api = [
"fastapi>=0.100.0",
"uvicorn>=0.23.0",
Expand Down
189 changes: 35 additions & 154 deletions sentinelguard/pii/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""PII (Personally Identifiable Information) detection and anonymization module.

Provides enterprise-grade PII detection using Presidio with 50+ entity types,
and multiple anonymization strategies.
Provides enterprise-grade PII detection using Microsoft Presidio (mandatory
dependency) with 30+ entity types, and multiple anonymization strategies.

Usage:
from sentinelguard.pii import PIIDetector, PIIAnonymizer

detector = PIIDetector(
language="en",
entities=["EMAIL", "PHONE", "CREDIT_CARD", "SSN"],
score_threshold=0.5
entities=["EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "US_SSN"],
)
entities = detector.detect(text)

Expand All @@ -21,48 +20,17 @@

import hashlib
import logging
import re
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)
from presidio_analyzer import AnalyzerEngine

# Built-in entity patterns for fallback
BUILTIN_PATTERNS: Dict[str, re.Pattern] = {
"EMAIL_ADDRESS": re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"),
"PHONE_NUMBER": re.compile(r"\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b"),
"US_SSN": re.compile(r"\b\d{3}[-]?\d{2}[-]?\d{4}\b"),
"CREDIT_CARD": re.compile(r"\b(?:\d{4}[-\s]?){3}\d{4}\b"),
"IP_ADDRESS": re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"),
"IBAN_CODE": re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b"),
"US_PASSPORT": re.compile(r"\b[A-Z]\d{8}\b"),
"US_DRIVER_LICENSE": re.compile(r"\b[A-Z]\d{7,14}\b"),
"DATE_TIME": re.compile(
r"\b(?:0[1-9]|1[0-2])[/\-](?:0[1-9]|[12]\d|3[01])[/\-](?:19|20)\d{2}\b"
),
"PERSON": re.compile(r"\b[A-Z][a-z]+\s+[A-Z][a-z]+\b"),
"LOCATION": re.compile(
r"\b\d{1,5}\s+\w+\s+(?:Street|St|Avenue|Ave|Boulevard|Blvd|Road|Rd|Drive|Dr|Lane|Ln)\b",
re.IGNORECASE,
),
"URL": re.compile(r"https?://[^\s]+"),
"MEDICAL_LICENSE": re.compile(r"\b[A-Z]{2}\d{6,8}\b"),
"CRYPTO": re.compile(r"\b(?:0x[a-fA-F0-9]{40}|[13][a-km-zA-HJ-NP-Z1-9]{25,34})\b"),
"US_BANK_NUMBER": re.compile(r"\b\d{8,17}\b"),
}
logger = logging.getLogger(__name__)


@dataclass
class PIIEntity:
"""Represents a detected PII entity.

Attributes:
entity_type: Type of PII (e.g., EMAIL_ADDRESS, PHONE_NUMBER).
start: Start position in text.
end: End position in text.
score: Confidence score (0.0-1.0).
text: The actual matched text.
"""
"""Represents a detected PII entity."""

entity_type: str
start: int
Expand All @@ -73,34 +41,27 @@ class PIIEntity:

@dataclass
class AnonymizedResult:
"""Result of anonymization.

Attributes:
text: The anonymized text.
items: List of anonymization operations performed.
mapping: Mapping from anonymous tokens to original values.
"""
"""Result of anonymization."""

text: str
items: List[Dict[str, Any]] = field(default_factory=list)
mapping: Dict[str, str] = field(default_factory=dict)


class PIIDetector:
"""Enterprise-grade PII detection.
"""Enterprise-grade PII detection powered by Microsoft Presidio.

Uses Presidio when available (50+ entity types), falling back to
built-in regex patterns.
Detects 30+ entity types: EMAIL_ADDRESS, PHONE_NUMBER, CREDIT_CARD,
US_SSN, IBAN_CODE, US_PASSPORT, IP_ADDRESS, PERSON, LOCATION,
CRYPTO, MEDICAL_LICENSE, US_DRIVER_LICENSE, and more.

Args:
language: Detection language. Default "en".
entities: List of entity types to detect. None = all available.
score_threshold: Minimum confidence score. Default 0.5.
use_presidio: Try to use Presidio if available. Default True.
entities: List of entity types to detect. ``None`` = all supported.
score_threshold: Minimum confidence score (0.0–1.0). Default 0.5.
"""

# Full list of Presidio-supported entity types
PRESIDIO_ENTITIES = [
SUPPORTED_ENTITIES = [
"CREDIT_CARD", "CRYPTO", "DATE_TIME", "EMAIL_ADDRESS",
"IBAN_CODE", "IP_ADDRESS", "NRP", "LOCATION", "PERSON",
"PHONE_NUMBER", "MEDICAL_LICENSE", "URL",
Expand All @@ -119,51 +80,28 @@ def __init__(
language: str = "en",
entities: Optional[List[str]] = None,
score_threshold: float = 0.5,
use_presidio: bool = True,
):
self.language = language
self.entities = entities
self.score_threshold = score_threshold
self._use_presidio = use_presidio
self._analyzer = None
self._presidio_available = None

def _init_presidio(self) -> bool:
"""Initialize Presidio analyzer."""
if self._presidio_available is not None:
return self._presidio_available
try:
from presidio_analyzer import AnalyzerEngine
self._analyzer = AnalyzerEngine()
self._presidio_available = True
logger.info("Presidio analyzer initialized successfully")
except ImportError:
self._presidio_available = False
logger.info("Presidio not available, using built-in patterns")
return self._presidio_available
self._analyzer = AnalyzerEngine()
logger.info("Presidio AnalyzerEngine initialized")

def detect(self, text: str) -> List[PIIEntity]:
"""Detect PII entities in text.
"""Detect PII entities in text using Presidio.

Args:
text: The text to analyze.

Returns:
List of detected PIIEntity objects.
List of detected PIIEntity objects sorted by position.
"""
if self._use_presidio and self._init_presidio():
return self._detect_presidio(text)
return self._detect_builtin(text)

def _detect_presidio(self, text: str) -> List[PIIEntity]:
"""Detect using Presidio."""
results = self._analyzer.analyze(
text=text,
entities=self.entities,
language=self.language,
score_threshold=self.score_threshold,
)

return [
PIIEntity(
entity_type=r.entity_type,
Expand All @@ -172,36 +110,9 @@ def _detect_presidio(self, text: str) -> List[PIIEntity]:
score=r.score,
text=text[r.start:r.end],
)
for r in results
for r in sorted(results, key=lambda r: r.start)
]

def _detect_builtin(self, text: str) -> List[PIIEntity]:
"""Detect using built-in regex patterns."""
entities = []

patterns = BUILTIN_PATTERNS
if self.entities:
patterns = {
k: v for k, v in BUILTIN_PATTERNS.items()
if k in self.entities
}

for entity_type, pattern in patterns.items():
for match in pattern.finditer(text):
entity = PIIEntity(
entity_type=entity_type,
start=match.start(),
end=match.end(),
score=0.85, # Fixed confidence for regex matches
text=match.group(),
)
if entity.score >= self.score_threshold:
entities.append(entity)

# Sort by position
entities.sort(key=lambda e: e.start)
return entities

def detect_batch(self, texts: List[str]) -> List[List[PIIEntity]]:
"""Detect PII in multiple texts.

Expand All @@ -218,11 +129,11 @@ class PIIAnonymizer:
"""PII anonymization with multiple strategies.

Strategies:
- replace: Replace with entity type tag (e.g., <EMAIL_ADDRESS>)
- mask: Replace with asterisks
- redact: Remove entirely
- hash: Replace with hash value
- fake: Replace with fake data (requires faker)
- replace: Replace with entity type tag (e.g. ``<EMAIL_ADDRESS>``)
- mask: Replace with asterisks
- redact: Remove entirely
- hash: Replace with SHA-256 hash (12 chars)
- fake: Replace with synthetic data (requires ``faker``)

Args:
default_strategy: Default anonymization strategy. Default "replace".
Expand All @@ -238,11 +149,7 @@ def __init__(
self.entity_strategies = entity_strategies or {}
self._faker = None

def anonymize(
self,
text: str,
entities: List[PIIEntity],
) -> AnonymizedResult:
def anonymize(self, text: str, entities: List[PIIEntity]) -> AnonymizedResult:
"""Anonymize detected PII entities in text.

Args:
Expand All @@ -255,26 +162,16 @@ def anonymize(
if not entities:
return AnonymizedResult(text=text)

# Sort by position (reverse for replacement)
sorted_entities = sorted(entities, key=lambda e: e.start, reverse=True)

result_text = text
items = []
mapping = {}

for entity in sorted_entities:
strategy = self.entity_strategies.get(
entity.entity_type, self.default_strategy
)
strategy = self.entity_strategies.get(entity.entity_type, self.default_strategy)
replacement = self._apply_strategy(entity, strategy)
mapping[replacement] = entity.text

result_text = (
result_text[:entity.start]
+ replacement
+ result_text[entity.end:]
)

result_text = result_text[:entity.start] + replacement + result_text[entity.end:]
items.append({
"entity_type": entity.entity_type,
"original_start": entity.start,
Expand All @@ -283,16 +180,10 @@ def anonymize(
"replacement": replacement,
})

items.reverse() # Restore original order

return AnonymizedResult(
text=result_text,
items=items,
mapping=mapping,
)
items.reverse()
return AnonymizedResult(text=result_text, items=items, mapping=mapping)

def _apply_strategy(self, entity: PIIEntity, strategy: str) -> str:
"""Apply anonymization strategy to an entity."""
if strategy == "replace":
return f"<{entity.entity_type}>"
elif strategy == "mask":
Expand All @@ -303,17 +194,14 @@ def _apply_strategy(self, entity: PIIEntity, strategy: str) -> str:
return hashlib.sha256(entity.text.encode()).hexdigest()[:12]
elif strategy == "fake":
return self._generate_fake(entity.entity_type)
else:
return f"<{entity.entity_type}>"
return f"<{entity.entity_type}>"

def _generate_fake(self, entity_type: str) -> str:
"""Generate fake data for an entity type."""
try:
if self._faker is None:
from faker import Faker
self._faker = Faker()

fake_generators = {
generators = {
"EMAIL_ADDRESS": self._faker.email,
"PHONE_NUMBER": self._faker.phone_number,
"PERSON": self._faker.name,
Expand All @@ -323,19 +211,12 @@ def _generate_fake(self, entity_type: str) -> str:
"URL": self._faker.url,
"IP_ADDRESS": self._faker.ipv4,
}

generator = fake_generators.get(entity_type)
if generator:
return generator()
gen = generators.get(entity_type)
if gen:
return gen()
except ImportError:
pass

return f"<{entity_type}>"


__all__ = [
"PIIDetector",
"PIIAnonymizer",
"PIIEntity",
"AnonymizedResult",
]
__all__ = ["PIIDetector", "PIIAnonymizer", "PIIEntity", "AnonymizedResult"]
Loading
Loading