distributeTemplate分布式配置,上下文的同步維護(hù)
我們實(shí)現(xiàn)分布式 也是按照 這樣一步一步來(lái)的,首先是公用的資源對(duì)象,***個(gè)必須的公用資源 對(duì)象是配置,這配置交給用戶外部填寫 手動(dòng)配置 并能維持同步 更新 所以需要一個(gè)配置對(duì)象 維護(hù)在內(nèi)存里面,需要事件驅(qū)動(dòng)監(jiān)聽配置文件的變化情況
ok下面來(lái) 看看代碼是怎么做的 ,首先 配置 有多重配置方式 ini conf prop xml 各種方式本質(zhì)上 我們分布式就是要各臺(tái)主機(jī) 自己所在的節(jié)點(diǎn) 都能知道我在網(wǎng)絡(luò)上情況,或者所可以找到像zookeeper 只要知道或者找到了 才能進(jìn)行以后的通訊同步
我們?yōu)榱四軌蛞院笾С侄嘀嘏渲?,所以先把配置定義為一個(gè)接口
- public interface IConfig {
- public static final String DEFAULTFILE="distribute.conf";
- /**
- *
- * 從文件讀取配置初始化
- * @param file
- * 添加(修改)人:zhuyuping
- */
- public void readConfigFormFile(String file);
- /**
- * 從用戶提供的json字符串中初始化
- * @param json
- * 添加(修改)人:zhuyuping
- */
- public void readConfigFormString(String json);
- /**
- * 獲得系統(tǒng)內(nèi)存中維護(hù)的內(nèi)存表
- *
- * 添加(修改)人:zhuyuping
- */
- public Table<InetSocketAddress, String, Object> getConfig();
- /**
- * 攝入context上下文
- * @param context
- * 添加(修改)人:zhuyuping
- */
- public void setContext(Context context);
- /**
- * 獲得上下文context
- * @return
- * 添加(修改)人:zhuyuping
- */
- public Context getContext();
- public BiMap<String, InetSocketAddress> getAlias();
- }
主要 是3個(gè)方法 ,一個(gè)是讀配置文件到內(nèi)存對(duì)象中,擰一個(gè)就是內(nèi)存對(duì)象寫到文件中,對(duì)于分布式還需要一個(gè)同步
有時(shí)候 我們可能需要?dú)v史版本 能夠容錯(cuò)還原 所以可能需要一個(gè)版本號(hào) 記錄當(dāng)前版本,然后保持有可能多個(gè)節(jié)點(diǎn)上配置文件更改后,發(fā)送請(qǐng)求時(shí)候能夠保持順序的更新,這后面我會(huì)處理 加入一個(gè)分布式先進(jìn)先出的隊(duì)列的,這里暫時(shí)未加 后面 加了會(huì)補(bǔ)上的,還有上下文context 對(duì)象,有人會(huì)說(shuō) 你為什么需要上下文對(duì)象去保存變量啊,好多框架 都有上下文這個(gè)對(duì)象,也許是攔截器模式 也許是門面模式 等等其他模式,但是 這里我都不是,其實(shí)上下文只是為了本地節(jié)點(diǎn)的各個(gè)功能代碼段之間的一個(gè)橋梁,有句話叫做 兩情若是久長(zhǎng)時(shí) 又豈在朝朝暮暮 我們有鵲橋,他就是上下文最重要的本質(zhì) 就是維護(hù)本地節(jié)點(diǎn)上下文貫穿 如果上下文 里面保存著門面 那么他就有門面的功能 便于用戶隨時(shí)獲取門面對(duì)象 進(jìn)行操作,ok我們來(lái)看看 context是怎么定義的
- /**
- *
- *
- *
- * @author zhuyuping
- * @version 1.0
- * @created 2014-7-9
- * @function:上下文接口 他只是存儲(chǔ)用戶上面類所有過(guò)程中的變量不是config配置而且分布式中不會(huì)同步的只會(huì)在單點(diǎn)上有效 切記 、、后期如果想支持xml 配置或者其他配置 可以添加策略模式
- */
- public interface Context {
- public final static String defaultConfig="distribute.conf";//默認(rèn)配置名
- public void putValue(String key,Object value);
- public Object getValue(String key);
- public void setCurrHost(String host,int port);
- public InetSocketAddress getCurrHost();
- /**
- * 獲得默認(rèn)配置文件
- * @return
- * 添加(修改)人:zhuyuping
- */
- public String getDefaultFc();
- // /**
- // * 設(shè)置默認(rèn)屬性文件的名稱
- // * @param pfile
- // * 添加(修改)人:zhuyuping
- // */
- // public void setDefaultFc(String pfile);
- // /**
- // * 注入template 門面 便于后面直接通過(guò)上下文來(lái)使用 如果要整合spring ApplicationContextAware
- // * @param distributedTemplate
- // * 添加(修改)人:zhuyuping
- // */
- // public void setTemplate(DistributedOperations distributedTemplate);
- //
- // public DistributedOperations getTemplate();
- }
這里其實(shí) 就是一個(gè)map 保存屬性key value 而常用的就取出作為一個(gè)方法了
這個(gè)context 因?yàn)楹竺嫖覀兘o用戶一個(gè)繼承的 這樣便于 用戶實(shí)現(xiàn)自己的上下文 或交給其他框架上下文 以及整合所以我們實(shí)現(xiàn)了一個(gè)抽象的 默認(rèn)實(shí)現(xiàn)
- /**
- *
- *
- *
- * @author zhuyuping
- * @version 1.0
- * @created 2014-7-9 下午5:58:37
- * @function:抽象的上下文 主要是 管理context的資源 還有就是提供用戶自定義 整合spring使用該類 //這里后期需整合策略 實(shí)現(xiàn)
- */
- public abstract class AbstractContext implements Context{
- //?也可以使用LocalThread 也可以
- private Map<String,Object> context=Maps.newConcurrentMap();
- private InetSocketAddress currHost;//當(dāng)前主機(jī) 比如192.168.0.1 8888
- private String dfConfig;//默認(rèn)讀取的配置文件 當(dāng)用戶 提供就修改 沒(méi)有提供就默認(rèn)
- public AbstractContext(String dfConfig) {
- super();
- this.dfConfig = dfConfig;
- //當(dāng)前classes 下的文件
- //currentPort
- this.currHost=new InetSocketAddress(ConfigFactory.load(dfConfig).getString("client.currentHost"),ConfigFactory.load(dfConfig).getInt("client.currentPort"));
- }
- @Override
- public InetSocketAddress getCurrHost() {
- return currHost;
- }
- @Override
- public void setCurrHost(String host,int port) {
- this.currHost=new InetSocketAddress(host, port);
- }
- @Override
- public String getDefaultFc() {
- return dfConfig!=null?dfConfig:defaultConfig;
- }
- public AbstractContext() {
- super();
- this.dfConfig=defaultConfig;
- this.currHost=new InetSocketAddress(ConfigFactory.load(defaultConfig).getString("client.currentHost"),ConfigFactory.load(defaultConfig).getInt("client.currentPort"));
- }
- @Override
- public void putValue(String key, Object value) {
- context.put(key, value);
- }
- @Override
- public Object getValue(String key) {
- return context.get(key);
- }
- }
ok 很簡(jiǎn)單的 維護(hù)者
然后回到剛才的配置 ,首先 我們配置文件 需要從文件讀取 到配置對(duì)象中 ,這是為了用戶更改 或者初始化時(shí)候 吧配置文件初始化到配置內(nèi)存對(duì)象中 然后這個(gè)內(nèi)存對(duì)象將會(huì)在同步 配置文件 更改 網(wǎng)絡(luò)通訊時(shí)候用到,在對(duì)于全局的所有節(jié)點(diǎn)的淪陷時(shí)候 單純的context只維護(hù)本節(jié)點(diǎn)橋梁信息的 已經(jīng)不夠用了 因?yàn)樗粫?huì)同步的,這就是為什么需要他的原因,我這里采用的配置文件時(shí)conf 也就是configLoad方式,后面我會(huì)逐步添加更多的支持方式 無(wú)非是xml 等讀取問(wèn)題,這并不重要 思想才是重要的
- /**
- *
- *
- *
- * @author zhuyuping
- * @version 1.0
- * @created 2014-7-9 下午4:07:00 如果后期需要支持xml 其他 只需要使用策略模式
- * @function:基類config 主要維持 配置基本信息 以及配置信息的同步 備份 同時(shí)內(nèi)存中維持一張內(nèi)存表table
- */
- public abstract class AbstractConfig implements IConfig{
- /**
- * 當(dāng)前的配置表 行 為主機(jī)別名 其中一定有一列為版本號(hào) AotmicLong 以及配置的相關(guān)字段 值為相關(guān)的對(duì)象
- */
- protected Table<InetSocketAddress, String, Object> config=HashBasedTable.create();//table信息 table 信息 這里無(wú)需要用到 哪個(gè)ConcurrentHashMap<K, V> 因?yàn)檫@個(gè)只會(huì)加載讀取 加載
- //不會(huì)修改,因?yàn)檫@個(gè)table當(dāng)用戶真的更新config后 會(huì)同步并同時(shí)刷到更新到文件中的 ,而且每次用戶提供查詢的配置
- //是不會(huì)更新到文件里面的
- protected AtomicLong version=new AtomicLong(0);//初始化版本為0;//判斷當(dāng)前的版本 是佛在config 里面是否存在
- protected BiMap<String,InetSocketAddress> alias=HashBiMap.create();
- protected Context context;
- public BiMap<String, InetSocketAddress> getAlias() {
- return alias;
- }
- /**
- * context 需要提供當(dāng)前主機(jī) 以及
- * @param context
- */
- public AbstractConfig(Context context) {
- super();
- this.context = context;
- wrapConfig(ConfigFactory.load(context.getDefaultFc()));
- }
- @Override
- public void setContext(Context context) {
- this.context=context;
- }
- @Override
- public Context getContext() {
- return context;
- }
- @Override
- public void readConfigFormFile(String file) {
- Config config=TypeSafeConfigLoadUtils.loadFromFile(new File(file));
- wrapConfig(config);
- }
- @Override
- public void readConfigFormString(String json) {
- Config config=TypeSafeConfigLoadUtils.loadFromString(json);
- wrapConfig(config);
- }
- /**
- * 對(duì)config進(jìn)行初始化 table
- * @param config
- * 添加(修改)人:zhuyuping
- */
- protected abstract void wrapConfig(Config config);
- /**
- * 把table 從內(nèi)存中讀取從新寫入到配置文件中
- * @param config
- * 添加(修改)人:zhuyuping
- */
- protected abstract String wrapTable(Table<String, String, Object> config);
- /**
- * 只保留最近的5個(gè)版本 可以回滾 更新***的
- *
- * 添加(修改)人:zhuyuping
- */
- public void updateVersion(Long version){
- }
- /**
- * 版本數(shù)更新
- * 更新完后 需要
- * 添加(修改)人:zhuyuping
- */
- public void addVersion(){
- Long v=version.getAndIncrement();
- //TODO 需要通知所有節(jié)點(diǎn) 我要修改版本了 如果同時(shí)有幾個(gè)人也這樣 那么接受該節(jié)點(diǎn)下次更新的版本號(hào),
- //在回調(diào)函數(shù)中 更新配置 隨后同步 只保留5個(gè)版本
- }
- @Override
- public Table<InetSocketAddress, String, Object> getConfig() {
- return config;
- }
- /**
- *
- * 提交對(duì)文件配置做出的修改
- * 添加(修改)人:zhuyuping
- */
- protected abstract void commit();
- /**
- *
- * 提交對(duì)配置的修改 如果一個(gè)人在一個(gè)節(jié)點(diǎn)上 更改了配置 需要核對(duì)版本 并從新更新本地的配置文件
- * 添加(修改)人:zhuyuping
- */
- protected abstract void sync();
- }
這里 我為了好維護(hù) 就直接使用guava的table ,其實(shí)你可以用List< map> 實(shí)現(xiàn),這里 重要是獲取所有主機(jī)列表的方法
然后就是配置文件讀取后 寫入context 對(duì)象 當(dāng)然 上面說(shuō)的讀取配置到內(nèi)存對(duì)象 ,內(nèi)存對(duì)象寫入到配置文件是基礎(chǔ)
然后看看怎么寫入的 我為了以后支持xml 所以這讀取方式 寫入方式 交給后面的子類實(shí)現(xiàn)類去實(shí)現(xiàn) 這樣實(shí)現(xiàn)xml只要實(shí)現(xiàn)這2個(gè)方法即可
- /**
- *
- *
- *
- * @author zhuyuping
- * @version 1.0
- * @created 2014-7-9 下午10:38:05
- * @function:默認(rèn)的配置 允許用戶實(shí)現(xiàn)自定義的配置規(guī)則 只需要繼承 AbstractConfig
- */
- public class SimpleDistributeConfig extends AbstractConfig{
- public SimpleDistributeConfig(Context context) {
- super(context);
- }
- @Override
- protected void wrapConfig(Config configObj) {
- //得到所有的節(jié)點(diǎn)
- List<? extends ConfigObject> nodes=configObj.getObjectList("server.hosts");
- int i=0;
- for (ConfigObject node : nodes) {
- i++;
- //如果后期添加其他mysql 等支持 這里就需要添加判斷
- //Integer localport=node.containsKey("localPort")?Integer.parseInt(node.get("localPort").render()):LOCALPORT;//Integer.parseInt(node.get("localPort").render());
- Integer remotePort=Integer.parseInt(node.get("remotePort").render());
- String remoteIp=node.get("remoteHost").unwrapped().toString();//遠(yuǎn)程主機(jī)的ip
- //開始初始化配置table
- String name=node.containsKey("name")?node.get("name").unwrapped().toString():remoteIp;//主機(jī)別名
- InetSocketAddress remoteHost=new InetSocketAddress(remoteIp, remotePort);
- super.alias.put(name, remoteHost);
- super.config.put(remoteHost,"version", super.version.incrementAndGet());
- super.config.put(remoteHost, "remoteHost", remoteHost);
- //super.config.put(remoteHost, "localPort", localport);
- super.config.put(remoteHost, "remotePort", remotePort);
- super.config.put(remoteHost, "name", name);
- if(node.containsKey("file")){
- HashMap fcs=(HashMap) node.get("file").unwrapped();
- String syncPath=fcs.get("syncPath").toString();//文件同步的路徑
- // System.out.println("SimpleDistributeConfig.wrapConfig() "+syncPath);
- super.config.put(remoteHost, "file", syncPath);//以后配置多的時(shí)候 分裝成一個(gè)bean存入
- }
- }
- String chost=configObj.getString("client.currentHost");
- int port=configObj.getInt("client.currentPort");
- super.context.setCurrHost(chost, port);
- //config.root().containsKey(key)
- //config.getString(path);
- }
- @Override
- protected String wrapTable(Table<String, String, Object> table) {
- StringBuilder sb=new StringBuilder("server{");
- sb.append("\n\t").append("hosts=[");
- Set<String> rows=table.rowKeySet();
- int size=rows.size();
- int i=0;
- for (String row : rows) {
- i++;
- Map<String,Object> map=table.row(row);
- if(!map.containsKey("remoteHost")) continue;
- sb.append("\t {");
- Object remoteHost=map.get("remoteHost");
- Object remotePort=map.get("remotePort");
- //Object localPort=map.get("localPort");
- Object name=map.get("name");
- sb.append("name=").append(name).append("\n\t");
- //sb.append("localPort=").append(localPort).append("\n\t");
- sb.append("remoteHost=").append(remoteHost).append("\n\t");
- sb.append("remotePort=").append(remotePort).append("\n\t");
- if(map.containsKey("file")){
- sb.append("file{").append("\n\t").append("syncPath=").append(map.get("syncPath")).append("}");
- }
- sb.append("\t }");
- if(i!=size){
- sb.append(",");
- }
- }
- sb.append("]").append("\n\t").append("}");
- //繼續(xù) 保存client
- sb.append("\n\t").append("client{").append("\n\t").append("currentHost=").append(context.getCurrHost().getHostString()).append("\n\t");
- sb.append("\n\t").append("currentPort=").append(context.getCurrHost().getPort()).append("\n\t");
- sb.append("}");
- return sb.toString();
- }
- @Override
- protected void commit() {
- }
- @Override
- protected void sync() {
- }
- public static void main(String[] args) {
- }
- }
ok 基本的簡(jiǎn)單的 配置 以及 上下文 這些用來(lái)同步的 用來(lái)做橋梁的都已經(jīng)做好了 下面就是怎么監(jiān)聽配置文件的更改,有人說(shuō)怎么監(jiān)聽一個(gè)配置文件更改啊,其實(shí) 文件有個(gè)屬性 ***更改的時(shí)間,只要監(jiān)聽這個(gè)就可以 初級(jí)版本沒(méi)有加上更改,這個(gè)后面可以隨時(shí)加上 而且為了以后好更改 只寫了相關(guān)方式?jīng)]有 加上
- import java.io.File;
- public interface FileListener {
- public void fileChanged(File file);
- }
然后用定時(shí)器 timeer 或者線程池定時(shí)線程去輪訓(xùn)他 ,
這里 注意弱應(yīng)用 ,其實(shí) 對(duì)于這些時(shí)間 我建議用LinkTransferQueue 他是FIFO 先進(jìn)先出的無(wú)阻曬的隊(duì)列 然后隊(duì)列 加上若引用,保證時(shí)間的一些順序,
然后 通知了接口 在接口里面我們?cè)谧鱿鄳?yīng)的更改配置 同步配置 的相關(guān)操作,這一章基本思想就是
同步 資源 配置 上下文為橋梁 事件驅(qū)動(dòng)為引擎
還有支出的是 timer excutors.newSchulerXXXX 還有很多方式實(shí)現(xiàn)輪訓(xùn)的方式 這種方式 也可以實(shí)現(xiàn)心跳線
同時(shí) 告訴大家的事 Apache、 camel| 里面的事件驅(qū)動(dòng)文件部分 核心代碼就是上面 這一小部分
代碼 沒(méi)有貼完整 ,但是代碼已經(jīng)托管到 http://git.oschina.net/zhuyuping/distributeTemplate
本文鏈接:http://my.oschina.net/yilian/blog/295667