如何使用Python將MySQL表數據遷移到MongoDB集合
譯文【51CTO.com快譯】介紹
MySQL 是一個 RDBMS 平臺,它以規范化的方式以表格格式存儲數據,而 MongoDB 是一個 NoSQL 數據庫,它以無模式的方式將信息存儲為按集合分組的文檔。數據的表示方式完全不同,因此將 MySQL 表數據遷移到 MongoDB 集合聽起來可能是一項艱巨的任務。但是,Python 憑借其強大的連接性和數據處理能力讓這一切變得輕而易舉。
在本文中,將詳細的講解如何使用簡單的 Python 腳本將 MySQL 表數據遷移到 MongoDB 集合所需的步驟。這些腳本是在 Windows 上使用 Python 3.9.5 開發的。但是,它應該適用于任何平臺上的任何 Python 3+ 版本一起使用。
步驟 1:安裝所需的模塊
第一步是安裝連接MySQL 和 MongoDB 數據庫實例所需的模塊。我們將使用mysql.connector連接到 MySQL 數據庫。對于 MongoDB,使用pymongo,這是從 Python 連接到 MongoDB 的推薦模塊。
如果所需模塊尚未安裝,請運行以下PIP命令來安裝它們。
- pip 安裝 mysql 連接器 pip 安裝 pymongo
PIP 是 Python 包或模塊的包管理器。
步驟2:從MySQL表中讀取數據
第一步是從源 MySQL 表中讀取數據,并以可用于將數據加載到目標 MongoDB 數據庫中的格式進行準備。MongoDB 是一個 NoSQL 數據庫,它將數據存儲為 JSON 文檔,因此最好以 JSON 格式生成源數據。值得一提的是,Python 具有強大的數據處理能力,可以輕松地將數據轉換為 JSON 格式。
- import mysql.connector
- mysqldb = mysql.connector.connect( host="localhost", database="employees", user="root", password="" )
- mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall()
- print(myresult)
當腳本在沒有任何錯誤的情況下完成時,輸出的結果如下:
- [
- {
- "id":4,
- "name":"Medicine",
- "description":"<p>Medicine<br></p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":6,
- "name":"Food",
- "description":"<p>Food</p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":8,
- "name":"Groceries",
- "description":"<p>Groceries<br></p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":9,
- "name":"Cakes & Bakes",
- "description":"<p>Cakes & Bakes<br></p>",
- "created_at":d"",
- "updated_at":""
- }
- ]
請注意,輸出的是一個 JSON 數組,因為我們將dictionary=True參數傳遞給了游標。否則,結果將采用列表格式。現在有了 JSON 格式的源數據,就可以遷移到 MongoDB 集合。
步驟3:寫入 MongoDB 集合
獲得 JSON 格式的源數據后,下一步是將數據插入到 MongoDB 集合中。集合是一組文檔,相當于 RDBMS 中表(或關系)。我可以通過調用insert_many()集合類的方法來實現,該方法返回插入文檔的對象 ID 列表。請注意,當作為參數傳遞空列表時,此方法將引發異常,因此在方法調用之前進行長度檢查。
- import pymongo
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- mycol = mydb["categories"]
- if len(myresult) > 0:
- x = mycol.insert_many(myresult) #myresult comes from mysql cursor
- print(len(x.inserted_ids))
完成此步驟后,檢查一下 MongoDB 實例,以驗證數據庫和集合是否已創建,文檔是否已插入。注意MongoDB 是無模式的,這就意味著不必定義模式來插入文檔,模式是動態推斷并自動創建的。MongoDB 還可以創建代碼中引用的數據庫和集合(如果它們還不存在的話)。
步驟4:把數據放在一起
下面是從 MySQL 中讀取表并將其插入到 MongoDB 中集合的完整腳本。
- import mysql.connector
- import pymongo
- delete_existing_documents = True
- mysql_host="localhost"
- mysql_database="mydatabase"
- mysql_schema = "myschema"
- mysql_user="myuser"
- mysql_password="********"
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- mysqldb = mysql.connector.connect(
- host=mysql_host,
- database=mysql_database,
- user=mysql_user,
- password=mysql_password
- )
- mycursor = mysqldb.cursor(dictionary=True)
- mycursor.execute("SELECT * from categories;")
- myresult = mycursor.fetchall()
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- mycol = mydb["categories"]
- if len(myresult) > 0:
- x = mycol.insert_many(myresult) #myresult comes from mysql cursor
- print(len(x.inserted_ids))
步驟 5:增強腳本以加載 MySQL 架構中的所有表
該腳本從 MySQL 中讀取一個表,并將結果加載到 MongoDB 集合中。然后,下一步是遍歷源數據庫中所有表的列表,并將結果加載到新的 MySQL 集合中。我們可以通過查詢information_schema.tables元數據表來實現這一點,該表提供給定模式中的表列表。然后可以遍歷結果并調用上面的腳本來遷移每個表的數據。
- #Iterate through the list of tables in the schema
- table_list_cursor = mysqldb.cursor()
- table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name;", (mysql_schema,))
- tables = table_list_cursor.fetchall()
- for table in tables:
- #Execute the migration script for 'table'
也可以通過將遷移邏輯抽象為一個函數來實現這一點。
- #Function migrate_table
- def migrate_table(db, col_name):
- mycursor = db.cursor(dictionary=True)
- mycursor.execute("SELECT * FROM " + col_name + ";")
- myresult = mycursor.fetchall()
- mycol = mydb[col_name]
- if delete_existing_documents:
- #delete all documents in the collection
- mycol.delete_many({})
- #insert the documents
- if len(myresult) > 0:
- x = mycol.insert_many(myresult)
- return len(x.inserted_ids)
- else:
- return 0
步驟6:輸出腳本進度并使其可讀
腳本的進度是通過使用print 語句來傳達的。通過顏色編碼使輸出易于閱讀。例如,以將成功語句打印為綠色,將失敗語句打印為紅色。
- class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- print(f"{bcolors.HEADER}This is a header{bcolors.ENDC}")
- print(f"{bcolors.OKBLUE}This prints in blue{bcolors.ENDC}")
- print(f"{bcolors.OKGREEN}This message is green{bcolors.ENDC}")
最終結果
- 源 MySQL 數據庫
- 遷移后的目標 MongoDB 數據庫
- 在VSCode 中的 Python 腳本和輸出
完整的腳本
- import mysql.connector
- import pymongo
- import datetime
- class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- begin_time = datetime.datetime.now()
- print(f"{bcolors.HEADER}Script started at: {begin_time} {bcolors.ENDC}")
- delete_existing_documents = True;
- mysql_host="localhost"
- mysql_database="mydatabase"
- mysql_schema = "myschhema"
- mysql_user="root"
- mysql_password=""
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- print(f"{bcolors.HEADER}Initializing database connections...{bcolors.ENDC}")
- print("")
- #MySQL connection
- print(f"{bcolors.HEADER}Connecting to MySQL server...{bcolors.ENDC}")
- mysqldb = mysql.connector.connect(
- host=mysql_host,
- database=mysql_database,
- user=mysql_user,
- password=mysql_password
- )
- print(f"{bcolors.HEADER}Connection to MySQL Server succeeded.{bcolors.ENDC}")
- #MongoDB connection
- print(f"{bcolors.HEADER}Connecting to MongoDB server...{bcolors.ENDC}")
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- print(f"{bcolors.HEADER}Connection to MongoDB Server succeeded.{bcolors.ENDC}")
- print(f"{bcolors.HEADER}Database connections initialized successfully.{bcolors.ENDC}")
- #Start migration
- print(f"{bcolors.HEADER}Migration started...{bcolors.ENDC}")
- dblist = myclient.list_database_names()
- if mongodb_dbname in dblist:
- print(f"{bcolors.OKBLUE}The database exists.{bcolors.ENDC}")
- else:
- print(f"{bcolors.WARNING}The database does not exist, it is being created.{bcolors.ENDC}")
- #Function migrate_table
- def migrate_table(db, col_name):
- mycursor = db.cursor(dictionary=True)
- mycursor.execute("SELECT * FROM " + col_name + ";")
- myresult = mycursor.fetchall()
- mycol = mydb[col_name]
- if delete_existing_documents:
- #delete all documents in the collection
- mycol.delete_many({})
- #insert the documents
- if len(myresult) > 0:
- x = mycol.insert_many(myresult)
- return len(x.inserted_ids)
- else:
- return 0
- #Iterate through the list of tables in the schema
- table_list_cursor = mysqldb.cursor()
- table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name LIMIT 15;", (mysql_schema,))
- tables = table_list_cursor.fetchall()
- total_count = len(tables)
- success_count = 0
- fail_count = 0
- for table in tables:
- try:
- print(f"{bcolors.OKCYAN}Processing table: {table[0]}...{bcolors.ENDC}")
- inserted_count = migrate_table(mysqldb, table[0])
- print(f"{bcolors.OKGREEN}Processing table: {table[0]} completed. {inserted_count} documents inserted.{bcolors.ENDC}")
- success_count += 1
- except Exception as e:
- print(f"{bcolors.FAIL} {e} {bcolors.ENDC}")
- fail_count += 1
- print("")
- print("Migration completed.")
- print(f"{bcolors.OKGREEN}{success_count} of {total_count} tables migrated successfully.{bcolors.ENDC}")
- if fail_count > 0:
- print(f"{bcolors.FAIL}Migration of {fail_count} tables failed. See errors above.{bcolors.ENDC}")
- end_time = datetime.datetime.now()
- print(f"{bcolors.HEADER}Script completed at: {end_time} {bcolors.ENDC}")
- print(f"{bcolors.HEADER}Total execution time: {end_time-begin_time} {bcolors.ENDC}")
警告
該腳本適用于中小型 MySQL 數據庫,它有幾百個表,每個表有幾千行。對于具有數百萬行的大型數據庫,性能可能會受到影響。在開始實際遷移之前,請在表列表查詢和實際表select查詢上使用 LIMIT 關鍵字對有限行進行檢測。
下載
帶點擊此處從 GitHub 下載整個腳本:
https://github.com/zshameel/MySQL2MongoDB
【51CTO譯稿,合作站點轉載請注明原文譯者和出處為51CTO.com】