How to set up a batch pipeline in Google Cloud Platform
Step 1: Design your pipeline
- Define the data sources and sinks, and the processing steps that transform the data. For this example, we’ll create a simple pipeline that reads a CSV file from Google Cloud Storage (GCS), applies a transformation to the data, and writes the result to a BigQuery table.
- Here’s the code for the pipeline:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions
# Define the options for the pipeline
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'your-project-id'
google_cloud_options.job_name = 'dataflow-job-name'
google_cloud_options.region = 'us-central1'
google_cloud_options.staging_location = 'gs://your-staging-bucket/staging'
google_cloud_options.temp_location = 'gs://your-temp-bucket/temp'
standard_options = options.view_as(StandardOptions)
standard_options.runner = 'DataflowRunner'
setup_options = options.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
# Define the pipeline
with beam.Pipeline(options=options) as pipeline:
# Read data from GCS
data = (
pipeline
| 'ReadData' >> beam.io.ReadFromText('gs://your-input-bucket/input_data.csv')
)
# Transform the data (optional)
transformed_data = (
data
| 'TransformData' >> beam.Map(lambda line: line.split(','))
)
# Write data to BigQuery
transformed_data | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='your-project-id:your_dataset.your_table',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
schema='column1:STRING,column2:INTEGER,column3:FLOAT'
)
In this pipeline, we first define the pipeline options, including the project ID. We then read the CSV file from GCS using the beam.io.ReadFromText
transform, apply a transformation to the data using the beam.Map
transform, and write the transformed data to BigQuery using the beam.io.WriteToBigQuery
transform.
Step 2: Create a GCP project
If you haven’t already done so, create a new GCP project or use an existing one. You can do this from the Google Cloud Console.
Step 3: Create a Google Cloud Storage (GCS) bucket
Dataflow requires a GCS bucket to store temporary files and job resources. Create a new bucket or use an existing one. You can create a new bucket from the Google Cloud Console.
Step 4: Create a BigQuery dataset (optional)
If you plan to load data into BigQuery, create a new dataset or use an existing one. You can create a new dataset from the Google Cloud Console.
Step 5: Upload data to GCS
If your input data is not already in GCS, upload it to your bucket. You can do this from the Google Cloud Console or using the gsutil
command-line tool.
Step 6: Set up authentication
To authenticate your pipeline with GCP, you’ll need to create a service account key and download it as a JSON file. You can do this from the Google Cloud Console.
Once you have the JSON file, set the GOOGLE_APPLICATION_CREDENTIALS
environment variable to the path of the file. For example:
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials.json
Step 7: Install the required Python packages
To run the pipeline, you’ll need to install the apache-beam[gcp]
and google-cloud-bigquery
packages. You can install these using pip
.
pip install apache-beam[gcp]
pip install google-cloud-bigquery
Step 8: Execute the Dataflow job
To execute the pipeline, run the following command:
python my_pipeline.py --runner DataflowRunner --project my-project --temp_location gs://my-bucket/tmp