Skip to main content

Python/PostgreSQL Wrapper

The Python wrapper provides programmatic access to FastTransfer from Python applications, and includes a PostgreSQL function that allows you to execute FastTransfer directly from within PostgreSQL using PL/Python.

Overview

The FastTransfer Python wrapper offers two main components:

  1. Python Library: A Python module for executing FastTransfer from Python scripts
  2. PostgreSQL PL/Python Function: Execute FastTransfer from within PostgreSQL stored procedures

Benefits:

  • Integrate FastTransfer into Python data pipelines
  • Call FastTransfer from Apache Airflow, Prefect, or other Python orchestrators
  • Execute transfers from PostgreSQL functions and triggers
  • Programmatic error handling and logging
  • Dynamic parameter generation

Repository

The Python wrapper source code and documentation are available on GitHub:

🔗 FastTransfer Python Wrapper on GitHub

Python Library

Installation

Install the wrapper using pip:

pip install fasttransfer-wrapper

Or install from source:

git clone https://github.com/aetperf/FastTransfer-Python-Wrapper.git
cd FastTransfer-Python-Wrapper
pip install .

Requirements

  • Python 3.8 or later
  • FastTransfer binary installed and accessible
  • Database drivers for source/target databases (if testing connections)

Basic Usage

from fasttransfer import FastTransfer

# Create FastTransfer instance
ft = FastTransfer(executable_path="/opt/fasttransfer/FastTransfer")

# Execute a simple transfer
result = ft.transfer(
source_type="pgsql",
source_server="pg.example.com",
source_user="pguser",
source_password="pgpass",
source_database="sales",
source_table="orders",
target_type="mssql",
target_server="sql.example.com",
target_user="sqluser",
target_password="sqlpass",
target_database="warehouse",
target_table="orders"
)

if result.success:
print(f"Transfer completed successfully")
print(f"Rows transferred: {result.row_count}")
print(f"Duration: {result.duration_seconds}s")
else:
print(f"Transfer failed: {result.error_message}")

Advanced Usage

Transfer with Custom Query

from fasttransfer import FastTransfer

ft = FastTransfer()

result = ft.transfer(
source_type="pgsql",
source_server="pg.example.com",
source_user="pguser",
source_password="pgpass",
source_database="sales",
query="SELECT * FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'",
target_type="mssql",
target_server="sql.example.com",
target_user="sqluser",
target_password="sqlpass",
target_database="warehouse",
target_table="recent_orders",
load_mode="Truncate"
)

Parallel Transfer

from fasttransfer import FastTransfer, ParallelMethod

ft = FastTransfer()

result = ft.transfer(
source_type="pgsql",
source_server="pg.example.com",
source_database="warehouse",
source_table="large_fact_table",
target_type="mssql",
target_server="sql.example.com",
target_database="analytics",
target_table="fact_table",
method=ParallelMethod.RANDOM,
degree=8,
batch_size=10000
)

Configuration from Dict

from fasttransfer import FastTransfer

config = {
"source_type": "mysql",
"source_server": "mysql.example.com:3306",
"source_user": "mysqluser",
"source_password": "mysqlpass",
"source_database": "ecommerce",
"source_table": "products",
"target_type": "pgsql",
"target_server": "pg.example.com",
"target_user": "pguser",
"target_password": "pgpass",
"target_database": "catalog",
"target_schema": "public",
"target_table": "products",
"map_method": "Name",
"use_work_tables": True
}

ft = FastTransfer()
result = ft.transfer(**config)

Integration with Apache Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from fasttransfer import FastTransfer

def transfer_data():
ft = FastTransfer(executable_path="/opt/fasttransfer/FastTransfer")

result = ft.transfer(
source_type="pgsql",
source_server="pg.example.com",
source_database="production",
source_table="daily_metrics",
target_type="mssql",
target_server="sql.example.com",
target_database="analytics",
target_table="daily_metrics",
load_mode="Truncate",
log_level="Debug"
)

if not result.success:
raise Exception(f"Transfer failed: {result.error_message}")

