使用 Python 配合 Redis 超越緩存
如果你是一位 Python 開發者, 那么你肯定使用過 Redis , 并且認為它是一個很棒的緩存。 雖然你的印象沒有錯, Redis 的確是一個很棒的緩存, 但使用 Redis 能夠解決的問題并不僅限于緩存。
我們將探索 Redis 和 Redis Enterprise 的一些其他用途。 為了找點樂子, 我將使用之前《 使用 Redis 儲存地理位置數據 》一文中的大腳怪(Bigfoot)數據。 此外, 由于這篇文章的讀者都是 Python 開發者, 所以我將使用 Python 來編寫本文的所有代碼!
我在接下來展示的代碼中使用了 aioredis 客戶端庫, 因為它對 async/await
提供了非常棒的支持。 如果你對 async/await
不熟悉的話, 那么可以去看看 這篇文章 , 里面提到了 async/await
對提升性能的幫助。
使用 Redis 構建隊列
Redis 提供了字符串、哈希、集合和列表等多種數據結構可供使用。 這些數據結構都是儲存數據的好幫手, 其中列表就可以用作一個非常棒的隊列(queue)。
為了將列表用作隊列, 我們需要使用 RPUSH
將新項目推送至列表末尾, 然后使用 LPOP
或者 BLPOP
將它們從列表的前面彈出。 由于 Redis 對數據庫的所有修改都是在單個線程里面完成的, 所以這些操作都是原子的。
作為例子, 下面這段在隊列里面添加了一些大腳怪的蹤跡。
- import asyncio
- import aioredis
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await asyncio.gather(
- add_to_queue(redis, 'Possible vocalizations east of Makanda'),
- add_to_queue(redis, 'Sighting near the Columbia River'),
- add_to_queue(redis, 'Chased by a tall hairy creature')
- )
- redis.close()
- await redis.wait_closed()
- def add_to_queue(redis, message):
- return redis.rpush('bigfoot:sightings:received', message)
- asyncio.run(main())
import asyncio 這個程序非常直接。 我們只需要在第 18 行調用 redis.rpush
, 就能夠將指定的元素推入到隊列。 接下來是從隊列另一端讀取元素的代碼, 同樣非常簡單。
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- while True:
- sighting = await redis.blpop('bigfoot:sightings:received')
- pp(sighting)
- asyncio.run(main())
Redis 還有 一些同樣很酷的命令 , 它們不僅可以將列表用作隊列甚至堆棧。 我最喜歡的是 BRPOPLPUSH
, 它可以從列表的右側阻塞并彈出一些元素, 然后將被彈出的元素推入到另一個列表。 你可以使用這個命令來將一個隊列中的元素傳遞至另一個隊列, 這是非常棒的一個命令。第 11 行和第 12 行的無限循環將等待并且打印被推入至隊列中的大腳怪蹤跡。 這里使用了 redis.blpop
而不是 redis.lpop
, 因為前者可以阻塞客戶端并等待列表中的元素返回。 比起讓 Redis 和 Python 代碼之間的網絡無休止地輪詢并做無用功, 讓客戶端阻塞并等待元素出現的做法會高效得多。
使用 Redis 訂閱和發送事件
Redis 提供的東西中有些并不是數據結構, 比如訂閱與發布(Pub/Sub)特性就是其中之一。 這個特性就像它的名字一樣, 是一個內置于 Redis 中的發布與訂閱機制。 得益于這個特性, 我們只需要 使用一些命令 就可以在自己的 Python 應用里面添加強大的訂閱與發布機制。
通過執行訂閱操作可以讓我們發現事件, 以下是代碼:
- import asyncio
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- [channel] = await redis.psubscribe('bigfoot:broadcast:channel:*')
- while True:
- message = await channel.get()
- pp(message)
- asyncio.run(main())
用于匹配模式的 redis.psubscribe
函數和非模式匹配的 redis.subscribe
函數都返回 Python 列表, 以便包含不定數量的元素。 程序將解構這個列表(Python 的術語是解包)以獲得我想要的通道, 并在之后使用 .get
進行阻塞調用以等待下一條消息。因為我想要接收所有跟大腳獸有關的消息, 所以我在這段代碼的第 10 行使用 redis.psubscribe
訂閱了一個 Glob 風格的模式, 通過使用 bigfoot:broadcast:channel:*
作為模式, 客戶端將接收到所有以 bigfoot:broadcast:channel:
開頭的事件。
發布事件非常簡單, 下面是代碼:
- import asyncio
- import aioredis
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await asyncio.gather(
- publish(redis, 1, 'Possible vocalizations east of Makanda'),
- publish(redis, 2, 'Sighting near the Columbia River'),
- publish(redis, 2, 'Chased by a tall hairy creature')
- )
- redis.close()
- await redis.wait_closed()
- def publish(redis, channel, message):
- return redis.publish(f'bigfoot:broadcast:channel:{channel}', message)
- asyncio.run(main())
值得注意的是, 發布與訂閱是一個發送即遺忘機制(fire-and-forget)。 如果代碼發布了一個事件但是卻沒有人監聽, 那么該事件就會消失。 如果你想讓自己的事件持續存在, 那么可以考慮使用前面提到的隊列, 又或者接下來將要介紹的 Redis 流。這段代碼的重點是第 18 行, 它使用了名字非常直接的 redis.publish
來講消息發布至所需的通道。
使用 Redis 儲存數據流
除了發布與訂閱之外, Redis 還可以使用流來發布和訂閱事件。 Redis 流 是一個非常大的話題, 但使用它只需要 掌握少量命令 。 從 Python 來看, 這些命令的用法都是非常簡單的, 我將一一向你說明。
下面的代碼將把三次大腳獸的目擊事件添加到流里面。
- import asyncio
- import aioredis
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await asyncio.gather(
- add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
- add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'),
- add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
- redis.close()
- await redis.wait_closed()
- def add_to_stream(redis, id, title, classification):
- return redis.xadd('bigfoot:sightings:stream', {
- 'id': id, 'title': title, 'classification': classification })
- asyncio.run(main())
每個新添加的流事件都有一個唯一標識符, 其中包含自 1970 年開始的時間戳(毫秒)和一個用破折號連接的序列號。 例如, 當我寫這篇文章的時候, 1970 年 1 月 1 日(Unix紀元)午夜已經過去了 1,593,120,357,193 毫秒(1.59千兆秒)。 因此當我運行上面這段代碼的時候, 命令將創建出 ID 為 1593120357193-0
的事件。這段代碼中最重要的就是第 17 行和第 18 行, 它使用了 redis.xadd
函數將一次目擊事件的字段添加到流里面。
我們在添加事件的時候可以使用 *
來代替具體的 ID , 這樣 Redis 就會根據當前時間來自動生成事件的 ID , 這也是 redis.xadd
函數的默認行為。
正如接下來的代碼所示, 在讀取流元素的時候, 我們需要設置一個起始 ID 。 你可以看到, 在第 10 行, 程序將變量 last_id
設置成了 0-0
, 這個 ID 代表流的起始位置。
- import asyncio
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf8')
- last_id = '0-0'
- while True:
- events = await redis.xread(['bigfoot:sightings:stream'], timeout=0, count=5, latest_ids=[last_id])
- for key, id, fields in events:
- pp(fields)
- last_id = id
- asyncio.run(main())
程序的第 12 行使用 redis.xread
函數從流中請求最多 5 個 0-0
之后的事件。 該調用將返回一個列表, 然后程序將對其進行循環和解構, 以獲得事件的字段和標識符。 事件的標識符會被儲存起來, 以便將來調用 redis.xread
時可以獲得新的事件并在有需要時重新讀取之前讀取過的舊事件
。
將 Redis 用作搜索引擎
Redis 可以通過模塊(Module)擴展來增加新的命令和功能。 有 大量的模塊 可以用于 AI 模型服務、圖形數據庫、時間序列數據庫以及本例中的搜索引擎。
RedisSearch 是一個強大的搜索引擎, 它攝取數據的速度快得驚人。 有些人喜歡用它來進行 瞬時搜索 , 但除此之外它也可以用來進行其他搜索。 下面是使用該模塊的一個例子:
- import asyncio
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await redis.execute('FT.DROP', 'bigfoot:sightings:search')
- await redis.execute('FT.CREATE', 'bigfoot:sightings:search',
- 'SCHEMA', 'title', 'TEXT', 'classification', 'TEXT')
- await asyncio.gather(
- add_document(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
- add_document(redis, 2, 'Sighting near the Columbia River', 'Class A'),
- add_document(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
- results = await search(redis, 'chase|east')
- pp(results)
- redis.close()
- await redis.wait_closed()
- def add_document(redis, id, title, classification):
- return redis.execute('FT.ADD', 'bigfoot:sightings:search', id, '1.0',
- 'FIELDS', 'title', title, 'classification', classification)
- def search(redis, query):
- return redis.execute('FT.SEARCH', 'bigfoot:sightings:search', query)
- asyncio.run(main())
在擁有了索引之后, 程序就可以向里面添加文檔了, 這一操作發生在程序的第 27 行和第 28 行, 通過 FT.ADD
命令來完成。 每個文檔偶讀需要一個唯一 ID 、一個介于 0.0
和 1.0
之間的權重(rank)以及相應的字段。在第 12 和第 13 行, 程序使用 FT.CREATE
創建了一個索引。 索引需要描述程序將要添加的每個文檔中的字段的模式。 在這個例子中, 程序需要添加大腳獸的目擊事件, 該文檔包含一個標題和一個分類, 并且它們都是文本字段。
正如程序的第 31 行所示, 在索引加載文檔之后, 程序就可以使用 FT.SEARCH
命令和具體的查詢語句來執行查詢操作。 第 20 行的特定查詢指示 RedisSearch 在索引中查找包含這些術語之一的文檔。 在這個例子中, 該查詢將返回兩個文檔。
使用 Redis 作為主數據庫
Redis 可以作為一個速度奇快的內存存儲數據庫來使用。 下面的代碼使用了哈希來演示這種用法。 哈希是一種非常棒的數據結構, 它可以建模你想要儲存的記錄類型, 并且能夠將數據的主鍵用作鍵名的其中一部分。
- import asyncio
- import aioredis
- from pprint import pp
- async def main():
- redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
- await asyncio.gather(
- add_sighting(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
- add_sighting(redis, 2, 'Sighting near the Columbia River', 'Class A'),
- add_sighting(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
- sightings = await asyncio.gather(
- read_sighting(redis, 1),
- read_sighting(redis, 2),
- read_sighting(redis, 3))
- pp(sightings)
- redis.close()
- await redis.wait_closed()
- def add_sighting(redis, id, title, classification):
- return redis.hmset(f'bigfoot:sighting:{id}',
- 'id', id, 'title', title, 'classification', classification)
- def read_sighting(redis, id):
- return redis.hgetall(f'bigfoot:sighting:{id}')
- asyncio.run(main())
你可能會這樣想”如果我把服務器關掉了怎么辦?如果它崩潰了怎么辦?那我就什么數據都沒有了!“ No,不會的! 你可以修改你的 redis.conf
文件, 用幾種不同的方式來持久化內存中的數據 。 此外, 如果你使用的是 Redis Enterprise , 我們也有為你提供 相應的解決方案 , 使得你可以直接使用 Redis 而不必擔心持久化的問題。
為了方便你親手嘗試這些例子, 我把文中涉及的 所有代碼都放到了 GitHub 上面 , 你可以克隆并開始使用它們。 如果你是 Docker 用戶, 項目里面也有一個名為 start-redis.sh
的 shell 腳本, 它可以拉取一個鏡像, 然后啟動一個能夠運行這些例子的 Redis 版本。
如果你在玩耍完畢之后想要認真地構建一些軟件, 那么可以注冊并嘗試 Redis Cloud Essentials 。 它和你所熟悉和喜歡的 Redis 一樣, 唯一的區別就是這種 Redis 由云端進行管理, 所以你只需要專注于構建你的軟件即可。