成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

細說Kestrel.scala中的PersistentQueue

開發 后端
本文是Scala代碼實例之Kestrel的一部分,本部分繼續講述PersistentQueue。PersistentQueue有兩個“類”,一個是object PersistentQueue,一個是class PersistentQueue。文章具體介紹了這兩個類的情況。

本文是Scala代碼實例之Kestrel的第五部分,繼續講述PersistentQueue處理消息隊列并發請求的方式。

回顧一下之前我們讀過的兩個文件,Kestrel.scala, QueueCollection.scala。Kestrel.scala是啟動文件,并且通過一個actor,保持整個項目不會因為沒有線程運行而退出,同時注冊了一個acceptor,當建立起新的鏈接的時候,訪問 KestrelHandler.scala(這個稍后我們再讀)。QueueCollection.scala,維護一個PersistentQueue的隊列,如果訪問的queue_name不存在,則創建一個,如果存在,就對相應的QueueCollection進行操作。如果留心的話,我們還可以看到QueueCollection在啟動的時候,queue_name的來源是一個文件目錄。

我們就從這個入口繼續往下,看看PersistentQueue是如何處理消息隊列的并發請求的:

在前幾篇文章里面,我們曾經提到過PersistentQueue有兩個“類”,一個是object PersistentQueue,一個是class PersistentQueue。而object在scala是一個單例模式,也就是singleton。也可以看做是只有static類型的java類。現在讓我們關注一下,看看class PersistentQueue和object Persistent之間的關系是怎樣的。

剛開始的一段代碼有點嚇人:

  1. class OverlaySetting[T](base: => T) {  
  2.   @volatile private var local: Option[T] = None  
  3.   def set(value: Option[T]) = local = value  
  4.   def apply() = local.getOrElse(base)  
  5. }  

我們先跳過去,直接往下看,看到這里:

  1. def overlay[T](base: => T) = new OverlaySetting(base)  
  2. // attempting to add an item after the queue reaches this size (in items) will fail.  
  3. val maxItems = overlay(PersistentQueue.maxItems)  
  4. // attempting to add an item after the queue reaches this size (in bytes) will fail.  
  5. val maxSize = overlay(PersistentQueue.maxSize)  
  6. ……  

