Batch Data Pipeline from Iterable to GCS to BigQuery (via Cloud Run)

This project demonstrates how to extract event data from Iterable (a marketing automation platform) and load it into Google BigQuery using a batch process on GCP.

Overview
The pipeline pulls the previous day’s emailSend events from Iterable’s API in batches of 1,000 records, stores them as CSV files in a Google Cloud Storage bucket, and then automatically loads them into a BigQuery table.

Architecture Flow

  1. Cloud Scheduler triggers a Pub/Sub topic daily.

  2. Cloud Run Function (subscriber) calls Iterable’s API, paginates through the results, and writes the CSV output to a GCS bucket.

  3. Cloud Storage Trigger invokes a second Cloud Run function when a file is saved.

  4. That function loads the CSV file into the specified BigQuery dataset and table.

Configuration Points

  • Replace placeholders for API key, GCS bucket name, BigQuery dataset, and table name.

  • Add your columns that match the file extract to your BigQuery table.

  • Iterable endpoint: /api/export/data.json?dataTypeName=emailSend

  • Pagination: 1,000 records per page until no more pages remain.

Pro Tip
When using Pub/Sub as a trigger, review the acknowledgement deadline configuration. If set too low, the function may re-trigger multiple times before completion.

This setup can be easily adapted to pull other Iterable event types or connect different sources and sinks within GCP.

#--requirements
google-cloud-storage
requests
pandas
#this version extracts as csv
import requests
import pandas as pd
from datetime import datetime, timedelta
from google.cloud import storage
from io import StringIO  # For in-memory CSV buffer

# --- CONFIG ---
API_KEY = "your-api-key"
EXPORT_URL = "https://api.iterable.com/api/export/data.csv"  # CSV endpoint
HEADERS = {
    "Api-Key": API_KEY,
    "Content-Type": "application/json"
}

# --- Date formatter ---
def get_previous_day_range():
    today = datetime.utcnow().date()
    start = datetime.combine(today - timedelta(days=1), datetime.min.time())
    end = datetime.combine(today, datetime.min.time())
    date_format = "%Y-%m-%d %H:%M:%S"
    return start.strftime(date_format), end.strftime(date_format)

def upload_data(request):
    start_date, end_date = get_previous_day_range()

    # --- Base params ---
    params = {
        "dataTypeName": "emailSend",  # Or emailOpen, etc.
        "startDateTime": start_date,
        "endDateTime": end_date,
        "limit": 1000  # 1000 records per page
    }

    # --- Pagination loop ---
    all_chunks = []
    next_token = None

    while True:
        if next_token:
            params["pageToken"] = next_token
        else:
            params.pop("pageToken", None)  # Clean up if first request

        response = requests.get(EXPORT_URL, headers=HEADERS, params=params)
        print("Status Code:", response.status_code)

        if response.status_code != 200:
            print("Error:", response.text)
            break

        # Wrap the text content in StringIO for pandas
        csv_buffer = StringIO(response.text)
        chunk_df = pd.read_csv(csv_buffer)
        all_chunks.append(chunk_df)

        # Check for pagination
        next_token = response.headers.get("X-Iterable-Next-Page-Token")
        if not next_token:
            print("No more pages.")
            break

    # --- Combine all chunks ---
    if all_chunks:
        df = pd.concat(all_chunks, ignore_index=True)
        print(f"Retrieved {len(df)} total records.")
    else:
        df = pd.DataFrame()
        print("No data retrieved.")

    # ---use todays date as filename
    file_today = datetime.utcnow().date()
    file_date_format = "%Y-%m-%d"
    file_name = file_today.strftime(file_date_format)
    file_name = file_name + ".csv"
    storage_client = storage.Client()
    bucket = storage_client.get_bucket('your-storage-bucket')
    
    bucket.blob(file_name).upload_from_string(df.to_csv(), 'text/csv')
    return "CSV file written successfully", 200
#--requirements
functions-framework==3.*
google-cloud-bigquery
google-cloud-storage
functions-framework
cloudevents
import functions_framework
import base64
from google.cloud import storage, bigquery
from cloudevents.http import CloudEvent

@functions_framework.cloud_event
def iterable_import(cloud_event: CloudEvent):
    data = cloud_event.data
    file_name = data["name"]
    bucket_name = data["bucket"]

    if not file_name.endswith('.csv'):
        print(f"Skipping non-data file: {file_name}")
        return

    # BigQuery details (you can set as env vars too)
    dataset_id = "your-data-set"
    table_id = "your-table-name"

    # GCS URI
    gcs_uri = f"gs://{bucket_name}/{file_name}"

    # Define schema explicitly
    schema = [
        bigquery.SchemaField("rowid", "INT64"),
        ..... add your columns
        bigquery.SchemaField("user_id", "STRING")       
    ]

    # Configure load job
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        skip_leading_rows=1,  # Set to 1 if there's a header row
        source_format=bigquery.SourceFormat.CSV,
        field_delimiter=",",
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND
    )

    client = bigquery.Client()
    table_ref = client.dataset(dataset_id).table(table_id)

    load_job = client.load_table_from_uri(
        gcs_uri,
        table_ref,
        job_config=job_config
    )

    #load_job.result()  # Waits for job to complete

    print(f"Loaded {load_job.output_rows} rows into {dataset_id}.{table_id} from {file_name}")
Previous
Previous

Multi-Tenant Architecture: A Database or Schema Per Tenant? A DB-Native Approach to Migration Management

Next
Next

Auditing & Change Data Capture in Oracle