diff --git a/docs/profiles.md b/docs/profiles.md index ee6b993..dfc0bbe 100644 --- a/docs/profiles.md +++ b/docs/profiles.md @@ -22,4 +22,5 @@ Here are the current supported functionalities of Profiles. | | property: profiles | Done | | | property: numRecords | Done | | StandardColumnProfile | StandardColumnProfile(spark_session, column, java_column_profile) | Done | +| StringColumnProfile | StringColumnProfile(spark_session, column, java_column_profile) | Done | | NumericColumnProfile | NumericColumnProfile(spark_session, column, java_column_profile) | Done | diff --git a/pydeequ/profiles.py b/pydeequ/profiles.py index 72ccc21..c0f4c6c 100644 --- a/pydeequ/profiles.py +++ b/pydeequ/profiles.py @@ -2,6 +2,7 @@ """ Profiles file for all the Profiles classes in Deequ""" import json from collections import namedtuple +from typing import Optional from pyspark.sql import DataFrame, SparkSession from pydeequ.analyzers import KLLParameters @@ -241,9 +242,8 @@ def __init__(self, spark_session: SparkSession): self._numRecords = 0 self.columnProfileClasses = { "StandardColumnProfile": StandardColumnProfile, - "StringColumnProfile": StandardColumnProfile, + "StringColumnProfile": StringColumnProfile, "NumericColumnProfile": NumericColumnProfile, - } def _columnProfilesFromColumnRunBuilderRun(self, run): @@ -528,3 +528,40 @@ def approxPercentiles(self): """ return self._approxPercentiles + +class StringColumnProfile(ColumnProfile): + """ + String Column Profile class + + :param SparkSession spark_session: sparkSession + :param str column: the designated column of which the profile is run on + :param JavaObject java_column_profile: The profile mapped as a Java map + """ + + def __init__( + self, spark_session: SparkSession, column: str, java_column_profile + ) -> None: + super().__init__(spark_session, column, java_column_profile) + self._minLength = get_or_else_none(java_column_profile.minLength()) + self._maxLength = get_or_else_none(java_column_profile.maxLength()) + self.all = { + "completeness": self.completeness, + "approximateNumDistinctValues": self.approximateNumDistinctValues, + "dataType": self.dataType, + "isDataTypeInferred": self.isDataTypeInferred, + "typeCounts": self.typeCounts, + "histogram": self.histogram, + "minLength": self._minLength, + "maxLength": self._maxLength, + } + + @property + def minLength(self) -> Optional[int]: + return self._minLength + + @property + def maxLength(self) -> Optional[int]: + return self._maxLength + + def __str__(self) -> str: + return f"StringProfiles for column: {self.column}: {json.dumps(self.all, indent=4)}" diff --git a/tests/test_profiles.py b/tests/test_profiles.py index fd46a2c..9ac45c2 100644 --- a/tests/test_profiles.py +++ b/tests/test_profiles.py @@ -1,17 +1,24 @@ # -*- coding: utf-8 -*- import unittest from pyspark.sql import Row -from pydeequ.analyzers import KLLParameters -from pydeequ.profiles import ColumnProfilerRunBuilder, ColumnProfilerRunner +from pydeequ.profiles import ( + ColumnProfilerRunBuilder, + ColumnProfilerRunner, + DistributionValue, + StringColumnProfile, +) from pydeequ.analyzers import KLLParameters, DataTypeInstances from tests.conftest import setup_pyspark + class TestProfiles(unittest.TestCase): @classmethod def setUpClass(cls): cls.spark = setup_pyspark().appName("test-profiles-local").getOrCreate() cls.sc = cls.spark.sparkContext - cls.df = cls.sc.parallelize([Row(a="foo", b=1, c=5), Row(a="bar", b=2, c=6), Row(a="baz", b=3, c=None)]).toDF() + cls.df = cls.sc.parallelize( + [Row(a="foo", b=1, c=5), Row(a="bar", b=2, c=6), Row(a="baz", b=3, c=None)] + ).toDF() @classmethod def tearDownClass(cls): @@ -19,10 +26,18 @@ def tearDownClass(cls): cls.spark.stop() def test_setPredefinedTypes(self): - result = ColumnProfilerRunner(self.spark) \ - .onData(self.df) \ - .setPredefinedTypes({'a': DataTypeInstances.Unknown, 'b': DataTypeInstances.String, 'c': DataTypeInstances.Fractional}) \ + result = ( + ColumnProfilerRunner(self.spark) + .onData(self.df) + .setPredefinedTypes( + { + "a": DataTypeInstances.Unknown, + "b": DataTypeInstances.String, + "c": DataTypeInstances.Fractional, + } + ) .run() + ) print(result) for col, profile in result.profiles.items(): print("Profiles:", profile) @@ -76,6 +91,46 @@ def test_profile_numRecords(self): result = ColumnProfilerRunner(self.spark).onData(self.df).run() self.assertEqual(result.numRecords, 3) + def test_StringColumnProfile(self): + df = self.sc.parallelize( + [ + Row(a="ant", b="dragonfly"), + Row(a="bee", b="earwig"), + Row(a="bee", b=None), + Row(a="cricket", b=None), + ] + ).toDF() + result = ColumnProfilerRunner(self.spark).onData(df).run() + column_profile = result.profiles["a"] + self.assertIsInstance(column_profile, StringColumnProfile) + self.assertEqual(column_profile.minLength, 3) + self.assertEqual(column_profile.maxLength, 7) + self.assertEqual(str(column_profile)[0:29], "StringProfiles for column: a:") + self.assertIn('"minLength": 3', str(column_profile)) + + self.assertEqual(column_profile.completeness, 1) + self.assertEqual(column_profile.approximateNumDistinctValues, 3) + self.assertEqual(column_profile.typeCounts["String"], 4) + self.assertEqual(column_profile.isDataTypeInferred, False) + actual_histogram = sorted(column_profile.histogram, key=lambda x: x.value) + self.assertEqual(len(actual_histogram), 3) + expected_histogram = [ + DistributionValue("ant", 1, 0.25), + DistributionValue("bee", 2, 0.5), + DistributionValue("cricket", 1, 0.25), + ] + for actual, expected in zip(actual_histogram, expected_histogram): + self.assertEqual(actual.value, expected.value) + self.assertEqual(actual.count, expected.count) + self.assertAlmostEqual(actual.ratio, expected.ratio) + + column_profile = result.profiles["b"] + self.assertEqual(column_profile.completeness, 0.5) + self.assertEqual(column_profile.approximateNumDistinctValues, 2) + self.assertEqual(column_profile.typeCounts["String"], 2) + self.assertEqual(column_profile.minLength, 0) + self.assertEqual(column_profile.maxLength, 9) + if __name__ == "__main__": unittest.main()