Tuesday, December 6, 2022
HomeData ScienceEasy methods to Take a look at PySpark ETL Knowledge Pipeline |...

Easy methods to Take a look at PySpark ETL Knowledge Pipeline | by Edwin Tan | Dec, 2022


Validate huge information pipeline with Nice Expectations

Picture by Erlend Ekseth on Unsplash

Rubbish in rubbish out is a typical expression used to emphasise the significance of knowledge high quality for duties similar to machine studying, information analytics and enterprise intelligence. With growing quantity of knowledge being created and saved, constructing prime quality information pipelines have by no means been tougher.

PySpark is a generally used instrument to construct ETL pipelines for giant datasets. A standard query that arises whereas constructing information pipeline is “How do we all know that our information pipeline is reworking the information in the way in which that’s meant?”. To reply this query, we borrow the concept of unit take a look at from the software program improvement paradigm. The aim of unit take a look at is to validate that every element of the code performs as anticipated through the use of a take a look at to examine if output meets expectation. In related vogue, we will validate if our information pipeline is working as meant by writing a take a look at to examine the output information.

On this part we are going to stroll via an instance of tips on how to leverage on Nice Expectation to validate your PySpark information pipeline.

This instance makes use of the next setup:

  1. PySpark
  2. Nice Expectations==0.15.34
  3. Databricks pocket book

We might be utilizing Databricks pocket book in Databricks group version. Nonetheless, you’re free to make use of any Built-in Growth Surroundings and cloud or native spark cluster.

We’re utilizing the UCI financial institution advertising dataset[2] which accommodates data of a direct advertising marketing campaign of a Portuguese financial institution. Right here’s how the information appears like.

Picture by writer.

Understanding of the information is paramount to create complete take a look at in your PySpark information pipeline. Widespread concerns in the case of information high quality contains however not restricted to:

  • Completeness
  • Consistency
  • Correctness
  • Uniqueness

The appropriate high quality for every of the above talked about objects relies upon closely on the context and use case.

On this part, we discover and create expectations which might be utilized in later part to check on new unseen information.

So what are expectations? Expectations are assertions of the information. Because the title implies, we’re validating if the information is what we expect it to be. Nice Expectations comes with predefined expectations for widespread information high quality checks. Listed here are some examples of predefined expectations.

expect_column_values_to_be_not_null
expect_column_values_tpytho_be_unique
expect_column_values_to_match_regex
expect_column_median_to_be_between
expect_table_row_count_to_be_between

The title of those expectations are relatively descriptive of what the expectations carry out. If predefined expectations doesn’t suit your wants, Nice Expectations additionally can help you create customized expectations.

Imports

import great_expectations as ge
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
from pyspark.sql import capabilities as f, Window
import json

Load the information

We load the information from a csv file and carried out some processing steps on the information set:

  • Change the “unknown” worth in job column to “null”
  • Create an “id” column which accommodates distinctive identifier for every row
df = 
spark
.learn
.format('csv')
.choice('header', True)
.load('/FileStore/tables/bank_full.csv', sep = ';')
.withColumn('job', f.when(f.col('job') == 'unknown', f.lit(None)).in any other case(f.col('job')))
.withColumn('id', f.monotonically_increasing_id())

SparkDFDataset is a skinny wrapper round PySpark DataFrame which permits us to make use of Nice Expectation strategies on Pyspark DataFrame.

gdf = SparkDFDataset(df)

Test column title

Let’s validate if the DataFrame accommodates the right set of columns by offering the record of anticipated columns to the expect_table_columns_to_match_set methodology.

expected_columns = ['age', 'job', 'marital',
'education', 'default', 'balance',
'housing', 'loan', 'contact',
'day', 'month', 'duration',
'campaign', 'pdays', 'previous',
'poutcome', 'y']
gdf.expect_table_columns_to_match_set(column_set = expected_columns)

Working the code above returns the beneath output. "success":truesignifies that the take a look at has handed.

