Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()
```

#### Row-Level Results

You can also get row-level results to see which individual rows passed or failed each check. This is useful for quarantining rows with data quality issues:

```python
rowLevelResult_df = VerificationResult.rowLevelResultsAsDataFrame(spark, checkResult, df)
rowLevelResult_df.show()
```

Each check produces a Boolean column (named after the check description) indicating pass/fail per row. Only checks with row-level-capable constraints (e.g., `isComplete`, `isContainedIn`, `hasPattern`, `isUnique`) will produce output columns.

### Repository

Save to a Metrics Repository by adding the `useRepository()` and `saveOrAppendResult()` calls to your Analysis Runner.
Expand Down
28 changes: 28 additions & 0 deletions pydeequ/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,34 @@ def checkResultsAsDataFrame(
)
return DataFrame(df, sql_ctx).toPandas() if pandas else DataFrame(df, sql_ctx)

@classmethod
def rowLevelResultsAsDataFrame(
cls, spark_session: SparkSession, verificationResult, data: DataFrame, pandas: bool = False
):
"""
Returns the original DataFrame with additional Boolean columns indicating which rows
passed or failed each Check. Each Check produces one Boolean column named after its
description, where multiple constraints within a Check are ANDed together.

Only checks with row-level-capable constraints (e.g., isComplete, hasPattern, isContainedIn,
isUnique) will produce output columns. Aggregate-only checks (e.g., hasSize) are skipped.

:param SparkSession spark_session: SparkSession
:param verificationResult: The results of the verification run
:param DataFrame data: The original input DataFrame that was verified
:param bool pandas: If True, return a Pandas DataFrame instead of PySpark
:return: DataFrame with original columns plus Boolean columns per qualifying Check
"""
df = spark_session._jvm.com.amazon.deequ.VerificationResult.rowLevelResultsAsDataFrame(
Comment thread
billpratt marked this conversation as resolved.
Comment thread
billpratt marked this conversation as resolved.
spark_session._jsparkSession, verificationResult.verificationRun, data._jdf
)
sql_ctx = SQLContext(
sparkContext=spark_session._sc,
sparkSession=spark_session,
jsqlContext=spark_session._jsparkSession.sqlContext(),
)
return DataFrame(df, sql_ctx).toPandas() if pandas else DataFrame(df, sql_ctx)


class VerificationRunBuilder:
# TODO Remaining Methods
Expand Down
128 changes: 128 additions & 0 deletions tests/test_verification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
import unittest

import pandas as pd
from pyspark.sql import Row
from pyspark.sql.types import BooleanType

from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationResult, VerificationSuite
from tests.conftest import setup_pyspark


class TestRowLevelResults(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = setup_pyspark().appName("test-row-level-results-local").getOrCreate()
cls.sc = cls.spark.sparkContext
cls.df = cls.sc.parallelize(
[
Row(a="foo", b=1, c=5),
Row(a="bar", b=2, c=6),
Row(a="baz", b=3, c=None),
]
).toDF()

@classmethod
def tearDownClass(cls):
# Must shutdown callback for tests to stop
# TODO Document this call to users or encapsulate in PyDeequSession
cls.spark.sparkContext._gateway.shutdown_callback_server()
cls.spark.stop()

def test_row_level_results_with_completeness(self):
"""Test that isComplete produces a Boolean column with correct per-row values."""
check = Check(self.spark, CheckLevel.Error, "completeness_check")
check = check.isComplete("c")

result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
Comment thread
billpratt marked this conversation as resolved.
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)

# Should have same row count as original DataFrame
self.assertEqual(row_level_df.count(), self.df.count())

# Should have original columns (a, b, c) plus one Boolean column for the check
self.assertIn("completeness_check", row_level_df.columns)
self.assertTrue(isinstance(row_level_df.schema["completeness_check"].dataType, BooleanType))

Comment thread
billpratt marked this conversation as resolved.
# Order by b to ensure deterministic row ordering
# b=1: c=5 (complete), b=2: c=6 (complete), b=3: c=None (incomplete)
results = row_level_df.orderBy("b").select("completeness_check").collect()
values = [row["completeness_check"] for row in results]
Comment thread
billpratt marked this conversation as resolved.
self.assertEqual(values, [True, True, False])

def test_row_level_results_with_contained_in(self):
"""Test that isContainedIn produces correct row-level results."""
check = Check(self.spark, CheckLevel.Error, "contained_check")
check = check.isContainedIn("a", ["foo", "bar"])

result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)

self.assertIn("contained_check", row_level_df.columns)

Comment thread
billpratt marked this conversation as resolved.
# Order by a to ensure deterministic row ordering
# a="bar" (contained), a="baz" (not contained), a="foo" (contained)
results = row_level_df.orderBy("a").select("contained_check").collect()
values = [row["contained_check"] for row in results]
self.assertEqual(values, [True, False, True])

def test_row_level_results_multiple_constraints_anded(self):
"""Test that multiple constraints in one Check are ANDed into a single column."""
check = Check(self.spark, CheckLevel.Error, "multi_check")
check = check.isComplete("a").isComplete("c")

result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)

self.assertIn("multi_check", row_level_df.columns)
Comment thread
billpratt marked this conversation as resolved.

# Order by b to ensure deterministic row ordering
# b=1: a,c complete -> True, b=2: a,c complete -> True, b=3: c=None -> False
results = row_level_df.orderBy("b").select("multi_check").collect()
values = [row["multi_check"] for row in results]
self.assertEqual(values, [True, True, False])

Comment thread
billpratt marked this conversation as resolved.
def test_row_level_results_aggregate_only_check(self):
"""Test that aggregate-only checks (hasSize) don't add columns."""
check = Check(self.spark, CheckLevel.Warning, "size_check")
check = check.hasSize(lambda x: x >= 3)

result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
Comment thread
billpratt marked this conversation as resolved.
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)

# hasSize is aggregate-only, so no new column should be added
self.assertEqual(sorted(row_level_df.columns), sorted(self.df.columns))

def test_row_level_results_preserves_original_columns(self):
"""Test that the original DataFrame columns are preserved."""
check = Check(self.spark, CheckLevel.Error, "preserve_check")
check = check.isComplete("c")

result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df)

for col in self.df.columns:
self.assertIn(col, row_level_df.columns)

# Verify original data is unchanged (ordered for deterministic comparison)
original_values = self.df.orderBy("b").select("a", "b").collect()
result_values = row_level_df.orderBy("b").select("a", "b").collect()
self.assertEqual(original_values, result_values)

def test_row_level_results_as_pandas(self):
"""Test the pandas=True option returns a Pandas DataFrame."""
check = Check(self.spark, CheckLevel.Error, "pandas_check")
check = check.isComplete("c")

result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run()
row_level_df = VerificationResult.rowLevelResultsAsDataFrame(
self.spark, result, self.df, pandas=True
)

self.assertIsInstance(row_level_df, pd.DataFrame)
self.assertIn("pandas_check", row_level_df.columns)


if __name__ == "__main__":
unittest.main()
Loading