Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/profiles.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ Here are the current supported functionalities of Profiles.
| | property: profiles | Done |
| | property: numRecords | Done |
| StandardColumnProfile | StandardColumnProfile(spark_session, column, java_column_profile) | Done |
| StringColumnProfile | StringColumnProfile(spark_session, column, java_column_profile) | Done |
| NumericColumnProfile | NumericColumnProfile(spark_session, column, java_column_profile) | Done |
41 changes: 39 additions & 2 deletions pydeequ/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
""" Profiles file for all the Profiles classes in Deequ"""
import json
from collections import namedtuple
from typing import Optional
Comment thread
nikie marked this conversation as resolved.

Comment thread
nikie marked this conversation as resolved.
from pyspark.sql import DataFrame, SparkSession
from pydeequ.analyzers import KLLParameters
Expand Down Expand Up @@ -241,9 +242,8 @@ def __init__(self, spark_session: SparkSession):
self._numRecords = 0
self.columnProfileClasses = {
"StandardColumnProfile": StandardColumnProfile,
"StringColumnProfile": StandardColumnProfile,
"StringColumnProfile": StringColumnProfile,
"NumericColumnProfile": NumericColumnProfile,

}

def _columnProfilesFromColumnRunBuilderRun(self, run):
Expand Down Expand Up @@ -528,3 +528,40 @@ def approxPercentiles(self):
"""
return self._approxPercentiles

Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.

class StringColumnProfile(ColumnProfile):
"""
String Column Profile class

:param SparkSession spark_session: sparkSession
:param str column: the designated column of which the profile is run on
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
:param JavaObject java_column_profile: The profile mapped as a Java map
Comment thread
nikie marked this conversation as resolved.
"""

def __init__(
self, spark_session: SparkSession, column: str, java_column_profile
) -> None:
super().__init__(spark_session, column, java_column_profile)
self._minLength = get_or_else_none(java_column_profile.minLength())
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
self._maxLength = get_or_else_none(java_column_profile.maxLength())
self.all = {
"completeness": self.completeness,
"approximateNumDistinctValues": self.approximateNumDistinctValues,
"dataType": self.dataType,
"isDataTypeInferred": self.isDataTypeInferred,
"typeCounts": self.typeCounts,
"histogram": self.histogram,
"minLength": self._minLength,
"maxLength": self._maxLength,
}

@property
def minLength(self) -> Optional[int]:
return self._minLength

@property
def maxLength(self) -> Optional[int]:
return self._maxLength
Comment thread
nikie marked this conversation as resolved.

def __str__(self) -> str:
return f"StringProfiles for column: {self.column}: {json.dumps(self.all, indent=4)}"
67 changes: 61 additions & 6 deletions tests/test_profiles.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
# -*- coding: utf-8 -*-
import unittest
from pyspark.sql import Row
from pydeequ.analyzers import KLLParameters
from pydeequ.profiles import ColumnProfilerRunBuilder, ColumnProfilerRunner
from pydeequ.profiles import (
ColumnProfilerRunBuilder,
ColumnProfilerRunner,
DistributionValue,
StringColumnProfile,
)
from pydeequ.analyzers import KLLParameters, DataTypeInstances
from tests.conftest import setup_pyspark


class TestProfiles(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = setup_pyspark().appName("test-profiles-local").getOrCreate()
cls.sc = cls.spark.sparkContext
cls.df = cls.sc.parallelize([Row(a="foo", b=1, c=5), Row(a="bar", b=2, c=6), Row(a="baz", b=3, c=None)]).toDF()
cls.df = cls.sc.parallelize(
[Row(a="foo", b=1, c=5), Row(a="bar", b=2, c=6), Row(a="baz", b=3, c=None)]
).toDF()

@classmethod
def tearDownClass(cls):
cls.spark.sparkContext._gateway.shutdown_callback_server()
cls.spark.stop()

def test_setPredefinedTypes(self):
result = ColumnProfilerRunner(self.spark) \
.onData(self.df) \
.setPredefinedTypes({'a': DataTypeInstances.Unknown, 'b': DataTypeInstances.String, 'c': DataTypeInstances.Fractional}) \
result = (
ColumnProfilerRunner(self.spark)
.onData(self.df)
.setPredefinedTypes(
{
"a": DataTypeInstances.Unknown,
"b": DataTypeInstances.String,
"c": DataTypeInstances.Fractional,
}
)
.run()
)
print(result)
for col, profile in result.profiles.items():
print("Profiles:", profile)
Expand Down Expand Up @@ -76,6 +91,46 @@ def test_profile_numRecords(self):
result = ColumnProfilerRunner(self.spark).onData(self.df).run()
self.assertEqual(result.numRecords, 3)

def test_StringColumnProfile(self):
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
df = self.sc.parallelize(
[
Row(a="ant", b="dragonfly"),
Row(a="bee", b="earwig"),
Row(a="bee", b=None),
Row(a="cricket", b=None),
]
).toDF()
result = ColumnProfilerRunner(self.spark).onData(df).run()
column_profile = result.profiles["a"]
self.assertIsInstance(column_profile, StringColumnProfile)
self.assertEqual(column_profile.minLength, 3)
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
Comment thread
nikie marked this conversation as resolved.
self.assertEqual(column_profile.maxLength, 7)
self.assertEqual(str(column_profile)[0:29], "StringProfiles for column: a:")
self.assertIn('"minLength": 3', str(column_profile))

self.assertEqual(column_profile.completeness, 1)
self.assertEqual(column_profile.approximateNumDistinctValues, 3)
self.assertEqual(column_profile.typeCounts["String"], 4)
self.assertEqual(column_profile.isDataTypeInferred, False)
actual_histogram = sorted(column_profile.histogram, key=lambda x: x.value)
self.assertEqual(len(actual_histogram), 3)
expected_histogram = [
DistributionValue("ant", 1, 0.25),
DistributionValue("bee", 2, 0.5),
DistributionValue("cricket", 1, 0.25),
]
for actual, expected in zip(actual_histogram, expected_histogram):
self.assertEqual(actual.value, expected.value)
Comment thread
nikie marked this conversation as resolved.
self.assertEqual(actual.count, expected.count)
self.assertAlmostEqual(actual.ratio, expected.ratio)

column_profile = result.profiles["b"]
self.assertEqual(column_profile.completeness, 0.5)
self.assertEqual(column_profile.approximateNumDistinctValues, 2)
self.assertEqual(column_profile.typeCounts["String"], 2)
self.assertEqual(column_profile.minLength, 0)
Comment thread
nikie marked this conversation as resolved.
self.assertEqual(column_profile.maxLength, 9)


if __name__ == "__main__":
unittest.main()
Loading