return result.row_count

default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'postgres_to_sqlserver_transfer',
default_args=default_args,
description='Daily transfer from PostgreSQL to SQL Server',
schedule_interval='0 2 * * *', # Daily at 2 AM
catchup=False
)

transfer_task = PythonOperator(
task_id='transfer_daily_metrics',
python_callable=transfer_data,
dag=dag
)

Error Handling and Logging

from fasttransfer import FastTransfer, TransferException
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def safe_transfer(config):
ft = FastTransfer()

try:
logger.info(f"Starting transfer from {config['source_table']} to {config['target_table']}")

result = ft.transfer(**config)

if result.success:
logger.info(f"Transfer completed: {result.row_count} rows in {result.duration_seconds}s")
return True
else:
logger.error(f"Transfer failed: {result.error_message}")
return False

except TransferException as e:
logger.error(f"Transfer exception: {str(e)}")
return False
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return False

# Usage
config = {
"source_type": "pgsql",
"source_server": "pg.example.com",
"source_table": "customers",
"target_type": "mssql",
"target_server": "sql.example.com",
"target_table": "customers"
}

success = safe_transfer(config)

PostgreSQL PL/Python Function

Prerequisites

  • PostgreSQL 9.1 or later
  • PL/Python extension installed (plpython3u)
  • FastTransfer binary accessible from PostgreSQL server
  • Python 3.8+ on PostgreSQL server

Installation

Step 1: Enable PL/Python

CREATE EXTENSION plpython3u;

Step 2: Install Python Wrapper

On the PostgreSQL server machine:

sudo -u postgres pip3 install fasttransfer-wrapper

Step 3: Create PostgreSQL Function

CREATE OR REPLACE FUNCTION public.execute_fasttransfer(
p_source_type TEXT,
p_source_server TEXT,
p_source_user TEXT DEFAULT NULL,
p_source_password TEXT DEFAULT NULL,
p_source_database TEXT DEFAULT NULL,
p_source_table TEXT DEFAULT NULL,
p_target_type TEXT DEFAULT 'pgsql',
p_target_server TEXT DEFAULT 'localhost',
p_target_user TEXT DEFAULT NULL,
p_target_password TEXT DEFAULT NULL,
p_target_database TEXT DEFAULT NULL,
p_target_table TEXT DEFAULT NULL,
p_query TEXT DEFAULT NULL,
p_load_mode TEXT DEFAULT 'Append',
p_degree INTEGER DEFAULT 1,
p_fasttransfer_path TEXT DEFAULT '/opt/fasttransfer/FastTransfer'
)
RETURNS TABLE(success BOOLEAN, row_count BIGINT, duration_seconds NUMERIC, error_message TEXT)
AS $$
from fasttransfer import FastTransfer

ft = FastTransfer(executable_path=p_fasttransfer_path)

kwargs = {
'source_type': p_source_type,
'source_server': p_source_server,
'target_type': p_target_type,
'target_server': p_target_server,
}

# Add optional parameters
if p_source_user: kwargs['source_user'] = p_source_user
if p_source_password: kwargs['source_password'] = p_source_password
if p_source_database: kwargs['source_database'] = p_source_database
if p_source_table: kwargs['source_table'] = p_source_table
if p_target_user: kwargs['target_user'] = p_target_user
if p_target_password: kwargs['target_password'] = p_target_password
if p_target_database: kwargs['target_database'] = p_target_database
if p_target_table: kwargs['target_table'] = p_target_table
if p_query: kwargs['query'] = p_query
if p_load_mode: kwargs['load_mode'] = p_load_mode
if p_degree: kwargs['degree'] = p_degree

result = ft.transfer(**kwargs)

return [(result.success, result.row_count, result.duration_seconds, result.error_message)]
$$ LANGUAGE plpython3u;

Usage in PostgreSQL

Basic Transfer

