成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

并行計算框架Polars、Dask的數據處理性能對比

開發 測試
本文我們使用兩個類似的腳本來執行提取、轉換和加載(ETL)過程。

在Pandas 2.0發布以后,我們發布過一些評測的文章,這次我們看看,除了Pandas以外,常用的兩個都是為了大數據處理的并行數據框架的對比測試。

本文我們使用兩個類似的腳本來執行提取、轉換和加載(ETL)過程。

測試內容

這兩個腳本主要功能包括:

從兩個parquet 文件中提取數據,對于小型數據集,變量path1將為“yellow_tripdata/ yellow_tripdata_2014-01”,對于中等大小的數據集,變量path1將是“yellow_tripdata/yellow_tripdata”。對于大數據集,變量path1將是“yellow_tripdata/yellow_tripdata*.parquet”;

進行數據轉換:a)連接兩個DF,b)根據PULocationID計算行程距離的平均值,c)只選擇某些條件的行,d)將步驟b的值四舍五入為2位小數,e)將列“trip_distance”重命名為“mean_trip_distance”,f)對列“mean_trip_distance”進行排序。

將最終的結果保存到新的文件。

腳本

1、Polars

數據加載讀取

def extraction():
    """
    Extract two datasets from parquet files
    """
    path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"
    df_trips= pl_read_parquet(path1,)
    path2 = "taxi+_zone_lookup.parquet"
    df_zone = pl_read_parquet(path2,)
 
    return df_trips, df_zone
 
 def pl_read_parquet(path, ):
    """
    Converting parquet file into Polars dataframe
    """
    df= pl.scan_parquet(path,)
    return df

轉換函數

def transformation(df_trips, df_zone):
    """
    Proceed to several transformations
    """
    df_trips= mean_test_speed_pl(df_trips, )
     
    df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)
    df = df.select(["Borough","Zone","trip_distance",])
   
    df = get_Queens_test_speed_pd(df)
    df = round_column(df, "trip_distance",2)
    df = rename_column(df, "trip_distance","mean_trip_distance")
 
    df = sort_by_columns_desc(df, "mean_trip_distance")
    return df
 
 
 def mean_test_speed_pl(df_pl,):
    """
    Getting Mean per PULocationID
    """
    df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
    return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
    """
    Only getting Borough in Queens
    """
 
    df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
    return df_pl
 
 def round_column(df, column,to_round):
    """
    Round numbers on columns
    """
    df = df.with_columns(pl.col(column).round(to_round))
    return df
 
 def rename_column(df, column_old, column_new):
    """
    Renaming columns
    """
    df = df.rename({column_old: column_new})
    return df
 
 def sort_by_columns_desc(df, column):
    """
    Sort by column
    """
    df = df.sort(column, descending=True)
    return df

保存

def loading_into_parquet(df_pl):
    """
    Save dataframe in parquet
    """
    df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')

其他代碼

import polars as pl
 import time
 
 def pl_read_parquet(path, ):
    """
    Converting parquet file into Polars dataframe
    """
    df= pl.scan_parquet(path,)
    return df
 
 def mean_test_speed_pl(df_pl,):
    """
    Getting Mean per PULocationID
    """
    df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
    return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
    """
    Only getting Borough in Queens
    """
 
    df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
    return df_pl
 
 def round_column(df, column,to_round):
    """
    Round numbers on columns
    """
    df = df.with_columns(pl.col(column).round(to_round))
    return df
 
 def rename_column(df, column_old, column_new):
    """
    Renaming columns
    """
    df = df.rename({column_old: column_new})
    return df
 
 
 def sort_by_columns_desc(df, column):
    """
    Sort by column
    """
    df = df.sort(column, descending=True)
    return df
 
 
 def main():
     
    print(f'Starting ETL for Polars')
    start_time = time.perf_counter()
 
    print('Extracting...')
    df_trips, df_zone =extraction()
        
    end_extract=time.perf_counter() 
    time_extract =end_extract- start_time
 
    print(f'Extraction Parquet end in {round(time_extract,5)} seconds')
    print('Transforming...')
    df = transformation(df_trips, df_zone)
    end_transform = time.perf_counter() 
    time_transformation =time.perf_counter() - end_extract
    print(f'Transformation end in {round(time_transformation,5)} seconds')
    print('Loading...')
    loading_into_parquet(df,)
    load_transformation =time.perf_counter() - end_transform
    print(f'Loading end in {round(load_transformation,5)} seconds')
    print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")
 
 
 if __name__ == "__main__":
     
    main()

2、Dask

函數功能與上面一樣,所以我們把代碼整合在一起:

