How to set up a batch pipeline in Google Cloud Platform

Biswanath Giri

--

Step 1: Design your pipeline

  1. Define the data sources and sinks, and the processing steps that transform the data. For this example, we’ll create a simple pipeline that reads a CSV file from Google Cloud Storage (GCS), applies a transformation to the data, and writes the result to a BigQuery table.
  2. Here’s the code for the pipeline:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions

# Define the options for the pipeline
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'your-project-id'
google_cloud_options.job_name = 'dataflow-job-name'
google_cloud_options.region = 'us-central1'
google_cloud_options.staging_location = 'gs://your-staging-bucket/staging'
google_cloud_options.temp_location = 'gs://your-temp-bucket/temp'
standard_options = options.view_as(StandardOptions)
standard_options.runner = 'DataflowRunner'
setup_options = options.view_as(SetupOptions)
setup_options.setup_file = './setup.py'

# Define the pipeline
with beam.Pipeline(options=options) as pipeline:
# Read data from GCS
data = (
pipeline
| 'ReadData' >> beam.io.ReadFromText('gs://your-input-bucket/input_data.csv')
)

# Transform the data (optional)
transformed_data = (
data
| 'TransformData' >> beam.Map(lambda line: line.split(','))
)

# Write data to BigQuery
transformed_data | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='your-project-id:your_dataset.your_table',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
schema='column1:STRING,column2:INTEGER,column3:FLOAT'
)

In this pipeline, we first define the pipeline options, including the project ID. We then read the CSV file from GCS using the beam.io.ReadFromText transform, apply a transformation to the data using the beam.Map transform, and write the transformed data to BigQuery using the beam.io.WriteToBigQuery transform.

Step 2: Create a GCP project

If you haven’t already done so, create a new GCP project or use an existing one. You can do this from the Google Cloud Console.

Step 3: Create a Google Cloud Storage (GCS) bucket

Dataflow requires a GCS bucket to store temporary files and job resources. Create a new bucket or use an existing one. You can create a new bucket from the Google Cloud Console.

Step 4: Create a BigQuery dataset (optional)

If you plan to load data into BigQuery, create a new dataset or use an existing one. You can create a new dataset from the Google Cloud Console.

Step 5: Upload data to GCS

If your input data is not already in GCS, upload it to your bucket. You can do this from the Google Cloud Console or using the gsutil command-line tool.

Step 6: Set up authentication

To authenticate your pipeline with GCP, you’ll need to create a service account key and download it as a JSON file. You can do this from the Google Cloud Console.

Once you have the JSON file, set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of the file. For example:

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials.json

Step 7: Install the required Python packages

To run the pipeline, you’ll need to install the apache-beam[gcp] and google-cloud-bigquery packages. You can install these using pip.

pip install apache-beam[gcp]
pip install google-cloud-bigquery

Step 8: Execute the Dataflow job

To execute the pipeline, run the following command:

python my_pipeline.py --runner DataflowRunner --project my-project --temp_location gs://my-bucket/tmp

--

--

No responses yet