Python script that sets up a Google Cloud Function to trigger on file uploads to a Google Cloud Storage (GCS) bucket, transforms the JSON data, and loads it into a BigQuery table
5 min readJul 18, 2024
Prerequisites
- Google Cloud SDK: Make sure you have the Google Cloud SDK installed and configured.
- Create a GCS Bucket: Create a GCS bucket where the JSON files will be uploaded.
- Create a BigQuery Dataset and Table: Create a dataset and table in BigQuery to store the transformed data.
- Enable APIs: Ensure that the Google Cloud Functions, Google Cloud Storage, and BigQuery APIs are enabled in your Google Cloud project.
Google Cloud Function
- main.py: Create a file named
main.py
with the following content
import json
import pandas as pd
from google.cloud import storage, bigquery, logging
import os
# Initialize logging
logging_client = logging.Client()
logger = logging_client.logger('data_transformation_errors')
# Environment variables (set these in your Cloud Function environment)
BQ_PROJECT_ID = os.getenv('BQ_PROJECT_ID')
BQ_DATASET_ID = os.getenv('BQ_DATASET_ID')
BQ_TABLE_ID = os.getenv('BQ_TABLE_ID')
def gcs_to_bigquery(event, context):
file_name = event['name']
bucket_name = event['bucket']
# Initialize clients
storage_client = storage.Client()
bigquery_client = bigquery.Client()
# Load JSON data from GCS
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
json_data = blob.download_as_string()
try:
# Transform JSON data
df = transform_data(json.loads(json_data))
# Load data to BigQuery
load_to_bigquery(df, bigquery_client)
except Exception as e:
logger.log_text(f"Error processing file {file_name}: {e}")
raise
def transform_data(json_data):
try:
# Normalize nested JSON
df = pd.json_normalize(json_data)
# Add additional transformation logic if necessary
validate_data(df)
return df
except Exception as e:
logger.log_text(f"Error in data transformation: {e}")
raise
def validate_data(df):
# Example validation: Check for required columns
required_columns = ['client_id', 'hospital_id', 'plan_name', 'active_status']
for col in required_columns:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
def load_to_bigquery(df, bigquery_client):
table_id = f'{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}'
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)
try:
load_job = bigquery_client.load_table_from_dataframe(df, table_id, job_config=job_config)
load_job.result() # Wait for the job to complete
print(f"Loaded {len(df)} rows into {table_id}.")
except Exception as e:
logger.log_text(f"Error loading data to BigQuery: {e}")
raise
requirements.txt: Create a file named requirements.txt
with the following content
google-cloud-storage
google-cloud-bigquery
google-cloud-logging
pandas
Deploy the Cloud Function: Deploy the Cloud Function using the Google Cloud CLI
gcloud functions deploy gcsToBigQuery \
--runtime python39 \
--trigger-resource YOUR_GCS_BUCKET_NAME \
--trigger-event google.storage.object.finalize \
--entry-point gcs_to_bigquery \
--set-env-vars BQ_PROJECT_ID=YOUR_BQ_PROJECT_ID,BQ_DATASET_ID=YOUR_BQ_DATASET_ID,BQ_TABLE_ID=YOUR_BQ_TABLE_ID
Option -2
from google.cloud import functions
# Import libraries for BigQuery and Cloud Storage (add them to your requirements.txt)
from google.cloud import bigquery
from google.cloud import storage
def process_data(event, context):
"""Triggered by a change to a Cloud Storage bucket."""
bucket_name = event.bucket
file_name = event.name
# Download JSON data from GCS
gcs_client = storage.Client()
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(file_name)
data_json = json.loads(blob.download_as_string())
# Transform data (replace with your transformation logic)
transformed_data = normalize_json(data_json) # Replace with your function
# Load data to BigQuery
load_to_bigquery(transformed_data)
print(f"Processed file: {file_name}")
def normalize_json(data):
"""Function to normalize nested JSON structures (example)"""
# Implement your transformation logic here
# This example flattens a single level of nested objects
if isinstance(data, dict):
flat_data = {}
for key, value in data.items():
if isinstance(value, dict):
for inner_key, inner_value in value.items():
flat_data[f"{key}_{inner_key}"] = inner_value
else:
flat_data[key] = value
return flat_data
else:
return data
def load_to_bigquery(data):
# Define your BigQuery client and schema (replace with details)
bq_client = bigquery.Client()
dataset_id = "your_dataset"
table_id = "your_table"
schema = [
# Define your BigQuery schema fields here (e.g., bigquery.SchemaField("field_name", bigquery.STRING))
]
# Load data to BigQuery table (assuming data is a list of dictionaries)
job = bq_client.load_job(
data,
f"bq://{dataset_id}.{table_id}",
schema=schema,
)
job.result() # Wait for load to complete
# Create a Cloud Function with the specified trigger and service account
# (This part requires additional setup in the Google Cloud Console)
function = functions.Function(
process_data,
# Replace with your trigger configuration (e.g., 'google.storage.object.finalize')
trigger="your_trigger_config",
# Replace with the service account with appropriate permissions
service_account="your_service_account_email"
)
Revised version of your Airflow DAG script. It integrates the necessary components to read data from GCS, transform it using a cloud function, write the transformed data back to GCS, and then load it into BigQuery
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage, bigquery
import requests
import json
from datetime import datetime
# Set variables
gcs_bucket = 'bucket_name'
input_json = 'input_file.json'
transformed_file = 'transformed_file.json'
cloud_function_url = 'your_cloud_function_url'
bq_dataset_id = 'your_dataset_id'
bq_table_id = 'your_table_id'
# Initialize clients
storage_client = storage.Client()
bigquery_client = bigquery.Client()
def read_from_gcs(bucket_name, file_name):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
data = blob.download_as_string()
return json.loads(data)
def write_to_gcs(bucket_name, file_name, data):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
blob.upload_from_string(json.dumps(data))
print(f'File {file_name} uploaded to GCS bucket {bucket_name}.')
def load_into_bigquery(dataset_id, table_id, file_path):
dataset_ref = bigquery_client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
with open(file_path, 'rb') as source_file:
load_job = bigquery_client.load_table_from_file(source_file, table_ref, job_config=job_config)
load_job.result()
print(f'Loaded data into BigQuery table {table_id}.')
def call_cloud_function(url, data):
response = requests.post(url, json=data)
transformed_data = response.json()
return transformed_data
def main():
print('Read data from GCS')
data = read_from_gcs(gcs_bucket, input_json)
print('Transform data')
transformed_data = call_cloud_function(cloud_function_url, data)
print('Write transformed data to GCS')
write_to_gcs(gcs_bucket, transformed_file, transformed_data)
print('Loading into BigQuery')
load_into_bigquery(bq_dataset_id, bq_table_id, transformed_file)
# Define the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
dag = DAG(
'gcs_to_bigquery_dag',
default_args=default_args,
description='A simple DAG to load data from GCS to BigQuery',
schedule_interval='@daily',
)
# Define tasks
main_task = PythonOperator(
task_id='main_task',
python_callable=main,
dag=dag,
)
# Setting the task sequence
main_task
or
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.transfers.gcs_download import GCSToLocalDownloadOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalToGCSOperator
from airflow import models
from google.cloud import bigquery
from google.cloud import storage
import json
import requests # Assuming external API call for transformation
# Replace with your project IDs and bucket/file details
PROJECT_ID = 'your-project-id'
BQ_DATASET = 'your-dataset'
BQ_TABLE = 'your-table'
GCS_BUCKET = 'your-bucket-name'
INPUT_JSON = 'input.json'
OUTPUT_JSON = 'transformed.json'
# Define BigQuery schema (adjust data types as needed)
BQ_SCHEMA = [
{'name': 'column1', 'type': 'STRING'},
{'name': 'column2', 'type': 'INTEGER'},
# Add more schema fields as required
]
default_args = {
'owner': 'airflow',
'start_date': models.Variable.get('start_date', default=datetime.datetime(2024, 7, 18)),
'depends_on_past': False,
'email': ['your_email@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5)
}
with DAG(
dag_id='gcs_to_bq_transform',
default_args=default_args,
schedule_interval='@daily', # Adjust schedule as needed
) as dag:
# Download JSON from GCS
download_json_task = GCSToLocalDownloadOperator(
task_id='download_json',
bucket=GCS_BUCKET,
object=INPUT_JSON,
local_path=f'/tmp/{INPUT_JSON}', # Temporary local storage
)
# Define transformation function
def transform_data(data):
# Modify this function to handle your specific transformation logic
# Assuming an external API call for transformation
transformed_data = call_cloud_function(CLOUD_FUNCTION_URL, data)
return transformed_data
# Transform data using Python operator
transform_data_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
op_args=[f'/tmp/{INPUT_JSON}'], # Pass downloaded JSON path
provide_context=True,
xcom_push=True, # Push transformed data to XCom
)
# Upload transformed JSON to GCS
upload_transformed_task = LocalToGCSOperator(
task_id='upload_transformed',
bucket=GCS_BUCKET,
local_path=f'/tmp/{OUTPUT_JSON}', # Temporary local storage for transformed data
object=OUTPUT_JSON,
)
# Load transformed data into BigQuery
load_to_bq_task = GCSToBigQueryOperator(
task_id='load_to_bq',
use_legacy_encoding=False, # Recommended for modern datasets
source_objects=[f'gs://{GCS_BUCKET}/{OUTPUT_JSON}'],
destination_table=f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}",
schema=BQ_SCHEMA,
write_disposition='WRITE_TRUNCATE', # Adjust write mode as needed
)
# Set task dependencies
download_json_task >> transform_data_task
transform_data_task >> upload_transformed_task
upload_transformed_task >> load_to_bq_task