ACE-214:Dataflow 與 Dataproc 深度解析——GCP 資料處理管線完全指南
前言
在 GCP 的資料生態系中,Dataflow 和 Dataproc 是兩大資料處理引擎。一個全託管、自動擴縮、用 Apache Beam;另一個跑 Spark/Hadoop 叢集、支援開源生態系。ACE 考試幾乎必考:什麼時候用哪一個?
這篇文章是 ACE 進階系列第 14 課,帶你一次搞懂兩者的差異和選型策略。
第一部分:Dataflow
什麼是 Dataflow?
Dataflow 是 GCP 的全託管資料處理服務,基於 Apache Beam 開源框架:
資料來源 Dataflow 資料目標
├── Pub/Sub(串流) → ┌──────────────┐ → ├── BigQuery
├── Cloud Storage → │ Apache Beam │ → ├── Bigtable
├── Bigtable → │ Pipeline │ → ├── Cloud Storage
├── BigQuery → │ │ → ├── Pub/Sub
└── Kafka → │ 自動擴縮 │ → └── Datastore
└──────────────┘
核心特性
- 統一模型:同一份程式碼同時支援批次和串流處理
- 全託管:不用管 Worker 數量,自動擴縮
- Exactly-once 語義:預設保證每筆資料精確處理一次
- SDK 支援:Java、Python、Go
Apache Beam 程式設計模型
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([
"--runner=DataflowRunner",
"--project=my-project",
"--region=asia-east1",
"--temp_location=gs://my-bucket/temp",
])
with beam.Pipeline(options=options) as p:
(
p
| "Read" >> beam.io.ReadFromText("gs://my-bucket/input.csv")
| "Parse" >> beam.Map(lambda line: line.split(","))
| "Filter" >> beam.Filter(lambda row: float(row[2]) > 100)
| "Format" >> beam.Map(lambda row: {"name": row[0], "value": float(row[2])})
| "Write" >> beam.io.WriteToBigQuery(
"my-project:dataset.table",
schema="name:STRING,value:FLOAT"
)
)
核心概念
| 概念 | 說明 |
|---|---|
| Pipeline | 整個資料處理流程 |
| PCollection | 資料集合(不可變、可分散) |
| Transform | 資料轉換操作(Map、Filter、GroupByKey) |
| Runner | 執行引擎(DataflowRunner、DirectRunner) |
| Window | 時間視窗,將無限串流切成有限區間 |
| Trigger | 何時輸出視窗結果 |
Windowing(時間視窗)
串流處理的核心就在這裡:把無限的資料流切成一段一段有限的區間。
import apache_beam as beam
from apache_beam import window
# 固定視窗:每 5 分鐘一個區間
events | beam.WindowInto(window.FixedWindows(300))
# 滑動視窗:30 分鐘大小,每 5 分鐘滑動一次
events | beam.WindowInto(window.SlidingWindows(1800, 300))
# Session 視窗:10 分鐘無活動就結束 session
events | beam.WindowInto(window.Sessions(600))
# Global 視窗:所有資料在同一個視窗(批次模式預設)
events | beam.WindowInto(window.GlobalWindows())
視窗類型比較
| 視窗 | 大小 | 重疊 | 適合場景 |
|---|---|---|---|
| Fixed | 固定 | 不重疊 | 每 N 分鐘聚合 |
| Sliding | 固定 | 重疊 | 移動平均、趨勢 |
| Session | 動態 | 不重疊 | 用戶行為分析 |
| Global | 無限 | — | 批次處理 |
Exactly-once 處理
Dataflow 預設就是 Exactly-once 語義:
At-least-once:保證處理,但可能重複
At-most-once:保證不重複,但可能遺失
Exactly-once:每筆資料精確處理一次(Dataflow 預設)
如果不需要 Exactly-once(例如日誌聚合允許少量誤差),可以切換到 At-least-once 模式以獲得更低延遲和更低成本。
Flex Templates vs Classic Templates
| 特性 | Classic Templates | Flex Templates(推薦) |
|---|---|---|
| 打包方式 | GCS 上的 JSON | Docker Image |
| 依賴管理 | 打包時固定 | 執行時解析 |
| 執行參數 | 部分限制 | 完全靈活 |
| 自訂環境 | 不支援 | Docker 自訂 |
| DAG 形狀 | 打包時固定 | 執行時動態 |
# 建立 Flex Template
gcloud dataflow flex-template build \
gs://my-bucket/templates/my-pipeline.json \
--image-gcr-path=asia-east1-docker.pkg.dev/my-project/my-repo/my-pipeline:latest \
--sdk-language=PYTHON \
--flex-template-base-image=PYTHON3
# 執行 Flex Template
gcloud dataflow flex-template run my-job \
--template-file-gcs-location=gs://my-bucket/templates/my-pipeline.json \
--region=asia-east1 \
--parameters input=gs://my-bucket/input.csv
FlexRS(彈性資源排程)
這是批次作業省錢的招數,做法是混合用一般 VM 和 Spot VM:
# FlexRS 透過 Flex Template 執行時以 flexRSGoal 參數設定
gcloud dataflow flex-template run my-batch-job \
--template-file-gcs-location=gs://my-bucket/templates/my-pipeline.json \
--region=asia-east1 \
--parameters flexRSGoal=COST_OPTIMIZED # 成本優化(或 SPEED_OPTIMIZED)
# 或在 SDK pipeline options 中設定:--flexRSGoal=COST_OPTIMIZED
- 比一般批次便宜約 40%
- 延遲啟動(最長 6 小時內開始)
- 適合非即時的批次 ETL
Dataflow 定價
| 資源 | Batch | Streaming |
|---|---|---|
| vCPU | $0.056 / hr | $0.069 / hr |
| 記憶體 | $0.003557 / GB-hr | $0.003557 / GB-hr |
| 磁碟(HDD) | $0.000054 / GB-hr | $0.000054 / GB-hr |
| 磁碟(SSD) | $0.000298 / GB-hr | $0.000298 / GB-hr |
| Shuffle | $0.011 / GB | — |
| Streaming Engine | — | $0.018 / GB |
Dataflow 沒有免費層,但 Google Cloud 新帳號有 $300 免費試用額度。
第二部分:Dataproc
什麼是 Dataproc?
Dataproc 是 GCP 的託管 Spark/Hadoop 叢集服務,適合已有大數據生態系的團隊:
Dataproc 叢集
├── Master Node(1-3 個)
│ ├── YARN Resource Manager
│ ├── HDFS NameNode
│ └── Hive Metastore
│
├── Worker Nodes(N 個)
│ ├── Spark Executor
│ ├── HDFS DataNode
│ └── YARN NodeManager
│
└── Secondary Workers(可選,可用 Spot/Preemptible VM)
└── 純運算,不存 HDFS
支援的框架
| 框架 | 用途 |
|---|---|
| Apache Spark | 批次/串流處理、ML |
| Apache Hadoop | MapReduce 批次處理 |
| Apache Hive | SQL-on-Hadoop |
| Apache Pig | 資料轉換腳本 |
| Trino | 互動式 SQL 查詢 |
| Apache Flink | 串流處理 |
Dataproc Serverless
Google Cloud Serverless for Apache Spark(GA),叢集完全不用管,直接把 Spark Job 丟上去就好:
# 提交 Serverless Spark 作業
gcloud dataproc batches submit spark \
--region=asia-east1 \
--jars=gs://my-bucket/my-app.jar \
--class=com.example.MySparkJob \
-- arg1 arg2
| 特性 | Dataproc 叢集 | Dataproc Serverless |
|---|---|---|
| 管理 | 自建叢集 | 全託管 |
| 啟動時間 | 分鐘級 | 秒級 |
| 計費 | 叢集存續期間 | 作業執行期間 |
| 自訂 | 完全自訂 | 限 Spark |
| 適合 | 長時間運行叢集 | 短期批次作業 |
自動擴縮(Autoscaling)
# 建立自動擴縮策略
gcloud dataproc autoscaling-policies create my-policy \
--region=asia-east1 \
--max-instances=20 \
--min-instances=2 \
--scale-up-factor=1.0 \
--scale-down-factor=0.5 \
--cooldown-period=120s
# 套用到叢集
gcloud dataproc clusters create my-cluster \
--region=asia-east1 \
--autoscaling-policy=my-policy \
--num-workers=3
Dataproc 的自動擴縮目前只支援 YARN-based 的應用(Spark、Hadoop、Hive)。
Spot VM 支援
Secondary Workers 可以使用 Spot VM 來降低成本:
gcloud dataproc clusters create my-cluster \
--region=asia-east1 \
--num-workers=3 \
--num-secondary-workers=5 \
--secondary-worker-type=spot
- Spot VM 最多可省 60-91% 的運算費用
- Secondary Workers 不儲存 HDFS 資料,被回收不影響資料
Component Gateway
Component Gateway 讓你安全存取 Web UI(無需 SSH Tunnel):
gcloud dataproc clusters create my-cluster \
--region=asia-east1 \
--enable-component-gateway \
--optional-components=JUPYTER
支援的 Web UI:
- Spark UI
- YARN Resource Manager
- Jupyter Notebook
- Zeppelin
Dataproc 定價
Dataproc 費用 = Compute Engine VM 費用 + Dataproc 附加費
Dataproc 附加費:$0.01 / vCPU / 小時
以 3 Node 叢集(每 Node 4 vCPU)為例:
Dataproc 附加費:12 vCPU × $0.01 = $0.12/hr
Compute Engine:12 vCPU × ~$0.03 = ~$0.36/hr
總計:~$0.48/hr(Dataproc 只佔約 25%)
Dataflow vs Dataproc 選型
核心差異
| 特性 | Dataflow | Dataproc |
|---|---|---|
| 基礎 | Apache Beam | Spark / Hadoop |
| 管理 | 全託管 | 自建叢集 |
| 擴縮 | 完全自動 | 需設定策略 |
| 語言 | Java, Python, Go | Java, Scala, Python, SQL(R 透過 SparkR) |
| 串流 | 原生支援 | Spark Structured Streaming |
| 精確度 | Exactly-once(預設) | At-least-once(預設) |
| 延遲 | 秒級 | 秒~分鐘級 |
| 生態系 | Beam I/O Connectors | 完整 Hadoop/Spark 生態 |
| GPU | 支援 | 支援 |
| 最適合 | 新建 ETL / 串流 | 已有 Spark/Hadoop 程式 |
選型公式
新建 ETL Pipeline、需要串流處理?
→ Dataflow ✅
已有 Spark/Hadoop 程式碼要上雲?
→ Dataproc ✅
需要 Exactly-once 串流處理?
→ Dataflow ✅
需要跑 Hive SQL、Pig、Trino?
→ Dataproc ✅
短期批次作業、不想管叢集?
→ Dataproc Serverless ✅ 或 Dataflow ✅
需要互動式 Jupyter Notebook + Spark?
→ Dataproc ✅
與其他服務的關係
Pub/Sub(訊息佇列)
│
▼
Dataflow(處理管線)──→ BigQuery(分析)
│ │
▼ ▼
Bigtable(即時查詢) Looker(報表)
Pub/Sub 是「管道」,Dataflow 是「處理引擎」
Pub/Sub 負責接收和緩衝訊息
Dataflow 負責轉換、聚合、寫入目標
ACE 考試重點整理
必背知識點
- Dataflow = 全託管 + Apache Beam + 自動擴縮 + Exactly-once
- Dataproc = 託管 Spark/Hadoop 叢集 + 開源生態系
- 新建管線選 Dataflow,遷移 Spark 選 Dataproc
- Windowing:Fixed(定時聚合)、Sliding(移動平均)、Session(用戶行為)
- Dataproc Serverless(正式名稱 Google Cloud Serverless for Apache Spark):不用管叢集的 Spark
- FlexRS:Dataflow 批次的成本優化,省約 40%
- Dataflow Flex Templates:Docker 打包,取代 Classic Templates
常見陷阱題
Q:需要即時處理 Pub/Sub 訊息流並寫入 BigQuery,選什麼? A:Dataflow。全託管、原生串流支援、Exactly-once 保證。
Q:已有 Spark ML Pipeline 要上雲,選什麼? A:Dataproc。直接跑既有 Spark 程式碼,不需要改寫成 Beam。
Q:Dataflow 和 Pub/Sub 有什麼不同? A:Pub/Sub 是訊息佇列(接收和分發訊息),Dataflow 是處理引擎(轉換、聚合、寫入)。兩者通常搭配使用。
Q:需要跑 Hive SQL 查詢大量歷史資料,選什麼? A:Dataproc。Hive 是 Hadoop 生態系的工具,在 GCP 上由 Dataproc 託管支援。(補充:BigQuery 也能透過外部資料表查詢 Hive 分區資料。)
Q:Dataflow 的 Exactly-once 和 Pub/Sub 的 At-least-once 有衝突嗎? A:不衝突。Pub/Sub 可能重複投遞,但 Dataflow 會在內部去重,保證下游結果是 Exactly-once。
總結
GCP 資料處理的雙引擎,核心要點:
Dataflow:
- 全託管 Apache Beam、自動擴縮、Exactly-once
- 統一批次/串流模型、Windowing(Fixed/Sliding/Session)
- Flex Templates(Docker 打包)、FlexRS(省 40%)
Dataproc:
- 託管 Spark/Hadoop/Hive/Trino 叢集
- Google Cloud Serverless for Apache Spark(無叢集管理)
- Spot VM 省 60-91%、Component Gateway 提供 Web UI
選型:新建 → Dataflow;遷移 Spark → Dataproc;短期 Spark → Serverless
下一課 GCP-114:Cloud DNS 入門,來看看 GCP 的全託管 DNS 服務怎麼用。