成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Kafka Java客戶端代碼示例

開發 后端 Kafka
kafka是linkedin用于日志處理的分布式消息隊列,linkedin的日志數據容量大,但對可靠性要求不高,其日志數據主要包括用戶行為(登錄、瀏覽、點擊、分享、喜歡)以及系統運行日志(CPU、內存、磁盤、網絡、系統及進程狀態)……

介紹      http://kafka.apache.org

kafka是一種高吞吐量的分布式發布訂閱消息系統

kafka是linkedin用于日志處理的分布式消息隊列,linkedin的日志數據容量大,但對可靠性要求不高,其日志數據主要包括用戶行為(登錄、瀏覽、點擊、分享、喜歡)以及系統運行日志(CPU、內存、磁盤、網絡、系統及進程狀態)

 當前很多的消息隊列服務提供可靠交付保證,并默認是即時消費(不適合離線)。

高可靠交付對linkedin的日志不是必須的,故可通過降低可靠性來提高性能,同時通過構建分布式的集群,允許消息在系統中累積,使得kafka同時支持離線和在線日志處理

測試環境

kafka_2.10-0.8.1.1 3個節點做的集群

zookeeper-3.4.5 一個實例節點

代碼示例

消息生產者代碼示例

  1. import java.util.Collections;  
  2. import java.util.Date;  
  3. import java.util.Properties;  
  4. import java.util.Random;  
  5.    
  6. import kafka.javaapi.producer.Producer;  
  7. import kafka.producer.KeyedMessage;  
  8. import kafka.producer.ProducerConfig;  
  9.    
  10. /**  
  11.  * 詳細可以參考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example  
  12.  * @author Fung  
  13.  *  
  14.  */ 
  15. public class ProducerDemo {  
  16.     public static void main(String[] args) {  
  17.         Random rnd = new Random();  
  18.         int events=100;  
  19.    
  20.         // 設置配置屬性  
  21.         Properties props = new Properties();  
  22.         props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");  
  23.         props.put("serializer.class""kafka.serializer.StringEncoder");  
  24.         // key.serializer.class默認為serializer.class  
  25.         props.put("key.serializer.class""kafka.serializer.StringEncoder");  
  26.         // 可選配置,如果不配置,則使用默認的partitioner  
  27.         props.put("partitioner.class""com.catt.kafka.demo.PartitionerDemo");  
  28.         // 觸發acknowledgement機制,否則是fire and forget,可能會引起數據丟失  
  29.         // 值為0,1,-1,可以參考  
  30.         // http://kafka.apache.org/08/configuration.html  
  31.         props.put("request.required.acks""1");  
  32.         ProducerConfig config = new ProducerConfig(props);  
  33.    
  34.         // 創建producer  
  35.         Producer<String, String> producer = new Producer<String, String>(config);  
  36.         // 產生并發送消息  
  37.         long start=System.currentTimeMillis();  
  38.         for (long i = 0; i < events; i++) {  
  39.             long runtime = new Date().getTime();  
  40.             String ip = "192.168.2." + i;//rnd.nextInt(255);  
  41.             String msg = runtime + ",www.example.com," + ip;  
  42.             //如果topic不存在,則會自動創建,默認replication-factor為1,partitions為0  
  43.             KeyedMessage<String, String> data = new KeyedMessage<String, String>(  
  44.                     "page_visits", ip, msg);  
  45.             producer.send(data);  
  46.         }  
  47.         System.out.println("耗時:" + (System.currentTimeMillis() - start));  
  48.         // 關閉producer  
  49.         producer.close();  
  50.     }  

消息消費者代碼示例

  1. import java.util.HashMap;  
  2. import java.util.List;  
  3. import java.util.Map;  
  4. import java.util.Properties;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7.    
  8. import kafka.consumer.Consumer;  
  9. import kafka.consumer.ConsumerConfig;  
  10. import kafka.consumer.KafkaStream;  
  11. import kafka.javaapi.consumer.ConsumerConnector;  
  12.    
  13. /**  
  14.  * 詳細可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example  
  15.  *   
  16.  * @author Fung  
  17.  *  
  18.  */ 
  19. public class ConsumerDemo {  
  20.     private final ConsumerConnector consumer;  
  21.     private final String topic;  
  22.     private ExecutorService executor;  
  23.    
  24.     public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {  
  25.         consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));  
  26.         this.topic = a_topic;  
  27.     }  
  28.    
  29.     public void shutdown() {  
  30.         if (consumer != null)  
  31.             consumer.shutdown();  
  32.         if (executor != null)  
  33.             executor.shutdown();  
  34.     }  
  35.    
  36.     public void run(int numThreads) {  
  37.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  38.         topicCountMap.put(topic, new Integer(numThreads));  
  39.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  
  40.                 .createMessageStreams(topicCountMap);  
  41.         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
  42.    
  43.         // now launch all the threads  
  44.         executor = Executors.newFixedThreadPool(numThreads);  
  45.    
  46.         // now create an object to consume the messages  
  47.         //  
  48.         int threadNumber = 0;  
  49.         for (final KafkaStream stream : streams) {  
  50.             executor.submit(new ConsumerMsgTask(stream, threadNumber));  
  51.             threadNumber++;  
  52.         }  
  53.     }  
  54.    
  55.     private static ConsumerConfig createConsumerConfig(String a_zookeeper,  
  56.             String a_groupId) {  
  57.         Properties props = new Properties();  
  58.         props.put("zookeeper.connect", a_zookeeper);  
  59.         props.put("group.id", a_groupId);  
  60.         props.put("zookeeper.session.timeout.ms""400");  
  61.         props.put("zookeeper.sync.time.ms""200");  
  62.         props.put("auto.commit.interval.ms""1000");  
  63.    
  64.         return new ConsumerConfig(props);  
  65.     }  
  66.    
  67.     public static void main(String[] arg) {  
  68.         String[] args = { "172.168.63.221:2188""group-1""page_visits""12" };  
  69.         String zooKeeper = args[0];  
  70.         String groupId = args[1];  
  71.         String topic = args[2];  
  72.         int threads = Integer.parseInt(args[3]);  
  73.    
  74.         ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);  
  75.         demo.run(threads);  
  76.    
  77.         try {  
  78.             Thread.sleep(10000);  
  79.         } catch (InterruptedException ie) {  
  80.    
  81.         }  
  82.         demo.shutdown();  
  83.     }  