import dask.dataframe as dd
 from dask.distributed import Client
 import time
 
 def extraction():
    path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"
    df_trips = dd.read_parquet(path1)
    path2 = "taxi+_zone_lookup.parquet"
    df_zone = dd.read_parquet(path2)
 
    return df_trips, df_zone
 
 def transformation(df_trips, df_zone):
    df_trips = mean_test_speed_dask(df_trips)
    df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")
    df = df[["Borough", "Zone", "trip_distance"]]
 
    df = get_Queens_test_speed_dask(df)
    df = round_column(df, "trip_distance", 2)
    df = rename_column(df, "trip_distance", "mean_trip_distance")
 
    df = sort_by_columns_desc(df, "mean_trip_distance")
    return df
 
 def loading_into_parquet(df_dask):
    df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")
 
 def mean_test_speed_dask(df_dask):
    df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})
    return df_dask
 
 def get_Queens_test_speed_dask(df_dask):
    df_dask = df_dask[df_dask["Borough"] == "Queens"]
    return df_dask
 
 def round_column(df, column, to_round):
    df[column] = df[column].round(to_round)
    return df
 
 def rename_column(df, column_old, column_new):
    df = df.rename(columns={column_old: column_new})
    return df
 
 def sort_by_columns_desc(df, column):
    df = df.sort_values(column, ascending=False)
    return df
 
 
 
 def main():
    print("Starting ETL for Dask")
    start_time = time.perf_counter()
 
    client = Client() # Start Dask Client
 
    df_trips, df_zone = extraction()
 
    end_extract = time.perf_counter()
    time_extract = end_extract - start_time
 
    print(f"Extraction Parquet end in {round(time_extract, 5)} seconds")
    print("Transforming...")
    df = transformation(df_trips, df_zone)
    end_transform = time.perf_counter()
    time_transformation = time.perf_counter() - end_extract
    print(f"Transformation end in {round(time_transformation, 5)} seconds")
    print("Loading...")
    loading_into_parquet(df)
    load_transformation = time.perf_counter() - end_transform
    print(f"Loading end in {round(load_transformation, 5)} seconds")
    print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")
 
    client.close() # Close Dask Client
 
 if __name__ == "__main__":
    main()

測試結果對比

1、小數據集

我們使用164 Mb的數據集,這樣大小的數據集對我們來說比較小,在日常中也時非常常見的。

下面是每個庫運行五次的結果:

Polars

Dask

2、中等數據集

我們使用1.1 Gb的數據集,這種類型的數據集是GB級別,雖然可以完整的加載到內存中,但是數據體量要比小數據集大很多。

Polars

Dask

3、大數據集

我們使用一個8gb的數據集,這樣大的數據集可能一次性加載不到內存中,需要框架的處理。

Polars

Dask

總結

從結果中可以看出,Polars和Dask都可以使用惰性求值。所以讀取和轉換非常快,執行它們的時間幾乎不隨數據集大小而變化;

可以看到這兩個庫都非常擅長處理中等規模的數據集。

由于polar和Dask都是使用惰性運行的,所以下面展示了完整ETL的結果(平均運行5次)。

Polars在小型數據集和中型數據集的測試中都取得了勝利。但是,Dask在大型數據集上的平均時間性能為26秒。

這可能和Dask的并行計算優化有關,因為官方的文檔說“Dask任務的運行速度比Spark ETL查詢快三倍,并且使用更少的CPU資源”。

上面是測試使用的電腦配置,Dask在計算時占用的CPU更多,可以說并行性能更好。

責任編輯:華軒 來源: DeepHub IMBA
相關推薦

2017-11-02 13:20:08

數據處理PythonNumpy

2025-05-06 07:15:00

Dask并行計算大數據

2019-04-18 09:15:05

DaskPython計算

2017-11-21 15:50:09

FlinkStorm性能

2017-11-20 13:54:55

FlinkStorm框架

2012-08-17 09:32:52

Python

2025-06-10 02:33:00

RustPython庫代碼

2018-03-13 12:51:12

Python數據函數

2018-06-07 15:58:52

Python函數數據

2011-04-21 09:13:14

并行計算

2010-03-22 14:45:40

云計算

2024-01-05 08:46:50

ReactVue

2011-04-20 17:15:21

并行計算

2014-04-24 10:25:15

2025-03-26 00:07:00

代碼Polars技巧

2021-06-01 05:51:37

云計算并行計算分布式計算

2014-01-21 11:16:59

MPI并行計算

2020-12-11 18:58:21

Nginx瀏覽器緩存

2022-11-24 12:07:47

英特爾

2017-04-24 12:07:44

Spark大數據并行計算
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 黄色在线免费观看 | 国产成人影院 | 日韩三区 | 成人小视频在线观看 | 国产日韩欧美在线一区 | 澳门永久av免费网站 | 成人国产免费视频 | 国产成人免费视频网站高清观看视频 | 日本淫视频 | 一区二区电影网 | 黄视频网站免费观看 | 性天堂网| www.黄色在线观看 | 国产精品久久久久久久久久久免费看 | 人人草天天草 | 九色网址 | 本道综合精品 | 日本午夜免费福利视频 | 欧美一区二区在线观看 | 欧美一级免费片 | 欧美舔穴 | 97在线播放 | 欧美成人精品一区二区三区 | 人人干超碰 | 黄色在线免费看 | 91大片| 天天天天操 | 欧美精品a∨在线观看不卡 国产精品久久国产精品 | 中文字幕一区在线观看视频 | 日日干天天操 | 亚洲在线免费观看 | 亚洲区一 | 国产一区二区三区久久久久久久久 | 午夜精品久久久久久久99黑人 | 国产精品视频入口 | 日韩欧美在线视频 | 国产69久久精品成人看动漫 | 99综合 | 欧美一区二区三区免费电影 | 宅男伊人 | 日韩视频91 |