Batch Data Pipeline from Iterable to GCS to BigQuery (via Cloud Run)
This project demonstrates how to extract event data from Iterable (a marketing automation platform) and load it into Google BigQuery using a batch process on GCP.
Overview
The pipeline pulls the previous day’s emailSend events from Iterable’s API in batches of 1,000 records, stores them as CSV files in a Google Cloud Storage bucket, and then automatically loads them into a BigQuery table.
Architecture Flow
Cloud Scheduler triggers a Pub/Sub topic daily.
Cloud Run Function (subscriber) calls Iterable’s API, paginates through the results, and writes the CSV output to a GCS bucket.
Cloud Storage Trigger invokes a second Cloud Run function when a file is saved.
That function loads the CSV file into the specified BigQuery dataset and table.
Configuration Points
Replace placeholders for API key, GCS bucket name, BigQuery dataset, and table name.
Add your columns that match the file extract to your BigQuery table.
Iterable endpoint:
/api/export/data.json?dataTypeName=emailSendPagination: 1,000 records per page until no more pages remain.
Pro Tip
When using Pub/Sub as a trigger, review the acknowledgement deadline configuration. If set too low, the function may re-trigger multiple times before completion.
This setup can be easily adapted to pull other Iterable event types or connect different sources and sinks within GCP.
#--requirements google-cloud-storage requests pandas
#this version extracts as csv
import requests
import pandas as pd
from datetime import datetime, timedelta
from google.cloud import storage
from io import StringIO # For in-memory CSV buffer
# --- CONFIG ---
API_KEY = "your-api-key"
EXPORT_URL = "https://api.iterable.com/api/export/data.csv" # CSV endpoint
HEADERS = {
"Api-Key": API_KEY,
"Content-Type": "application/json"
}
# --- Date formatter ---
def get_previous_day_range():
today = datetime.utcnow().date()
start = datetime.combine(today - timedelta(days=1), datetime.min.time())
end = datetime.combine(today, datetime.min.time())
date_format = "%Y-%m-%d %H:%M:%S"
return start.strftime(date_format), end.strftime(date_format)
def upload_data(request):
start_date, end_date = get_previous_day_range()
# --- Base params ---
params = {
"dataTypeName": "emailSend", # Or emailOpen, etc.
"startDateTime": start_date,
"endDateTime": end_date,
"limit": 1000 # 1000 records per page
}
# --- Pagination loop ---
all_chunks = []
next_token = None
while True:
if next_token:
params["pageToken"] = next_token
else:
params.pop("pageToken", None) # Clean up if first request
response = requests.get(EXPORT_URL, headers=HEADERS, params=params)
print("Status Code:", response.status_code)
if response.status_code != 200:
print("Error:", response.text)
break
# Wrap the text content in StringIO for pandas
csv_buffer = StringIO(response.text)
chunk_df = pd.read_csv(csv_buffer)
all_chunks.append(chunk_df)
# Check for pagination
next_token = response.headers.get("X-Iterable-Next-Page-Token")
if not next_token:
print("No more pages.")
break
# --- Combine all chunks ---
if all_chunks:
df = pd.concat(all_chunks, ignore_index=True)
print(f"Retrieved {len(df)} total records.")
else:
df = pd.DataFrame()
print("No data retrieved.")
# ---use todays date as filename
file_today = datetime.utcnow().date()
file_date_format = "%Y-%m-%d"
file_name = file_today.strftime(file_date_format)
file_name = file_name + ".csv"
storage_client = storage.Client()
bucket = storage_client.get_bucket('your-storage-bucket')
bucket.blob(file_name).upload_from_string(df.to_csv(), 'text/csv')
return "CSV file written successfully", 200
#--requirements functions-framework==3.* google-cloud-bigquery google-cloud-storage functions-framework cloudevents
import functions_framework
import base64
from google.cloud import storage, bigquery
from cloudevents.http import CloudEvent
@functions_framework.cloud_event
def iterable_import(cloud_event: CloudEvent):
data = cloud_event.data
file_name = data["name"]
bucket_name = data["bucket"]
if not file_name.endswith('.csv'):
print(f"Skipping non-data file: {file_name}")
return
# BigQuery details (you can set as env vars too)
dataset_id = "your-data-set"
table_id = "your-table-name"
# GCS URI
gcs_uri = f"gs://{bucket_name}/{file_name}"
# Define schema explicitly
schema = [
bigquery.SchemaField("rowid", "INT64"),
..... add your columns
bigquery.SchemaField("user_id", "STRING")
]
# Configure load job
job_config = bigquery.LoadJobConfig(
schema=schema,
skip_leading_rows=1, # Set to 1 if there's a header row
source_format=bigquery.SourceFormat.CSV,
field_delimiter=",",
write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)
client = bigquery.Client()
table_ref = client.dataset(dataset_id).table(table_id)
load_job = client.load_table_from_uri(
gcs_uri,
table_ref,
job_config=job_config
)
#load_job.result() # Waits for job to complete
print(f"Loaded {load_job.output_rows} rows into {dataset_id}.{table_id} from {file_name}")