Python script to migrate data from an on-premises PostgreSQL database to Google BigQuery
3 min readJul 18, 2024
Python Script
Below is a Python script that performs the following steps:
- Extracts data from PostgreSQL.
- Transforms the data to match the BigQuery schema.
- Uploads the transformed data to Google Cloud Storage.
- Loads the data from GCS into BigQuery.
Prerequisites
- Install the required Python libraries:
pip install pandas sqlalchemy psycopg2 google-cloud-storage google-cloud-bigquery
Set up environment variables for your PostgreSQL and Google Cloud credentials.
import os
import pandas as pd
from sqlalchemy import create_engine
from google.cloud import storage, bigquery
# PostgreSQL connection details
POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
POSTGRES_HOST = os.getenv('POSTGRES_HOST')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', 5432)
POSTGRES_DB = os.getenv('POSTGRES_DB')
# GCS and BigQuery details
GCS_BUCKET_NAME = os.getenv('GCS_BUCKET_NAME')
GCS_FILE_NAME = 'data.csv'
BQ_PROJECT_ID = os.getenv('BQ_PROJECT_ID')
BQ_DATASET_ID = os.getenv('BQ_DATASET_ID')
BQ_TABLE_ID = os.getenv('BQ_TABLE_ID')
# Initialize clients
storage_client = storage.Client()
bigquery_client = bigquery.Client()
# PostgreSQL connection URL
postgres_url = f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}'
# Extract data from PostgreSQL
def extract_data(query):
engine = create_engine(postgres_url)
with engine.connect() as connection:
df = pd.read_sql(query, connection)
return df
# Transform data to match BigQuery schema
def transform_data(df):
# Add transformation logic if necessary
return df
# Upload data to GCS
def upload_to_gcs(df):
bucket = storage_client.bucket(GCS_BUCKET_NAME)
blob = bucket.blob(GCS_FILE_NAME)
df.to_csv(GCS_FILE_NAME, index=False)
blob.upload_from_filename(GCS_FILE_NAME)
os.remove(GCS_FILE_NAME)
print(f'File {GCS_FILE_NAME} uploaded to GCS bucket {GCS_BUCKET_NAME}.')
# Load data from GCS to BigQuery
def load_to_bigquery():
table_id = f'{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}'
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
)
uri = f'gs://{GCS_BUCKET_NAME}/{GCS_FILE_NAME}'
load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result() # Wait for the job to complete
print(f'Loaded data into BigQuery table {table_id}.')
# Main function
def main():
query = "SELECT * FROM your_table_name"
df = extract_data(query)
df = transform_data(df)
upload_to_gcs(df)
load_to_bigquery()
if __name__ == '__main__':
main()
example of dynamically detecting the schema and adjusting the transformation
def extract_data(query):
engine = create_engine(postgres_url)
with engine.connect() as connection:
df = pd.read_sql(query, connection)
return df
def transform_data(df):
# Dynamic schema detection
expected_columns = ['client_id', 'hospital_id', 'plan_name', 'active_status']
current_columns = df.columns.tolist()
# Reorder columns if necessary
if set(expected_columns) == set(current_columns):
df = df[expected_columns]
else:
raise ValueError("Schema mismatch detected.")
return df
Option -2
from google.cloud import bigquery
from google.cloud import storage
import psycopg2
def extract_data(query, gcs_bucket_name=None):
"""Extracts data from PostgreSQL and uploads to Cloud Storage (optional)"""
try:
pg_conn = psycopg2.connect(**pg_conn_params) # Replace with your connection details
cur = pg_conn.cursor()
cur.execute(query)
data = cur.fetchall()
cur.close()
pg_conn.close()
if gcs_bucket_name:
# Convert data to CSV (basic transformation)
csv_data = [",".join(str(x) for x in row) for row in data]
csv_content = "\n".join(csv_data)
# Upload data to Cloud Storage
gcs_client = storage.Client()
bucket = gcs_client.bucket(gcs_bucket_name)
blob = bucket.blob("data.csv")
blob.upload_from_string(csv_content)
print(f"Data extracted and uploaded to gs://{gcs_bucket_name}/data.csv")
return f"gs://{gcs_bucket_name}/data.csv"
else:
return data # Return data directly if no Cloud Storage used
except Exception as e:
print(f"Error extracting data: {e}")
return None
def load_to_bigquery(data_uri, table_name, schema):
"""Loads data from Cloud Storage (or directly) to BigQuery table"""
bq_client = bigquery.Client()
dataset_ref = bq_client.dataset(bq_client.project)
table_ref = dataset_ref.table(table_name)
if data_uri.startswith("gs://"):
load_job_config = bigquery.LoadJobConfig(
schema=schema,
skip_leading_rows=1, # Assuming first row is header in CSV
field_delimiter=","
)
load_job = bq_client.load_job_from_uri(
data_uri,
table_ref,
job_config=load_job_config
)
else:
load_job = bq_client.load_table_from_dataframe(data, table_ref, schema=schema)
load_job.result() # Wait for load to complete
print(f"Data loaded to BigQuery table: {table_name}")
# Replace with your PostgreSQL connection details
pg_conn_params = {
"host": "your_postgres_host",
"database": "your_database",
"user": "your_user",
"password": "your_password"
}
# Replace with your BigQuery schema (adjust data types as needed)
schema = [
bigquery.SchemaField("column1", bigquery.STRING),
bigquery.SchemaField("column2", bigquery.INTEGER),
]
# Replace with your PostgreSQL query
query = "SELECT column1, column2 FROM your_table"
# Choose one approach based on your needs:
# Option 1: Extract directly to BigQuery (no downtime)
data = extract_data(query)
load_to_bigquery(data, "your_dataset.your_table", schema)
# Option 2: Extract to Cloud Storage, then load to BigQuery (minimal downtime)
gcs_bucket_name = "your_gcs_bucket" # Uncomment if using Cloud Storage
# data_uri = extract_data(query, gcs_bucket_name)
# if data_uri:
# load_to_bigquery(data_uri, "your_dataset.your_table", schema)