成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Chronicle Queue入門

譯文 精選
開發
一文了解Chronicle Queue

  作者丨Shaolang Ai

  譯者 | 楊曉娟

  用Chronicle Queue構建的應用程序不會讓生產者放慢將消息放入隊列的速度(沒有背壓機制)。

  Chronicle Queue(編年史隊列)是低延遲、無代理、持久的消息隊列。

  與其最相近的是0MQ,但0MQ不存儲發布的消息。Chronicle Queue的開源版本不支持跨機器通信。Chronicle Queue最與眾不同之處在于它使用RandomAccessFile做堆外存儲因而不會產生垃圾。

  Chronicle Queue是以生產者為中心的,也就是說,用它構建的應用程序不會讓生產者放慢將消息放入隊列的速度(沒有背壓機制)。這種設計在對生產者的生產能力幾乎不可控的情況下非常有用,例如外匯價格更新。

術語

  大多數消息隊列使用術語Producer(生產者)和Consumer(消費者),Chronicle Queue使用Appender(附加器)和Tailer(零售商),用于區分它總是將消息附加到隊列中,并且零售商從隊列中讀取消息之后,從不“銷毀/丟棄”任何消息。與Message(消息)相比,Chronicle Queue更喜歡使用術語Excerpt(摘錄),因為寫入Chronicle Queue的blob可以是字節數組、字符串以及域模型。

Hello, World!

  我們用傳統的“Hello, World!”來演示基本用法。如果您使用的是Gradle,將以下內容添加到build.gradle.kts:

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile  // line 1

plugins {
id("org.jetbrains.kotlin.jvm") version "1.3.71"
application
}

repositories {
mavenCentral()
mavenLocal()
}

dependencies {
implementation("org.jetbrains.kotlin:kotlin-bom")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("net.openhft.chronicle:chronicle-queue:5.19.8") // line 17

implementation("org.apache.logging.log4j:log4j-sl4fj18-impl:2.13.1")
}
application {
mainClass = "hello.AppKt"
}

tasks.withType<KotlinCompile> { // line 25
kotlinOptions.jvmTarget = "1.8"
}

  導入KotlinCompile(第1行)允許將Java 1.8指定為編譯目標(第25-27行)。第17-18行顯示了開始使用Chronicle Queue所需的其它依賴項。請注意build.gradle.kts假定要使用的包是hello。接下來看看演示Chronicle Queue用法的代碼:

package hello

import net.openhft.chronicle.queue.ChronicleQueue

fun main(args: Array<String>) {
val q: ChronicleQueue = ChronicleQueue.single("./build/hello-world")

try {
val appender: ExcerptAppender = q.acquireAppender()
appender.writeText("Hello, World!")

val tailer: ExcerptTailer = q.createTailer()
println(tailer.readText())
} finally {
q.close()
}
}

  ChronicleQueue.single()返回一個新建的使用給定的路徑存儲摘錄的ChronicleQueue。其余的代碼幾乎是不言自明的:獲得的appender把摘錄“Hello, World!”追加到排隊中;tailer從隊列中讀取并將摘錄打印到標準輸出。程序結束時一定要關閉隊列。

  還記得Chronicle Queue是持久的嗎?注釋掉兩個appender行,然后再用gradle run執行程序。您將看到程序還是在標準輸出上打印了Hello, World!:tailer讀取的是上次運行時寫入到隊列中的數據。它的持久性允許在tailers崩潰時重放收到的摘錄。

便道:摘錄類型

  Chronicle Queue僅接受以下類型的摘要:

  1. Serializable對象:請注意,由于依賴于反射,序列化類對象的效率很低

  2. Externalizable對象:如果與Java的兼容性很重要,但以犧牲手寫邏輯為代價

  3. net.openhft.chronicle.wire.Marshallable對象:使用二進制格式的高性能數據交換

  4. net.openhft.chronicle.bytes.BytesMarshallable對象:底層二進制或文本編碼

  “Hello, World!”演示了字符串,我們順便看一個使用Chronicle Wire庫中Marshallable的例子。

package types

import net.openhft.chronicle.wire.Marshallable
import net.openhft.chronicle.wire.SelfDescribingMarshallable

class Person(val name: String, val age: Int): SelfDescribingMarshallable()

fun main(args: Array<String>) {
    val person = Person("Shaolang", 3)
    val outputString = """
!types.Person {
name: Shaolang
age: 3
}
    """.trimIndent()

    println(person.toString() == outputString)

    val p = Marshallable.fromString<Person>(outputString)
    println(person == p)
    println(person.hashCode() == p.hashCode())
}

  運行上面的代碼片段會看到標準輸出上打印了三個true。SelfDescribtingMarshallable可以輕松持久化Chronicle Queue 中的Marshallable類。

