TLDR
Conditional blocks in Mage are powerful tools for creating dynamic, decision-making data pipelines. They allow you to route data based on specified conditions, enabling parallel processing of different data types within a single pipeline. This article explains how to implement conditional blocks, using a banking example of processing Suspicious Activity Reports (SARs) for transactions over $10,000. By leveraging conditional blocks, you can build more efficient, compliant, and adaptable data workflows.
Source: Giphy
Outline
The Critical Role of Decision-Making in Financial Data Pipelines
The Magic of Parallel Processing with Conditional Blocks
Create a Pipeline in Mage with Conditional Blocks
Mage Install and Project Setup
Step 1: Generate Mock Financial Data
Step 2: Create Transformer Blocks
Step 3: Add Conditional Blocks to Transformer Blocks
Step 4: Load data to PostgreSQL
Conclusion
The Critical Role of Decision-Making in Financial Data Pipelines
Conditional blocks in Mage are add-on blocks that attach to main pipeline blocks where a condition will be evaluated before the block executes. Their primary function is to perform critical checks before a block executes, ensuring that predefined conditions are met. This functionality enables dynamic and responsive financial data routing:
When transaction data meets specified conditions (e.g., exceeding SAR thresholds), it flows seamlessly through the compliance pipeline, undergoing the required reporting processes.
Conversely, if no transactions satisfy the set conditions, the compliance reporting block is intelligently skipped, preventing unnecessary SAR filings and potential regulatory scrutiny.
This conditional execution not only streamlines financial data flow but also enhances pipeline efficiency and regulatory compliance. By allowing for granular control over transaction processing, conditional blocks empower Bank Security Act (BSA) professionals and consultants to create more nuanced, flexible, and robust compliance pipelines that can adapt to varying regulatory scenarios and institutional risk appetites.
Source: Giphy
The Magic of Parallel Processing with Conditional Blocks
One of the standout features of conditional blocks in Mage is their ability to facilitate parallel processing within a single pipeline. This capability enhances financial data workflows, offering several significant advantages:
Streamlined Compliance Architecture
: Instead of creating separate pipelines for different regulatory requirements (e.g., SARs, CTRs, OFAC screening), you can consolidate logic into a single, comprehensive pipeline. This simplification reduces overhead and minimizes the risk of inconsistencies across compliance processes.
Enhanced AML Performance
: By processing different transaction types simultaneously, conditional blocks can significantly improve overall AML efficiency. This parallel execution leverages computational resources more effectively, potentially reducing processing time for high-volume transaction monitoring.
Improved Regulatory Maintainability
: With all compliance logic centralized in one pipeline, updates to regulatory requirements become more straightforward to implement. This centralization ensures that changes are applied consistently across all transaction processing scenarios, reducing the risk of compliance gaps.
Scalability for Growing Financial Institutions
: As your institution's transaction volume or regulatory obligations grow, conditional blocks allow for easy expansion. New compliance checks or processing paths can be added to the existing pipeline without major restructuring of your AML systems.
Better Resource Utilization in Compliance Operations
: By dynamically routing transactions based on risk-based conditions, you ensure that each compliance step only handles relevant data, optimizing resource usage and focusing investigative efforts on high-risk activities.
Let's explore how these advantages play out in a real-world scenario from the banking sector, demonstrating the transformative potential of conditional blocks in Mage for your financial compliance workflows.
Create a Pipeline in Mage with Conditional Blocks
Putting theory into practice, let's walk through the process of building a pipeline in Mage that leverages conditional blocks for financial data processing. This hands-on approach will demonstrate how to implement intelligent decision-making in your AML and compliance workflows. By following these steps, you'll be able to create a robust pipeline that efficiently handles various transaction scenarios while maintaining regulatory compliance.
Mage Install and Project Setup
Prior to completing Step 1, if you are new to Mage and do not have it installed, check out their
to install the open source version.
Once Mage is installed make the following adjustments to the io_config.yml file:
Enter the database name that matches your database in PostgreSQL
Enter the schema where you want your table to live
Enter the Postgres username for your database
Enter the Postgres password used for your database
If you want to create a Mage Secret for the Password, check out the Mage
Finally make sure to include ‘host.docker.internal’ as your Postgres host if you installed Mage using Docker
After completing the steps above you are ready to create a Mage data pipeline using Conditional Blocks.
Step 1: Generate Mock Financial Data
For this tutorial we’re going to generate Mock financial data for this project using the Faker library in Python. To do this complete the following steps:
Open Mage
From the Overview Dashboard click the ‘+ New Pipeline’ button on the top left of the Mage UI
Name the pipeline conditional_sar_pipeline or whatever you feel like naming the pipeline and click ‘Create’
Add a data loader block to the pipeline by clicking ‘all blocks and then hovering over ‘data loaders’ and then clicking ‘Base template (generic)’
Enter a name for the data loader block something that will remind you of the block’s purpose. Then ‘Save and add’
Replace the templated code in the data loader block with the code below and hit the run button located at the top right side of the block.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import random
from faker import Faker
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data(*args, **kwargs):
fake = Faker()
# Number of records to generate
num_records = 20000
# List of channels
channels = ["Cash", "Wire-Transfer"]
# Function to generate a transaction amount
def generate_amount():
if random.random() <= 0.05: # 5% chance for amounts > 2500
return round(random.uniform(2500.01, 15000.00), 2)
else: # 95% chance for amounts <= 2500
return round(random.uniform(0.01, 2500.00), 2)
# Function to generate a random time on the fixed date
def generate_timestamp():
base_date = datetime(2024, 6, 27)
random_time = base_date + timedelta(seconds=random.randint(0, 86399)) # Random second in the day
return random_time.isoformat()
# Generate fake records
records = []
for _ in range(num_records):
record = {
"transaction_id": fake.bothify(text="??######"),
"channel": random.choice(channels),
"timestamp": generate_timestamp(), # Random timestamp for 07/16/2024
"amount": generate_amount(),
"currency": "USD",
"merchant": fake.company(),
"location": f"{fake.city()}, {fake.country()}"
}
records.append(record)
# Convert the records to a pandas DataFrame
df = pd.DataFrame(records)
# Return the DataFrame
return df
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
You should see that the block returned 20,000 rows of mock financial data tagged as Cash or Wire Transfer. The amounts should be between $0.01 and $15,000.00 United States Dollars. The block gives you some sense of real life banking transactions, some of which will require SARs reporting. Complete Step 2 to transform the data and prepare it export to the PostgreSQL database.
Step 2: Create Conditional Transformer Blocks
Now it’s time to create the transformer blocks for this tutorial. Here’s the step by step:
Create the transformer blocks by clicking all blocks and then hovering over ‘Transformer’ and clicking ‘Base template (generic)’ under python.
Give the block a meaningful name similar to transactions_greater_than_10000
Create two blocks for the transformers by completing these steps.
Give the second block a meaningful name similar to transactions_less_than_10000
Next remove the connection between the two transformer blocks by clicking on the connector line and then clicking ‘Remove connection’
Hover over the bubble to the top of the broken transformer block and then connect it to the data loader block
After connecting the blocks your tree view should appear similar to this view.
Replace the code underneath the @transformer decorator in the transactions_greater_than_10000 block with the code below
1
2
3
4
5
6
7
8
9
def transform(data, *args, **kwargs):
# Filter records with an amount greater than 10000
suspicious_transactions = data[data['amount'] >= 10000].copy()
# Mark remaining transactions as suspicious
suspicious_transactions['suspicious'] = 'yes'
return suspicious_transactions
Replace the code underneath the @transformer decorator in the transactions_less_than_10000 block with the code below
1
2
3
4
5
6
7
8
9
def transform(data, *args, **kwargs):
# Filter records with an amount less than 10000
non_suspicious_transactions = data[data['amount'] < 10000].copy()
# Mark remaining transactions as not suspicious
non_suspicious_transactions['suspicious'] = 'no'
return non_suspicious_transactions
Before running the transformer blocks let’s create the conditional blocks which will create rules for our blocks to run. Remember, if the batch or stream fails the condition of a block, the block will not run in production.
Step 3: Add Conditional Blocks to Transformer Blocks
This step is where we add our ‘decision makers’ to the example. Our goal is to automate decisions on whether to store data in the SARs table or the regular transactions table. Here’s how we do it:
From the navigation menu on the right side of the Mage UI click ‘Add-on blocks’
Click the ‘Conditionals’ Add-on block
Next click ‘+ Conditional Block’ and then hover over ‘Python’ and click ‘Base template’
Give the conditional block a meaningful name and then click ‘Save and add’
Replace the code underneath the @condition decorator in the greater_than_10000 conditional block with the code below
1
2
3
4
5
6
7
8
9
10
def evaluate_condition(data, *args, **kwargs) -> bool:
# Check if 'amount' column exists
if 'amount' not in data.columns:
return False
# Check if there are any amounts >= 5000
large_amounts = data['amount'] >= 10000
# Return True if there are any large amounts, False otherwise
return large_amounts.any()
Replace the code underneath the @condition decorator in the less_than_10000 conditional block with the code below
1
2
3
4
5
6
7
8
9
def transform(data, *args, **kwargs):
# Filter records with an amount less than 10000
non_suspicious_transactions = data[data['amount'] < 10000].copy()
# Mark remaining transactions as not suspicious
non_suspicious_transactions['suspicious'] = 'no'
return non_suspicious_transactions
Apply the conditional block to the correct transformer block by clicking the ‘Select blocks to add conditionals to’ button, checking which block to add the condition to, and then clicking ‘Save selected blocks
If the block does not contain any data and you run it in the pipeline editor you will get the response below
By adding these conditional blocks, we’re setting up the logic to automate our decision-making process. This ensures that transactions are directed to the appropriate tables—either the SARs table for suspicious transactions or the regular transactions table for the rest. With these steps, you can now create a more intelligent and efficient data pipeline in Mage.
Step 4: Load data to PostgreSQL
Finally, we’re going to load the data into two separate tables in our PostgreSQL financial database. Based on the io_config.yml file you should have a database in Postgre called financial_database and a schema called test. Don’t worry about creating the tables, Mage will handle this part for you. To load data to Postgres complete the following steps:
Create two data exporter blocks greater_than_10000_to_postgres and less_than_10000_to_postgres
You may have to remove and reconnect the data exporters to the correct transformer block. Follow the steps above for breaking and connecting blocks
Change the schema_name to the schema where you want the table to live in your Postgres database
Add the table name you want your table to be called
Ensure the config_profile matches the YAML file
Run the block by clicking the yellow arrow button
After a successful run the below message will appear
Make sure to complete this step for both the greater than 10000 side of the pipeline and the less than 10000 side of the pipeline. Check your PostgreSQL database. You should see the table name for the less than side and a table name for the greater than side that you provided in the data exporters.
Conclusion
Conditional blocks in Mage offer a powerful way to create intelligent, dynamic financial data pipelines. By implementing decision-making logic directly within your compliance workflows, you can create more efficient, compliant, and adaptable transaction monitoring systems. Whether you're focusing on AML, fraud detection, or general regulatory compliance, mastering conditional blocks in Mage can significantly enhance your financial data processing capabilities.
Remember, the key to effective financial data pipeline design is not just about moving transactions from point A to point B, but making smart, risk-based decisions along the way. With Mage's conditional blocks, BSA professionals and consultants are well-equipped to build pipelines that don't just process financial data, but truly understand and act on it in alignment with regulatory expectations and institutional risk policies.