mirror of https://gitee.com/a529548204/apitest.git
110 lines
3.2 KiB
Python
110 lines
3.2 KiB
Python
# coding:utf-8
|
||
"""
|
||
@author: jing
|
||
@contact: 529548204@qq.com
|
||
@file: redisData.py
|
||
@time: 2022/5/12 14:18
|
||
"""
|
||
import json
|
||
|
||
import redis
|
||
|
||
from config.confManage import db_manage
|
||
|
||
|
||
class RedisHandler(object):
|
||
""" redis 缓存读取封装 """
|
||
|
||
def __init__(self):
|
||
self.host = db_manage("${host}$", "redis")
|
||
self.port = db_manage("${port}$", "redis")
|
||
self.db = db_manage("${db}$", "redis")
|
||
self.password = db_manage("${password}$", "redis")
|
||
self.charset = db_manage("${charset}$", "redis")
|
||
self.redis = redis.Redis(self.host, port=self.port, password=self.password, decode_responses=True, db=self.db)
|
||
|
||
def set_string(self, name: str, value, ex=None, px=None, nx=False, xx=False) -> None:
|
||
"""
|
||
缓存中写入 str(单个)
|
||
:param name: 缓存名称
|
||
:param value: 缓存值
|
||
:param ex: 过期时间(秒)
|
||
:param px: 过期时间(毫秒)
|
||
:param nx: 如果设置为True,则只有name不存在时,当前set操作才执行(新增)
|
||
:param xx: 如果设置为True,则只有name存在时,当前set操作才执行(修改)
|
||
:return:
|
||
"""
|
||
self.redis.set(name, value, ex=ex, px=px, nx=nx, xx=xx)
|
||
|
||
def key_exit(self, key):
|
||
"""
|
||
判断redis中的key是否存在
|
||
:param key:
|
||
:return:
|
||
"""
|
||
|
||
return self.redis.exists(key)
|
||
|
||
def incr(self, key):
|
||
"""
|
||
使用 incr 方法,处理并发问题
|
||
当 key 不存在时,则会先初始为 0, 每次调用,则会 +1
|
||
:return:
|
||
"""
|
||
self.redis.incr(key)
|
||
|
||
def get_key(self, name) -> str:
|
||
"""
|
||
读取缓存
|
||
:param name:
|
||
:return:
|
||
"""
|
||
return self.redis.get(name)
|
||
|
||
def set_many(self, *args, **kwargs):
|
||
"""
|
||
批量设置
|
||
支持如下方式批量设置缓存
|
||
eg: set_many({'k1': 'v1', 'k2': 'v2'})
|
||
set_many(k1="v1", k2="v2")
|
||
:return:
|
||
"""
|
||
self.redis.mset(*args, **kwargs)
|
||
|
||
def get_many(self, *args):
|
||
"""获取多个值"""
|
||
results = self.redis.mget(*args)
|
||
return results
|
||
|
||
def del_all_cache(self):
|
||
"""清理所有现在的数据"""
|
||
for key in self.redis.keys():
|
||
self.del_cache(key)
|
||
|
||
def del_cache(self, name):
|
||
"""
|
||
删除缓存
|
||
:param name:
|
||
:return:
|
||
"""
|
||
self.redis.delete(name)
|
||
|
||
def clear(self):
|
||
# 清空当前库数据
|
||
self.redis.execute_command('FLUSHDB')
|
||
# execute_command
|
||
|
||
|
||
if __name__ == '__main__':
|
||
d = {'login': {'name': '登录', 'order': 1, 'token': False, 'file': False, 'case': [
|
||
{'address': '/v1/oauth/login/password/',
|
||
'assert': {'jsonpath': None, 'sqlassert': None, 'time': 2, 'code': 200},
|
||
'data': {'file': None, 'param': {'username': 'dl010', 'password': 123456}, 'urlparam': None},
|
||
'headers': {'Content-Type': 'application/json'}, 'host': 'HB', 'info': '代理账号登录', 'method': 'POST',
|
||
'cookie': True, 'cache': None, 'relevance': None, 'teardown': None}]}}
|
||
rd = RedisHandler()
|
||
d['login'] = json.dumps(d['login'])
|
||
rd.set_many(d)
|
||
# rd.clear()
|
||
# print(json.loads(rd.get_key("login")))
|