import redis
class RedisContent():
def read(self,key):
try:
'''
**redisinfo
为应用配置文件conf.ini配置关于redis的数据库信息
redisinfo={
"host": 192.168.1.48,
"port": 6379
}
参数key为,redis存储当中的key
'''
r = redis.StrictRedis(decode_responses=True, charset='UTF-8', encoding='UTF-8',**redisinfo)
value = r.get(key)
return value
except Exception as e:
raise e
python操作redis增删改查
python连接redis
1、普通连接
import redis
"""
redis默认可以不设置密码,因此password可为空不传
"""
r=redis.StrictRedis(host="192.168.23.166", port=6379,password="123456",db=0)
r.set("s","hello",ex=5) #ex代表seconds,px代表ms,表示过期时间
2、通过连接池连接
pool=redis.ConnectionPool(host="192.168.23.166", port=6379,password="123456",db=0)
r=redis.StrictRedis(connection_pool=pool)
3、操作redis
(1-1)、result=r.get(key).decode()
通过指定key获取value的值,因为获取的二进制所以需要解码decode(),
如果key不存在,r.get(key)不报错,r.get(key).decode()报错
(1-2)、result=r.hget(key,name).decode()
获取针对存储hash类型的值
(2)、r.delete(key)
通过指定key删除redis中的键值对
(3)、result=r.exists(key)
判断指定key是否存在
(4)、resultList=r.keys()
获取所有的key返回列表
(5)、resultList=r.keys(pattern='正则表达式')
通过key的正则表达式返回key列表
正则支持如下三种:
1. * 任意长度的任意字符
2. ? 任意单一字符
3. [xxx] 匹配方括号中的一个字符
(6)、r.flushdb()
清空redis的所有keys
单机模式
举例:
# -*-coding:utf-8 -*-
# @Time : hudechao
# @Author : 2021/06/09 19:55
import redis
pool=redis.ConnectionPool(host='192.168.1.202', port=6379, db=0)
r = redis.StrictRedis(connection_pool=pool)
#获取redis指定key的值
"""
获取不存在key的值会报错
"""
def getRedis(key):
#result=res.get('account_future:userId:201:currencyId:1').decode()
try:
result = r.get(key).decode()
except Exception as e:
print('{} 不存在'.format(key))
result=None
return result
#删除redis指定key
"""
删除不存在的key不会报错
"""
def delRedis(key):
if isexist(key)==1:
result=r.delete(key)
if result:
print("删除成功:{}".format(key))
else:
print("删除失败:{}".format(key))
else:
print("{}:不存在".format(key))
#判断当前key是否存在
def isexist(key):
result=r.exists(key)
return result
#获取所有的key
def getAllKey():
result=r.keys()
return result
#根据指定模糊keyname过滤需要的keys
def getKey(keyname):
"""
:param keyname: "pattern=*name"
pattern支持持三种:
1. * 任意长度的任意字符
2. ? 任意单一字符
3. [xxx] 匹配方括号中的一个字符
:return:
"""
result=r.keys(pattern=keyname)
return result
#清空所有的keys
def delAllKey():
r.flushdb()
#为redis设置新的键值对
def setRedis():
r.set('hudechao','{"name":"dechaohu","age":"18"}')
if __name__ == '__main__':
#frozen_margin的key列表
keylist_frozen_margin=[]
#position的key列表
keylist_position = []
#order的key列表
keylist_order=[]
#frozen_positio的key列表
keylist_frozen_position=[]
userId_list=[201,301,401,501,502,503,504,505,506,507,508]
for item in userId_list:
#删除account_future的key
key="account_future:userId:{0}:currencyId:1".format(item)
delRedis(key)
#frozen_margin的key列表
key_frozen_margin='frozen_margin:userId:{}:orderNo:*'.format(item)
keylist=getKey(key_frozen_margin)
keylist_frozen_margin.extend(keylist)
#position的key列表
key_position1='position:userId:{}:contractId:5'.format(item)
delRedis(key_position1)
key_position2='position:userId:{}:positionNo:*'.format(item)
keylist_tmp=getKey(key_position2)
keylist_position.extend(keylist_tmp)
#order的key列表
key_order="order:contract:5:user:{}:orderNo:*".format(item)
keyorder_tmp=getKey(key_order)
keylist_order.extend(keyorder_tmp)
#frozen_positio的key列表
key_frozen_position="frozen_position:userId:{}:*".format(item)
key_frozen_position_tmp=getKey(key_frozen_position)
keylist_frozen_position.extend(key_frozen_position_tmp)
#删除frozen_margin的key
if keylist_frozen_margin is not None:
for key in keylist_frozen_margin:
delRedis(key)
else:
print('frozen_margin key is None')
#position的key列表
if keylist_position is not None:
for key in keylist_position:
delRedis(key)
else:
print('position key is None')
#删除order的key
if keylist_order is not None:
for key in keylist_order:
delRedis(key)
else:
print('order key is None')
#删除frozen_position的key
if keylist_frozen_position is not None:
for key in keylist_frozen_position:
delRedis(key)
else:
print('frozen_position key is None')
集群模式
from configparser import ConfigParser
from common import mysql_utils
import json
import paramiko
import os
import socks
import time
current_path=os.path.dirname(__file__)
root_path=os.path.dirname(current_path)
conffile_path=os.path.join(root_path+'/','config/conf.ini')
redis_node = ['fm-test-cache1-0001-001.snbr1u.0001.apne1.cache.amazonaws.com',
'fm-test-cache1-0002-001.snbr1u.0001.apne1.cache.amazonaws.com',
'fm-test-cache1-0003-001.snbr1u.0001.apne1.cache.amazonaws.com']
def conn_try_again(func):
nums = 0
def wrapped(*args,**kwargs):
try:
return func(*args,**kwargs)
except Exception as e:
nonlocal nums
if int(nums) < 10:
nums = nums + 1
time.sleep(1)
return wrapped(*args,**kwargs)
else:
raise Exception(e)
with open(log_path,'a',encoding="utf-8") as f:
f.write(e)
return wrapped
class RedisLinuxUtil:
def __init__(self):
try:
cf=ConfigParser()
cf.read(conffile_path)
hostname=cf.get("Rdislinuxinfo","hostname")
port=cf.get("Rdislinuxinfo","port")
username=cf.get("Rdislinuxinfo","username")
pkey=paramiko.RSAKey.from_private_key_file(cf.get("Rdislinuxinfo","keyfile"))
except Exception as e:
print('读取配置文件conf.ini异常',e)
try:
sock = socks.socksocket()
sock.set_proxy(socks.SOCKS5, "127.0.0.1", 1080)
sock.connect((hostname, int(port)))
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(hostname=hostname,port=int(port),username=username,pkey=pkey,sock=sock)
except Exception as e:
print('连接linux服务器失败',e)
def getKey(self,key):
result = []
for r in redis_node:
# stdin, stdout, stderr = self.client.exec_command('redis-cli -h {} -p 6379 -c scan 0 MATCH {} COUNT 1000'.format(r, key))
stdin, stdout, stderr = self.client.exec_command('redis-cli -h {} -p 6379 -c keys {}'.format(r, key))
msg = stdout.read().decode('utf-8').strip('\n').split('\n')
for v in msg:
if v == '0':
continue
result.append(v)
return result
def delRedis(self,key):
for r in redis_node:
stdin, stdout, stderr = self.client.exec_command('redis-cli -h {} -p 6379 -c del {}'.format(r, key))
result = stdout.read().decode('utf-8')
if result.__eq__(1):
print("{}:删除成功".format(key))
else:
print(key)
print('删除数据失败 异常:{}'.format(result))
# 批量删除key
def delbatchRedis(self,key):
"""
:param key: keyname , 支持模糊匹配, key*
:return:
"""
for r in redis_node:
stdin, stdout, stderr = self.client.exec_command('redis-cli -h {0} -p 6379 -c keys {1} | xargs -r -t -n1 redis-cli -h {2} -p 6379 -c del '.format(r, key, r))
result = stdout.read().decode('utf-8')
if result.__lt__(1):
print("{}:批量删除成功".format(key))
else:
print("{}:批量删除失败".format(key))
@conn_try_again
def clean_redis(self,userId_list):
for item in userId_list:
# 删除account_future的key
key = "account_future:userId:{0}:currencyId:1".format(item)
self.delRedis(key)
# frozen_margin的key列表
key_frozen_margin = 'frozen_margin:userId:{}:orderNo:*'.format(item)
self.delbatchRedis(key_frozen_margin)
# position的key列表
key_position1 = 'position:userId:{}:orderNo:*'.format(item)
key_position2 = 'position:userId:{}:positionNo:*'.format(item)
# 净持仓的position的key
key_p = "position:userId:{}:contractId:*".format(int(item))
self.delbatchRedis(key_position1)
self.delbatchRedis(key_position2)
self.delbatchRedis(key_p)
# order的key列表
key_order = "order:contract:*:user:{}:orderNo:*".format(item)
key_user_order = "order:user:{}:contract:*:orderNo:*".format(item)
self.delbatchRedis(key_order)
self.delbatchRedis(key_user_order)
# frozen_positio的key列表
key_frozen_position = "frozen_position:userId:{}:orderNo:*".format(item)
self.delbatchRedis(key_frozen_position)
# 删除system_takeover_account的key
systemtkey = "system_takeover_account:userId:{}:force_no:*".format(item)
self.delbatchRedis(systemtkey)
# 删除system_takeover_position的key
systempkey_ = "system_takeover_position:userId:{}:positionNo:*".format(item)
self.delbatchRedis(systempkey_)
def clean_redis_new(self):
# 删除force_close_order_matching:contract:7:orderNo:10000000000032101
key_force_close_order_matching = "force_close_order_matching:contract:7:orderNo:*"
self.delbatchRedis(key_force_close_order_matching)
self.delRedis("force_close_order:contract:7")
self.delRedis("order_worker_status:7")
def redis_put(self,userId_list):
"""
初始化账号信息到Redis
:return:
"""
for userId in userId_list:
put_key='account_future:userId:{}:currencyId:1'.format(userId)
#account_future表
table_account_future = mysql_utils.MysqlUtils().select('select * from account_future where user_id = {}'.format(userId))
account = {}
for item in table_account_future:
account['changeTime'] = int(item.get("change_time"))
account['crossFrozenPositionMargin'] = float(item['cross_frozen_position_margin'])
account['crossPositionMargin'] = float(item['cross_position_margin'])
account['currencyId'] = item['currency_id']
account['deleteTime'] = float(item['isolated_frozen_position_margin'])
account['isForbid'] = item['is_forbid']
account['isolatedFrozenPositionMargin'] = float(item['isolated_frozen_position_margin'])
account['isolatedPositionMargin'] = float(item['isolated_position_margin'])
account['totalMoney'] = float(item['total_money'])
account['userId'] = item['user_id']
value = json.dumps(str(account).replace("'", '\\\"'))
cmd = 'redis-cli -h fm-test-cache1-0001-001.snbr1u.0001.apne1.cache.amazonaws.com -p 6379 -c set {0} \\\"{1}\\\"'.format(put_key,value)
stdin, stdout, stderr = self.client.exec_command(cmd)
print(stdout.read().decode("utf-8"))
def setRedisString(self,key,value):
for r in redis_node:
stdin, stdout, stderr = self.client.exec_command('redis-cli -h {0} -p 6379 -c set {1} {2}'.format(r,key, value))
result = stdout.read().decode('utf-8')
print(result)
if __name__ == '__main__':
userId_list = [509]
rl = RedisLinuxUtil()
rl.clean_redis(userId_list)
网友评论