成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

在SQLite中插入10億條Python VS Rust

開發 后端
寫腳本來進行數據處理,比如說給數據庫導入導出數據,這種任務一般來說最方便的方法是用python腳本,但是如果數據量比較大時候(比如上億條)時候Python就會超級慢,看到無法忍受。

在實際生活中,市場有這樣的案例:寫腳本來進行數據處理,比如說給數據庫導入導出數據,這種任務一般來說最方便的方法是用python腳本,但是如果數據量比較大時候(比如上億條)時候Python就會超級慢,看到無法忍受。在這種案例時候該怎么做呢,有一個外國老哥分享了自己的實踐經歷,并且對比了Python和Rust語言給SQLite插入十一條數據的情況,最后用Rust實現了在一分鐘來完成任務。我們在此分享一下該實踐過程,希望能對大家有所啟迪,大家也可以嘗試自己最拿手方法來實現該例子,并對比一下具體性能。

概述

案例中的任務是SQLite數據庫插入10億條的數據。表(user)數據結構和約束如下:

  1. create table IF NOT EXISTS user 
  2. id INTEGER not null primary key, 
  3. area CHAR(6), 
  4. age INTEGER not null, 
  5. active INTEGER not null 
  6. ); 

隨機生成數據。其中are列為六位數的區號(任何六位數字)。age將是5、10 或15中的一個數字。Active為0或1。

  • 實驗環境硬件配置為:MacBook Pro,2019(2.4 GHz 四核i5,8GB內存,256GB SSD硬盤,Big Sur 11.1)。
  • 任務前提:任務無需保持程序穩健性,如果進程崩潰并且所有數據都丟失了也沒關系。可以再次運行腳本。
  • 需要充分利用我的機器資源:100% CPU、8GB 內存和千兆字節的SSD空間。

無需使用真正的隨機方法,stdlib偽隨機方法即可。

Python