SELECT * FROM public.execute_fasttransfer(
p_source_type := 'mssql',
p_source_server := 'sql.example.com',
p_source_user := 'sqluser',
p_source_password := 'sqlpass',
p_source_database := 'sales',
p_source_table := 'orders',
p_target_database := 'warehouse',
p_target_table := 'orders_staging'
);

Transfer with Query

SELECT * FROM public.execute_fasttransfer(
p_source_type := 'mysql',
p_source_server := 'mysql.example.com:3306',
p_source_user := 'mysqluser',
p_source_password := 'mysqlpass',
p_source_database := 'ecommerce',
p_query := 'SELECT * FROM products WHERE updated_at >= DATE_SUB(NOW(), INTERVAL 1 DAY)',
p_target_database := 'staging',
p_target_table := 'recent_products',
p_load_mode := 'Truncate'
);

Use in Stored Procedure

CREATE OR REPLACE PROCEDURE sync_customer_data()
LANGUAGE plpgsql
AS $$
DECLARE
v_success BOOLEAN;
v_row_count BIGINT;
v_error TEXT;
BEGIN
-- Transfer customers from SQL Server
SELECT success, row_count, error_message
INTO v_success, v_row_count, v_error
FROM public.execute_fasttransfer(
p_source_type := 'mssql',
p_source_server := 'sql.example.com',
p_source_database := 'crm',
p_source_table := 'customers',
p_target_database := 'analytics',
p_target_table := 'customers'
);

IF NOT v_success THEN
RAISE EXCEPTION 'Transfer failed: %', v_error;
END IF;

RAISE NOTICE 'Successfully transferred % rows', v_row_count;
END;
$$;

-- Execute the procedure
CALL sync_customer_data();

Scheduled Transfer with pg_cron

-- Install pg_cron extension
CREATE EXTENSION pg_cron;

-- Schedule daily transfer at 2 AM
SELECT cron.schedule(
'daily-customer-sync',
'0 2 * * *',
$$
SELECT * FROM public.execute_fasttransfer(
p_source_type := 'mssql',
p_source_server := 'sql.example.com',
p_source_database := 'production',
p_source_table := 'customers',
p_target_database := 'warehouse',
p_target_table := 'customers',
p_load_mode := 'Truncate'
);
$$
);

Security Best Practices

Credential Management

Don't hardcode credentials:

# Bad - hardcoded credentials
result = ft.transfer(
source_password="mysecretpassword", # Don't do this!
target_password="anothersecret"
)

# Good - use environment variables
import os

result = ft.transfer(
source_password=os.getenv("PG_PASSWORD"),
target_password=os.getenv("SQL_PASSWORD")
)

Use secrets managers:

from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

credential = DefaultAzureCredential()
client = SecretClient(vault_url="https://myvault.vault.azure.net/", credential=credential)

pg_password = client.get_secret("pg-password").value
sql_password = client.get_secret("sql-password").value

result = ft.transfer(
source_password=pg_password,
target_password=sql_password
)

PostgreSQL Function Security

Restrict who can execute the function:

-- Revoke public access
REVOKE EXECUTE ON FUNCTION public.execute_fasttransfer FROM PUBLIC;

-- Grant to specific role
GRANT EXECUTE ON FUNCTION public.execute_fasttransfer TO etl_users;

Troubleshooting

Python: Module Not Found

Error: ModuleNotFoundError: No module named 'fasttransfer'

Solution:

pip install fasttransfer-wrapper

PostgreSQL: PL/Python Not Available

Error: ERROR: language "plpython3u" does not exist

Solution:

# Ubuntu/Debian
sudo apt-get install postgresql-plpython3-14

# Then in PostgreSQL
CREATE EXTENSION plpython3u;

Permission Denied Executing FastTransfer

Error: "Permission denied executing FastTransfer"

Solution:

  • Ensure FastTransfer binary has execute permissions
  • Verify PostgreSQL user can access the FastTransfer path
  • Check SELinux or AppArmor policies on Linux

Further Information

For complete source code, API documentation, and examples:

🔗 Visit the GitHub Repository

For FastTransfer documentation:

Copyright © 2026 Architecture & Performance. Built with Docusaurus.