Python script that sets up a Google Cloud Function to trigger on file uploads to a Google Cloud Storage (GCS) bucket, transforms the JSON data, and loads it into a BigQuery table

Biswanath Giri
5 min readJul 18, 2024

--

Prerequisites

  1. Google Cloud SDK: Make sure you have the Google Cloud SDK installed and configured.
  2. Create a GCS Bucket: Create a GCS bucket where the JSON files will be uploaded.
  3. Create a BigQuery Dataset and Table: Create a dataset and table in BigQuery to store the transformed data.
  4. Enable APIs: Ensure that the Google Cloud Functions, Google Cloud Storage, and BigQuery APIs are enabled in your Google Cloud project.

Google Cloud Function

  1. main.py: Create a file named main.py with the following content
import json
import pandas as pd
from google.cloud import storage, bigquery, logging
import os

# Initialize logging
logging_client = logging.Client()
logger = logging_client.logger('data_transformation_errors')

# Environment variables (set these in your Cloud Function environment)
BQ_PROJECT_ID = os.getenv('BQ_PROJECT_ID')
BQ_DATASET_ID = os.getenv('BQ_DATASET_ID')
BQ_TABLE_ID = os.getenv('BQ_TABLE_ID')

def gcs_to_bigquery(event, context):
file_name = event['name']
bucket_name = event['bucket']

# Initialize clients
storage_client = storage.Client()
bigquery_client = bigquery.Client()

# Load JSON data from GCS
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
json_data = blob.download_as_string()

try:
# Transform JSON data
df = transform_data(json.loads(json_data))

# Load data to BigQuery
load_to_bigquery(df, bigquery_client)
except Exception as e:
logger.log_text(f"Error processing file {file_name}: {e}")
raise

def transform_data(json_data):
try:
# Normalize nested JSON
df = pd.json_normalize(json_data)
# Add additional transformation logic if necessary
validate_data(df)
return df
except Exception as e:
logger.log_text(f"Error in data transformation: {e}")
raise

def validate_data(df):
# Example validation: Check for required columns
required_columns = ['client_id', 'hospital_id', 'plan_name', 'active_status']
for col in required_columns:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")

def load_to_bigquery(df, bigquery_client):
table_id = f'{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}'

job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)

try:
load_job = bigquery_client.load_table_from_dataframe(df, table_id, job_config=job_config)
load_job.result() # Wait for the job to complete
print(f"Loaded {len(df)} rows into {table_id}.")
except Exception as e:
logger.log_text(f"Error loading data to BigQuery: {e}")
raise

requirements.txt: Create a file named requirements.txt with the following content

google-cloud-storage
google-cloud-bigquery
google-cloud-logging
pandas

Deploy the Cloud Function: Deploy the Cloud Function using the Google Cloud CLI

gcloud functions deploy gcsToBigQuery \
--runtime python39 \
--trigger-resource YOUR_GCS_BUCKET_NAME \
--trigger-event google.storage.object.finalize \
--entry-point gcs_to_bigquery \
--set-env-vars BQ_PROJECT_ID=YOUR_BQ_PROJECT_ID,BQ_DATASET_ID=YOUR_BQ_DATASET_ID,BQ_TABLE_ID=YOUR_BQ_TABLE_ID

Option -2

from google.cloud import functions

# Import libraries for BigQuery and Cloud Storage (add them to your requirements.txt)
from google.cloud import bigquery
from google.cloud import storage

def process_data(event, context):
"""Triggered by a change to a Cloud Storage bucket."""
bucket_name = event.bucket
file_name = event.name

# Download JSON data from GCS
gcs_client = storage.Client()
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(file_name)
data_json = json.loads(blob.download_as_string())

# Transform data (replace with your transformation logic)
transformed_data = normalize_json(data_json) # Replace with your function

# Load data to BigQuery
load_to_bigquery(transformed_data)

print(f"Processed file: {file_name}")

def normalize_json(data):
"""Function to normalize nested JSON structures (example)"""
# Implement your transformation logic here
# This example flattens a single level of nested objects
if isinstance(data, dict):
flat_data = {}
for key, value in data.items():
if isinstance(value, dict):
for inner_key, inner_value in value.items():
flat_data[f"{key}_{inner_key}"] = inner_value
else:
flat_data[key] = value
return flat_data
else:
return data

def load_to_bigquery(data):
# Define your BigQuery client and schema (replace with details)
bq_client = bigquery.Client()
dataset_id = "your_dataset"
table_id = "your_table"
schema = [
# Define your BigQuery schema fields here (e.g., bigquery.SchemaField("field_name", bigquery.STRING))
]

# Load data to BigQuery table (assuming data is a list of dictionaries)
job = bq_client.load_job(
data,
f"bq://{dataset_id}.{table_id}",
schema=schema,
)
job.result() # Wait for load to complete

# Create a Cloud Function with the specified trigger and service account
# (This part requires additional setup in the Google Cloud Console)
function = functions.Function(
process_data,
# Replace with your trigger configuration (e.g., 'google.storage.object.finalize')
trigger="your_trigger_config",
# Replace with the service account with appropriate permissions
service_account="your_service_account_email"
)