消息處理類

  1. import kafka.consumer.ConsumerIterator;  
  2. import kafka.consumer.KafkaStream;  
  3.    
  4. public class ConsumerMsgTask implements Runnable {  
  5.     private KafkaStream m_stream;  
  6.     private int m_threadNumber;  
  7.    
  8.     public ConsumerMsgTask(KafkaStream stream, int threadNumber) {  
  9.         m_threadNumber = threadNumber;  
  10.         m_stream = stream;  
  11.     }  
  12.    
  13.     public void run() {  
  14.         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();  
  15.         while (it.hasNext())  
  16.             System.out.println("Thread " + m_threadNumber + ": " 
  17.                     + new String(it.next().message()));  
  18.         System.out.println("Shutting down Thread: " + m_threadNumber);  
  19.     }  

Partitioner類示例

  1. import kafka.producer.Partitioner;  
  2. import kafka.utils.VerifiableProperties;  
  3.    
  4. public class PartitionerDemo implements Partitioner {  
  5.     public PartitionerDemo(VerifiableProperties props) {  
  6.    
  7.     }  
  8.    
  9.     @Override 
  10.     public int partition(Object obj, int numPartitions) {  
  11.         int partition = 0;  
  12.         if (obj instanceof String) {  
  13.             String key=(String)obj;  
  14.             int offset = key.lastIndexOf('.');  
  15.             if (offset > 0) {  
  16.                 partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;  
  17.             }  
  18.         }else{  
  19.             partition = obj.toString().length() % numPartitions;  
  20.         }  
  21.            
  22.         return partition;  
  23.     }  
  24.    

參考

https://cwiki.apache.org/confluence/display/KAFKA/Index

https://kafka.apache.org/

原文鏈接:http://my.oschina.net/cloudcoder/blog/299215

責任編輯:林師授 來源: cloud-coder的博客
相關推薦

2010-03-18 16:49:43

Java Socket

2010-03-18 17:30:46

Java Socket

2021-05-07 15:28:03

Kafka客戶端Sarama

2010-03-18 17:47:07

Java 多客戶端通信

2017-01-11 10:38:17

MySQL客戶端代碼

2010-04-21 12:57:33

RAC負載均衡配置

2021-09-22 15:46:29

虛擬桌面瘦客戶端胖客戶端

2011-08-17 10:10:59

2022-09-23 08:02:42

Kafka消息緩存

2022-08-01 08:04:58

MySQL客戶端字符

2011-03-24 13:00:31

配置nagios客戶端

2011-03-02 14:36:24

Filezilla客戶端

2010-12-21 11:03:15

獲取客戶端證書

2011-10-26 13:17:05

2010-05-31 10:11:32

瘦客戶端

2009-03-04 10:27:50

客戶端組件桌面虛擬化Xendesktop

2011-03-21 14:53:36

Nagios監控Linux

2011-04-06 14:24:20

Nagios監控Linux

2013-05-09 09:33:59

2010-02-24 16:17:09

WCF獲取客戶端IP
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品视频观看 | 欧美午夜影院 | 一级黄a | 成年人网站免费 | 精品一区在线 | 国产一区二区在线免费观看 | 日本在线视频中文字幕 | 伊人狠狠 | 在线看黄免费 | av片免费 | 看特级黄色片 | 日韩手机在线视频 | 在线成人免费视频 | 日本免费在线观看视频 | 欧美日韩一本 | 日本电影免费完整观看 | 超碰男人天堂 | 欧美福利在线 | 久久成人精品视频 | 中文字幕一区二区在线观看 | 欧美性受xxxx白人性爽 | 国产精品一区二区av | 成人在线播放网站 | 亚洲免费视频一区 | 天天摸天天干 | 免费看黄色片 | 涩涩视频在线观看 | 亚洲精品乱码久久久久久按摩观 | 观看av | 亚洲一区二区三区四区五区中文 | 欧美福利视频 | 中文字幕一区二区三区四区五区 | 伊人伊人| 99reav| 亚洲欧美日韩电影 | www久久久 | 日韩一区二区在线观看 | 日韩中文字幕 | 日本一区二区三区四区 | 一区二区三区观看视频 | 天天澡天天操 |