GCP-Native BigQuery Python ETL Cloud Composer

CME Group — GCP Architecture Overview

Public CMEGroup.com pages → governed ingestion → layered BigQuery → marts. MIT-Labs tone, production-minded.

1) System Diagram

CMEGroup.com GCS (raw_html / json) Pub/Sub Dataflow / Beam BQ Bronze BQ Silver BQ Gold Cloud Composer

2) Layered Data Model

Bronze

  • Normalized JSON + lineage (source_url, crawl_ts, hash_key).
  • Partition by DATE(crawl_ts); cluster by instrument_code, content_type.

Silver

  • Entities: products, contract_specs, calendar_events, education_articles.
  • Typed columns; dedupe by hash_key; business keys: instrument_code, as_of_date.

Gold

  • Analytics views & materialized views; freshness indicators.
  • Optional row-level security per audience.

DDL Sketch

CREATE TABLE IF NOT EXISTS `proj.ds_bronze.cme_pages`
PARTITION BY DATE(crawl_ts)
CLUSTER BY instrument_code, content_type AS
SELECT NULL AS instrument_code, NULL AS content_type, NULL AS source_url,
       NULL AS crawl_ts, NULL AS hash_key, NULL AS payload_json;

3) ETL & Incremental Merge

Extractor

HEADERS = {"User-Agent": "ResearchBot/1.0 (+contact@example.com)"}
def make_record(url, html):
    payload = parse(html)
    h = sha256((url + html)[:1_000_000].encode()).hexdigest()
    return {"source_url": url, "crawl_ts": NOW(), "hash_key": h, "payload_json": payload}

Merge

MERGE `proj.ds_silver.contract_specs` T
USING `proj.ds_bronze.cme_pages` S
ON T.hash_key = S.hash_key
WHEN NOT MATCHED AND S.content_type = 'contract_specs' THEN
  INSERT (instrument_code, as_of_ts, tick_size, contract_unit, hash_key)
  VALUES (JSON_VALUE(S.payload_json,'$.instrument_code'), S.crawl_ts,
          JSON_VALUE(S.payload_json,'$.attributes.tick_size'),
          JSON_VALUE(S.payload_json,'$.attributes.contract_unit'), S.hash_key);

4) Operations: Governance, Orchestration, IaC

Validation & Lineage

expect_column_values_to_not_be_null(["source_url","hash_key","crawl_ts"])
expect_column_values_to_match_regex("instrument_code", "^[A-Z0-9-_]{1,20}$")
  • _LINEAGE tables: source_url, crawl_ts, code_version.
  • Labels on datasets: layer, owner, domain.

Composer & Terraform

with DAG("cme_public_pipeline", schedule="@hourly", catchup=False): ...
resource "google_bigquery_dataset" "bronze" { dataset_id="ds_bronze"; location="US" }
  • Freshness SLA ≤ 2h; exponential retries.
  • Slack alerts on failure; escalate on repeated SLA breach.

5) Notes & Attribution

This page illustrates a conceptual design using public CMEGroup.com pages. Review Terms of Use and robots.txt. Do not store or redistribute proprietary market data. Educational demo only.