寫入和讀取域對象

  有了從上面小便道得來的經驗,下面將演示向Chronicle Queue寫入和讀取Marshallable對象:

package docs

import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable

class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable()

class Food(var name: String? = null): SelfDescribingMarshallable()

fun main(args: Array<String>) {
    ChronicleQueue.single("./build/documents").use { q ->
        val appender = q.acquireAppender()
        appender.writeDocument(Person("Shaolang", 3))
        appender.writeText("Hello, World!")
        appender.writeDocument(Food("Burger"))

        val tailer = q.createTailer()

        val person = Person()
        tailer.readDocument(person)
        println(person)
        println("${tailer.readText()}\n")

        val food = Food()
        tailer.readDocument(food)
        println(food)
    }
}

  盡管在不同的VM進程中運行appender和tailer會更有意義,但將兩者保持在同一個VM中可以更容易理解討論,不必篩選無關的代碼。運行上面的代碼會看到如下輸出:

!docs.Person {
  name: Shaolang,
  age: 3
}
Hello, World!
!docs.Food {
  name: Burger,
}

  有幾點需要注意:

  1. 由于Chronicle Queue的目標是不產生垃圾,因而要求域模型是可變對象;這就是為什么兩個類在構造器中使用var而不是val。

  2. Chronicle Queue允許appender將不同的內容寫入同一隊列。

  3. tailer需要知道它應該讀什么才能得到正確的結果。

  如果我們把最后一個tailer.readDocument(food)改成tailer.readDocument(person)然后打印person,將看到以下打印內容(至少在Chronicle Queue 5.19.x中,它不會崩潰/拋出任何異常):

!docs.Person {
  name: Burger,
  age: !!null ""
}

  因為Person和Food有一個同名的屬性,Chronicle Queue會盡可能匹配Person,不能匹配的置為空。

  上面注意事項中的最后一點“關于tailer需要知道他們在讀什么”會有點麻煩:它們(tailer)現在背負著過濾的重擔,要從生產者不斷扔來的雪崩一樣的數據中獲得它們想要的信息。為了保持代碼庫穩健,我們需要使用觀察者模式.

(有點)只聽感興趣的東西

  除了直接使用摘錄附加器,另一種方法是使它具體化為傳給methodWriter方法的第一類。下面的片段重點介紹指定偵聽器的具體化:

package listener

import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.queue.ChronicleReaderMain
import net.openhft.chronicle.wire.SelfDescribingMarshallable

class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable()

interface PersonListener {
    fun onPerson(person: Person)
}

fun main(args: Array<String>) {
    val directory = "./build/listener"

ChronicleQueue.single(directory).use { q ->
val observable: PersonListener = q.acquireAppender()
.methodWriter(PersonListener::class.java)
        observable.onPerson(Person("Shaolang", 3))
        observable.onPerson(Person("Elliot", 4))
    }

     ChronicleReaderMain.main(arrayOf("-d", directory))
}

  第17-18行用指定的PersonListener調用methodWriter獲得附加器。請注意,賦予observable的類型是PersonListener,不是ExcerptAppender。現在,任何對PersonListener的方法調用都會把給定的參數寫入隊列。但是,直接使用附加器寫入隊列和使用具體化的類寫入隊列是有區別的。為了看出區別,我們使用ChronicleReaderMain檢驗隊列:

0x47c900000000:
onPerson {
  name: Shaolang,
  age: 3
}
0x47c900000001:
onPerson {
  name: Elliot,
  age: 4
}

  注意,具體化類寫入隊列的摘錄用的是onPerson { ...} 而不是!listener.Person { ... }。 這種差異允許實現了PersonListener的tailer收到寫入隊列的新Person對象的通知并忽略它們不感興趣的對象。

  是的,你沒看錯:實現了PersonListener的tailer。不幸的是,Chronicle Queue(有點)將被觀察者和觀察者混為一談,因此很難區分它們。我認為區分差異的最簡單方法是使用以下片段注釋中所示的啟發式方法:

