孔乙己:Kotlin生產者消費者問題的八種解法
本文轉載自微信公眾號「AndroidPub」,作者fundroid。轉載本文請聯系AndroidPub公眾號。
生產者和消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一個緩沖區(Buffer),生產者往 Buffer 中添加產品,消費者從 Buffer 中取走產品,當 Buffer 為空時,消費者阻塞,當 Buffer 滿時,生產者阻塞。
Kotlin 中有多種方法可以實現多線程的生產/消費模型(大多也適用于Java)
- Synchronized
- ReentrantLock
- BlockingQueue
- Semaphore
- PipedXXXStream
- RxJava
- Coroutine
- Flow
1. Synchronized
Synchronized 是最最基本的線程同步工具,配合 wait/notify 可以實現實現生產消費問題。
- val buffer = LinkedList<Data>()
- val MAX = 5 //buffer最大size
- val lock = Object()
- fun produce(data: Data) {
- sleep(2000) // mock produce
- synchronized(lock) {
- while (buffer.size >= MAX) {
- // 當buffer滿時,停止生產
- // 注意此處使用while不能使用if,因為有可能是被另一個生產線程而非消費線程喚醒,所以要再次檢查buffer狀態
- // 如果生產消費兩把鎖,則不必擔心此問題
- lock.wait()
- }
- buffer.push(data)
- // notify方法只喚醒其中一個線程,選擇哪個線程取決于操作系統對多線程管理的實現。
- // notifyAll會喚醒所有等待中線程,哪一個線程將會第一個處理取決于操作系統的實現,但是都有機會處理。
- // 此處使用notify有可能喚醒的是另一個生產線程從而造成死鎖,所以必須使用notifyAll
- lock.notifyAll()
- }
- }
- fun consume() {
- synchronized(lock) {
- while (buffer.isEmpty())
- lock.wait() // 暫停消費
- buffer.removeFirst()
- lock.notifyAll()
- }
- sleep(2000) // mock consume
- }
- @Test
- fun test() {
- // 同時啟動多個生產、消費線程
- repeat(10) {
- Thread { produce(Data()) }.start()
- }
- repeat(10) {
- Thread { consume() }.start()
- }
- }
2. ReentrantLock
Lock 相對于 Synchronized 好處是當有多個生產線/消費線程時,我們可以通過定義多個 condition 精確指定喚醒哪一個。下面的例子展示 Lock 配合 await/single 替換前面 Synchronized 寫法。
- val buffer = LinkedList<Data>()
- val MAX = 5 //buffer最大size
- val lock = ReentrantLock()
- val condition = lock.newCondition()
- fun produce(data: Data) {
- sleep(2000) // mock produce
- lock.lock()
- while (buffer.size >= 5)
- condition.await()
- buffer.push(data)
- condition.signalAll()
- lock.unlock()
- }
- fun consume() {
- lock.lock()
- while (buffer.isEmpty())
- condition.await()
- buffer.removeFirst()
- condition.singleAll()
- lock.unlock()
- sleep(2000) // mock consume
- }
3. BlockingQueue (阻塞隊列)
BlockingQueue在達到臨界條件時,再進行讀寫會自動阻塞當前線程等待鎖的釋放,天然適合這種生產/消費場景。
- val buffer = LinkedBlockingQueue<Data>(5)
- fun produce(data: Data) {
- sleep(2000) // mock produce
- buffer.put(data) //buffer滿時自動阻塞
- }
- fun consume() {
- buffer.take() // buffer空時自動阻塞
- sleep(2000) // mock consume
- }
注意 BlockingQueue 的有三組讀/寫方法,只有一組有阻塞效果,不要用錯。
方法 | 說明 |
---|---|
add(o)/remove(o) | add 方法在添加元素的時候,若超出了隊列的長度會直接拋出異常 |
offer(o)/poll(o) | offer 在添加元素時,如果發現隊列已滿無法添加的話,會直接返回false |
put(o)/take(o) | put 向隊尾添加元素的時候發現隊列已經滿了會發生阻塞一直等待空間,以加入元素 |
4. Semaphore(信號量)
Semaphore 是 JUC 提供的一種共享鎖機制,可以進行擁塞控制,此特性可用來控制 buffer 的大小。
- // canProduce: 可以生產數量(即buffer可用的數量),生產者調用acquire,減少permit數目
- val canProduce = Semaphore(5)
- // canConsumer:可以消費數量,生產者調用release,增加permit數目
- val canConsume = Semaphore(5)
- // 控制buffer訪問互斥
- val mutex = Semaphore(0)
- val buffer = LinkedList<Data>()
- fun produce(data: Data) {
- if (canProduce.tryAcquire()) {
- sleep(2000) // mock produce
- mutex.acquire()
- buffer.push(data)
- mutex.release()
- canConsume.release() //通知消費端新增加了一個產品
- }
- }
- fun consume() {
- if (canConsume.tryAcquire()) {
- sleep(2000) // mock consume
- mutex.acquire()
- buffer.removeFirst()
- mutex.release()
- canProduce.release() //通知生產端可以再追加生產
- }
- }
5. PipedXXXStream (管道)
Java 里的管道輸入/輸出流 PipedInputStream / PipedOutputStream 實現了類似管道的功能,用于不同線程之間的相互通信,輸入流中有一個緩沖數組,當緩沖數組為空的時候,輸入流 PipedInputStream 所在的線程將阻塞。
- val pis: PipedInputStream = PipedInputStream()
- val pos: PipedOutputStream by lazy {
- PipedOutputStream().apply {
- pis.connect(this) //輸入輸出流之間建立連接
- }
- }
- fun produce(data: ContactsContract.Data) {
- while (true) {
- sleep(2000)
- pos.use { // Kotlin 使用 use 方便的進行資源釋放
- it.write(data.getBytes())
- it.flush()
- }
- }
- }
- fun consume() {
- while (true) {
- sleep(2000)
- pis.use {
- val byteArray = ByteArray(1024)
- it.read(byteArray)
- }
- }
- }
- @Test
- fun Test() {
- repeat(10) {
- Thread { produce(Data()) }.start()
- }
- repeat(10) {
- Thread { consume() }.start()
- }
- }
6. RxJava
RxJava 從概念上,可以將 Observable/Subject 作為生產者, Subscriber 作為消費者, 但是無論 Subject 或是 Observable 都缺少 Buffer 溢出時的阻塞機制,難以獨立實現生產者/消費者模型。
Flowable 的背壓機制,可以用來控制 buffer 數量,并在上下游之間建立通信, 配合 Atomic 可以變向實現單生產者/單消費者場景,(不適用于多生產者/多消費者場景)。
- class Producer : Flowable<Data>() {
- override fun subscribeActual(subscriber: org.reactivestreams.Subscriber<in Data>) {
- subscriber.onSubscribe(object : Subscription {
- override fun cancel() {
- //...
- }
- private val outStandingRequests = AtomicLong(0)
- override fun request(n: Long) { //收到下游通知,開始生產
- outStandingRequests.addAndGet(n)
- while (outStandingRequests.get() > 0) {
- sleep(2000)
- subscriber.onNext(Data())
- outStandingRequests.decrementAndGet()
- }
- }
- })
- }
- }
- class Consumer : DefaultSubscriber<Data>() {
- override fun onStart() {
- request(1)
- }
- override fun onNext(i: Data?) {
- sleep(2000) //mock consume
- request(1) //通知上游可以增加生產
- }
- override fun onError(throwable: Throwable) {
- //...
- }
- override fun onComplete() {
- //...
- }
- }
- @Test
- fun test_rxjava() {
- try {
- val testProducer = Producer)
- val testConsumer = Consumer()
- testProducer
- .subscribeOn(Schedulers.computation())
- .observeOn(Schedulers.single())
- .blockingSubscribe(testConsumer)
- } catch (t: Throwable) {
- t.printStackTrace()
- }
- }
7. Coroutine Channel
協程中的 Channel 具有擁塞控制機制,可以實現生產者消費者之間的通信。可以把 Channel 理解為一個協程版本的阻塞隊列,capacity 指定隊列容量。
- val channel = Channel<Data>(capacity = 5)
- suspend fun produce(data: ContactsContract.Contacts.Data) = run {
- delay(2000) //mock produce
- channel.send(data)
- }
- suspend fun consume() = run {
- delay(2000)//mock consume
- channel.receive()
- }
- @Test
- fun test_channel() {
- repeat(10) {
- GlobalScope.launch {
- produce(Data())
- }
- }
- repeat(10) {
- GlobalScope.launch {
- consume()
- }
- }
- }
此外,Coroutine 提供了 produce 方法,在聲明 Channel 的同時生產數據,寫法上更簡單,適合單消費者單生產者的場景:
- fun CoroutineScope.produce(): ReceiveChannel<Data> = produce {
- repeat(10) {
- delay(2000) //mock produce
- send(Data())
- }
- }
- @Test
- fun test_produce() {
- GlobalScope.launch {
- produce.consumeEach {
- delay(2000) //mock consume
- }
- }
- }
8. Coroutine Flow
Flow 跟 RxJava 一樣,因為缺少 Buffer 溢出時的阻塞機制,不適合處理生產消費問題,其背壓機制也比較簡單,無法像 RxJava 那樣收到下游通知。但是 Flow 后來發布了 SharedFlow, 作為帶緩沖的熱流,提供了 Buffer 溢出策略,可以用作生產者/消費者之間的同步。
- val flow : MutableSharedFlow<Data> = MutableSharedFlow(
- extraBufferCapacity = 5 //緩沖大小
- , onBufferOverflow = BufferOverflow.SUSPEND // 緩沖溢出時的策略:掛起
- )
- @Test
- fun test() {
- GlobalScope.launch {
- repeat(10) {
- delay(2000) //mock produce
- sharedFlow.emit(Data())
- }
- }
- GlobalScope.launch {
- sharedFlow.collect {
- delay(2000) //mock consume
- }
- }
- }
注意 SharedFlow 也只能用在單生產者/單消費者場景。
總結
生產者/消費者問題,其本質核心還是多線程讀寫共享資源(Buffer)時的同步問題,理論上只要具有同步機制的多線程框架,例如線程鎖、信號量、阻塞隊列、協程 Channel等,都是可以實現生產消費模型的。
另外,RxJava 和 Flow 雖然也是多線程框架,但是缺少Buffer溢出時的阻塞機制,不適用于生產/消費場景,更適合在純響應式場景中使用。