生成式人工智能驅動的數據工程:現代數據工程師至少應掌握哪些技能
生成式人工智能 (GenAI) 已從最初的好奇發展成為數據工程工作流程中的日常工具:我們現在只需一個精心設計的指令,就能起草架構、生成模式、啟動基礎設施模板,甚至生成隱私保護數據。本文將我的經驗與當前的行業實踐相結合,提煉成一份循序漸進的參考指南,供想成為數據工程師或處于職業生涯中期的人士參考。
為什么它今天如此重要?
人工智能工作負載的爆炸式增長: Apache Flink 2.0等實時分析平臺現在開箱即用地承擔 LLM 級延遲要求,迫使每個管道在幾毫秒內做出反應。
Lakehouse 成為 AI 原生:Databricks 的 Lakehouse AI統一了數據、ML 和治理,將 SQL、矢量搜索和模型服務放在同一平臺上。
隱私監管趨嚴:歐盟人工智能法案的最終文本要求合成內容生成器具有特殊的透明度,并要求更嚴格的數據治理日志。
一、面向提示的數據架構設計
大型語言模型 (LLM) 的成功秘訣在于,你可以反復練習題目,而不是一次性放棄所有要求。一個行之有效的方法:
1. 基本式提示
為醫院網絡創建詳細的數據架構設計,重點關注 EHR、PACS、LIMS 和安全性。
2.填空式提示
添加 HIPAA 級 RBAC、AES-256 靜態加密和審計跟蹤。
3.模型驗證提示
你做了哪些假設?列出缺失的NFR。
您可以嘗試以下更多操作:
二 LLM 生成的倉庫模式
LLM 可以一次性輸出星型或雪花型模式以及 SQL DDL。
輸入:
為銷售、客戶、庫存、員工和賣家設計一個星型模式。然后生成 DBML 格式的模式,并包含適當的 Table {} 塊以及所有外鍵的 Ref 關系。
輸出:
Table DimCustomer {
customer_id int [pk]
customer_name varchar ( 100 )
customer_email varchar ( 100 )
customer_phone varchar ( 20 )
}
Table DimEmployee {
employee_id int [pk]
employee_name varchar ( 100 )
employee_role varchar ( 50 )
employee_department varchar ( 50 )
}
Table DimSeller {
seller_id int [pk]
seller_name varchar ( 100 )
seller_contact varchar ( 100 )
seller_location varchar ( 100 )
}
Table DimInventory {
inventory_id int [pk]
product_name varchar ( 100 )
product_category varchar ( 50 )
amount_available int
seller_id int
}
Table FactSales {
sale_id int [pk]
sale_date date
sale_amount decimal(12,2)
customer_id int
employee_id int
inventory_id int
}
Ref: DimInventory.seller_id > DimSeller.seller_id
Ref: FactSales.customer_id > DimCustomer.customer_id
Ref: FactSales.employee_id > DimEmployee.employee_id
Ref: FactSales.inventory_id > DimInventory.inventory_id
驗證后,將 DDL 粘貼到dbdiagram.io中即可獲得即時 ERD。
最佳實踐更新:
優先使用Iceberg v1.4或Delta Lake 3.0表來獲得 ACID Lakehouse 保證。
用子句標記每個表和列COMMENT——GenAI 可以自動生成描述。
三、合成數據與增強
合成數據既解決了隱私問題,也解決了數據稀缺問題。常用的工具包括:
使用 ydata-sdk 的示例:
from ydata_sdk.datasets import load_tabular
from ydata_sdk.synth import CTGAN
data = load_tabular('insurance.csv')
model = CTGAN()
model.fit(data, num_epochs=50)
synthetic = model.generate(n_samples=len(data))
synthetic.to_csv('insurance_synth.csv', index=False)
數據匿名化工作
除了合成數據之外,你仍然需要直接匿名化。將簡單的 Python 轉換與差異隱私框架相結合。
導入重新
導入隨機
導入pandas作為pd
# 示例數據
data = [
{ "name" : "Rohit Sharma" , "age" : 36 , "email" : "rohit.sharma@example.com" , "phone" : "9876543210" },
{ "name" : "Anjali Verma" , "age" : 24 , "email" : "anjali.v@example.org" , "phone" : "9123456789" },
{ "name" : "David Johnson" , "age" : 43 , "email" : "david.j@example.net" , "phone" : "9012345678" }
]
# 創建 DataFrame
df = pd.DataFrame(data)
# 幫助器來編輯電子郵件
def redact_email ( email ):
username, domain = email.split( '@' )
return username[: 2 ] + "***@redacted.com"
# 匿名化函數
def anonymize ( row ):
row[ 'name' ] = re.sub( r'[aeiouAEIOU]' , '*' , row[ 'name' ])
row[ 'age' ] = f" {row[ 'age' ]// 10 } 0s"
row[ 'email' ] = redact_email(row[ 'email' ])
row[ 'phone' ] = row[ 'phone' ][:- 5 ] + str (random.randint( 10000 , 99999 ))
return row
# 應用匿名化
df_anonymized = df.apply(anonymize, axis= 1 )
# 顯示前幾行
df_anonymized.head()
對于更嚴謹的使用,OpenDP SmartNoise適用于 DP 保證版本。
監管提示:根據歐盟人工智能法案第 50 條,將合成或大量轉換的數據集標記為人工生成的。
四、GenAI輔助基礎設施規模調整
提示模板讓您可以根據工作負載導出云架構。
輸入:
“建議基礎設施堆棧每秒傳輸 25 萬個事件,每年存儲 3 PB 數據,并提供小于 200 毫秒的分析延遲。
輸出:
? 存儲建議
? 計算層
? 實時引擎
? 成本優化建議?!?/span>
GenAI 通常建議:
存儲— 對象(S3/GCS)+ 開放表格式 Lakehouse
計算——在 Kubernetes 上自動擴展 Spark;Flink 2.0用于流
實時——Kafka + Flink SQL,由低延遲 OLAP(ClickHouse)支持
成本杠桿——分層存儲、Graviton/ARM 節點、現貨隊列
使用云計算器進行驗證,然后通過另一個提示導出 Terraform JSON。示例:
輸入:
根據下面描述的基礎架構堆棧,生成 JSON 格式的 Terraform(*.tf.json)以在 AWS 上配置架構:
要求:
每秒 250K 個事件的流提取存儲 3 PB/年的數據(對象存儲 + lakehouse 格式)支持 <200ms 分析延遲使用自動擴展計算 Kubernetes 上的 Spark使用 Kafka + Flink 2.0 的實時引擎使用ClickHouse 進行低延遲分析
包括:
S3 存儲設置MSK(托管 Kafka)集群用于 Spark/Flink 工作負載的 EKS 集群基于 ClickHouse EC2 的集群IAM 角色、網絡基礎知識(VPC、子網)
成本優化:
分層 S3 存儲、Graviton2 EC2 節點、現貨車隊使用情況
輸出:
{
“provider” : {
“aws” : {
“region” : “us-east-1”
}
} ,
“resource” : {
“aws_s3_bucket” : {
“data_lake” : {
“bucket” : “my-event-data-lake” ,
“force_destroy” : true
}
} ,
“aws_msk_cluster” : {
“event_stream” : {
“cluster_name” : “event-stream-msk” ,
“kafka_version” : “3.4.0” ,
“number_of_broker_nodes” : 3 ,
“broker_node_group_info” : {
“instance_type” : “kafka.m5.large” ,
“client_subnets” : [ “subnet-abc123” , “subnet-def456” ] ,
“security_groups” : [ “sg-0123456789abcdef0” ]
}
}
} ,
“aws_eks_cluster” : {
“analytics_cluster” : {
“name” : “spark-flink-cluster” ,
“role_arn” : “arn:aws:iam :: 123456789012:role / eks-cluster-role” ,
“vpc_config” : {
“subnet_ids” : [ “subnet-abc123” , “subnet-def456” ]
}
}
} ,
“aws_eks_node_group” : {
“analytics_nodes” : {
“cluster_name” : “$ {aws_eks_cluster.analytics_cluster.name}” ,
“node_role_arn” : “arn:aws:iam :: 123456789012:role / eks-node-role” ,
“subnet_ids” : [ “subnet-abc123” , “subnet-def456” ] ,
“instance_types” : [ “m6g.large” ] ,
“scaling_config” : {
“desired_size” : 4 ,
“max_size” : 10 ,
“min_size” : 2
} ,
“capacity_type” : “SPOT”
}
} ,
“aws_security_group” : {
"clickhouse_sg" : {
"name" : "clickhouse-security-group" ,
"description" : "允許內部訪問 ClickHouse 節點" ,
"ingress" : [
{
"from_port" : 8123 ,
"to_port" : 8123 ,
"protocol" : "tcp" ,
"cidr_blocks" : [ "10.0.0.0/16" ]
}
] ,
"egress" : [
{
"from_port" : 0 ,
"to_port" : 0 ,
"protocol" : "-1" ,
"cidr_blocks" : [ "0.0.0.0/0" ]
}
] ,
"vpc_id" : "vpc-0123456789abcdef0"
}
} ,
"aws_instance" : {
"clickhouse_node" : {
“ami” : “ami-0abcdef1234567890” ,
“instance_type” : “m6g.large” ,
“count” : 3 ,
“vpc_security_group_ids” : [ “${aws_security_group.clickhouse_sg.id}” ] ,
“subnet_id” : “subnet-abc123” ,
“tags” : {
“名稱” : “clickhouse-analytics-node”
}
}
}
}
}
提示:
您必須將子網 ID、VPC ID、AMI和IAM 角色替換為實際值。
此配置不包括Flink 或 Spark 部署,這些部署將通過 Helm 圖表或 Kubernetes 清單在 EKS 內處理(Terraform 可以觸發 Helm)。
根據需要添加自動擴展組、數據分層策略和監控(例如 CloudWatch)。
五、LLMOps — 每個數據工程師必備
大型語言模型操作 (LLMOps) 位于數據工程和 MLOps 的交匯處。以下是一些核心最佳實踐:
六、小結
為了在不斷發展的數據和人工智能領域保持競爭力,掌握架構和基礎設施設計的快速工程技術如今已成為一項基礎技能。采用 Lakehouse 架構和開放表格式將簡化 GenAI 集成并簡化數據工作流程。此外,熟練掌握 Mostly.ai 或 YData 等合成數據生成工具以及 SmartNoise 等差異化隱私框架,對于確保數據合規性至關重要。雖然使用 GenAI 實現基礎設施決策自動化可以顯著提高效率,但根據成本、延遲和安全約束驗證這些決策至關重要。此外,隨著數據工程師承擔起維護生產環境中大型語言模型健康的責任,投資 LLMOps 也變得越來越重要。通過掌握上述技能,您將確保您的數據工程技能在 GenAI 持續重塑整個技術棧的背景下依然保持競爭力。