Python script to migrate data from an on-premises PostgreSQL database to Google BigQuery

Biswanath Giri
3 min readJul 18, 2024

--

Python Script

Below is a Python script that performs the following steps:

  1. Extracts data from PostgreSQL.
  2. Transforms the data to match the BigQuery schema.
  3. Uploads the transformed data to Google Cloud Storage.
  4. Loads the data from GCS into BigQuery.

Prerequisites

  • Install the required Python libraries:
pip install pandas sqlalchemy psycopg2 google-cloud-storage google-cloud-bigquery

Set up environment variables for your PostgreSQL and Google Cloud credentials.

import os
import pandas as pd
from sqlalchemy import create_engine
from google.cloud import storage, bigquery

# PostgreSQL connection details
POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
POSTGRES_HOST = os.getenv('POSTGRES_HOST')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', 5432)
POSTGRES_DB = os.getenv('POSTGRES_DB')

# GCS and BigQuery details
GCS_BUCKET_NAME = os.getenv('GCS_BUCKET_NAME')
GCS_FILE_NAME = 'data.csv'
BQ_PROJECT_ID = os.getenv('BQ_PROJECT_ID')
BQ_DATASET_ID = os.getenv('BQ_DATASET_ID')
BQ_TABLE_ID = os.getenv('BQ_TABLE_ID')

# Initialize clients
storage_client = storage.Client()
bigquery_client = bigquery.Client()

# PostgreSQL connection URL
postgres_url = f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}'

# Extract data from PostgreSQL
def extract_data(query):
engine = create_engine(postgres_url)
with engine.connect() as connection:
df = pd.read_sql(query, connection)
return df

# Transform data to match BigQuery schema
def transform_data(df):
# Add transformation logic if necessary
return df

# Upload data to GCS
def upload_to_gcs(df):
bucket = storage_client.bucket(GCS_BUCKET_NAME)
blob = bucket.blob(GCS_FILE_NAME)
df.to_csv(GCS_FILE_NAME, index=False)
blob.upload_from_filename(GCS_FILE_NAME)
os.remove(GCS_FILE_NAME)
print(f'File {GCS_FILE_NAME} uploaded to GCS bucket {GCS_BUCKET_NAME}.')

# Load data from GCS to BigQuery
def load_to_bigquery():
table_id = f'{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}'
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
)

uri = f'gs://{GCS_BUCKET_NAME}/{GCS_FILE_NAME}'
load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result() # Wait for the job to complete
print(f'Loaded data into BigQuery table {table_id}.')

# Main function
def main():
query = "SELECT * FROM your_table_name"
df = extract_data(query)
df = transform_data(df)
upload_to_gcs(df)
load_to_bigquery()

if __name__ == '__main__':
main()

example of dynamically detecting the schema and adjusting the transformation

def extract_data(query):
engine = create_engine(postgres_url)
with engine.connect() as connection:
df = pd.read_sql(query, connection)
return df

def transform_data(df):
# Dynamic schema detection
expected_columns = ['client_id', 'hospital_id', 'plan_name', 'active_status']
current_columns = df.columns.tolist()

# Reorder columns if necessary
if set(expected_columns) == set(current_columns):
df = df[expected_columns]
else:
raise ValueError("Schema mismatch detected.")

return df

Option -2

from google.cloud import bigquery
from google.cloud import storage
import psycopg2

def extract_data(query, gcs_bucket_name=None):
"""Extracts data from PostgreSQL and uploads to Cloud Storage (optional)"""
try:
pg_conn = psycopg2.connect(**pg_conn_params) # Replace with your connection details
cur = pg_conn.cursor()
cur.execute(query)
data = cur.fetchall()
cur.close()
pg_conn.close()

if gcs_bucket_name:
# Convert data to CSV (basic transformation)
csv_data = [",".join(str(x) for x in row) for row in data]
csv_content = "\n".join(csv_data)

# Upload data to Cloud Storage
gcs_client = storage.Client()
bucket = gcs_client.bucket(gcs_bucket_name)
blob = bucket.blob("data.csv")
blob.upload_from_string(csv_content)

print(f"Data extracted and uploaded to gs://{gcs_bucket_name}/data.csv")
return f"gs://{gcs_bucket_name}/data.csv"
else:
return data # Return data directly if no Cloud Storage used

except Exception as e:
print(f"Error extracting data: {e}")
return None

def load_to_bigquery(data_uri, table_name, schema):
"""Loads data from Cloud Storage (or directly) to BigQuery table"""
bq_client = bigquery.Client()
dataset_ref = bq_client.dataset(bq_client.project)
table_ref = dataset_ref.table(table_name)

if data_uri.startswith("gs://"):
load_job_config = bigquery.LoadJobConfig(
schema=schema,
skip_leading_rows=1, # Assuming first row is header in CSV
field_delimiter=","
)
load_job = bq_client.load_job_from_uri(
data_uri,
table_ref,
job_config=load_job_config
)
else:
load_job = bq_client.load_table_from_dataframe(data, table_ref, schema=schema)

load_job.result() # Wait for load to complete
print(f"Data loaded to BigQuery table: {table_name}")

# Replace with your PostgreSQL connection details
pg_conn_params = {
"host": "your_postgres_host",
"database": "your_database",
"user": "your_user",
"password": "your_password"
}

# Replace with your BigQuery schema (adjust data types as needed)
schema = [
bigquery.SchemaField("column1", bigquery.STRING),
bigquery.SchemaField("column2", bigquery.INTEGER),
]

# Replace with your PostgreSQL query
query = "SELECT column1, column2 FROM your_table"

# Choose one approach based on your needs:
# Option 1: Extract directly to BigQuery (no downtime)
data = extract_data(query)
load_to_bigquery(data, "your_dataset.your_table", schema)

# Option 2: Extract to Cloud Storage, then load to BigQuery (minimal downtime)
gcs_bucket_name = "your_gcs_bucket" # Uncomment if using Cloud Storage
# data_uri = extract_data(query, gcs_bucket_name)
# if data_uri:
# load_to_bigquery(data_uri, "your_dataset.your_table", schema)

--

--

Biswanath Giri
Biswanath Giri

Written by Biswanath Giri

Cloud & AI Architect | Empowering People in Cloud Computing, Google Cloud AI/ML, and Google Workspace | Enabling Businesses on Their Cloud Journey

Responses (1)