拋磚引玉:Redis 與 接口自動化測試框架的結合
接口自動化測試已成為保證軟件質量和穩定性的重要手段。而Redis作為一個高性能的緩存數據庫,具備快速讀寫、多種數據結構等特點,為接口自動化測試提供了強大的支持。勇哥這里粗略介紹如何結合Python操作Redis,并將其應用于接口自動化測試框架中,以提升測試效率和數據管理能力。
Redis 基本操作
(1) Redis的安裝和配置
在開始之前,首先需要安裝Redis并進行相應的配置:
- redis官網:https://redis.io/
- redis中文網:https://www.redis.net.cn/
安裝完成后,確保Redis服務已成功啟動,并正確配置了連接信息(如主機地址、端口號、密碼等),這塊信息就不過多介紹了喲!
(2) Redis與接口自動化測試框架的集成
使用Python操作Redis需要導入相應的客戶端庫,例如:
pip install redis
import redis
(3) 初始化Redis連接
在接口自動化測試框架的初始化過程中,可以添加連接Redis的代碼,確保測試過程中能夠與Redis建立連接。
class TestFramework:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, password='your_password')
操作 Redis 常用命令
(4) 字符串操作
# 設置鍵為"key1"的字符串值為"Hello, Redis!"
r.set('key1', 'Hello, Redis!')
# 獲取鍵為"key1"的字符串值
value = r.get('key1')
print(value) # 輸出: b'Hello, Redis!'
(5) 列表操作
# 向名為"list1"的列表左側插入元素
r.lpush('list1', 'item1')
r.lpush('list1', 'item2')
r.lpush('list1', 'item3')
# 獲取名為"list1"的列表所有元素
items = r.lrange('list1', 0, -1)
print(items) # 輸出: [b'item3', b'item2', b'item1']
(6) 哈希表操作
# 設置名為"hash1"的哈希表字段和值
r.hset('hash1', 'field1', 'value1')
r.hset('hash1', 'field2', 'value2')
# 獲取名為"hash1"的哈希表字段和值
value1 = r.hget('hash1', 'field1')
value2 = r.hget('hash1', 'field2')
print(value1, value2) # 輸出: b'value1' b'value2'
(7) 集合操作
# 向名為"set1"的集合添加元素
r.sadd('set1', 'item1')
r.sadd('set1', 'item2')
r.sadd('set1', 'item3')
# 獲取名為"set1"的集合所有元素
items = r.smembers('set1')
print(items) # 輸出: {b'item1', b'item2', b'item3'}
以上就是 redis 的常見操作,是不是比寫 `sql` 語句簡單,是不是 `so easy!!`
Redis 在接口自動化中的應用
(1) 封裝Redis操作方法
為了方便接口自動化測試框架使用,又要開始封裝了,簡單封裝代碼如下:
class RedisClient:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, password='your_password')
def set_data(self, key, value, expire_time=None):
self.redis.set(key, value)
if expire_time is not None:
self.redis.expire(key, expire_time)
def get_data(self, key):
return self.redis.get(key)
def delete_data(self, key):
self.redis.delete(key)
def hash_set_field(self, key, field, value):
self.redis.hset(key, field, value)
def hash_get_field(self, key, field):
return self.redis.hget(key, field)
def hash_delete_field(self, key, field):
self.redis.hdel(key, field)
接口自動化中比較常用的是字符串了,為了滿足更多場景的需求,我們價格哈希數據結構的封裝操作方法。
接口自動化測試中的常見應用場景
(1) 測試數據管理
接口自動化測試中,將測試數據存儲在Redis中,如用戶信息、配置參數等。通過使用封裝的Redis操作方法,可以方便地進行數據的增、刪、改、查。
redis_client= RedisClient()
redis_client.set_data('user:1', '{"name": "kira", "age": 18}')
user = redis_client.get_data('user:1')
print(user.decode()) # 輸出:{"name": "kira", "age": 18}
(2) 處理接口依賴數據
一般步驟如下:
- 先明確接口的依賴關系: 誰調用誰之前要先調用誰或者誰
- 設置數據到redis:也就是接口B執行成功后,將關鍵數據存redis,可以使用我們封裝的set,健一般是一個標識符,值就是接口的返回值
- 從redis獲取數據:比如接口A執行前,先獲取B數據存Redis,然后調用redis獲取數據給A或者B、C 等等。
上代碼:
redis_client = RedisClient()
# 第一個接口,設置依賴數據
def first_api():
response = requests.get('https://api.example.com/first')
data = response.json()
redis_client.set_data('key', data['value'])
def second_api():
# 獲取依賴數據
dependency_data = redis_client.get_data('key')
response = requests.post('https://api.example.com/second', data={'data': dependency_data})
result = response.json()
# 處理接口響應結果
if __name__ == '__main__':
first_api()
second_api()
(3) 緩存管理
遇到需要頻繁訪問的接口,怎么半?
為了減少接口調用的開銷和提高測試效率,可以使用Redis作為緩存工具,將接口的響應結果緩存起來,以便后續的測試用例重復使用。
redis_client= RedisClient()
def get_user_info(user_id):
cache_key = f'user:{user_id}'
user_info = redis_client.get_data(cache_key)
if not user_info:
# 調用接口獲取用戶信息
user_info = api.get_user_info(user_id)
redis_client.set_data(cache_key, user_info, expire_time=3600)
return user_info
咱們首先檢查Redis緩存中是否已存在對應的用戶信息,如果不存在,則調用接口獲取用戶信息并將其存儲到Redis緩存中,以備后續使用。同時,通過設置expire_time參數,可以為緩存數據設置過期時間,避免過期數據的使用。
(4) 并發測試
在自動化測試中,針對并發場景的測試很重要,我們可以并發模擬一些實際場景,比如:利用redis的原子性和分布式鎖,為每個用戶創建一個唯一的標識,存到redis中,這樣不同用戶請求就可以通過檢查和比對redis的結果來模擬并發訪問了,例如:
# 創建 Redis 客戶端
redis_client = RedisClient()
def get_user_info(user_id):
cache_key = f'user:{user_id}'
user_info = redis_client.get_data(cache_key)
if not user_info:
# 調用接口獲取用戶信息
response = requests.get(f'http://127.0.0.1:5000/?user_id={user_id}')
if response.status_code == 200:
user_info = response.text
print(user_info)
redis_client.set_data(cache_key, user_info, expire_time=3600)
else:
print(f"Failed to retrieve user info for user_id: {user_id}. Status code: {response.status_code}")
return user_info
# 并發測試函數
def run_concurrent_test(user_ids):
with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交任務到線程池
future_to_user_id = {executor.submit(get_user_info, user_id): user_id for user_id in user_ids}
# 處理返回結果
for future in concurrent.futures.as_completed(future_to_user_id):
user_id = future_to_user_id[future]
try:
user_info = future.result()
print(f"user_id: {user_id}; user_info: {user_info}")
except Exception as e:
print(f"Error occurred for user_id: {user_id}, Error: {str(e)}")
if __name__ == '__main__':
u_ids = [i for i in range(10, 99)]
run_concurrent_test(u_ids)
我們創建線程池,使用submit 將任務(get_user_info)提交到線程池,每個任務一個 user_id,這里簡單打印每個用戶id,對于的信息,通過并發執行多任務,可以同一時間內獲取多個用戶信息,提高測試效率。