首先是原始版本的Python方法。Python標準庫提供了一個SQLite模塊,首先使用它編寫了第一個版本。代碼如下:

  1. import sqlite3 
  2. from commons import get_random_age, get_random_active, get_random_bool, get_random_area_code, create_table 
  3. DB_NAME = "naive.db" 
  4. def faker(con: sqlite3.Connection, count=100_000): 
  5. for _ in range(count): 
  6. age = get_random_age() 
  7. active = get_random_active() 
  8. # switch for area code 
  9. if get_random_bool(): 
  10. # random 6 digit number 
  11. area = get_random_area_code() 
  12. con.execute('INSERT INTO user VALUES (NULL,?,?,?)', (area, age, active)) 
  13. else: 
  14. con.execute('INSERT INTO user VALUES (NULL,NULL,?,?)', (age, active)) 
  15. con.commit() 
  16. def main(): 
  17. con = sqlite3.connect(DB_NAME, isolation_level=None
  18. con.execute('PRAGMA journal_mode = WAL;') 
  19. create_table(con) 
  20. faker(con, count=10_000_000
  21. if __name__ == '__main__': 
  22. main() 

在該腳本中,通for循環中一一插入1000萬條數據。執行花了將近15分鐘。基于此進行優化迭代,提高性能。

SQLite中,每次插入都是原子性的并且為一個事務。每個事務都需要保證寫入磁盤(涉及IO操作),因此可能會很慢。為了優化,可以嘗試通過不同大小的批量插入,對比發現,100000是最佳選擇。通過這個簡單的更改,運行時間減少到了10分鐘,優化了3分之一,但是仍然非常耗時。優化后,批量插入版本源碼:

SQLite庫優化

除了在代碼層優化外,如果對于單純的數據寫入,對數據庫本身搞的優化也是非常重要的。對于SQLite優化,可以做如下配置:

  1. PRAGMA journal_mode = OFF
  2. PRAGMA synchronous = 0
  3. PRAGMA cache_size = 1000000
  4. PRAGMA locking_mode = EXCLUSIVE
  5. PRAGMA temp_store = MEMORY

具體解釋:

首先,journal_mode設置為OFF,將會關閉回滾日志,禁用 SQLite 的原子提交和回滾功能,這樣在事務失敗情況下,無法恢復,基于例子實例穩健性要求可以設置,但是嚴禁在生產環境中使用。

其次,關閉synchronous,SQLite可以不再校驗磁盤寫入的數據可靠性。寫入SQLite可能并不意味著它已刷新到磁盤。同樣,嚴禁在生產環境中啟用。

cache_size用戶指定SQLite允許在內存中保留多少內存頁。不要在生產中分配太高的的數值。

使用在EXCLUSIVE鎖定模式,SQLite連接持有的鎖永遠不會被釋放。

設置temp_store到MEMOR將使其表現得像一個內存數據庫。

優化性能

對上面的兩個腳本,添加 SQLite優化參數,然后重新運行:

  1. def main():     
  2. con = sqlite3.connect(DB_NAME, isolation_level=None)     
  3. con.execute('PRAGMA journal_mode = OFF;')     
  4. con.execute('PRAGMA synchronous = 0;')     
  5. con.execute('PRAGMA cache_size = 1000000;') # give it a GB     
  6. con.execute('PRAGMA locking_mode = EXCLUSIVE;')     
  7. con.execute('PRAGMA temp_store = MEMORY;')     
  8. create_table(con)     

faker(con, count=100_000_000)

優化后版本,原始版本,插入1億行數據,大概花了10分鐘;對比批量插入版本大概花了8.5分鐘。

pypy版本

對比CPython PyPy在數據處理中可以提高性能,據說可以提高4倍以上的性能。本實驗中也嘗試編譯PyPy解釋器,運行腳本(代碼無需修改)。

使用pypy解釋器,批處理版本,插入1億行數據只需2.5分鐘。性能大概是Cpython的3.5倍,可見傳說的4倍性能提高確實是真的,誠不我欺也!。同時,為了測試在純循環插入中消耗的時間,在腳本中刪除SQL指令并運行:

以上腳本在CPython中耗時5.5分鐘 。PyPy執行耗時1.5分鐘(同樣提高了3.5倍)。

Rust

在完成Python各種優化折騰。又嘗試了Rust版本的插入,對比也有個原始版本和批量插入版本。原始版本,也是每行插入:

  1. use rusqlite::{params, Connection}; 
  2. mod common; 
  3. fn faker(mut conn: Connection, count: i64) { 
  4. let tx = conn.transaction().unwrap(); 
  5. for _ in 0..count { 
  6. let with_area = common::get_random_bool(); 
  7. let age = common::get_random_age(); 
  8. let is_active = common::get_random_active(); 
  9. if with_area { 
  10. let area_code = common::get_random_area_code(); 
  11. tx.execute( 
  12. "INSERT INTO user VALUES (NULL, ?, ?, ?)", 
  13. params![area_code, age, is_active], 
  14. .unwrap(); 
  15. } else { 
  16. tx.execute( 
  17. "INSERT INTO user VALUES (NULL, NULL, ?, ?)", 
  18. params![age, is_active], 
  19. .unwrap(); 
  20. tx.commit().unwrap(); 
  21. fn main() { 
  22. let conn = Connection::open("basic.db").unwrap(); 
  23. conn.execute_batch( 
  24. "PRAGMA journal_mode = OFF
  25. PRAGMA synchronous = 0
  26. PRAGMA cache_size = 1000000
  27. PRAGMA locking_mode = EXCLUSIVE
  28. PRAGMA temp_store = MEMORY;", 
  29. .expect("PRAGMA"); 
  30. conn.execute( 
  31. "CREATE TABLE IF NOT EXISTS user ( 
  32. id INTEGER not null primary key, 
  33. area CHAR(6), 
  34. age INTEGER not null, 
  35. active INTEGER not null)", 
  36. [], 
  37. .unwrap(); 
  38. faker(conn, 100_000_000) 

該版執行,大概用時3分鐘。然后我做了進一步的實驗:

將rusqlite,換成sqlx異步運行。

  1. use std::str::FromStr;     
  2.  
  3. use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};     
  4. use sqlx::{ConnectOptions, Connection, Executor, SqliteConnection, Statement};     
  5.  
  6. mod common;     
  7.  
  8. async fn faker(mut conn: SqliteConnection, count: i64) -> Result<(), sqlx::Error> {     
  9. let mut tx = conn.begin().await?;     
  10. let stmt_with_area = tx     
  11. .prepare("INSERT INTO user VALUES (NULL, ?, ?, ?)")     
  12. .await?;     
  13. let stmt = tx     
  14. .prepare("INSERT INTO user VALUES (NULL, NULL, ?, ?)")     
  15. .await?;     
  16. for _ in 0..count {     
  17. let with_area = common::get_random_bool();     
  18. let age = common::get_random_age();     
  19. let is_active = common::get_random_active();     
  20. if with_area {     
  21. let area_code = common::get_random_area_code();     
  22. stmt_with_area     
  23. .query()     
  24. .bind(area_code)     
  25. .bind(age)     
  26. .bind(is_active)     
  27. .execute(&mut tx)     
  28. .await?;     
  29. } else {     
  30. stmt.query()     
  31. .bind(age)     
  32. .bind(is_active)     
  33. .execute(&mut tx)     
  34. .await?;     
  35. }     
  36. }     
  37. tx.commit().await?;     
  38. Ok(())     
  39. }     
  40.  
  41. #[tokio::main]     
  42. async fn main() -> Result<(), sqlx::Error> {     
  43. let mut conn = SqliteConnectOptions::from_str("basic_async.db")     
  44. .unwrap()     
  45. .create_if_missing(true)     
  46. .journal_mode(SqliteJournalMode::Off)     
  47. .synchronous(SqliteSynchronous::Off)     
  48. .connect()     
  49. .await?;     
  50. conn.execute("PRAGMA cache_size = 1000000;").await?;     
  51. conn.execute("PRAGMA locking_mode = EXCLUSIVE;").await?;     
  52. conn.execute("PRAGMA temp_store = MEMORY;").await?;     
  53. conn.execute(     
  54. "CREATE TABLE IF NOT EXISTS user (     
  55. id INTEGER not null primary key,     
  56. area CHAR(6),     
  57. age INTEGER not null,     
  58. active INTEGER not null);",     
  59. )     
  60. .await?;     
  61. faker(conn, 100_000_000).await?;     
  62. Ok(())     

這個版本花了大約14分鐘。性能反而下降下降了。比Python版本還要差(原因值得深析)。

對執行的原始SQL語句,切換到準備好的語句并在循環中插入行,但重用了準備好的語句。該版本只用了大約一分鐘。

使用準備好的語句并將它們插入到50行的批次中,插入10億條,耗時34.3 秒。

  1. use rusqlite::{Connection, ToSql, Transaction}; 
  2. mod common; 
  3. fn faker_wrapper(mut conn: Connection, count: i64) { 
  4. let tx = conn.transaction().unwrap(); 
  5. faker(&tx, count); 
  6. tx.commit().unwrap(); 
  7. fn faker(tx: &Transaction, count: i64) { 
  8. // that is, we will batch 50 inserts of rows at once 
  9. let min_batch_size: i64 = 50
  10. if count < min_batch_size { 
  11. panic!("count cant be less than min batch size"); 
  12. // jeez, refactor this! 
  13. let mut with_area_params = " (NULL, ?, ?, ?),".repeat(min_batch_size as usize); 
  14. with_area_params.pop(); 
  15. let with_area_paramswith_area_params = with_area_params.as_str(); 
  16. let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(min_batch_size as usize); 
  17. without_area_params.pop(); 
  18. let without_area_paramswithout_area_params = without_area_params.as_str(); 
  19. let st1 = format!("INSERT INTO user VALUES {}", with_area_params); 
  20. let st2 = format!("INSERT INTO user VALUES {}", without_area_params); 
  21. let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap(); 
  22. let mut stmt = tx.prepare_cached(st2.as_str()).unwrap(); 
  23. for _ in 0..(count / min_batch_size) { 
  24. let with_area = common::get_random_bool(); 
  25. let age = common::get_random_age(); 
  26. let is_active = common::get_random_active(); 
  27. let mut param_values: Vec<_> = Vec::new(); 
  28. if with_area { 
  29. // lets prepare the batch 
  30. let mut vector = Vec::<(String, i8, i8)>::new(); 
  31. for _ in 0..min_batch_size { 
  32. let area_code = common::get_random_area_code(); 
  33. vector.push((area_code, age, is_active)); 
  34. for batch in vector.iter() { 
  35. param_values.push(&batch.0 as &dyn ToSql); 
  36. param_values.push(&batch.1 as &dyn ToSql); 
  37. param_values.push(&batch.2 as &dyn ToSql); 
  38. stmt_with_area.execute(&*param_values).unwrap(); 
  39. } else { 
  40. // lets prepare the batch 
  41. let mut vector = Vec::<(i8, i8)>::new(); 
  42. for _ in 0..min_batch_size { 
  43. vector.push((age, is_active)); 
  44. for batch in vector.iter() { 
  45. param_values.push(&batch.0 as &dyn ToSql); 
  46. param_values.push(&batch.1 as &dyn ToSql); 
  47. stmt.execute(&*param_values).unwrap(); 
  48. fn main() { 
  49. let conn = Connection::open("basic_batched.db").unwrap(); 
  50. conn.execute_batch( 
  51. "PRAGMA journal_mode = OFF
  52. PRAGMA synchronous = 0
  53. PRAGMA cache_size = 1000000
  54. PRAGMA locking_mode = EXCLUSIVE
  55. PRAGMA temp_store = MEMORY;", 
  56. .expect("PRAGMA"); 
  57. conn.execute( 
  58. "CREATE TABLE IF NOT EXISTS user ( 
  59. id INTEGER not null primary key, 
  60. area CHAR(6), 
  61. age INTEGER not null, 
  62. active INTEGER not null)", 
  63. [], 
  64. .unwrap(); 
  65. faker_wrapper(conn, 100_000_000) 
  66. 創建了一個線程版本,其中有一個從通道接收數據的寫入線程和四個將數據推送到管道其他線程。 
  67. use rusqlite::{Connection, ToSql}; 
  68. use std::sync::mpsc; 
  69. use std::sync::mpsc::{Receiver, Sender}; 
  70. use std::thread; 
  71. mod common; 
  72. static MIN_BATCH_SIZE: i64 = 50
  73. enum ParamValues { 
  74. WithArea(Vec<(String, i8, i8)>), 
  75. WithoutArea(Vec<(i8, i8)>), 
  76. fn consumer(rx: Receiver<ParamValues>) { 
  77. let mut conn = Connection::open("threaded_batched.db").unwrap(); 
  78. conn.execute_batch( 
  79. "PRAGMA journal_mode = OFF
  80. PRAGMA synchronous = 0
  81. PRAGMA cache_size = 1000000
  82. PRAGMA locking_mode = EXCLUSIVE
  83. PRAGMA temp_store = MEMORY;", 
  84. .expect("PRAGMA"); 
  85. conn.execute( 
  86. "CREATE TABLE IF NOT EXISTS user ( 
  87. id INTEGER not null primary key, 
  88. area CHAR(6), 
  89. age INTEGER not null, 
  90. active INTEGER not null)", 
  91. [], 
  92. .unwrap(); 
  93. let tx = conn.transaction().unwrap(); 
  94. // jeez, refactor this! 
  95. let mut with_area_params = " (NULL, ?, ?, ?),".repeat(MIN_BATCH_SIZE as usize); 
  96. with_area_params.pop(); 
  97. let with_area_paramswith_area_params = with_area_params.as_str(); 
  98. let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(MIN_BATCH_SIZE as usize); 
  99. without_area_params.pop(); 
  100. let without_area_paramswithout_area_params = without_area_params.as_str(); 
  101. let st1 = format!("INSERT INTO user VALUES {}", with_area_params); 
  102. let st2 = format!("INSERT INTO user VALUES {}", without_area_params); 
  103. let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap(); 
  104. let mut stmt_without_area = tx.prepare_cached(st2.as_str()).unwrap(); 
  105. for param_values in rx { 
  106. let mut row_values: Vec<&dyn ToSql> = Vec::new(); 
  107. match param_values { 
  108. ParamValues::WithArea(values) => { 
  109. for batch in values.iter() { 
  110. row_values.push(&batch.0 as &dyn ToSql); 
  111. row_values.push(&batch.1 as &dyn ToSql); 
  112. row_values.push(&batch.2 as &dyn ToSql); 
  113. stmt_with_area.execute(&*row_values).unwrap(); 
  114. ParamValues::WithoutArea(values) => { 
  115. for batch in values.iter() { 
  116. row_values.push(&batch.0 as &dyn ToSql); 
  117. row_values.push(&batch.1 as &dyn ToSql); 
  118. stmt_without_area.execute(&*row_values).unwrap(); 
  119. tx.commit().unwrap(); 
  120. fn producer(tx: Sender<ParamValues>, count: i64) { 
  121. if count < MIN_BATCH_SIZE { 
  122. panic!("count cant be less than min batch size"); 
  123. for _ in 0..(count / MIN_BATCH_SIZE) { 
  124. let with_area = common::get_random_bool(); 
  125. let age = common::get_random_age(); 
  126. let is_active = common::get_random_active(); 
  127. let mut param_values: Vec<_> = Vec::new(); 
  128. if with_area { 
  129. // lets prepare the batch 
  130. let mut vector = Vec::<(String, i8, i8)>::new(); 
  131. for _ in 0..MIN_BATCH_SIZE { 
  132. let area_code = common::get_random_area_code(); 
  133. vector.push((area_code, age, is_active)); 
  134. for batch in vector.iter() { 
  135. param_values.push(&batch.0 as &dyn ToSql); 
  136. param_values.push(&batch.1 as &dyn ToSql); 
  137. param_values.push(&batch.2 as &dyn ToSql); 
  138. // send the values 
  139. tx.send(ParamValues::WithArea(vector)).unwrap(); 
  140. } else { 
  141. // lets prepare the batch 
  142. let mut vector = Vec::<(i8, i8)>::new(); 
  143. for _ in 0..MIN_BATCH_SIZE { 
  144. vector.push((age, is_active)); 
  145. for batch in vector.iter() { 
  146. param_values.push(&batch.0 as &dyn ToSql); 
  147. param_values.push(&batch.1 as &dyn ToSql); 
  148. // send the values 
  149. tx.send(ParamValues::WithoutArea(vector)).unwrap(); 
  150. fn main() { 
  151. // setup the DB and tables 
  152. let (tx, rx): (Sender<ParamValues>, Receiver<ParamValues>) = mpsc::channel(); 
  153. // lets launch the consumer 
  154. let consumer_handle = thread::spawn(|| consumer(rx)); 
  155. let cpu_count = num_cpus::get(); 
  156. let total_rows = 100_000_000
  157. let each_producer_count = (total_rows / cpu_count) as i64; 
  158. let mut handles = Vec::with_capacity(cpu_count); 
  159. for _ in 0..cpu_count { 
  160. let thread_tx = tx.clone(); 
  161. handles.push(thread::spawn(move || { 
  162. producer(thread_tx, each_producer_count.clone()) 
  163. })) 
  164. for t in handles { 
  165. t.join().unwrap(); 
  166. drop(tx); 
  167. // wait till consumer is exited 
  168. consumer_handle.join().unwrap(); 

這是性能最好的版本,耗時約32.37秒。

基準測試對比:

總結

通過案例不同任務實驗,總體上可以得到:

  • 通過SQLite PRAGMA語句優化設置可以提高插入性能。
  • 使用準備好的語句可以提高性能
  • 進行批量插入可以提高性能。
  • PyPy 實際上比CPython快4倍
  • 線程/異步不一定能提高性能。

 

責任編輯:趙寧寧 來源: 蟲蟲搜奇
相關推薦

2021-07-19 15:33:27

編程Rust開發

2011-05-25 10:32:19

SQLite

2024-04-15 08:30:53

MySQLORM框架

2024-07-04 13:42:12

2025-05-12 01:55:00

MySQL存儲數據

2025-06-26 08:22:03

2024-05-21 11:34:03

RustPython編譯器

2015-03-03 09:52:02

2024-06-24 07:00:00

C++RustGo

2018-06-21 09:12:01

編程語言Python數據分析

2022-04-06 14:15:10

Python數據

2022-11-17 10:23:13

VS CodeCodiumPython

2024-01-08 13:31:00

Rust自動化測試

2024-06-04 10:49:05

Rust插件開發工具

2023-03-07 17:50:03

2021-08-22 17:22:31

VS Code容器開發人員

2024-04-02 08:30:40

RustUnix信號服務器

2013-04-22 11:12:35

Facebook美國數據中心

2023-06-15 17:00:11

Rust循環

2024-09-20 18:02:42

C#數據庫SQLite
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 99精品电影 | 久久久久久久99 | 一区二区三区免费观看 | 国产乱精品一区二区三区 | 亚洲最大福利网 | 国产欧美在线播放 | 午夜无码国产理论在线 | 日韩成人一区二区 | 2021天天干夜夜爽 | 成人做爰www免费看视频网站 | 欧美一二精品 | 久久的色 | 日韩在线欧美 | 香蕉视频一区二区 | 久久久久久久久久影视 | gav成人免费播放视频 | 亚洲女优在线播放 | 久久99精品久久久久久国产越南 | 国产一级片av | 国产亚洲成av人片在线观看桃 | 激情毛片 | 中文字幕1区2区3区 日韩在线视频免费观看 | 亚洲精品成人在线 | 久久国产激情视频 | 成人做爰www免费看 午夜精品久久久久久久久久久久 | 精品视频导航 | a级毛片毛片免费观看久潮喷 | 人人做人人澡人人爽欧美 | 最新中文字幕在线 | 久久久久久久97 | 最新国产精品精品视频 | 欧美a∨ | 成人做爰www免费看 午夜精品久久久久久久久久久久 | 激情六月天 | 日韩午夜影院 | 国产日韩精品一区二区三区 | 日韩久久精品电影 | 久久男人 | 国产日韩欧美一区二区在线播放 | 亚洲+变态+欧美+另类+精品 | 麻豆视频在线看 |