Ibis integration¶
Ibis is an open-source Python library that provides a high-level, Pythonic interface for SQL queries. It allows users to write SQL queries using Python syntax, which are then translated into actual SQL code for execution on various database backends. Ibis supports multiple database engines including PostgreSQL, MySQL, SQLite, Google BigQuery, DuckDB, and many others.
Integrating Ibis with Kedro enables you to efficiently execute SQL queries as part of your data workflows without needing to resort to hardcoded SQL or less flexible methods. This integration provides several benefits:
Centralized SQL Query Management: Define SQL queries in Python code, allowing for version control within your Kedro project repository
Database Engine Flexibility: Switch between different database backends without changing your query syntax
Performance Optimizations: Push down SQL operations to the database, optimizing performance and reducing data movement
Prerequisites¶
You will need the following:
A working Kedro project in a virtual environment
Ibis installed in the same virtual environment
To set yourself up, install Ibis and the appropriate database connector. We recommend using DuckDB as it’s easy to install and use:
pip install ibis-framework
# Install DuckDB connector (recommended)
pip install 'ibis-framework[duckdb]'
# Or install other backend connectors if needed
# For PostgreSQL
pip install 'ibis-framework[postgres]'
# For MySQL
pip install 'ibis-framework[mysql]'
# For SQLite
pip install 'ibis-framework[sqlite]'
# For BigQuery
pip install 'ibis-framework[bigquery]'
Setting up Ibis in your Kedro project¶
Configure database connection¶
The first step is to configure your database connection in your Kedro project. You should store your database connection parameters in your credentials configuration files for better security.
Create or update your file at conf/local/credentials.yml
with your database connection parameters:
# conf/local/credentials.yml
database:
type: duckdb # or postgres, mysql, sqlite, bigquery, etc.
# For DuckDB, you only need to specify the path to the database file
path: ${BASE_PATH}/data/database.duckdb
# For other databases like PostgreSQL, you would include these parameters:
# host: localhost
# port: 5432
# database: mydatabase
# username: myuser
# password: ${DB_PASSWORD} # Use credentials from environment variables
Using the Ibis TableDataset with Kedro’s DataCatalog¶
Kedro provides a built-in TableDataset
for Ibis in the kedro-datasets
package. You can use this dataset to integrate Ibis tables with your Kedro pipelines.
First, install the required package:
pip install "kedro-datasets[ibis]"
Then, configure your DataCatalog to use the Ibis TableDataset. Add the following to your conf/base/catalog.yml
:
# conf/base/catalog.yml
customers_table:
type: kedro_datasets.ibis.TableDataset
credentials: database # References the database config in credentials.yml
table_name: customers
filtered_customers:
type: kedro_datasets.ibis.TableDataset
credentials: database
table_name: customers
query: SELECT * FROM customers WHERE age > 30
# Example of using Ibis expressions in catalog
high_value_customers:
type: kedro_datasets.ibis.TableDataset
credentials: database
table_name: customers
# Using Ibis expression for filtering
ibis_expr: |
def query(table):
return table.filter(table.lifetime_value > 1000)
customer_orders:
type: kedro_datasets.ibis.TableDataset
credentials: database
table_name: orders
# Using Ibis expression for joining and aggregation
ibis_expr: |
def query(table):
customers = context.catalog.load('customers_table')
return table.join(
customers,
table.customer_id == customers.id
).group_by(table.customer_id).aggregate(
total_orders=table.order_id.count(),
avg_amount=table.amount.mean()
)
Creating a database connection from credentials¶
To use Ibis in your Kedro nodes, you’ll need to create a database connection object from your credentials. You can do this by adding a custom dataset to your catalog that creates and provides an Ibis connection:
# conf/base/catalog.yml
database_connection:
type: kedro_datasets.ibis.IbisConnectionDataset
credentials: database # References the database config in credentials.yml
This will create an Ibis connection object that can be used in your nodes to interact with the database.
Using Ibis in Kedro nodes¶
Now you can use Ibis in your Kedro nodes to perform SQL operations. Here’s an example of a node that uses Ibis to transform data:
def filter_customers(customers_table: ibis.expr.types.TableExpr) -> ibis.expr.types.TableExpr:
"""Filter customers by age and calculate average purchase amount."""
# Filter customers older than 25
filtered = customers_table.filter(customers_table.age > 25)
# Group by customer_id and calculate average purchase amount
result = filtered.group_by("customer_id").aggregate(
avg_purchase=filtered.purchase_amount.mean(),
total_purchases=filtered.purchase_amount.count()
)
return result
def join_customer_orders(customers: ibis.expr.types.TableExpr,
orders: ibis.expr.types.TableExpr) -> ibis.expr.types.TableExpr:
"""Join customers with their orders."""
# Join customers and orders on customer_id
joined = customers.join(orders, customers.id == orders.customer_id)
# Select relevant columns
result = joined.select([
customers.id,
customers.name,
orders.order_date,
orders.amount
])
return result
Add these nodes to your pipeline:
from kedro.pipeline import Pipeline, node
def create_pipeline(**kwargs):
return Pipeline(
[
node(
func=filter_customers,
inputs="customers_table",
outputs="filtered_customers",
name="filter_customers_node",
),
node(
func=join_customer_orders,
inputs=["customers_table", "orders_table"],
outputs="customer_orders",
name="join_customer_orders_node",
),
]
)
Best practices¶
Structuring SQL queries in your Kedro project¶
Modularize your queries: Create reusable Ibis expressions in separate modules
Avoid hardcoded SQL: Use Ibis expressions instead of raw SQL strings
Use Kedro parameters: Parameterize your queries using Kedro’s parameter system
Performance optimization¶
Push operations to the database: Let Ibis push down operations to the database when possible
Use appropriate indexes: Ensure your database tables have proper indexes for your queries
Limit data transfer: Only select the columns you need to minimize data movement
Error handling and debugging¶
Log query execution: Use Kedro’s logging system to log query execution
Handle database errors: Implement proper error handling for database connection issues
Debug with .compile(): Use Ibis’s
.compile()
method to see the generated SQL
Example: Complete pipeline with Ibis and DuckDB¶
Here’s a complete example of a Kedro pipeline that uses Ibis with DuckDB for SQL operations:
# src/<package_name>/pipelines/data_processing/nodes.py
import ibis
from ibis import _
import pandas as pd
def create_sample_data(conn) -> None:
"""Create sample tables in DuckDB for demonstration."""
# Create customers table
customers_df = pd.DataFrame({
'id': range(1, 11),
'name': [f'Customer {i}' for i in range(1, 11)],
'age': [25, 40, 35, 28, 52, 19, 31, 45, 33, 60],
'lifetime_value': [100.0, 2500.0, 550.0, 1200.0, 3000.0, 50.0, 750.0, 1800.0, 400.0, 5000.0]
})
# Create orders table
orders_df = pd.DataFrame({
'order_id': range(1, 21),
'customer_id': [1, 2, 2, 3, 4, 5, 5, 5, 6, 7, 7, 8, 8, 8, 9, 9, 10, 10, 10, 10],
'order_date': pd.date_range('2023-01-01', periods=20),
'amount': [50.0, 100.0, 200.0, 150.0, 300.0, 500.0, 400.0, 600.0, 50.0, 75.0,
80.0, 200.0, 300.0, 150.0, 100.0, 200.0, 1000.0, 1500.0, 800.0, 1200.0]
})
# Create tables in DuckDB
conn.create_table('customers', customers_df)
conn.create_table('orders', orders_df)
def load_customers(conn) -> ibis.expr.types.TableExpr:
"""Load customers table from database."""
return conn.table('customers')
def load_orders(conn) -> ibis.expr.types.TableExpr:
"""Load orders table from database."""
return conn.table('orders')
def filter_high_value_customers(customers: ibis.expr.types.TableExpr,
min_value: float) -> ibis.expr.types.TableExpr:
"""Filter customers with high total order value."""
return customers.filter(customers.lifetime_value > min_value)
def join_with_orders(customers: ibis.expr.types.TableExpr,
orders: ibis.expr.types.TableExpr) -> ibis.expr.types.TableExpr:
"""Join customers with their orders."""
return customers.join(orders, customers.id == orders.customer_id)
def aggregate_order_stats(joined_data: ibis.expr.types.TableExpr) -> ibis.expr.types.TableExpr:
"""Calculate order statistics by customer."""
return joined_data.group_by(joined_data.customer_id).aggregate(
total_orders=joined_data.order_id.count(),
avg_order_value=joined_data.amount.mean(),
max_order_value=joined_data.amount.max(),
min_order_value=joined_data.amount.min()
)
def to_pandas(expr: ibis.expr.types.TableExpr) -> pd.DataFrame:
"""Execute the Ibis expression and return a pandas DataFrame."""
return expr.execute()
# src/<package_name>/pipelines/data_processing/pipeline.py
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline
from .<package_name>.pipelines.data_processing.nodes import (
create_sample_data,
load_customers,
load_orders,
filter_high_value_customers,
join_with_orders,
aggregate_order_stats,
to_pandas
)
def create_pipeline(**kwargs):
return Pipeline(
[
node(
func=create_sample_data,
inputs="database_connection", # Connection object created from database credentials
outputs=None,
name="create_sample_data_node",
),
node(
func=load_customers,
inputs="database_connection", # Connection object created from database credentials
outputs="customers_table",
name="load_customers_node",
),
node(
func=load_orders,
inputs="database_connection", # Connection object created from database credentials
outputs="orders_table",
name="load_orders_node",
),
node(
func=filter_high_value_customers,
inputs=["customers_table", "params:min_customer_value"],
outputs="high_value_customers",
name="filter_high_value_customers_node",
),
node(
func=join_with_orders,
inputs=["high_value_customers", "orders_table"],
outputs="joined_customer_orders",
name="join_with_orders_node",
),
node(
func=aggregate_order_stats,
inputs="joined_customer_orders",
outputs="customer_order_stats_ibis",
name="aggregate_order_stats_node",
),
node(
func=to_pandas,
inputs="customer_order_stats_ibis",
outputs="customer_order_stats_df",
name="to_pandas_node",
),
]
)
Conclusion¶
Integrating Ibis with Kedro provides a powerful way to work with SQL databases in your data pipelines. By using Ibis, you can write SQL queries in a Pythonic way, making them more maintainable and easier to version control. The integration allows you to leverage the strengths of both tools: Kedro’s pipeline management and Ibis’s SQL query capabilities.
This approach offers several advantages:
Maintainability: SQL queries are written in Python, making them easier to maintain and version control
Flexibility: Switch between different database backends without changing your query syntax
Performance: Push down operations to the database for optimal performance
Integration: Seamlessly integrate SQL operations with the rest of your Kedro pipeline
By following the best practices outlined in this guide, you can build efficient, maintainable data pipelines that leverage the power of SQL databases through Ibis and Kedro.