MongoDB集群和實(shí)戰(zhàn)詳解
1.概述
最近有同學(xué)和網(wǎng)友私信我,問(wèn)我MongoDB方面的問(wèn)題;這里我整理一篇博客來(lái)贅述下MongoDB供大家學(xué)習(xí)參考,博客的目錄內(nèi)容如下:
- 基本操作
- CRUD
- MapReduce
本篇文章是基于MongoDB集群(Sharding+Replica Sets)上演示的,故操作的內(nèi)容都是集群層面的,所以有些命令和單獨(dú)的使用MongoDB庫(kù)有異樣。
2.基本操作
常用的 Shell 命令如下所示:
- db.help() # 數(shù)據(jù)庫(kù)幫助
- db.collections.help() # 集合幫助
- rs.help() # help on replica set
- show dbs # 展示數(shù)據(jù)庫(kù)名
- show collections # 展示collections在當(dāng)前庫(kù)
- use db_name # 選擇數(shù)據(jù)庫(kù)
查看集合基本信息,內(nèi)容如下所示:
- #查看幫助
- db.yourColl.help();
- #查詢當(dāng)前集合的數(shù)據(jù)條數(shù)
- db.yourColl.count();
- #查看數(shù)據(jù)空間大小
- db.userInfo.dataSize();
- #得到當(dāng)前聚集集合所在的
- db db.userInfo.getDB();
- #得到當(dāng)前聚集的狀態(tài)
- db.userInfo.stats();
- #得到聚集集合總大小
- db.userInfo.totalSize();
- #聚集集合儲(chǔ)存空間大小
- db.userInfo.storageSize();
- #Shard版本信息
- db.userInfo.getShardVersion()
- #聚集集合重命名,將userInfo重命名為users
- db.userInfo.renameCollection("users");
- #刪除當(dāng)前聚集集合
- db.userInfo.drop();
3.CRUD
3.1創(chuàng)建
在集群中,我們?cè)黾右粋€(gè) friends 庫(kù),命令如下所示:
- db.runCommand({enablesharding:"friends"});
在庫(kù)新建后,我們?cè)谠搸?kù)下創(chuàng)建一個(gè)user分片,命令如下:
- db.runCommand( { shardcollection : "friends. user"});
3.2新增
在MongoDB中,save和insert都能達(dá)到新增的效果。但是這兩者是有區(qū)別的,在save函數(shù)中,如果原來(lái)的對(duì)象不存在,那他們都可以向collection里插入數(shù)據(jù);如果已經(jīng)存在,save會(huì)調(diào)用update更新里面的記錄,而insert則會(huì)忽略操作。
另外,在insert中可以一次性插敘一個(gè)列表,而不用遍歷,效率高,save則需要遍歷列表,一個(gè)個(gè)插入,下面我們可以看下兩個(gè)函數(shù)的原型,通過(guò)函數(shù)原型我們可以看出,對(duì)于遠(yuǎn)程調(diào)用來(lái)說(shuō),是一次性將整個(gè)列表post過(guò)來(lái)讓MongoDB去處理,效率會(huì)高些。
Save函數(shù)原型如下所示:
Insert函數(shù)原型(部分代碼)如下所示:
3.3查詢
3.3.1查詢所有記錄
- db. user.find();
默認(rèn)每頁(yè)顯示20條記錄,當(dāng)顯示不下的情況下,可以用it迭代命令查詢下一頁(yè)數(shù)據(jù)。注意:鍵入it命令不能帶“;” 但是你可以設(shè)置每頁(yè)顯示數(shù)據(jù)的大小,用DBQuery.shellBatchSize= 50;這樣每頁(yè)就顯示50條記錄了。
3.3.2查詢?nèi)サ艉蟮漠?dāng)前聚集集合中的某列的重復(fù)數(shù)據(jù)
- db. user.distinct("name");
- #會(huì)過(guò)濾掉name中的相同數(shù)據(jù) 相當(dāng)于:
- select distict name from user;
3.3.3查詢等于條件數(shù)據(jù)
- db.user.find({"age": 24});
- #相當(dāng)于:
- select * from user where age = 24;
3.3.4查詢大于條件數(shù)據(jù)
- db.user.find({age: {$gt: 24}});
- # 相當(dāng)于:
- select * from user where age >24;
3.3.5查詢小于條件數(shù)據(jù)
- db.user.find({age: {$lt: 24}});
- #相當(dāng)于:
- select * from user where age < 24;
3.3.6查詢大于等于條件數(shù)據(jù)
- db.user.find({age: {$gte: 24}});
- #相當(dāng)于:
- select * from user where age >= 24;
3.3.7查詢小于等于條件數(shù)據(jù)
- db.user.find({age: {$lte: 24}});
- #相當(dāng)于:
- select * from user where age <= 24;
3.3.8查詢AND和OR條件數(shù)據(jù)
- AND
- db.user.find({age: {$gte: 23, $lte: 26}});
- #相當(dāng)于
- select * from user where age >=23 and age <= 26;
- OR
- db.user.find({$or: [{age: 22}, {age: 25}]});
- #相當(dāng)于:
- select * from user where age = 22 or age = 25;
3.3.9模糊查詢
- db.user.find({name: /mongo/});
- #相當(dāng)于%%
- select * from user where name like '%mongo%';
3.3.10開(kāi)頭匹配
- db.user.find({name: /^mongo/});
- # 與SQL中得like語(yǔ)法類似
- select * from user where name like 'mongo%';
3.3.11指定列查詢
- db.user.find({}, {name: 1, age: 1});
- #相當(dāng)于:
- select name, age from user;
當(dāng)然name也可以用true或false,當(dāng)用ture的情況下和name:1效果一樣,如果用false就是排除name,顯示name以外的列信息。
3.3.12指定列查詢+條件查詢
- db.user.find({age: {$gt: 25}}, {name: 1, age: 1});
- #相當(dāng)于:
- select name, age from user where age > 25;
- db.user.find({name: 'zhangsan', age: 22});
- #相當(dāng)于:
- select * from user where name = 'zhangsan' and age = 22;
3.3.13排序
- #升序:
- db.user.find().sort({age: 1});
- #降序:
- db.user.find().sort({age: -1});
3.3.14查詢5條數(shù)據(jù)
- db.user.find().limit(5);
- #相當(dāng)于:
- select * from user limit 5;
3.3.15N條以后數(shù)據(jù)
- db.user.find().skip(10);
- #相當(dāng)于:
- select * from user where id not in ( select * from user limit 5 );
3.3.16在一定區(qū)域內(nèi)查詢記錄
- #查詢?cè)?~10之間的數(shù)據(jù)
- db.user.find().limit(10).skip(5);
可用于分頁(yè),limit是pageSize,skip是第幾頁(yè)*pageSize。
3.3.17COUNT
- db.user.find({age: {$gte: 25}}).count();
- #相當(dāng)于:
- select count(*) from user where age >= 20;
3.3.18安裝結(jié)果集排序
- db.userInfo.find({sex: {$exists: true}}).sort();
3.3.19不等于NULL
- db.user.find({sex: {$ne: null}})
- #相當(dāng)于:
- select * from user where sex not null;
3.4索引
創(chuàng)建索引,并指定主鍵字段,命令內(nèi)容如下所示:
- db.epd_favorites_folder.ensureIndex({"id":1},{"unique":true,"dropDups":true})
- db.epd_focus.ensureIndex({"id":1},{"unique":true,"dropDups":true})
3.5更新
update命令格式,如下所示:
- db.collection.update(criteria,objNew,upsert,multi)
參數(shù)說(shuō)明: criteria:
查詢條件 objNew:update對(duì)象和一些更新操作符
upsert:如果不存在update的記錄,是否插入objNew這個(gè)新的文檔,true為插入,默認(rèn)為false,不插入。
multi:默認(rèn)是false,只更新找到的***條記錄。如果為true,把按條件查詢出來(lái)的記錄全部更新。
下面給出一個(gè)示例,更新id為 1 中 price 的值,內(nèi)容如下所示:
- db. user.update({id: 1},{$set:{price:2}});
- #相當(dāng)于:
- update user set price=2 where id=1;
3.6刪除
3.6.1刪除指定記錄
- db. user. remove( { id:1 } );
- #相當(dāng)于:
- delete from user where id=1;
3.6.2刪除所有記錄
- db. user. remove( { } );
- #相當(dāng)于:
- delete from user;
3.6.3DROP
- db. user. drop();
- #相當(dāng)于:
- drop table user;
4.MapReduce
MongoDB中的 MapReduce 是編寫(xiě)JavaScript腳本,然后由MongoDB去解析執(zhí)行對(duì)應(yīng)的腳本,下面給出 Java API 操作MR。代碼如下所示:
MongdbManager類,用來(lái)初始化MongoDB:
- package cn.mongo.util;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.mongodb.DB;
- import com.mongodb.Mongo;
- import com.mongodb.MongoOptions;
- /**
- * @Date Mar 3, 2015
- *
- * @author dengjie
- *
- * @Note mongodb manager
- */
- public class MongdbManager {
- private static final Logger logger = LoggerFactory.getLogger(MongdbManager.class);
- private static Mongo mongo = null;
- private static String tag = SystemConfig.getProperty("dev.tag");
- private MongdbManager() {
- }
- static {
- initClient();
- }
- // get DB object
- public static DB getDB(String dbName) {
- return mongo.getDB(dbName);
- }
- // get DB object without param
- public static DB getDB() {
- String dbName = SystemConfig.getProperty(String.format("%s.mongodb.dbname", tag));
- return mongo.getDB(dbName);
- }
- // init mongodb pool
- private static void initClient() {
- try {
- String[] hosts = SystemConfig.getProperty(String.format("%s.mongodb.host", tag)).split(",");
- for (int i = 0; i < hosts.length; i++) {
- try {
- String host = hosts[i].split(":")[0];
- int port = Integer.parseInt(hosts[i].split(":")[1]);
- mongo = new Mongo(host, port);
- if (mongo.getDatabaseNames().size() > 0) {
- logger.info(String.format("connection success,host=[%s],port=[%d]", host, port));
- break;
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.error(String.format("create connection has error,msg is %s", ex.getMessage()));
- }
- }
- // 設(shè)置連接池的信息
- MongoOptions opt = mongo.getMongoOptions();
- opt.connectionsPerHost = SystemConfig.getIntProperty(String.format("%s.mongodb.poolsize", tag));// poolsize
- opt.threadsAllowedToBlockForConnectionMultiplier = SystemConfig.getIntProperty(String.format(
- "%s.mongodb.blocksize", tag));// blocksize
- opt.socketKeepAlive = true;
- opt.autoConnectRetry = true;
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
MongoDBFactory類,用來(lái)封裝操作業(yè)務(wù)代碼,具體內(nèi)容如下所示:
- package cn.mongo.util;
- import java.util.ArrayList;
- import java.util.List;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import cn.diexun.domain.MGDCustomerSchema;
- import com.mongodb.BasicDBList;
- import com.mongodb.DB;
- import com.mongodb.DBCollection;
- import com.mongodb.DBObject;
- import com.mongodb.util.JSON;
- /**
- * @Date Mar 3, 2015
- *
- * @Author dengjie
- */
- public class MongoDBFactory {
- private static Logger logger = LoggerFactory.getLogger(MongoDBFactory.class);
- // save data to mongodb
- public static void save(MGDCustomerSchema mgs, String collName) {
- DB db = null;
- try {
- db = MongdbManager.getDB();
- DBCollection coll = db.getCollection(collName);
- DBObject dbo = (DBObject) JSON.parse(mgs.toString());
- coll.insert(dbo);
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.error(String.format("save object to mongodb has error,msg is %s", ex.getMessage()));
- } finally {
- if (db != null) {
- db.requestDone();
- db = null;
- }
- }
- }
- // batch insert
- public static void save(List<?> mgsList, String collName) {
- DB db = null;
- try {
- db = MongdbManager.getDB();
- DBCollection coll = db.getCollection(collName);
- BasicDBList data = (BasicDBList) JSON.parse(mgsList.toString());
- List<DBObject> list = new ArrayList<DBObject>();
- int commitSize = SystemConfig.getIntProperty("mongo.commit.size");
- int rowCount = 0;
- long start = System.currentTimeMillis();
- for (Object dbo : data) {
- rowCount++;
- list.add((DBObject) dbo);
- if (rowCount % commitSize == 0) {
- try {
- coll.insert(list);
- list.clear();
- logger.info(String.format("current commit rowCount = [%d],commit spent time = [%s]s", rowCount,
- (System.currentTimeMillis() - start) / 1000.0));
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.error(String.format("batch commit data to mongodb has error,msg is %s", ex.getMessage()));
- }
- }
- }
- if (rowCount % commitSize != 0) {
- try {
- coll.insert(list);
- logger.info(String.format("insert data to mongo has spent total time = [%s]s",
- (System.currentTimeMillis() - start) / 1000.0));
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.error(String.format("commit end has error,msg is %s", ex.getMessage()));
- }
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.error(String.format("save object list to mongodb has error,msg is %s", ex.getMessage()));
- } finally {
- if (db != null) {
- db.requestDone();
- db = null;
- }
- }
- }
- }
LoginerAmountMR類,這是一個(gè)統(tǒng)計(jì)登錄用戶數(shù)的MapReduce計(jì)算類,代碼如下:
- package cn.mongo.mapreduce;
- import java.sql.Timestamp;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import org.bson.BSONObject;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import cn.diexun.conf.ConfigureAPI.MR;
- import cn.diexun.conf.ConfigureAPI.PRECISION;
- import cn.diexun.domain.Kpi;
- import cn.diexun.util.CalendarUtil;
- import cn.diexun.util.MongdbManager;
- import cn.diexun.util.MysqlFactory;
- import com.mongodb.DB;
- import com.mongodb.DBCollection;
- import com.mongodb.DBCursor;
- import com.mongodb.DBObject;
- import com.mongodb.MapReduceOutput;
- import com.mongodb.ReadPreference;
- /**
- * @Date Mar 13, 2015
- *
- * @Author dengjie
- *
- * @Note use mr jobs stats user login amount
- */
- public class LoginerAmountMR {
- private static Logger logger = LoggerFactory.getLogger(LoginerAmountMR.class);
- // map 函數(shù)JS字符串拼接
- private static String map() {
- String map = "function(){";
- map += "if(this.userName != \"\"){";
- map += "emit({" + "kpi_code:'login_times',username:this.userName,"
- + "district_id:this.districtId,product_style:this.product_style,"
- + "customer_property:this.customer_property},{count:1});";
- map += "}";
- map += "}";
- return map;
- }
- private static String reduce() {
- String reduce = "function(key,values){";
- reduce += "var total = 0;";
- reduce += "for(var i=0;i<values.length;i++){";
- reduce += "total += values[i].count;}";
- reduce += "return {count:total};";
- reduce += "}";
- return reduce;
- }
- // reduce 函數(shù)字符串拼接
- public static void main(String[] args) {
- loginNumbers("t_login_20150312");
- }
- /**
- * login user amount
- *
- * @param collName
- */
- public static void loginNumbers(String collName) {
- DB db = null;
- try {
- db = MongdbManager.getDB();
- db.setReadPreference(ReadPreference.secondaryPreferred());
- DBCollection coll = db.getCollection(collName);
- String result = MR.COLLNAME_TMP;
- long start = System.currentTimeMillis();
- MapReduceOutput mapRed = coll.mapReduce(map(), reduce(), result, null);
- logger.info(String.format("mr run spent time=%ss", (System.currentTimeMillis() - start) / 1000.0));
- start = System.currentTimeMillis();
- DBCursor cursor = mapRed.getOutputCollection().find();
- List<Kpi> list = new ArrayList<Kpi>();
- while (cursor.hasNext()) {
- DBObject obj = cursor.next();
- BSONObject key = (BSONObject) obj.get("_id");
- BSONObject value = (BSONObject) obj.get("value");
- Object kpiValue = value.get("count");
- Object userName = key.get("username");
- Object districtId = key.get("district_id");
- Object customerProperty = key.get("customer_property");
- Object productStyle = key.get("product_style");
- Kpi kpi = new Kpi();
- try {
- kpi.setUserName(userName == null ? "" : userName.toString());
- kpi.setKpiCode(key.get("kpi_code").toString());
- kpi.setKpiValue(Math.round(Double.parseDouble(kpiValue.toString())));
- kpi.setCustomerProperty(customerProperty == null ? "" : customerProperty.toString());
- kpi.setDistrictId(districtId == "" ? 0 : Integer.parseInt(districtId.toString()));
- kpi.setProductStyle(productStyle == null ? "" : productStyle.toString());
- kpi.setCreateDate(collName.split("_")[2]);
- kpi.setUpdateDate(Timestamp.valueOf(CalendarUtil.formatMap.get(PRECISION.HOUR).format(new Date())));
- list.add(kpi);
- } catch (Exception exx) {
- exx.printStackTrace();
- logger.error(String.format("parse type or get value has error,msg is %s", exx.getMessage()));
- }
- }
- MysqlFactory.insert(list);
- logger.info(String.format("store mysql spent time is %ss", (System.currentTimeMillis() - start) / 1000.0));
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.error(String.format("run map-reduce jobs has error,msg is %s", ex.getMessage()));
- } finally {
- if (db != null) {
- db.requestDone();
- db = null;
- }
- }
- }
- }
5.總結(jié)
在計(jì)算 MongoDB 的MapReduce計(jì)算的時(shí)候,拼接JavaScript字符串時(shí)需要謹(jǐn)慎小心,很容易出錯(cuò),上面給出的代碼只是一部分代碼,供參考學(xué)習(xí)使用;另外,若是要做MapReduce任務(wù)計(jì)算,推薦使用Hadoop的MapReduce計(jì)算框架,MongoDB的MapReduce框架這里僅做介紹學(xué)習(xí)了解。