From c7293c2bd4b815676faf25c47a4c079ea6c6b494 Mon Sep 17 00:00:00 2001 From: billpratt Date: Tue, 12 May 2026 18:08:10 -0400 Subject: [PATCH 1/3] Add VerificationResult.rowLevelResultsAsDataFrame support Wrap deequ's VerificationResult.rowLevelResultsAsDataFrame as a classmethod on pydeequ's VerificationResult. This returns the original DataFrame with additional Boolean columns indicating which rows passed or failed each Check. - Add rowLevelResultsAsDataFrame classmethod to VerificationResult - Add tests covering completeness, containedIn, ANDed constraints, aggregate-only checks, column preservation, and pandas output - Update README with usage example Closes awslabs/python-deequ#261 --- README.md | 11 ++++ pydeequ/verification.py | 28 +++++++++ tests/test_verification.py | 122 +++++++++++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+) create mode 100644 tests/test_verification.py diff --git a/README.md b/README.md index a6003c9..4bcfb3d 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,17 @@ checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult) checkResult_df.show() ``` +#### Row-Level Results + +You can also get row-level results to see which individual rows passed or failed each check. This is useful for quarantining rows with data quality issues: + +```python +rowLevelResult_df = VerificationResult.rowLevelResultsAsDataFrame(spark, checkResult, df) +rowLevelResult_df.show() +``` + +Each check produces a Boolean column (named after the check description) indicating pass/fail per row. Only checks with row-level-capable constraints (e.g., `isComplete`, `isContainedIn`, `hasPattern`, `isUnique`) will produce output columns. + ### Repository Save to a Metrics Repository by adding the `useRepository()` and `saveOrAppendResult()` calls to your Analysis Runner. diff --git a/pydeequ/verification.py b/pydeequ/verification.py index c164246..38da74f 100644 --- a/pydeequ/verification.py +++ b/pydeequ/verification.py @@ -143,6 +143,34 @@ def checkResultsAsDataFrame( ) return DataFrame(df, sql_ctx).toPandas() if pandas else DataFrame(df, sql_ctx) + @classmethod + def rowLevelResultsAsDataFrame( + cls, spark_session: SparkSession, verificationResult, data: DataFrame, pandas: bool = False + ): + """ + Returns the original DataFrame with additional Boolean columns indicating which rows + passed or failed each Check. Each Check produces one Boolean column named after its + description, where multiple constraints within a Check are ANDed together. + + Only checks with row-level-capable constraints (e.g., isComplete, hasPattern, isContainedIn, + isUnique) will produce output columns. Aggregate-only checks (e.g., hasSize) are skipped. + + :param SparkSession spark_session: SparkSession + :param verificationResult: The results of the verification run + :param DataFrame data: The original input DataFrame that was verified + :param bool pandas: If True, return a Pandas DataFrame instead of PySpark + :return: DataFrame with original columns plus Boolean columns per qualifying Check + """ + df = spark_session._jvm.com.amazon.deequ.VerificationResult.rowLevelResultsAsDataFrame( + spark_session._jsparkSession, verificationResult.verificationRun, data._jdf + ) + sql_ctx = SQLContext( + sparkContext=spark_session._sc, + sparkSession=spark_session, + jsqlContext=spark_session._jsparkSession.sqlContext(), + ) + return DataFrame(df, sql_ctx).toPandas() if pandas else DataFrame(df, sql_ctx) + class VerificationRunBuilder: # TODO Remaining Methods diff --git a/tests/test_verification.py b/tests/test_verification.py new file mode 100644 index 0000000..551534a --- /dev/null +++ b/tests/test_verification.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +import unittest + +import pandas as pd +from pyspark.sql import Row +from pyspark.sql.types import BooleanType + +from pydeequ.checks import Check, CheckLevel +from pydeequ.verification import VerificationResult, VerificationSuite +from tests.conftest import setup_pyspark + + +class TestRowLevelResults(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.spark = setup_pyspark().appName("test-row-level-results-local").getOrCreate() + cls.sc = cls.spark.sparkContext + cls.df = cls.sc.parallelize( + [ + Row(a="foo", b=1, c=5), + Row(a="bar", b=2, c=6), + Row(a="baz", b=3, c=None), + ] + ).toDF() + + @classmethod + def tearDownClass(cls): + # Must shutdown callback for tests to stop + # TODO Document this call to users or encapsulate in PyDeequSession + cls.spark.sparkContext._gateway.shutdown_callback_server() + cls.spark.stop() + + def test_row_level_results_with_completeness(self): + """Test that isComplete produces a Boolean column with correct per-row values.""" + check = Check(self.spark, CheckLevel.Error, "completeness_check") + check = check.isComplete("c") + + result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run() + row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df) + + # Should have original columns (a, b, c) plus one Boolean column for the check + self.assertIn("completeness_check", row_level_df.columns) + self.assertTrue(isinstance(row_level_df.schema["completeness_check"].dataType, BooleanType)) + + # Row 0: c=5 (complete), Row 1: c=6 (complete), Row 2: c=None (incomplete) + results = row_level_df.select("completeness_check").collect() + values = [row["completeness_check"] for row in results] + self.assertEqual(values, [True, True, False]) + + def test_row_level_results_with_contained_in(self): + """Test that isContainedIn produces correct row-level results.""" + check = Check(self.spark, CheckLevel.Error, "contained_check") + check = check.isContainedIn("a", ["foo", "bar"]) + + result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run() + row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df) + + self.assertIn("contained_check", row_level_df.columns) + + # Row 0: a="foo" (contained), Row 1: a="bar" (contained), Row 2: a="baz" (not contained) + results = row_level_df.select("contained_check").collect() + values = [row["contained_check"] for row in results] + self.assertEqual(values, [True, True, False]) + + def test_row_level_results_multiple_constraints_anded(self): + """Test that multiple constraints in one Check are ANDed into a single column.""" + check = Check(self.spark, CheckLevel.Error, "multi_check") + check = check.isComplete("a").isComplete("c") + + result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run() + row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df) + + self.assertIn("multi_check", row_level_df.columns) + + # a is always complete, c is None for row 2 -> AND produces [True, True, False] + results = row_level_df.select("multi_check").collect() + values = [row["multi_check"] for row in results] + self.assertEqual(values, [True, True, False]) + + def test_row_level_results_aggregate_only_check(self): + """Test that aggregate-only checks (hasSize) don't add columns.""" + check = Check(self.spark, CheckLevel.Warning, "size_check") + check = check.hasSize(lambda x: x >= 3) + + result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run() + row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df) + + # hasSize is aggregate-only, so no new column should be added + self.assertEqual(sorted(row_level_df.columns), sorted(self.df.columns)) + + def test_row_level_results_preserves_original_columns(self): + """Test that the original DataFrame columns are preserved.""" + check = Check(self.spark, CheckLevel.Error, "preserve_check") + check = check.isComplete("c") + + result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run() + row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df) + + for col in self.df.columns: + self.assertIn(col, row_level_df.columns) + + # Verify original data is unchanged + original_values = self.df.select("a", "b").collect() + result_values = row_level_df.select("a", "b").collect() + self.assertEqual(original_values, result_values) + + def test_row_level_results_as_pandas(self): + """Test the pandas=True option returns a Pandas DataFrame.""" + check = Check(self.spark, CheckLevel.Error, "pandas_check") + check = check.isComplete("c") + + result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run() + row_level_df = VerificationResult.rowLevelResultsAsDataFrame( + self.spark, result, self.df, pandas=True + ) + + self.assertIsInstance(row_level_df, pd.DataFrame) + self.assertIn("pandas_check", row_level_df.columns) + + +if __name__ == "__main__": + unittest.main() From abb2fe016d6a58f4611da668f51b36bc0010f3ba Mon Sep 17 00:00:00 2001 From: billpratt Date: Tue, 12 May 2026 18:32:01 -0400 Subject: [PATCH 2/3] Add orderBy to tests for deterministic row ordering Address review feedback: Spark DataFrames have no guaranteed row order, so add explicit orderBy() before collect() in all tests that assert row-level values. --- tests/test_verification.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tests/test_verification.py b/tests/test_verification.py index 551534a..8bcd908 100644 --- a/tests/test_verification.py +++ b/tests/test_verification.py @@ -42,8 +42,9 @@ def test_row_level_results_with_completeness(self): self.assertIn("completeness_check", row_level_df.columns) self.assertTrue(isinstance(row_level_df.schema["completeness_check"].dataType, BooleanType)) - # Row 0: c=5 (complete), Row 1: c=6 (complete), Row 2: c=None (incomplete) - results = row_level_df.select("completeness_check").collect() + # Order by b to ensure deterministic row ordering + # b=1: c=5 (complete), b=2: c=6 (complete), b=3: c=None (incomplete) + results = row_level_df.orderBy("b").select("completeness_check").collect() values = [row["completeness_check"] for row in results] self.assertEqual(values, [True, True, False]) @@ -57,10 +58,11 @@ def test_row_level_results_with_contained_in(self): self.assertIn("contained_check", row_level_df.columns) - # Row 0: a="foo" (contained), Row 1: a="bar" (contained), Row 2: a="baz" (not contained) - results = row_level_df.select("contained_check").collect() + # Order by a to ensure deterministic row ordering + # a="bar" (contained), a="baz" (not contained), a="foo" (contained) + results = row_level_df.orderBy("a").select("contained_check").collect() values = [row["contained_check"] for row in results] - self.assertEqual(values, [True, True, False]) + self.assertEqual(values, [True, False, True]) def test_row_level_results_multiple_constraints_anded(self): """Test that multiple constraints in one Check are ANDed into a single column.""" @@ -72,8 +74,9 @@ def test_row_level_results_multiple_constraints_anded(self): self.assertIn("multi_check", row_level_df.columns) - # a is always complete, c is None for row 2 -> AND produces [True, True, False] - results = row_level_df.select("multi_check").collect() + # Order by b to ensure deterministic row ordering + # b=1: a,c complete -> True, b=2: a,c complete -> True, b=3: c=None -> False + results = row_level_df.orderBy("b").select("multi_check").collect() values = [row["multi_check"] for row in results] self.assertEqual(values, [True, True, False]) @@ -99,9 +102,9 @@ def test_row_level_results_preserves_original_columns(self): for col in self.df.columns: self.assertIn(col, row_level_df.columns) - # Verify original data is unchanged - original_values = self.df.select("a", "b").collect() - result_values = row_level_df.select("a", "b").collect() + # Verify original data is unchanged (ordered for deterministic comparison) + original_values = self.df.orderBy("b").select("a", "b").collect() + result_values = row_level_df.orderBy("b").select("a", "b").collect() self.assertEqual(original_values, result_values) def test_row_level_results_as_pandas(self): From 0d37d0a7eabe3c67bf5e37cff7360190ae6eaf96 Mon Sep 17 00:00:00 2001 From: billpratt Date: Tue, 12 May 2026 18:48:36 -0400 Subject: [PATCH 3/3] Add row count assertion to completeness test Verify that rowLevelResultsAsDataFrame preserves the same number of rows as the original DataFrame. --- tests/test_verification.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_verification.py b/tests/test_verification.py index 8bcd908..6ca6144 100644 --- a/tests/test_verification.py +++ b/tests/test_verification.py @@ -38,6 +38,9 @@ def test_row_level_results_with_completeness(self): result = VerificationSuite(self.spark).onData(self.df).addCheck(check).run() row_level_df = VerificationResult.rowLevelResultsAsDataFrame(self.spark, result, self.df) + # Should have same row count as original DataFrame + self.assertEqual(row_level_df.count(), self.df.count()) + # Should have original columns (a, b, c) plus one Boolean column for the check self.assertIn("completeness_check", row_level_df.columns) self.assertTrue(isinstance(row_level_df.schema["completeness_check"].dataType, BooleanType))