Pandas 進階秘籍:五招搞定復雜數據處理,效率飆升!
Pandas 是 Python 數據分析領域當之無愧的瑞士軍刀,其強大的數據結構 DataFrame 和 Series 使得數據清洗、轉換、分析變得高效便捷。然而,當面對日益復雜的數據場景和性能要求時,僅僅掌握基礎操作往往捉襟見肘。
本文將深入探討 5 個 Pandas 進階實用技巧,助您優雅地應對復雜數據處理挑戰,顯著提升代碼質量和運行效率。
一、鏈式操作 (Method Chaining):優雅的數據流
鏈式操作,也稱為流式接口 (Fluent Interface),是將一系列 Pandas 方法串聯起來,形成一個清晰、連貫的數據處理管道。這種風格不僅代碼更易讀、易維護,還能有效避免創建不必要的中間變量,優化內存使用。
1. 為什么選擇鏈式操作?
- 可讀性強:代碼從上到下或從左到右,清晰地展示了數據轉換的每一步。
- 簡潔性高:減少了中間變量的聲明,使代碼更加緊湊。
- 易于調試:雖然長鏈可能看起來復雜,但可以通過逐個添加方法或使用 pipe() 方法(見后文)進行調試。
- 性能潛力:在某些情況下,Pandas 內部可能對鏈式操作進行優化。
2. 示例:傳統方式 vs. 鏈式操作
假設我們需要對一個銷售數據 DataFrame (df) 進行如下操作:
- 篩選出 'Category' 為 'Electronics' 的記錄。
- 按 'Sub-Category' 分組。
- 計算每個子類的總銷售額 ('Sales') 和平均利潤 ('Profit')。
- 按總銷售額降序排序。
- 取銷售額最高的前 5 個子類。
(1) 傳統方式 (創建中間變量):
# 假設 df 是已加載的銷售數據 DataFrame
electronics_df = df[df['Category'] == 'Electronics']
grouped_df = electronics_df.groupby('Sub-Category')
agg_df = grouped_df.agg(
TotalSales=('Sales', 'sum'),
AverageProfit=('Profit', 'mean')
)
sorted_df = agg_df.sort_values(by='TotalSales', ascending=False)
top_5_subcategories = sorted_df.head(5)
print(top_5_subcategories)
(2) 鏈式操作:
# 假設 df 是已加載的銷售數據 DataFrame
top_5_subcategories_chained = (
df[df['Category'] == 'Electronics'] # 1. 篩選
.groupby('Sub-Category') # 2. 分組
.agg( # 3. 聚合
TotalSales=('Sales', 'sum'),
AverageProfit=('Profit', 'mean')
)
.sort_values(by='TotalSales', ascending=False) # 4. 排序
.head(5) # 5. 取前5
)
print(top_5_subcategories_chained)
注釋:鏈式操作通過將每個方法調用的結果直接作為下一個方法調用的對象,形成了一個流暢的管道。使用圓括號 () 包裹整個鏈條可以方便地進行多行書寫,提高可讀性。
3. 提升鏈式操作可讀性的技巧
- 適當換行和縮進:如上例所示,每個 .method() 調用占一行。
- 添加注釋:在每個步驟后添加簡短注釋說明其作用。
- 使用 pipe() 方法:對于需要傳遞 DataFrame 給自定義函數或不易直接鏈式調用的函數,pipe() 非常有用(詳見技巧二)。
二、pipe() 方法:自定義函數的無縫融入
當鏈式操作中需要應用一個自定義函數,或者某個庫函數不直接支持在 DataFrame/Series 對象上調用時,pipe() 方法就派上了用場。它允許你將 DataFrame 或 Series 作為第一個參數傳遞給指定的函數,并將函數的返回值作為 pipe() 的結果,從而保持鏈式操作的流暢性。
1. pipe() 的優勢
- 保持鏈式結構:即使需要調用外部函數,也能維持代碼的流式風格。
- 封裝復雜邏輯:可以將復雜的數據處理步驟封裝在自定義函數中,然后通過 pipe() 調用。
- 傳遞額外參數:pipe() 允許向目標函數傳遞額外的參數。
2. 示例:使用 pipe() 進行自定義數據清洗
假設我們有一個自定義函數 clean_text_column(df, column_name) 用于清洗 DataFrame 中的某個文本列(例如轉換為小寫、去除特殊字符)。
import pandas as pd
import re
# 示例 DataFrame
data = {'ID': [1, 2, 3],
'Description': ['Product A - NEW!', 'Item B (Old Model)', 'Widget C*']}
df_text = pd.DataFrame(data)
# 自定義清洗函數
def clean_text_column(dataframe, column_to_clean, remove_chars_pattern=r'[^a-zA-Z0-9\s]'):
"""清洗指定文本列:轉小寫,移除特定字符"""
df_copy = dataframe.copy() # 避免修改原始 DataFrame
df_copy[column_to_clean] = (
df_copy[column_to_clean]
.str.lower() # 轉小寫
.str.replace(remove_chars_pattern, '', regex=True) # 移除特定字符
.str.strip() # 去除首尾空格
)
return df_copy
# 使用 pipe() 調用自定義函數
cleaned_df = (
df_text
.pipe(clean_text_column, column_to_clean='Description') # 將 df_text 作為第一個參數傳遞
# 可以在這里繼續鏈接其他 Pandas 操作
# .assign(DescriptionLength=lambda x: x['Description'].str.len())
)
print(cleaned_df)
注釋:df_text.pipe(clean_text_column, column_to_clean='Description') 實際上等同于 clean_text_column(df_text, column_to_clean='Description'),但它允許我們將其嵌入到鏈式操作中。column_to_clean='Description' 是傳遞給 clean_text_column 函數的額外命名參數。
三、explode() 與 stack()/unstack():處理復雜結構數據
當 DataFrame 中的某一列包含列表、元組或其他可迭代對象,或者當需要重塑多級索引的數據時,explode()、stack() 和 unstack() 方法非常有用。
1. explode():將列表式數據展開為多行
如果 DataFrame 的某一列包含列表或類似列表的條目,而你希望將每個列表元素擴展成單獨的行,保留其他列的值,explode() 是理想選擇。
示例;
data_explode = {'OrderID': [1, 2],
'Products': [['Apple', 'Banana'], ['Orange', 'Grape', 'Apple']]}
df_explode_before = pd.DataFrame(data_explode)
print("原始 DataFrame:\n", df_explode_before)
# 使用 explode() 展開 'Products' 列
df_exploded = df_explode_before.explode('Products').reset_index(drop=True)
print("\nExplode 之后:\n", df_exploded)
注釋:explode('Products') 將 Products 列中的每個列表元素拆分成新行,OrderID 的值會相應復制。reset_index(drop=True) 用于重置索引,避免因展開產生重復索引。
2. stack() 與 unstack():重塑多級索引
stack() 和 unstack() 主要用于處理具有多級索引 (MultiIndex) 的 DataFrame,在寬格式 (wide format) 和長格式 (long format) 數據之間進行轉換。
- stack(): 將 DataFrame 的列“堆疊”到索引中,通常會使 DataFrame 變得更“長”(行數增加,列數減少)。
- unstack(): stack() 的逆操作,將索引的某個級別“解堆”到列中,通常使 DataFrame 變得更“寬”。
示例:
# 創建一個帶有多級列索引的 DataFrame
header = pd.MultiIndex.from_product([['Year1', 'Year2'], ['Sales', 'Profit']])
data_multiindex = [[100, 10, 120, 15], [150, 20, 130, 18]]
df_wide = pd.DataFrame(data_multiindex, index=['StoreA', 'StoreB'], columns=header)
print("原始寬格式 DataFrame (多級列索引):\n", df_wide)
# 使用 stack() 將最內層的列索引級別 (Sales, Profit) 堆疊到行索引
df_long_stacked = df_wide.stack(level=-1) # level=-1 表示最內層列索引
print("\nStack 之后 (長格式):\n", df_long_stacked)
# 使用 unstack() 將剛剛堆疊的級別解堆回列
df_unstacked_back = df_long_stacked.unstack(level=-1) # level=-1 表示最內層行索引
print("\nUnstack 回去:\n", df_unstacked_back)
注釋:stack() 和 unstack() 的 level 參數指定了要操作的索引級別(可以是名稱或整數位置)。這些操作對于時間序列分析、面板數據處理以及準備用于特定可視化或統計模型的數據非常關鍵。
四、assign():動態創建新列的利器
在鏈式操作中,如果需要基于現有列計算并添加新列,assign() 方法提供了一種非常優雅和函數式的方式。它返回一個新的 DataFrame,包含原始列和新添加的列,而不會修改原始 DataFrame。
1. assign() 的特點
- 鏈式友好:完美融入鏈式操作。
- 函數式編程風格:可以使用 lambda 函數動態定義新列的計算邏輯。
- 可讀性高:新列的名稱和計算方式一目了然。
- 創建多個列:可以一次性創建多個新列。
2. 示例:使用 assign() 計算衍生指標
data_assign = {'Item': ['A', 'B'], 'Price': [10, 20], 'Quantity': [5, 3]}
df_sales = pd.DataFrame(data_assign)
df_with_revenue = (
df_sales
.assign(
Revenue=lambda x: x['Price'] * x['Quantity'], # 新列 Revenue
DiscountedPrice=lambda x: x['Price'] * 0.9 # 新列 DiscountedPrice
)
.assign(
# 可以在后續的 assign 中使用前面 assign 創建的列
FinalRevenue=lambda x: x['DiscountedPrice'] * x['Quantity']
)
)
print(df_with_revenue)
注釋:assign() 的參數是 新列名=計算邏輯。計算邏輯可以是一個標量、一個 Series,或者最常用的是一個接受 DataFrame (通常用 x 或 df_ 表示) 并返回新列值的 lambda 函數。Lambda 函數中的 x 代表調用 assign() 的那個時刻的 DataFrame 狀態。
五、優化性能:eval() 與 query(),以及數據類型選擇
當處理大型 DataFrame 時,性能成為關鍵考量。Pandas 提供了一些方法來優化表達式計算和數據篩選,同時合理選擇數據類型也能顯著提升效率和減少內存占用。
1. eval() 和 query():高效表達式計算與篩選
- df.eval(expression_string): 使用 Numexpr 庫(如果已安裝)或 Python 的 eval() 來高效計算字符串形式的列表達式,尤其在涉及多個列的算術運算時,可以避免生成大量中間 Series,從而節省內存和提高速度。
- df.query(expression_string): 同樣使用 Numexpr 進行布爾表達式的求值,用于高效篩選行,語法比傳統的布爾索引更簡潔。
示例:
# 創建一個較大的示例 DataFrame
import numpy as np
size = 100000
df_large = pd.DataFrame({
'A': np.random.rand(size),
'B': np.random.rand(size),
'C': np.random.rand(size),
'D': np.random.randint(0, 10, size)
})
# 使用 eval() 計算新列
df_large['E_eval'] = df_large.eval('A + B * C - D/2')
# 傳統方式計算新列 (可能較慢且占用更多內存)
# df_large['E_traditional'] = df_large['A'] + df_large['B'] * df_large['C'] - df_large['D']/2
# 使用 query() 進行篩選
threshold_a = 0.5
threshold_d = 5
# 可以在表達式字符串中通過 @ 符號引用外部變量
filtered_df_query = df_large.query('A > @threshold_a and D < @threshold_d and B > C')
# 傳統方式篩選 (可能較冗長)
# filtered_df_traditional = df_large[
# (df_large['A'] > threshold_a) &
# (df_large['D'] < threshold_d) &
# (df_large['B'] > df_large['C'])
# ]
print("使用 eval 計算的 E 列的前5行:\n", df_large[['E_eval']].head())
print("\n使用 query 篩選后的 DataFrame 形狀:", filtered_df_query.shape)
注釋:eval() 和 query() 的表達式是字符串。對于 query(),可以使用 @variable_name 的方式在查詢字符串中引用 Python 環境中的變量。這些方法在 DataFrame 較大時性能優勢更明顯。
2. 合理選擇數據類型 (dtypes)
Pandas 默認的數據類型(如 int64, float64, object)可能不是最優的,尤其對于內存有限或追求極致性能的場景。
- 數值類型:如果整數范圍較小,可以使用 int8, int16, int32 代替 int64。浮點數類似,float32 代替 float64。
- **分類類型 (Category)**:對于基數(唯一值數量)遠小于數據總長度的字符串列(如性別、國家、產品類別),將其轉換為 category 類型可以大幅減少內存占用并加快分組等操作。
- **日期時間類型 (Datetime)**:確保日期時間字符串被正確解析為 datetime64[ns] 類型,以便利用 Pandas 強大的日期時間功能。
示例:轉換數據類型
# 假設 df_large 是上面創建的 DataFrame
print("\n原始數據類型及內存占用:")
df_large.info(memory_usage='deep')
# 轉換 'D' 列為更小的整數類型 (如果適用)
# 先檢查 D 的取值范圍
# print(df_large['D'].min(), df_large['D'].max()) # 0-9,int8 足夠
df_large['D_optimized'] = df_large['D'].astype('int8')
# 假設 'Category_String' 是一列基數較低的字符串
df_large['Category_String'] = pd.Series(np.random.choice(['X', 'Y', 'Z'], size=size, p=[0.6, 0.3, 0.1]))
df_large['Category_Optimized'] = df_large['Category_String'].astype('category')
print("\n優化后數據類型及內存占用:")
df_large.info(memory_usage='deep')
注釋:使用 astype() 方法轉換數據類型。在轉換前,最好先了解列中數據的實際范圍和特性。使用 df.info(memory_usage='deep') 可以查看更準確的內存占用情況(特別是對于 object 類型的列)。
總結
掌握 Pandas 的進階技巧對于高效處理日益復雜的數據至關重要。通過運用鏈式操作提升代碼可讀性與流暢性,借助 pipe() 無縫集成自定義函數,利用 explode()、stack()/unstack() 巧妙重塑數據結構,使用 assign() 優雅創建新列,以及通過 eval()/query() 和合理選擇數據類型 來優化性能,你將能夠更自信、更專業地駕馭 Pandas,將數據分析工作提升到新的高度。
不斷實踐這些技巧,并結合具體的數據問題進行思考,你將發現Pandas的強大遠不止于此。