Batch Data Pipeline from Iterable to GCS to BigQuery (via Cloud Run)
This project demonstrates how to extract event data from Iterable (a marketing automation platform) and load it into Google BigQuery using a batch process on GCP.
Overview
The pipeline pulls the previous day’s emailSend
events from Iterable’s API in batches of 1,000 records, stores them as CSV files in a Google Cloud Storage bucket, and then automatically loads them into a BigQuery table.
Architecture Flow
Cloud Scheduler triggers a Pub/Sub topic daily.
Cloud Run Function (subscriber) calls Iterable’s API, paginates through the results, and writes the CSV output to a GCS bucket.
Cloud Storage Trigger invokes a second Cloud Run function when a file is saved.
That function loads the CSV file into the specified BigQuery dataset and table.
Configuration Points
Replace placeholders for API key, GCS bucket name, BigQuery dataset, and table name.
Add your columns that match the file extract to your BigQuery table.
Iterable endpoint:
/api/export/data.json?dataTypeName=emailSend
Pagination: 1,000 records per page until no more pages remain.
Pro Tip
When using Pub/Sub as a trigger, review the acknowledgement deadline configuration. If set too low, the function may re-trigger multiple times before completion.
This setup can be easily adapted to pull other Iterable event types or connect different sources and sinks within GCP.
#--requirements google-cloud-storage requests pandas
#this version extracts as csv import requests import pandas as pd from datetime import datetime, timedelta from google.cloud import storage from io import StringIO # For in-memory CSV buffer # --- CONFIG --- API_KEY = "your-api-key" EXPORT_URL = "https://api.iterable.com/api/export/data.csv" # CSV endpoint HEADERS = { "Api-Key": API_KEY, "Content-Type": "application/json" } # --- Date formatter --- def get_previous_day_range(): today = datetime.utcnow().date() start = datetime.combine(today - timedelta(days=1), datetime.min.time()) end = datetime.combine(today, datetime.min.time()) date_format = "%Y-%m-%d %H:%M:%S" return start.strftime(date_format), end.strftime(date_format) def upload_data(request): start_date, end_date = get_previous_day_range() # --- Base params --- params = { "dataTypeName": "emailSend", # Or emailOpen, etc. "startDateTime": start_date, "endDateTime": end_date, "limit": 1000 # 1000 records per page } # --- Pagination loop --- all_chunks = [] next_token = None while True: if next_token: params["pageToken"] = next_token else: params.pop("pageToken", None) # Clean up if first request response = requests.get(EXPORT_URL, headers=HEADERS, params=params) print("Status Code:", response.status_code) if response.status_code != 200: print("Error:", response.text) break # Wrap the text content in StringIO for pandas csv_buffer = StringIO(response.text) chunk_df = pd.read_csv(csv_buffer) all_chunks.append(chunk_df) # Check for pagination next_token = response.headers.get("X-Iterable-Next-Page-Token") if not next_token: print("No more pages.") break # --- Combine all chunks --- if all_chunks: df = pd.concat(all_chunks, ignore_index=True) print(f"Retrieved {len(df)} total records.") else: df = pd.DataFrame() print("No data retrieved.") # ---use todays date as filename file_today = datetime.utcnow().date() file_date_format = "%Y-%m-%d" file_name = file_today.strftime(file_date_format) file_name = file_name + ".csv" storage_client = storage.Client() bucket = storage_client.get_bucket('your-storage-bucket') bucket.blob(file_name).upload_from_string(df.to_csv(), 'text/csv') return "CSV file written successfully", 200
#--requirements functions-framework==3.* google-cloud-bigquery google-cloud-storage functions-framework cloudevents
import functions_framework import base64 from google.cloud import storage, bigquery from cloudevents.http import CloudEvent @functions_framework.cloud_event def iterable_import(cloud_event: CloudEvent): data = cloud_event.data file_name = data["name"] bucket_name = data["bucket"] if not file_name.endswith('.csv'): print(f"Skipping non-data file: {file_name}") return # BigQuery details (you can set as env vars too) dataset_id = "your-data-set" table_id = "your-table-name" # GCS URI gcs_uri = f"gs://{bucket_name}/{file_name}" # Define schema explicitly schema = [ bigquery.SchemaField("rowid", "INT64"), ..... add your columns bigquery.SchemaField("user_id", "STRING") ] # Configure load job job_config = bigquery.LoadJobConfig( schema=schema, skip_leading_rows=1, # Set to 1 if there's a header row source_format=bigquery.SourceFormat.CSV, field_delimiter=",", write_disposition=bigquery.WriteDisposition.WRITE_APPEND ) client = bigquery.Client() table_ref = client.dataset(dataset_id).table(table_id) load_job = client.load_table_from_uri( gcs_uri, table_ref, job_config=job_config ) #load_job.result() # Waits for job to complete print(f"Loaded {load_job.output_rows} rows into {dataset_id}.{table_id} from {file_name}")