interface PersonListener {
onPerson(person: Person)
}
// this is an observer because it implements the listener interface
class PersonRegistry: PersonListener {
override fun onPerson(person: Person) {
// code omitted for brevity
}
}
fun main(args: Array<String>) {
// code omitted for brevity
val observable: PersonListener = q.acquireAppender() // this is an
.methodWriter(PersonListener::class.java) // observable
// another way to differentiate: the observer will never call the
// listener method, only observables do.
observable.onPerson(Person("Shaolang", 3))
// code omitted for brevity
}

  再來看一下tailer。盡管Chronicle Queue確保每個tailer能看到每一條摘錄,通過實現偵聽器類/接口并用已實現的偵聽器創建net.openhft.chronicle.bytes.MethodReader, tailer可以僅過濾出它想看到的摘錄:

package listener

import net.openhft.chronicle.bytes.MethodReader
import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable

class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable()

class Food(var name: String? = null): SelfDescribingMarshallable()

interface PersonListener {
    fun onPerson(person: Person)
}

class PersonRegistry: PersonListener {
    override fun onPerson(person: Person) {
        println("in registry: ${person.name}")
    }
}

fun main(args: Array<String>) {
    ChronicleQueue.single("./build/listener2").use { q ->
        val appender = q.acquireAppender()
        val writer: PersonListener = appender.methodWriter(PersonListener::class.java)
        writer.onPerson(Person("Shaolang", 3))
        appender.writeDocument(Food("Burger"))
        writer.onPerson(Person("Elliot", 4))

        val registry: PersonRegistry = PersonRegistry()
val reader: MethodReader = q.createTailer().methodReader(registry)
        reader.readOne()
        reader.readOne()
        reader.readOne()
    }
}

  這里的主要新內容是PersonRegistry的實現,它簡單地打印出所給的person的name。 代碼片段并沒直接用ExcerptTailer從隊列中讀取而是用給定的PersonRegistry由tailer創建了一個MethodReader。

  .methodWriter接受Class參數,而.methodReader接受的是對象。appender向隊列寫入三個摘錄:person(通過調用onPerson)、food(通過.writeDocument)和person。 因為tailer可以看到每一個摘錄,所以閱讀者也會調用三次“讀取”所有摘錄,但卻只會看到兩個輸出:

  in registry:Shaolang

  in registry:Elliot

  如果代碼片段只有兩個.readOne()調用而不是三個,那么輸出中就不會包含in registry:Elliot.

MethodReader使用鴨子類型

  還記得我們檢驗由具體化的PersonListener填充隊列時ChronicleReaderMain的輸出嗎?輸出的不是類名而是類似于onPerson { ... }。這表明MethodReader過濾與方法簽名匹配的摘錄,即它不關心包含方法簽名的接口/類;或者簡單地說,鴨子類型:

package duck

import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable

class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallabl()

interface PersonListener {
    fun onPerson(person: Person)
}

interface VIPListener {
    fun onPerson(person: Person)
}

class VIPClub: VIPListener {
override fun onPerson(person: Person) {
        println("Welcome to the club, ${person.name}!")
    }
}

fun main(args: Array<String>) {
    ChronicleQueue.single("./build/duck").use { q ->
        val writer = q.acquireAppender().methodWriter(PersonListener::class.java)
        writer.onPerson(Person("Shaolang", 3))

        val club = VIPClub()
        val reader = q.createTailer().methodReader(club)
        reader.readOne()
    }
}

  注意,VIPClub實現了VIPListener,碰巧與PersonListener有相同的onPerson方法簽名。運行上面的代碼,你會看到打印的Welcome to the club, Shaolang!

命名tailer

  到目前為止,在所有的演示中,我們一直創建的都是匿名的tailer。因為它們是匿名的,所以每次(重新)運行都會讀取隊列中的所有摘錄。有時,這樣的行為是可接受的,甚至是可取的,但有時卻不是。只需命名tailer就可以從上次停止的位置繼續讀取:

package restartable

import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.queue.ExcerptTailer

fun readQueue(tailerName: String, times: Int) {
    ChronicleQueue.single("./build/restartable").use { q ->
       val tailer = q.createTailer(tailerName)       // tailer name given
       for (_n in 1..times) {
          println("$tailerName: ${tailer.readText()}")
        }

        println()       // to separate outputs for easier visualization
  }
}

fun main(args: Array<String>) {
    ChronicleQueue.single("./build/restartable").use { q ->
        val appender = q.acquireAppender()
        appender.writeText("Test Message 1")
        appender.writeText("Test Message 2")
        appender.writeText("Test Message 3")
        appender.writeText("Test Message 4")
}

    readQueue("foo", 1)
    readQueue("bar", 2)
    readQueue("foo", 3)
    readQueue("bar", 1)
}

  注意,tailer的名字是通過createTailer方法指定的。上面的代碼中有兩個tailer(命名為foo和bar)讀取隊列并在運行時輸出以下內容:

