apitest/util/tools/redisData.py

110 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding:utf-8
"""
@author: jing
@contact: 529548204@qq.com
@file: redisData.py
@time: 2022/5/12 14:18
"""
import json
import redis
from config.confManage import db_manage
class RedisHandler(object):
""" redis 缓存读取封装 """
def __init__(self):
self.host = db_manage("${host}$", "redis")
self.port = db_manage("${port}$", "redis")
self.db = db_manage("${db}$", "redis")
self.password = db_manage("${password}$", "redis")
self.charset = db_manage("${charset}$", "redis")
self.redis = redis.Redis(self.host, port=self.port, password=self.password, decode_responses=True, db=self.db)
def set_string(self, name: str, value, ex=None, px=None, nx=False, xx=False) -> None:
"""
缓存中写入 str单个
:param name: 缓存名称
:param value: 缓存值
:param ex: 过期时间(秒)
:param px: 过期时间(毫秒)
:param nx: 如果设置为True则只有name不存在时当前set操作才执行新增
:param xx: 如果设置为True则只有name存在时当前set操作才执行(修改)
:return:
"""
self.redis.set(name, value, ex=ex, px=px, nx=nx, xx=xx)
def key_exit(self, key):
"""
判断redis中的key是否存在
:param key:
:return:
"""
return self.redis.exists(key)
def incr(self, key):
"""
使用 incr 方法,处理并发问题
当 key 不存在时,则会先初始为 0, 每次调用,则会 +1
:return:
"""
self.redis.incr(key)
def get_key(self, name) -> str:
"""
读取缓存
:param name:
:return:
"""
return self.redis.get(name)
def set_many(self, *args, **kwargs):
"""
批量设置
支持如下方式批量设置缓存
eg: set_many({'k1': 'v1', 'k2': 'v2'})
set_many(k1="v1", k2="v2")
:return:
"""
self.redis.mset(*args, **kwargs)
def get_many(self, *args):
"""获取多个值"""
results = self.redis.mget(*args)
return results
def del_all_cache(self):
"""清理所有现在的数据"""
for key in self.redis.keys():
self.del_cache(key)
def del_cache(self, name):
"""
删除缓存
:param name:
:return:
"""
self.redis.delete(name)
def clear(self):
# 清空当前库数据
self.redis.execute_command('FLUSHDB')
# execute_command
if __name__ == '__main__':
d = {'login': {'name': '登录', 'order': 1, 'token': False, 'file': False, 'case': [
{'address': '/v1/oauth/login/password/',
'assert': {'jsonpath': None, 'sqlassert': None, 'time': 2, 'code': 200},
'data': {'file': None, 'param': {'username': 'dl010', 'password': 123456}, 'urlparam': None},
'headers': {'Content-Type': 'application/json'}, 'host': 'HB', 'info': '代理账号登录', 'method': 'POST',
'cookie': True, 'cache': None, 'relevance': None, 'teardown': None}]}}
rd = RedisHandler()
d['login'] = json.dumps(d['login'])
rd.set_many(d)
# rd.clear()
# print(json.loads(rd.get_key("login")))