# output
{
"consequence": {
"observed_value": [
"age",
"job",
"marital",
"education",
"default",
"balance",
"housing",
"loan",
"contact",
"day",
"month",
"duration",
"campaign",
"pdays",
"previous",
"poutcome",
"y"
]
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Test values in categorical column

We will examine if a categorical column accommodates sudden information through the use of the expect_column_values_to_be_in_setmethodology. We anticipate the maritalcolumn to include solely the next values single, married and divorced.

gdf.expect_column_values_to_be_in_set(column = 'marital', value_set = {'single', 'married', 'divorced'})

Nice Expectation will fail the examine if marital column accommodates any values that aren’t discovered within the worth set.

# output
{
"consequence": {
"element_count": 45211,
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_count": 0,
"unexpected_percent": 0.0,
"unexpected_percent_total": 0.0,
"unexpected_percent_nonmissing": 0.0,
"partial_unexpected_list": []
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Test column doesn’t include null

If we expect columns to not include any null values, we will use the expect_column_values_to_not_be_null methodology and specify the column of curiosity within the argument.

gdf.expect_column_values_to_not_be_null(column = 'job')

On this case, the job column fails the examine as there are null values within the column.

{
"consequence": {
"element_count": 45211,
"unexpected_count": 288,
"unexpected_percent": 0.6370131162770122,
"unexpected_percent_total": 0.6370131162770122,
"partial_unexpected_list": [
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
]
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": false
}

Test uniqueness in column

Nice Expectation additionally offers out of the field methodology to examine that the values in a given column are distinctive. Let’s examine if the id column accommodates distinctive values.

gdf.expect_column_values_to_be_unique('id')

As we anticipated, the column accommodates distinctive values.

# output
{
"consequence": {
"element_count": 45211,
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_count": 0,
"unexpected_percent": 0.0,
"unexpected_percent_total": 0.0,
"unexpected_percent_nonmissing": 0.0,
"partial_unexpected_list": []
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Now that we’ve got created varied expectations, we will put all of them collectively in an expectation suite.

expectation_suite = gdf.get_expectation_suite(discard_failed_expectations=False)

The expectation fairly is nothing greater than only a assortment of expectations.

#expectation_suite
{
"expectations": [
{
"kwargs": {
"column_set": [
"age",
"job",
"marital",
"education",
"default",
"balance",
"housing",
"loan",
"contact",
"day",
"month",
"duration",
"campaign",
"pdays",
"previous",
"poutcome",
"y"
]
},
"expectation_type": "expect_table_columns_to_match_set",
"meta": {}
},
{
"kwargs": {
"column": "marital",
"value_set": [
"married",
"divorced",
"single"
]
},
"expectation_type": "expect_column_values_to_be_in_set",
"meta": {}
},
{
"kwargs": {
"column": "job"
},
"expectation_type": "expect_column_values_to_not_be_null",
"meta": {}
},
{
"kwargs": {
"column": "id"
},
"expectation_type": "expect_column_values_to_be_unique",
"meta": {}
}
],
"data_asset_type": "Dataset",
"meta": {
"great_expectations_version": "0.15.34"
},
"expectation_suite_name": "default",
"ge_cloud_id": null
}

Let’s save the expectation suite in JSON format.

# save expectation suite
with open('my_expectation_suite.json', 'w') as my_file:
my_file.write(
json.dumps(expectation_suite.to_json_dict())
)

Assuming that we’ve got a brand new set of knowledge and it requires checking. We use the beforehand saved expectation suite to carry out the information high quality examine on unseen information in a brand new pocket book.

Import

import great_expectations as ge
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
import pyspark
from pyspark.sql import capabilities as f, Window
import json

Load the information

df = 
spark
.learn
.format('csv')
.choice('header', True)
.load('/FileStore/tables/bank_full_new.csv', sep = ';')
.withColumn('job', f.when(f.col('job') == 'unknown', f.lit(None)).in any other case(f.col('job')))
.withColumn('id', f.monotonically_increasing_id())

We create two capabilities to (a) load the expectation suite and (b) validate the information towards the expectation suite.

def load_expectation_suite(path: str) -> dict:
"""Load expectation suite saved in JSON format
and convert into dictionary.
Args:
path (str): path to expectation suite json file
Returns:
dict: expectation suite
"""
with open(path, 'r') as f:
expectation_suite = json.load(f)
return expectation_suite

def great_expectation_validation(df: pyspark.sql.DataFrame,
expectation_suite_path: str) -> dict:
"""Run validation on DataFrame primarily based on expecation suite
Args:
df (pyspark.sql.DataFrame): DataFrame to validate
expectation_suite_path (str): path to expectation suite json file
Returns:
dict: Validation consequence
"""
expectation_suite = load_expectation_suite(expectation_suite_path)
gdf = SparkDFDataset(df)
validation_results = gdf.validate(expectation_suite = expectation_suite, result_format = 'SUMMARY', catch_exceptions = True)
return validation_results

Run the validation

validation_result = 
great_expectation_validation(df = df,
expectation_suite_path = 'my_expectation_suite.json')

Carry out the examine

False signifies that the validation failed as we’ve got failed no less than one expectation within the expectation suite.

validation_result['success']

# output:
# False

Nice Expectation additionally exhibits the variety of profitable and failed take a look at.

validation_result['statistics']

#output:
#{'evaluated_expectations': 4,
# 'successful_expectations': 2,
# 'unsuccessful_expectations': 2,
# 'success_percent': 50.0}

On this article we mentioned the significance of testing your information pipeline and walked via and instance of how we will leverage on Nice Expectation to create a complete suite of exams.

[1] Nice Expectations House Web page • Nice Expectations

[2] Moro,S., Rita,P. & Cortez,P.. (2012). Financial institution Advertising. UCI Machine Studying Repository. CC By 4.0.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments