跳至主要內容
ESC
ACE 服務實戰 — 第 9/11 篇

ACE-214:Dataflow 與 Dataproc 深度解析——GCP 資料處理管線完全指南

ACE-214

前言

在 GCP 的資料生態系中,DataflowDataproc 是兩大資料處理引擎。一個全託管、自動擴縮、用 Apache Beam;另一個跑 Spark/Hadoop 叢集、支援開源生態系。ACE 考試幾乎必考:什麼時候用哪一個?

這篇文章是 ACE 進階系列第 14 課,帶你一次搞懂兩者的差異和選型策略。


第一部分:Dataflow

什麼是 Dataflow?

Dataflow 是 GCP 的全託管資料處理服務,基於 Apache Beam 開源框架:

資料來源                    Dataflow                    資料目標
├── Pub/Sub(串流)   →    ┌──────────────┐    →    ├── BigQuery
├── Cloud Storage     →    │ Apache Beam  │    →    ├── Bigtable
├── Bigtable          →    │  Pipeline    │    →    ├── Cloud Storage
├── BigQuery          →    │              │    →    ├── Pub/Sub
└── Kafka             →    │ 自動擴縮     │    →    └── Datastore
                           └──────────────┘

核心特性

  • 統一模型:同一份程式碼同時支援批次和串流處理
  • 全託管:不用管 Worker 數量,自動擴縮
  • Exactly-once 語義:預設保證每筆資料精確處理一次
  • SDK 支援:Java、Python、Go

Apache Beam 程式設計模型

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=DataflowRunner",
    "--project=my-project",
    "--region=asia-east1",
    "--temp_location=gs://my-bucket/temp",
])

with beam.Pipeline(options=options) as p:
    (
        p
        | "Read" >> beam.io.ReadFromText("gs://my-bucket/input.csv")
        | "Parse" >> beam.Map(lambda line: line.split(","))
        | "Filter" >> beam.Filter(lambda row: float(row[2]) > 100)
        | "Format" >> beam.Map(lambda row: {"name": row[0], "value": float(row[2])})
        | "Write" >> beam.io.WriteToBigQuery(
            "my-project:dataset.table",
            schema="name:STRING,value:FLOAT"
        )
    )

核心概念

概念說明
Pipeline整個資料處理流程
PCollection資料集合(不可變、可分散)
Transform資料轉換操作(Map、Filter、GroupByKey)
Runner執行引擎(DataflowRunner、DirectRunner)
Window時間視窗,將無限串流切成有限區間
Trigger何時輸出視窗結果

Windowing(時間視窗)

串流處理的核心就在這裡:把無限的資料流切成一段一段有限的區間。

import apache_beam as beam
from apache_beam import window

# 固定視窗:每 5 分鐘一個區間
events | beam.WindowInto(window.FixedWindows(300))

# 滑動視窗:30 分鐘大小,每 5 分鐘滑動一次
events | beam.WindowInto(window.SlidingWindows(1800, 300))

# Session 視窗:10 分鐘無活動就結束 session
events | beam.WindowInto(window.Sessions(600))

# Global 視窗:所有資料在同一個視窗(批次模式預設)
events | beam.WindowInto(window.GlobalWindows())

視窗類型比較

視窗大小重疊適合場景
Fixed固定不重疊每 N 分鐘聚合
Sliding固定重疊移動平均、趨勢
Session動態不重疊用戶行為分析
Global無限批次處理

Exactly-once 處理

Dataflow 預設就是 Exactly-once 語義:

At-least-once:保證處理,但可能重複
At-most-once:保證不重複,但可能遺失
Exactly-once:每筆資料精確處理一次(Dataflow 預設)

如果不需要 Exactly-once(例如日誌聚合允許少量誤差),可以切換到 At-least-once 模式以獲得更低延遲和更低成本。


Flex Templates vs Classic Templates

特性Classic TemplatesFlex Templates(推薦)
打包方式GCS 上的 JSONDocker Image
依賴管理打包時固定執行時解析
執行參數部分限制完全靈活
自訂環境不支援Docker 自訂
DAG 形狀打包時固定執行時動態
# 建立 Flex Template
gcloud dataflow flex-template build \
  gs://my-bucket/templates/my-pipeline.json \
  --image-gcr-path=asia-east1-docker.pkg.dev/my-project/my-repo/my-pipeline:latest \
  --sdk-language=PYTHON \
  --flex-template-base-image=PYTHON3

# 執行 Flex Template
gcloud dataflow flex-template run my-job \
  --template-file-gcs-location=gs://my-bucket/templates/my-pipeline.json \
  --region=asia-east1 \
  --parameters input=gs://my-bucket/input.csv

FlexRS(彈性資源排程)

這是批次作業省錢的招數,做法是混合用一般 VM 和 Spot VM:

# FlexRS 透過 Flex Template 執行時以 flexRSGoal 參數設定
gcloud dataflow flex-template run my-batch-job \
  --template-file-gcs-location=gs://my-bucket/templates/my-pipeline.json \
  --region=asia-east1 \
  --parameters flexRSGoal=COST_OPTIMIZED  # 成本優化(或 SPEED_OPTIMIZED)

# 或在 SDK pipeline options 中設定:--flexRSGoal=COST_OPTIMIZED
  • 比一般批次便宜約 40%
  • 延遲啟動(最長 6 小時內開始)
  • 適合非即時的批次 ETL

Dataflow 定價

資源BatchStreaming
vCPU$0.056 / hr$0.069 / hr
記憶體$0.003557 / GB-hr$0.003557 / GB-hr
磁碟(HDD)$0.000054 / GB-hr$0.000054 / GB-hr
磁碟(SSD)$0.000298 / GB-hr$0.000298 / GB-hr
Shuffle$0.011 / GB
Streaming Engine$0.018 / GB

Dataflow 沒有免費層,但 Google Cloud 新帳號有 $300 免費試用額度。


第二部分:Dataproc

什麼是 Dataproc?

Dataproc 是 GCP 的託管 Spark/Hadoop 叢集服務,適合已有大數據生態系的團隊:

Dataproc 叢集
├── Master Node(1-3 個)
│   ├── YARN Resource Manager
│   ├── HDFS NameNode
│   └── Hive Metastore

├── Worker Nodes(N 個)
│   ├── Spark Executor
│   ├── HDFS DataNode
│   └── YARN NodeManager

└── Secondary Workers(可選,可用 Spot/Preemptible VM)
    └── 純運算,不存 HDFS

支援的框架

框架用途
Apache Spark批次/串流處理、ML
Apache HadoopMapReduce 批次處理
Apache HiveSQL-on-Hadoop
Apache Pig資料轉換腳本
Trino互動式 SQL 查詢
Apache Flink串流處理

Dataproc Serverless

Google Cloud Serverless for Apache Spark(GA),叢集完全不用管,直接把 Spark Job 丟上去就好:

# 提交 Serverless Spark 作業
gcloud dataproc batches submit spark \
  --region=asia-east1 \
  --jars=gs://my-bucket/my-app.jar \
  --class=com.example.MySparkJob \
  -- arg1 arg2
特性Dataproc 叢集Dataproc Serverless
管理自建叢集全託管
啟動時間分鐘級秒級
計費叢集存續期間作業執行期間
自訂完全自訂限 Spark
適合長時間運行叢集短期批次作業

自動擴縮(Autoscaling)

# 建立自動擴縮策略
gcloud dataproc autoscaling-policies create my-policy \
  --region=asia-east1 \
  --max-instances=20 \
  --min-instances=2 \
  --scale-up-factor=1.0 \
  --scale-down-factor=0.5 \
  --cooldown-period=120s

# 套用到叢集
gcloud dataproc clusters create my-cluster \
  --region=asia-east1 \
  --autoscaling-policy=my-policy \
  --num-workers=3

Dataproc 的自動擴縮目前只支援 YARN-based 的應用(Spark、Hadoop、Hive)。


Spot VM 支援

Secondary Workers 可以使用 Spot VM 來降低成本:

gcloud dataproc clusters create my-cluster \
  --region=asia-east1 \
  --num-workers=3 \
  --num-secondary-workers=5 \
  --secondary-worker-type=spot
  • Spot VM 最多可省 60-91% 的運算費用
  • Secondary Workers 不儲存 HDFS 資料,被回收不影響資料

Component Gateway

Component Gateway 讓你安全存取 Web UI(無需 SSH Tunnel):

gcloud dataproc clusters create my-cluster \
  --region=asia-east1 \
  --enable-component-gateway \
  --optional-components=JUPYTER

支援的 Web UI:

  • Spark UI
  • YARN Resource Manager
  • Jupyter Notebook
  • Zeppelin

Dataproc 定價

Dataproc 費用 = Compute Engine VM 費用 + Dataproc 附加費

Dataproc 附加費:$0.01 / vCPU / 小時

以 3 Node 叢集(每 Node 4 vCPU)為例:

Dataproc 附加費:12 vCPU × $0.01 = $0.12/hr
Compute Engine:12 vCPU × ~$0.03 = ~$0.36/hr
總計:~$0.48/hr(Dataproc 只佔約 25%)

Dataflow vs Dataproc 選型

核心差異

特性DataflowDataproc
基礎Apache BeamSpark / Hadoop
管理全託管自建叢集
擴縮完全自動需設定策略
語言Java, Python, GoJava, Scala, Python, SQL(R 透過 SparkR)
串流原生支援Spark Structured Streaming
精確度Exactly-once(預設)At-least-once(預設)
延遲秒級秒~分鐘級
生態系Beam I/O Connectors完整 Hadoop/Spark 生態
GPU支援支援
最適合新建 ETL / 串流已有 Spark/Hadoop 程式

選型公式

新建 ETL Pipeline、需要串流處理?
  → Dataflow ✅

已有 Spark/Hadoop 程式碼要上雲?
  → Dataproc ✅

需要 Exactly-once 串流處理?
  → Dataflow ✅

需要跑 Hive SQL、Pig、Trino?
  → Dataproc ✅

短期批次作業、不想管叢集?
  → Dataproc Serverless ✅ 或 Dataflow ✅

需要互動式 Jupyter Notebook + Spark?
  → Dataproc ✅

與其他服務的關係

Pub/Sub(訊息佇列)


Dataflow(處理管線)──→ BigQuery(分析)
    │                         │
    ▼                         ▼
Bigtable(即時查詢)    Looker(報表)

Pub/Sub 是「管道」,Dataflow 是「處理引擎」
Pub/Sub 負責接收和緩衝訊息
Dataflow 負責轉換、聚合、寫入目標

ACE 考試重點整理

必背知識點

  1. Dataflow = 全託管 + Apache Beam + 自動擴縮 + Exactly-once
  2. Dataproc = 託管 Spark/Hadoop 叢集 + 開源生態系
  3. 新建管線選 Dataflow,遷移 Spark 選 Dataproc
  4. Windowing:Fixed(定時聚合)、Sliding(移動平均)、Session(用戶行為)
  5. Dataproc Serverless(正式名稱 Google Cloud Serverless for Apache Spark):不用管叢集的 Spark
  6. FlexRS:Dataflow 批次的成本優化,省約 40%
  7. Dataflow Flex Templates:Docker 打包,取代 Classic Templates

常見陷阱題

Q:需要即時處理 Pub/Sub 訊息流並寫入 BigQuery,選什麼? A:Dataflow。全託管、原生串流支援、Exactly-once 保證。

Q:已有 Spark ML Pipeline 要上雲,選什麼? A:Dataproc。直接跑既有 Spark 程式碼,不需要改寫成 Beam。

Q:Dataflow 和 Pub/Sub 有什麼不同? A:Pub/Sub 是訊息佇列(接收和分發訊息),Dataflow 是處理引擎(轉換、聚合、寫入)。兩者通常搭配使用。

Q:需要跑 Hive SQL 查詢大量歷史資料,選什麼? A:Dataproc。Hive 是 Hadoop 生態系的工具,在 GCP 上由 Dataproc 託管支援。(補充:BigQuery 也能透過外部資料表查詢 Hive 分區資料。)

Q:Dataflow 的 Exactly-once 和 Pub/Sub 的 At-least-once 有衝突嗎? A:不衝突。Pub/Sub 可能重複投遞,但 Dataflow 會在內部去重,保證下游結果是 Exactly-once。


總結

GCP 資料處理的雙引擎,核心要點:

Dataflow:

  • 全託管 Apache Beam、自動擴縮、Exactly-once
  • 統一批次/串流模型、Windowing(Fixed/Sliding/Session)
  • Flex Templates(Docker 打包)、FlexRS(省 40%)

Dataproc:

  • 託管 Spark/Hadoop/Hive/Trino 叢集
  • Google Cloud Serverless for Apache Spark(無叢集管理)
  • Spot VM 省 60-91%、Component Gateway 提供 Web UI

選型:新建 → Dataflow;遷移 Spark → Dataproc;短期 Spark → Serverless

下一課 GCP-114:Cloud DNS 入門,來看看 GCP 的全託管 DNS 服務怎麼用。

ACE 服務實戰 — 9/11 完成 查看系列全覽 →

留言討論

徽章解鎖!