foo: Test Message 1

bar: Test Message 1
bar: Test Message 2

foo: Test Message 2
foo: Test Message 3
foo: Test Message 4

bar: Test Message 3 

注意,foo和bar第二次從隊列中讀取數據時,會從之前斷開的位置開始。

滾動文件

  Chronicle Queue根據創建隊列時定義的滾動周期滾動使用的文件;默認情況下,每天滾動文件。要改變滾動周期,就不能使用簡單的ChronicleQueue.single方法:

package roll

import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.queue.RollCycles
import net.openhft.chronicle.impl.single.SingleChronicleQueueBuilder

fun main(args: Array<String>) {
   var qbuilder: SingleChronicleQueueBuilder = ChronicleQueue.singleBuilder("./build/roll")
   qbuilder.rollCycle(RollCycles.HOURLY)
   val q: ChronicleQueue = qbuilder.build()
   // code omitted for brevity
}

  首先,得到一個SingleChronicleQueueBuilder實例,并通過.rollCycle方法設置滾動周期。 上面的代碼段將隊列配置為每小時滾動一次文件。配置好后,調用構造器的.build()獲取ChronicleQueue實例。請注意,appender和tailer(s)在訪問同一個隊列時必須使用相同的滾動周期。

  由于SingleChronicleQueueBuilder支持流式接口,代碼也可以做如下簡化:

val q: ChronicleQueue = ChronicleQueue.singleBuilder("./build/roll")
                                      .rollCycle(RollCycles.HOURLY)
                                      .build()

接下來

  這篇文章介紹了Chronicle Queue的術語和基礎知識。以下網站有更多信息可供挖掘:

  1. Chronicle Queue GitHub repository

  2. Stack Overflow tagged questions

  3. Peter Lawre's Blog


原文鏈接

https://dzone.com/articles/bit-by-bit

譯者介紹  

楊曉娟,51CTO社區編輯,資深研發工程師,信息系統項目管理師,擁有近20年Java開發經驗。

責任編輯:張潔 來源: 51CTO技術棧
相關推薦

2022-03-23 09:00:00

微服務KafkaChronicle

2020-09-24 14:07:31

谷歌Chronicle網絡安全

2018-01-25 11:49:51

谷歌Chronicle安全企業

2024-05-07 09:01:21

Queue 模塊Python線程安全隊列

2023-07-31 11:16:45

Web前端worker服務

2019-05-22 08:11:51

Winnti惡意軟件Linux

2023-09-21 09:00:00

Merge Que開發工具Mergify

2009-08-03 15:06:43

C# Stack對象C# Queue對象

2024-08-19 04:00:00

2013-07-15 15:51:32

iOS多線程GCD基本概念Dispatch Qu

2021-05-20 10:47:58

Resource Qu阿里云PostgreSQL

2021-06-11 06:10:09

Python數據結構算法

2012-11-05 10:33:40

IBMdw

2021-06-16 12:57:27

FreeDOS

2011-02-28 13:34:51

SpringMVC

2022-04-26 16:52:59

漏洞網絡攻擊者谷歌

2021-06-04 14:15:10

鴻蒙HarmonyOS應用

2012-02-29 00:49:06

Linux學習

2025-02-24 10:07:10

2024-02-26 00:26:16

ChatGPTMQQueue
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品一区二区电影 | 亚洲精品久久久久久国产精华液 | 精品美女在线观看视频在线观看 | 成人av免费在线观看 | 福利二区 | 色播久久 | www.操com | 天天色综 | 都市激情亚洲 | 中文字幕一区在线观看视频 | 成人福利电影 | 91精品久久久久久综合五月天 | 精品国产欧美一区二区 | 在线观看中文字幕视频 | 国产精品久久久久久久久久久久午夜片 | 国产欧美一区二区三区国产幕精品 | 碰碰视频 | 伊人久久综合 | 超碰伊人久久 | 中文字幕在线视频免费视频 | 久久久久成人精品 | 国产传媒视频在线观看 | 午夜网| 伊色综合久久之综合久久 | 影音先锋中文字幕在线观看 | 欧产日产国产精品视频 | 国产高清视频一区 | 欧美精品一区在线 | 好婷婷网 | 日日夜夜精品视频 | 久久黄网 | 亚洲一区二区黄 | 亚洲欧洲精品一区 | 国产精品视频在线播放 | 国产一区欧美一区 | 四虎影院欧美 | 99pao成人国产永久免费视频 | 水蜜桃亚洲一二三四在线 | 精品一区二区三区在线观看国产 | 91 视频网站| 天堂va在线 |