Java 和 Python 是當今最流行的兩種計算機語言。兩者都非常成熟,并提供了工具和技術生態系統,幫助我們解決數據科學領域出現的挑戰性問題。每種語言都各有優勢,我們要知道什么時候應該使用哪種工具,或者什么時候它們應該協同工作相互補充。
Python 是一種動態類型語言,使用起來非常簡單,如果我們不想接觸復雜的程序,它肯定是進行復雜計算的首選語言。Python 提供了優秀的庫(Pandas、NumPy、Matplotlib、ScyPy、PyTorch、TensorFlow 等)來支持對數據結構或數組的邏輯、數學和科學操作。
Java 是一種非常健壯的語言,具有強類型,因此有更嚴格的語法規則,所以不易出現程序錯誤。與Python一樣,它也提供了大量的庫來處理數據結構、線性代數、機器學習和數據處理(ND4J、Mahout、Spark、Deeplearning4J 等)。
本文將介紹如何對大量表格數據進行簡單的數據分析,并使用 Java 和 Python 計算一些統計數據。我們可以看到使用各個平臺進行數據分析的不同技術,對比它們的擴展方式,以及應用并行計算來提高其性能的可行性。
提出問題
我們要對不同州的一大批城市的價格做一個簡單的分析,這里假設有一個包含此信息的 CSV 文件。閱讀文件并繼續過濾掉一些州,并將剩余的州按城市-州分組以進行一些基本統計。希望能夠找到有效執行的解決方案,并且能夠隨著輸入數據規模的增長而有良好的擴展。
數據樣本是:
城市 | 州 | 基本價格 | 實際價格 |
La Jose | PA | 34.17 | 33.19 |
Preachers Slough | WA | 27,46 | 90.17 |
Doonan Corners | NY | 92.0 | 162.46 |
Doonan Corners | NY | 97.45 | 159.46 |
Castle Rock | WA | 162.16 | 943.21 |
Marble Rock | IA | 97.13 | 391.49 |
Mineral | CA | 99.13 | 289.37 |
Blountville | IN | 92.50 | 557.66 |
Blountsville | IN | 122.50 | 557.66 |
Coe | IN | 187.85 | 943.98 |
Cecilia | KY | 92.85 | 273.61 |
目的是展示如何使用 Java 和 Python 解決這些類型的問題。該示例非常簡單且范圍有限,但很容易拓展到更具挑戰性的問題。
Java 的方法
首先定義一個封裝數據元素的 Java 記錄:
record InputEntry(String city, String state, double basePrice, double actualPrice) {}
記錄(record)是 JDK 14 中引入的一種新型類型聲明。它是定義提供構造函數、訪問器、equals 和哈希實現的不可變類的一種簡捷方式。
接下來,讀取 CVS 文件并將它們增加到一個列表中:
List<InputEntry> inputEntries = readRecordEntriesFromCSVFile(recordEntries.csv);
為了按城市和州對輸入的元素進行分組,將其定義:
record CityState(String city, String state) {};
使用以下類來封裝屬于一個組的所有元素的統計信息:
record StatsAggregation(StatsAccumulator basePrice, StatsAccumulator actualPrice) {}
StatsAccumulator是Guava 庫的一部分??梢詫㈦p精度值集合添加到類中,它會計算基本統計數據,例如計數、平均值、方差或標準差??梢允褂肧tatsAccumulator來獲取InputEntry的basePrice和actualPrice的統計數據。
現在我們已經擁有了解決問題的所有材料。Java Streams提供了一個強大的框架來實現數據操作和分析。它的聲明式編程風格,對選擇、過濾、分組和聚合的支持,簡化了數據操作和統計分析。它的框架還提供了一個強大的實現,可以處理大量的(甚至是無限的流),并通過使用并行性、懶惰性和短路操作來高效處理。所有這些特性使Java Streams成為解決這類問題的絕佳選擇。實現非常簡單:
Map<CityState, StatsAggregation> stats = inputEntries.stream().
filter(i -> !(i.state().equals("MN") || i.state().equals("CA"))).collect(
groupingBy(entry -> new CityState(entry.city(), entry.state()),
collectingAndThen(Collectors.toList(),
list -> {StatsAccumulator sac = new StatsAccumulator();
sac.addAll(list.stream().mapToDouble(InputEntry::basePrice));
StatsAccumulator sas = new StatsAccumulator();
sas.addAll(list.stream().mapToDouble(InputEntry::actualPrice));
return new StatsAggregation(sac, sas);}
)));
在代碼的第 2 行,我們使用Stream::filter. 這是一個布爾值函數,用于過濾列表中的元素。可以實現一個 lambda 表達式來刪除任何包含“MN”或“CA”狀態的元素。
然后繼續收集列表的元素并調用Collectors::groupingBy()(第 3 行),它接受兩個參數:
- 一個分類功能,使用CityState記錄來做城市和州的分組(第3行)。
- 下游的收集器,包含屬于同一<城州>的元素。使用Collectors::collectingAndThen(第 4 行),它采用兩個參數分兩步進行歸約:
·我們使用Collectors::toList(第 4 行),它返回一個收集器,它將屬于同一<城州>的所有元素放到一個列表中。
·隨后對這個列表進行了整理轉換。使用一個lambda函數(第5行至第9行)來定義兩個StatsAccumulator(s),在這里分別計算前一個列表中的basePrice和actualPrice元素的統計數據。最后,返回到新創建的包含這些元素的StatsAggregation記錄。
正如前文所述,使用Java Streams的優勢之一是,它提供了一種簡單的機制,可以使用多線程進行并行處理。這允許利用CPU的多核資源,同時執行多個線程。只要在流中添加一個 "parallel":
Map<CityState, StatsAggregation> stats = inputEntries.stream().parallel().
這導致流框架將元素列表細分為多個部分,并同時在單獨的線程中運行它們。隨著所有不同的線程完成它們的計算,框架將它們串行添加到生成的 Map 中。
在第4行中使用Collectors::groupingByConcurrent而不是Collectors:groupingBy。在這種情況下,框架使用并發映射,允許將來自不同線程的元素直接插入到此映射中,而不必串行組合。
有了這三種可能性,可以檢查它們如何執行之前的統計計算(不包括從 CSV 文件加載數據的時間),因為加載量從500萬條翻倍到2000萬條:
串行 | 平行 | 并行 & GroupByConcurrent | |
五百萬個元素 | 3.045 秒 | 1.941 秒 | 1.436 秒 |
一千萬個元素 | 6.405 秒 | 2.876 秒 | 2.785 秒 |
兩千萬個元素 | 8.507 秒 | 4.956 秒 | 4.537 秒 |
可以看到并行運行大大提高了性能;隨著負載的增加,時間幾乎減半。使用 GroupByConcurrent 還可額外獲得 10% 的收益。
最后,得到結果是微不足道的;例如,要獲得印第安納州 Blountsville 的統計數據,我們只需要:
StatsAggregation aggreg = stateAggr.get(new CityState("Blountsville ", "IN"));
System.out.println("Blountsville, IN");
System.out.println("basePrice.mean: " + aggreg.basePrice().mean());
System.out.println("basePrice.populationVariance: " + aggreg.basePrice().populationVariance());
System.out.println("basePrice.populationStandardDeviation: " + aggreg.basePrice().populationStandardDeviation());
System.out.println("actualPrice.mean: " + aggreg.basePrice().mean());
System.out.println("actualPrice.populationVariance: " + aggreg.actualPrice().populationVariance());
System.out.println("actualPrice.populationStandardDeviation: " + aggreg.actualPrice().populationStandardDeviation());
得到的結果:
Blountsville : IN
basePrice.mean: 50.302588996763795
basePrice.sampleVariance: 830.7527439246837
basePrice.sampleStandardDeviation: 28.822781682632293
basePrice.count: 309
basePrice.min: 0.56
basePrice.max: 99.59
actualPrice.mean: 508.8927831715211
actualPrice.sampleVariance: 78883.35878833274
actualPrice.sampleStandardDeviation: 280.86181440048546
actualPrice.count: 309
actualPrice.min: 0.49
actualPrice.max: 999.33
Python的方法
在 Python 中,有幾個庫可以處理數據統計和分析。其中,Pandas 庫非常適合處理大量表格數據,它提供了非常有效的過濾、分組和統計分析方法。
使用 Python 分析以前的數據:
import pandas as pd
def group_aggregations(df_group_by):
df_result = df_group_by.agg(
{'basePrice': ['count', 'min', 'max', 'mean', 'std', 'var'],
'actualPrice': ['count', 'min', 'max', 'mean', 'std', 'var']}
)
return df_result
if __name__ == '__main__':
df = pd.read_csv("recordEntries.csv")
excluded_states = ['MN', 'CA']
df_st = df.loc[~ df['state'].isin(excluded_states)]
group_by = df_st.groupby(['city', 'state'], sort=False)
aggregated_results = group_aggregations(group_by)
在主要部分,先調用pandas.read_csv()(第 11 行)將文件中用逗號分隔的值加載到 PandasDataFrame中。
在第13行,使用~df['state'].isin(excluded_states)來得到一個Pandas系列的布爾值,使用pandas.loc()來過濾其中不包括的州(MN和CA)。
接下來,在第14行使用DataFrame.groupby()來按城市和州進行分組。結果由group_aggregations()處理,保存每個組的basePrice和actualPrice的統計數據。
在Python中打印結果是非常直接的。IN和Blountsville的結果:
print(aggregated_results.loc['Blountsville', 'IN']['basePrice'])
print(aggregated_results.loc['Blountsville', 'IN']['actualPrice'])
統計數據:
base_price:
Name: (Blountsville, IN), dtype: float64
count 309.000000
min 0.560000
max 99.590000
mean 50.302589
std 28.822782
var 830.752744
actual_price:
Name: (Blountsville, IN), dtype: float64
count 309.000000
min 0.490000
max 999.330000
mean 508.892783
std 280.861814
var 78883.358788
為了并行運行前面的代碼,我們必須記住,Python并不像Java那樣支持細粒度的鎖機制。必須解決好與全局解釋器鎖(GIL)的問題,無論你有多少個CPU多核或線程,一次只允許一個線程執行。
為了支持并發,我們必須考慮到有一個CPU 密集型進程,因此,最好的方法是使用multiprocessing。所以需要修改代碼:
from multiprocessing import Pool
import pandas as pd
def aggreg_basePrice(df_group):
ct_st, grp = df_group
return ct_st, grp.basePrice.agg(['count', 'min', 'max', 'mean', 'std', 'var'])
if __name__ == '__main__':
df = pd.read_csv("recordEntries.csv")
start = time.perf_counter()
excluded_states = ['MN', 'CA']
filtr = ~ df['state'].isin(excluded_states)
df_st = df.loc[filtr]
grouped_by_ct_st = df_st.groupby(['city', 'state'], sort=False)
with Pool() as p:
list_parallel = p.map(aggreg_basePrice, [(ct_st, grouped) for ct_st, grouped in grouped_by_ct_st])
print(f'Time elapsed parallel: {round(finish - start, 2)} sec')
和之前一樣,使用Pandas groupby()來獲得按城市和州分組的數據(第14行)。在下一行,使用多進程庫提供的Pool()來映射分組的數據,使用aggreg_basePrice來計算每組的統計數據。Pool()會對數據進行分割,并在幾個平行的獨立進程中進行統計計算。
正如下面的表格中所示,多進程比串行運行進程慢得多。因此,對于這些類型的問題,不值得使用這種方法。
可以使用另一種并發運行代碼 - Modin。Modin提供了一種無縫的方式來并行化你的代碼,當你必須處理大量的數據時是非常有用的。將導入語句從import pandas as pd改為import modin.pandas as pd,可以并行運行代碼,并利用環境中可能存在的內核集群來加速代碼的執行。
下面的表格是剛剛涉及的不同場景的運行時間(和以前一樣,不包括從CSV文件中讀取數據的時間):
串行 | 多進程 | Modin 過程 | |
五百萬個元素 | 1.94 秒 | 20.25 秒 | 6.99 秒 |
一千萬個元素 | 4.07 秒 | 25.1 秒 | 12.88 秒 |
兩千萬個元素 | 7.62 秒 | 36.2 秒 | 25.94 秒 |
根據表格顯示,在Python中串行運行代碼甚至比在Java中更快。然而,使用多進程會大大降低性能。使用Moding可以改善結果,使串行運行進程更有利。值得一提的是,和以前一樣,我們在計算時間時不包括從CSV文件中讀取數據的時間。
可以發現,對于 Pandas 中的 CPU 密集型進程來說,并行化代碼是沒有優勢的。從某種意義上說,這反映了 Pandas 最初的架構方式。Pandas 在串行模式下的運行速度令人印象深刻,而且即使處理大量數據也具有很好的擴展性。
需要指出的是,Python中統計數字的計算速度取決于它的執行方式。為了得到快速的計算,需要應用到統計函數。例如,做一個簡單的pandas.DataFrame.describe()來獲得統計信息,運行速度會非常慢。
Java 的 Streams 或 Python 的 Pandas 是對大量數據進行分析和統計的兩個絕佳選擇。兩者都有非常可靠的框架,以及足夠的支持,能夠實現出色的性能和可擴展性。
Java 提供了非常強大的基礎架構,非常適合處理復雜的程序流。它非常高效,可以有效地并行運行進程。適用于快速獲得結果。
Python 非常適合做數學和統計。它非常簡單,相當快,非常適合進行復雜的計算。
譯者介紹
翟珂,51CTO社區編輯,目前在杭州從事軟件研發工作,做過電商、征信等方面的系統,享受分享知識的過程,充實自己的生活。
原文標題:??Data Statistics and Analysis With Java and Python???,作者:??Manu Barriola??