Revised version of your Airflow DAG script. It integrates the necessary components to read data from GCS, transform it using a cloud function, write the transformed data back to GCS, and then load it into BigQuery

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage, bigquery
import requests
import json
from datetime import datetime

# Set variables
gcs_bucket = 'bucket_name'
input_json = 'input_file.json'
transformed_file = 'transformed_file.json'
cloud_function_url = 'your_cloud_function_url'
bq_dataset_id = 'your_dataset_id'
bq_table_id = 'your_table_id'

# Initialize clients
storage_client = storage.Client()
bigquery_client = bigquery.Client()

def read_from_gcs(bucket_name, file_name):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
data = blob.download_as_string()
return json.loads(data)

def write_to_gcs(bucket_name, file_name, data):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
blob.upload_from_string(json.dumps(data))
print(f'File {file_name} uploaded to GCS bucket {bucket_name}.')

def load_into_bigquery(dataset_id, table_id, file_path):
dataset_ref = bigquery_client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

with open(file_path, 'rb') as source_file:
load_job = bigquery_client.load_table_from_file(source_file, table_ref, job_config=job_config)

load_job.result()
print(f'Loaded data into BigQuery table {table_id}.')

def call_cloud_function(url, data):
response = requests.post(url, json=data)
transformed_data = response.json()
return transformed_data

def main():
print('Read data from GCS')
data = read_from_gcs(gcs_bucket, input_json)

print('Transform data')
transformed_data = call_cloud_function(cloud_function_url, data)

print('Write transformed data to GCS')
write_to_gcs(gcs_bucket, transformed_file, transformed_data)

print('Loading into BigQuery')
load_into_bigquery(bq_dataset_id, bq_table_id, transformed_file)

# Define the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}

dag = DAG(
'gcs_to_bigquery_dag',
default_args=default_args,
description='A simple DAG to load data from GCS to BigQuery',
schedule_interval='@daily',
)

# Define tasks
main_task = PythonOperator(
task_id='main_task',
python_callable=main,
dag=dag,
)

# Setting the task sequence
main_task

or

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.transfers.gcs_download import GCSToLocalDownloadOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalToGCSOperator
from airflow import models
from google.cloud import bigquery
from google.cloud import storage
import json
import requests # Assuming external API call for transformation

# Replace with your project IDs and bucket/file details
PROJECT_ID = 'your-project-id'
BQ_DATASET = 'your-dataset'
BQ_TABLE = 'your-table'
GCS_BUCKET = 'your-bucket-name'
INPUT_JSON = 'input.json'
OUTPUT_JSON = 'transformed.json'

# Define BigQuery schema (adjust data types as needed)
BQ_SCHEMA = [
{'name': 'column1', 'type': 'STRING'},
{'name': 'column2', 'type': 'INTEGER'},
# Add more schema fields as required
]

default_args = {
'owner': 'airflow',
'start_date': models.Variable.get('start_date', default=datetime.datetime(2024, 7, 18)),
'depends_on_past': False,
'email': ['your_email@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5)
}

with DAG(
dag_id='gcs_to_bq_transform',
default_args=default_args,
schedule_interval='@daily', # Adjust schedule as needed
) as dag:

# Download JSON from GCS
download_json_task = GCSToLocalDownloadOperator(
task_id='download_json',
bucket=GCS_BUCKET,
object=INPUT_JSON,
local_path=f'/tmp/{INPUT_JSON}', # Temporary local storage
)

# Define transformation function
def transform_data(data):
# Modify this function to handle your specific transformation logic
# Assuming an external API call for transformation
transformed_data = call_cloud_function(CLOUD_FUNCTION_URL, data)
return transformed_data

# Transform data using Python operator
transform_data_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
op_args=[f'/tmp/{INPUT_JSON}'], # Pass downloaded JSON path
provide_context=True,
xcom_push=True, # Push transformed data to XCom
)

# Upload transformed JSON to GCS
upload_transformed_task = LocalToGCSOperator(
task_id='upload_transformed',
bucket=GCS_BUCKET,
local_path=f'/tmp/{OUTPUT_JSON}', # Temporary local storage for transformed data
object=OUTPUT_JSON,
)

# Load transformed data into BigQuery
load_to_bq_task = GCSToBigQueryOperator(
task_id='load_to_bq',
use_legacy_encoding=False, # Recommended for modern datasets
source_objects=[f'gs://{GCS_BUCKET}/{OUTPUT_JSON}'],
destination_table=f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}",
schema=BQ_SCHEMA,
write_disposition='WRITE_TRUNCATE', # Adjust write mode as needed
)

# Set task dependencies
download_json_task >> transform_data_task
transform_data_task >> upload_transformed_task
upload_transformed_task >> load_to_bq_task

--

--

Biswanath Giri
Biswanath Giri

Written by Biswanath Giri

Cloud & AI Architect | Empowering People in Cloud Computing, Google Cloud AI/ML, and Google Workspace | Enabling Businesses on Their Cloud Journey

No responses yet