如果我們不細究overlay的內容,這段代碼其實就是把object PersisitentQueue中的變量賦值給class PersistentQueue中,那么overlay究竟做了什么呢?其實,overlay是將變量做了一個封裝,封裝在一個叫做OverlaySetting的類里面。這個類,根據我們之前對scala語法的了解,可以知道,它是一個OverlaySetting[T]的類,并且在創建的時候,需要帶入方法,方法沒有參數,但是有一個返回值,類型就是T。(關于class類的語法規則,可以參考http://programming-scala.labs.oreilly.com/ch05.html#Constructors,不過里面的例子比OverlaySetting還復雜……-_-|||)

這個類在每次創建對象的時候,都會被賦值。我們也看到只有在使用apply方法的時候才會被調用(不過我沒有太想明白,如何通過函數的返回值來確定模板中的類型T,也許這就是Scala這種更加靈活的編譯算法,可以在new對象的時候,通過審查變量類型來獲取T的吧,畢竟Scala是一個靜態語言,如果是動態語言就不太成為一個問題了)。

這里面還存在一個Scala概念,就是方法=變量。當然在很多動態語言里面就已經這么做了。在Scala里面,我們可以把def看作是val的一種特殊寫法,def聲明的方法,也可以用 def func_name() = {} 這樣的語法規則,跟val基本就是一回事了。當然,這一改變在Scala里面并不簡單是一個語法規則的問題,更進一步的,所有的變量也都是類,所以我們可以把一個變量,看做一個類,也可以看做類的建構函數,返回的就是類本身……有點繞,不過這樣理解,就比較好理解為什么可以用常量,當作沒有參數的方法調用了。

說了那么多,結論很簡單,maxSize是一個OverlaySetting[LONG]的類,如果maxSize沒有設置過,那么返回的就是object PersistentQueue里面的maxSize。LONG類型。

在主程序體里面,我們看到了Journal類,然后是調用 configure 方法,這個方法印證了我們的對OverlaySetting的解釋,它從配置文件里面把參數都讀出來賦值給class PersistentQueue里面的那些常量,用的是set。這里是一個Scala的語法細節,它省略了一些不必要的”.”和”()”。

休息一下。我們開始討論在PersistentQueue里面的Actor

……

休息完畢

Scala中,消息傳遞的方式有一個特殊的語法結構:“Object ! MessageType” 就好像在源代碼里面出現的:“w.actor ! ItemArrived。”,(關于Scala的Actor,詳細的語法說明在http://programming-scala.labs.oreilly.com/ch09.html可以看到,建議先看一下,好對actor有一個比較深入的了解)

我們發現PersistentQueue中Actor的實現,跟語法說明里面的很不一樣,在語法說明里面的Actor都是作為一個獨立的線程出現的,而在PersistentQueue中,你甚至看不見一個對Actor的重載,但我們可以發現與Actor相關的幾個地方,一個是Waiter的定義,它是一個case class,并且有一個成員變量叫做actor,類型是Actor:

  1. private case class Waiter(actor: Actor)  
  2. ……  
  3. private val waiters = new mutable.ArrayBuffer[Waiter]  
  4. ……  
  5.     val w = Waiter(Actor.self)  
  6.     waiters += w  
  7. ……  

需要注意:之前我們提過一個Scala的語法規則,那就是類后面的建構函數的參數,就是類中的成員變量!(不過這是在解釋,為什么在建構函數里面會有private關鍵字時提到的……)所以,我們知道了一點,就是每一個Waiter內部都有一個actor,這些actor通過Actor.self共享了一個線程,當然也和其他的PersistentQueue共享了一個Actor。這是有點讓人不習慣,因為這么要緊的一個線程的創建,竟然可以出現得那么隱蔽。甚至連一個大括號都沒有。

接下來,我們來看看Actor是怎么在PersistentQueue里面工作了——這有點難,因為它的機制有點復雜,不是簡單的象語法說明里面的那樣,是一個完整的獨立的函數,而是在一些函數中,突然切入進來,分享了Actor.self的一部分線程資源,就像下面代碼一樣:

  1. ……  
  2. f operateReact(op: => Option[QItem], timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit = {  
  3. operateOrWait(op, timeoutAbsolute) match {  
  4.   case (item, None) =>  
  5.     f(item)  
  6.   case (None, Some(w)) =>  
  7.     Actor.self.reactWithin((timeoutAbsolute - Time.now) max 0) {  
  8.       case ItemArrived => operateReact(op, timeoutAbsolute)(f)  
  9.       case TIMEOUT => synchronized {  
  10.         waiters -= w  
  11.         // race: someone could have done an add() between the timeout and grabbing the lock.  
  12.         Actor.self.reactWithin(0) {  
  13.           case ItemArrived => f(op)  
  14.           case TIMEOUT => f(op)  
  15.         }  
  16.       }  
  17.     }  
  18.   case _ => throw new RuntimeException()  
  19. }  
  20.  
  21. ……  

其中:

  1. Actor.self.reactWithin(0) {  
  2.     case ItemArrived => f(op)  
  3.     case TIMEOUT => f(op)  
  4. }  

就是Actor的一個語法,在一段時間里面等待消息,如果有消息就如何……,如果沒有消息(TIMEOUT),就如何……。但是在整個函數里面套用了兩層 Actor.self.reactWithin,有點讓人要暈菜的感覺,再加上之前有一個match…case的結構,調用了operateOrWait(op, timeoutAbsolute)方法。要了解整個消息處理的機制,就需要把這三個部分聯系起來看了。

先簡單看一下operateOrWait函數,比較容易理解:

  1. private def operateOrWait(op: => Option[QItem], timeoutAbsolute: Long): (Option[QItem], Option[Waiter]) = synchronized {  
  2.   val item = op  
  3.   if (!item.isDefined && !closed && !paused && timeoutAbsolute > 0) {  
  4.     val w = Waiter(Actor.self)  
  5.     waiters += w  
  6.     (None, Some(w))  
  7.   } else {  
  8.     (item, None)  
  9.   }  
  10. }  

返回值是一個map,包括兩個被Option封裝的類型QItem和Waiter,從QItem.scala中可以知道(代碼很簡單),QItem就是把原始數據打了一個包,而Waiter之前我們也已經說過了。程序體中的判斷是這樣的:如果item,也就是op這個參數沒有定義,并且PersistentQueue也沒有停止,關閉,而且處理時間AbsoluteTime不是0,那么就創建一個Waiter,返回(None, Some[Waiter]);如果不滿足這些條件,那么就直接返回(op, None)。簡單的說,就是如果系統還能等,就讓他等待正常一段時間然后操作,如果不能等,就直接返回操作指令。返回值只有兩種類型。

然后再看operateReact,如果返回的是時間參數是None(詳細的可以參考 actor .. case 的語法,地址是:http://programming-scala.labs.oreilly.com/ch03.html#MatchingOnCaseClasses),那么就直接執行f(op)的函數,把op這個方法,作為參數傳遞給f函數。如果返回的是一個時間戳,Some(w),那么我們就等待AbsoluteTime 到 Time.now()這段時間,如果在這段事件里面有ItemArrived事件發生,那么就處理一下,直到Time.now 等于或者大于 AbsoluteTime,那就會得到一個TIMEOUT,然后就退出了。(有一個異常的情況,需要清空一下事件隊列,通過reactWithin(0){})

這么理解這段actor還是不太清晰,那么讓我們回到上一層的調用。看看這個f(op)到底是什么,然后我們看到了:

  1. def removeReact(timeoutAbsolute: Long, transaction: Boolean)(f: Option[QItem] => Unit): Unit = {  
  2.   operateReact(remove(transaction), timeoutAbsolute)(f)  
  3. }  

我們就知道op其實是一個remove的操作,并且返回remove得到的QItem對象。再往上一層到QueueCollection,我們看到:

  1. q.removeReact(if (timeout == 0) timeout else Time.now + timeout, transaction) {  
  2.   case None =>  
  3.     queueMisses.incr  
  4.     f(None)  
  5.   case Some(item) =>  
  6.     queueHits.incr  
  7.     f(Some(item))  
  8. }  

f方法的操作,如果之前的remove返回的是一個None,則記錄queueMess(未命中)添加1,如果返回的是一個QItem的值,那么就記錄queueHits(命中)添加1,并且,對這個QItem進行操作(注意:這里的f是QueueCollection中remove帶入的那個方法,而不是前面提到的removeReact里面提到的f。

從QueueCollection的remove調用到***層PersistentQueue的operateReact調用,我們大致可以了解這么曲折的調用關系解決了一個什么問題——從消息隊列里面獲取QItem。

回顧一下QueueCollection其他的代碼,我們發現,只有waiter.size > 0的時候,有新的QItem添加,才會發出ItemArrived事件。也就是說,只有有一個獲取消息隊列的進程存在的時候,才會觸發ItemArrived事件。獲取消息隊列,則通過使用reactWithin,允許在一個規定的時間內,連續處理一系列的ItemArrived事件。看QueueCollection的remove方法,我們還可以知道,當啟動q.removeReact之前,首先會調用q.peek來檢查,隊列是不是為空,如果不是空的話,就直接返回隊列里面最前面的那個元素。所以我們可以把這個消息隊列理解成——如果消息隊列為空的情況下,讓獲取消息隊列的Client等待一段時間的機制,以降低反復進行SOCKET連接帶來的不必要的耗損。

這個機制,可以讓我們比較好地理解,為什么Kestrel提示說,如果運行多個獨立的進程來處理消息隊列的時候,會讓這個消息隊列的處理變成一個缺乏時序,但是處理并發能力很強的集群。每個連接對應的是一個Waiter,但是當ItemArrived觸發的時候,只可能有其中的一個reactWithin得到了這個事件,發送給對應的那個線程處理這個消息。

我現在手上的是Kestrel-1.1.2版本的代碼,走讀這部分代碼的時候,其實發現作者在寫這段代碼的時候,多了一些冗余的內容——比如說removeReceive方法,從而看出作者在使用Scala的特性中,也是逐步地把代碼優化成如今的樣子。畢竟Scala和Java之間的差別很大,如果做到Type Less, Do More。是需要一個逐步積累的過程,誰都不是天生就能把Scala寫得很好的,更何況是需要性能非常高的時候。

【相關閱讀】

  1. 從Java走進Scala(Scala經典讀物)
  2. A Scala Tutorial for Java programmers
  3. 專題:Scala編程語言
  4. 從Scala看canEqual與正確的的equals實現
  5. Scala快速入門:從下載安裝到定義方法
責任編輯:yangsai 來源: dingsding
相關推薦

2009-09-22 10:15:42

PersistentQScala

2009-09-22 09:59:40

QueueCollecScala

2009-09-28 11:37:03

Journal.scaKestrel

2009-09-18 11:44:05

Scala實例教程Kestrel

2009-09-28 11:42:21

KestrelScala

2009-09-28 10:26:12

Scala代碼實例Kestrel

2009-09-22 09:42:24

Scala的核心

2022-01-19 09:00:00

Java空指針開發

2009-07-22 07:53:00

Scala擴展類

2009-07-08 15:35:18

Case類Scala

2009-07-22 07:45:00

Scala代碼重復

2009-07-08 12:43:59

Scala ServlScala語言

2023-06-12 15:33:52

Scalafor循環語句

2009-07-21 17:21:57

Scala定義函數

2020-10-31 17:33:18

Scala語言函數

2010-09-14 15:34:41

Scala

2011-05-19 13:32:38

PHPstrlenmb_strlen

2010-01-19 13:43:59

C++函數

2010-01-28 11:08:09

C++變量

2009-07-22 08:57:49

Scalafinal
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文天堂在线一区 | 波多野结衣一区二区三区在线观看 | 色www精品视频在线观看 | 日韩欧美手机在线 | 狠狠骚| 久久伊人久久 | 国产成人精品999在线观看 | 特黄特黄a级毛片免费专区 av网站免费在线观看 | 日本一区二区三区精品视频 | 久久久久久久av | 91福利在线观看 | 亚洲一区视频在线播放 | 97在线播放| 日韩欧美一区二区三区免费观看 | 狠狠色狠狠色综合系列 | 亚洲高清在线观看 | 久久国产综合 | 天堂va在线观看 | 国产在线看片 | 国产成人免费视频网站高清观看视频 | 日韩av一区二区在线 | 国产精品久久久久久吹潮 | 精品少妇一区二区三区日产乱码 | av天天干 | 欧美性生活一区二区三区 | 色妞av| 国产精品久久久久久久久久 | 国产高清久久 | 亚洲欧美日韩电影 | 在线国产视频 | 欧美激情a∨在线视频播放 成人免费共享视频 | 国产精品成人一区 | 九九热精品视频在线观看 | 日韩在线xx | 亚洲欧美日韩精品久久亚洲区 | 少妇午夜一级艳片欧美精品 | 99久久婷婷国产亚洲终合精品 | 自拍 亚洲 欧美 老师 丝袜 | 成人片在线看 | av一区在线观看 | 亚洲一区二区三区免费在线观看 |