TLDR
Maximize the quality and productivity of your data pipelines.
In my
, I talked a lot about the importance of testing in data pipelines and how to create data tests and unit tests respectively. While testing plays an essential role, it may not always be the most exciting part of the development cycle. As a result, many modern data stacks have introduced frameworks or plugins to expedite the implementation of data tests. In addition, unit testing frameworks in Python such as Pytest, unittest have been there for a long time, helping engineers efficiently create unit tests for data pipelines and any Python applications.
In this article, I want to introduce a setup that uses two modern techniques: Behaviour Driven Development (BDD) — a business-oriented testing framework, and
— a modern data pipeline tool. By combining these two techniques, the objective is to create high-quality unit tests for data pipelines while having a seamless developer experience.
What is Behaviour Driven Development (BDD)?
When building data pipelines for business, it’s highly likely that we will encounter complicated and tricky business logic. One example is to define customer segmentation based on a combination of age, income, and past purchases. The following example represents only a fraction of the complexity that business logic can entail. It can become progressively complicated as there are more attributes and granularity within each attribute. Think about one example in your daily job!
1
2
3
4
5
6
7
8
9
10
11
1. People between 19 and 60
AND with high past purchases are "premium".
2. People between 19 and 60
AND with high income are "premium".
3. People above 60
AND with high income
AND with high past purchases are "premium".
4. Others are "basic".
So the question is where should the business rules be documented and how to ensure the synchronization between the documentation and the code. One common approach is to include comments alongside the code or strive to write code that is self-explanatory and easily understandable. But there is still a risk of having outdated comments or code that stakeholders find challenging to comprehend.
Ultimately, what we are looking for is a “documentation-as-code” solution that can benefit both engineers and business stakeholders and this is exactly what BDD can provide. If you are familiar with the concept of “data contract”, BDD can be seen as a form of data contract, but with a focus on the stakeholders rather than the data source.
It can be very beneficial, particularly for data pipelines with complicated business logic, and it helps prevent debates regarding “feature or bug”.
BDD is essentially a software development approach that emphasizes collaboration and communication between stakeholders and developers to ensure that software meets the desired business outcomes. The behavior is described in scenarios that illustrate the expected inputs and outcomes. Each scenario is written in a specific format of “Given-When-Then” where each step describes a specific condition or action.
Let’s see what the scenarios may look like for the customer segmentation example. Since the feature file is written in English, it can be well understood by business stakeholders and they can even contribute to it. It works like a contract between stakeholders and engineers, where engineers are responsible for accurately implementing the requirements, and stakeholders are expected to provide all the necessary information.
Having a clear contract between stakeholders and engineers helps correctly categorize the data issue, distinguishing between “software bugs” resulting from implementation errors and “feature requests” due to missing requirements.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
Feature: Customer Segmentation
As a retail business
In order to personalize marketing strategies
I want to segment customers based on their attributes and behavior
Scenario: Label customers as premium if they are between 19 and 60 and made high past purchases
Given a customer is <age> years old
And the customer has <income_level> income
And the customer has made high past purchases
When the segmentation process is executed
Then the customer should be labeled as premium
Examples:
| income_level | age |
| low | 19 |
| low | 35 |
| medium | 28 |
| high | 42 |
| high | 60 |
Scenario: Label customers as premium if they are between 19 and 60 and have high income
Given a customer is <age> years old
And the customer has high income
And the customer has made <purchases_level> past purchases
When the segmentation process is executed
Then the customer should be labeled as premium
Examples:
| purchases_level | age |
| low | 19 |
| low | 35 |
| medium | 28 |
| high | 42 |
| high | 60 |
Scenario: Label customers as premium if they are above 60, have high income and made high past purchases
Given a customer is <age> years old
And the customer has high income
And the customer has made high past purchases
When the segmentation process is executed
Then the customer should be labeled as premium
Examples:
| age |
| 61 |
| 70 |
| 99 |
| 42 |
| 60 |
Scenario: Label customers as basic if they do not meet the above requirements
Given a customer is <age> years old
And the customer has <income_level> income
And the customer has made <purchases_level> past purchases
When the segmentation process is executed
Then the customer should be labeled as basic
Examples:
| age | income_level | purchases_level |
| 18 | low | low |
| 17 | high | high |
| 28 | low | medium |
| 42 | medium | low |
| 60 | medium | high |
Feature file (Created by Author)
The next step is to generate test code from the feature and that’s where the connection happens. The Pytest code acts as a bridge between the documentation and the implementation code. When there is any misalignment between them, the tests will fail, highlighting the need for synchronization between documentation and implementation.
Test code acts as a bridge (Created by Author)
Here is what the test code looks like. To keep the example short, I only implement the test code for the first scenario. The Given steps set up the initial context for the scenario which in this case gets customer age, income, past purchases data from the examples. The When step triggers the behavior being tested which is
get_user_segment
function. In the Then step, we compare the result from the When step with the expected output from the scenario example.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import pytest
from pytest_bdd import given, when, then, scenario, parsers, scenarios
from demo_project.transformers.user_segment import get_user_segment
import pandas as pd
@scenario(
"test_customer_segment.feature",
"Label customers as premium if they are between 19 and 60 and made high past purchases",
)
def test_label_customers_high_past_purchases():
pass
@given(
parsers.parse("a customer is {age:d} years old"),
target_fixture="customer_age",
)
def customer_age(age):
return age
@given(
parsers.parse("the customer has {income_level} income"),
target_fixture="customer_income",
)
def customer_income(income_level):
return income_level
@given(
"the customer has made high past purchases",
target_fixture="customer_purchases",
)
def customer_purchases():
return "high"
@when(
"the segmentation process is executed",
target_fixture="segmentation_execution",
)
def segmentation_execution(customer_age, customer_income, customer_purchases):
df = pd.DataFrame(
{
"age": [customer_age],
"income": [customer_income],
"past_purchases": [customer_purchases],
},
columns=["age", "income", "past_purchases"],
)
return get_user_segment(df)
@then("the customer should be labeled as premium")
def verify_segmentation(segmentation_execution):
assert segmentation_execution.iloc[0]["segment"] == "premium"
Test code of the 1st scenario in the feature file (Created by Author)
Imagine a change to the age range specified in the first scenario where an example age of 62 is added without updating the code. In such as case, the test would immediately fail because the code has conflicting expectations.
What is Mage?
So far, we’ve seen the potential of BDD and learned how to implement it using Python. Now, it’s time to incorporate BDD into our data pipelines. When it comes to data orchestration, Airflow as the first Python-based orchestrator with a web interface has become the most commonly used tool for executing data pipelines.
But it certainly is not perfect. For example, testing pipelines outside of the production environment, especially when using operators like KubernetesOperator, can be challenging. Additionally, a DAG can be cluttered with boilerplate code and complicated configurations, making it less straightforward to tell the purpose of each task, be it ingestion, transformation, or exportation. Furthermore, Airflow is not focused on being a data-driven orchestration tool because it concerns more about the successful execution of tasks rather than the quality of the final data assets.
As the community of data engineering grows, many Airflow alternatives have come out to fill the gap present in Airflow. Mage is one of the growing data pipeline tools that is seen as a modern replacement for Airflow. Its four design concepts set Mage apart from Airflow and we can sense the difference right from the beginning of the development cycle.
Design principles of Mage (Created by Author)
Mage has a very intuitive UI that allows engineers to swiftly edit and test the pipeline with ease and efficiency.
Each pipeline is composed of several types of blocks: @data_loader, @transformer, @data_exporter, etc, with a clear purpose. This is one of my favorite features because I can immediately understand the objective of each task and focus on the business logic rather than getting caught up in boilerplate code.
Mage UI (Created by Author)
BDD + Mage
A regular data pipeline has three main steps: ingestion, transformation, and exportation. Transformation is the place where all the complicated business logic is implemented, and it’s not uncommon to have multiple transformation steps incorporated.
The clear segregation of the ingestion task and transformation task makes it incredibly straightforward and intuitive to apply BDD to your transformation logic. In fact, it feels just like testing a regular Python function, ignoring the fact that it’s part of a data pipeline.
Let’s go back to the user segmentation example. The business rules are supposed to sit in @transformer block and it is decoupled from the loader and exported.
@transformer block in data pipeline (Created by Author)
The same @transformer block can be plugged into multiple pipelines as long as the loader returns a pandas dataframe. To run the test, we just need to run
pytest
command in the terminal or in the CI/CD pipeline. The pipeline configuration such as the trigger is in a separate file which keeps the main pipeline file as clean as possible.
Let’s imagine what would happen if we implement this in Airflow. As it’s not a complicated example, Airflow can certainly handle it well. But there are a few details that make me feel “errrr” when I switched from Mage to Airflow.
DAG file gets cluttered as each DAG has a big code block to define its metadata. In Mage, the configuration is moved to a yaml file, so the pipeline file keeps concise.
1
2
3
4
5
6
7
@dag(
dag_id="user_segment",
schedule_interval="0 0 * * *",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
2. Data-passing is tricky in Airflow. XCOM in Airflow is used for passing data between tasks. However, it’s not recommended to pass large datasets such as dataframes directly through XCOM. As a workaround, we need to persist the data in temporary storage first which seems to be an unnecessary engineering effort. Mage handles the data passing naturally for us and we don’t need to worry about the size of the dataset.
3. Technically, Airflow supports several versions of Python packages, but with a big cost. KubernetesPodOperator and PythonVirtualenvOperator allow you to run a task in an isolated environment. But you will lose all the convenience that comes out-of-the-box with Airflow such as using another operator. In contrast, Mage addresses this challenge by using one centralized
requirements.txt
, ensuring that all tasks have access to all the native features of Mage.
Conclusion
In this article, I brought two technologies together with the goal of improving test quality and developer experience. BDD aims to enhance the collaboration between stakeholders and engineers by creating a contract in the format of a feature file that is directly embedded within the codebase. On the other hand, Mage is a great data pipeline tool that keeps developer experience as their top priority and truly treats data as a first-class citizen.
I hope you find it inspiring and feel motivated to explore and incorporate at least one technology into your daily work. Making the right tooling choice can certainly amplify your team’s productivity. I’m curious about what you think. Let me know in the comments. Cheers!
Link to original article: