tips:
本文简单介绍Flask中使用redis
本文代码基于python3编写
项目场景
在实际项目中,不频繁变化且重复使用的数据、有一定时效的数据等。放入redis中,不仅可以提高查询效率,还能减少维护成本。实际应用比如手机验证码,token验证、任务调度等。
redis
定义
REmote DIctionary Server(Redis) 是一个由Salvatore Sanfilippo写的key-value存储系统。Redis是一个开源的使用ANSI C语言编写、遵守BSD协议、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。它通常被称为数据结构服务器,因为值(value)可以是 字符串(String), 哈希(Hash), 列表(list), 集合(sets) 和 有序集合(sorted sets)等类型。
安装
安装过程可参考我的另一篇文章https://www.jianshu.com/p/bfb5e45586c5,也可以在菜鸟教程查看。
使用方法
Python使用redis
1、 安装Python redis包 pip install redis
2、简单使用
import redis
# 获取redis数据库连接
r = redis.StrictRedis(host="127.0.0.1", port=6379, db=0)
# redis存入键值对
r.set(name="key", value="value")
# 读取键值对
print(r.get("key"))
# 删除
print(r.delete("key"))
# redis存入Hash值
r.hset(name="name", key="key1", value="value1")
r.hset(name="name", key="key2", value="value2")
# 获取所有哈希表中的字段
print(r.hgetall("name"))
# 获取所有给定字段的值
print(r.hmget("name", "key1", "key2"))
# 获取存储在哈希表中指定字段的值。
print(r.hmget("name", "key1"))
# 删除一个或多个哈希表字段
print(r.hdel("name", "key1"))
# 过期时间
r.expire("name", 60) # 60秒后过期
# 更多相关内容可以参考菜鸟教程
Flask使用redis
1、封装redis方法(util.py)
from flask import current_app
import redis
class Redis(object):
"""
redis数据库操作
"""
@staticmethod
def _get_r():
host = current_app.config['REDIS_HOST']
port=current_app.config['REDIS_PORT']
db=current_app.config['REDIS_DB']
r = redis.StrictRedis(host, port,db)
return r
@classmethod
def write(cls, key, value, expire=None):
"""
写入键值对
"""
# 判断是否有过期时间,没有就设置默认值
if expire:
expire_in_seconds = expire
else:
expire_in_seconds = current_app.config['REDIS_EXPIRE']
r = cls._get_r()
r.set(key, value, ex=expire_in_seconds)
@classmethod
def read(cls, key):
"""
读取键值对内容
"""
r = cls._get_r()
value = r.get(key)
return value.decode('utf-8') if value else value
@classmethod
def hset(cls, name, key, value):
"""
写入hash表
"""
r = cls._get_r()
r.hset(name, key, value)
@classmethod
def hmset(cls, key, *value):
"""
读取指定hash表的所有给定字段的值
"""
r = cls._get_r()
value = r.hmset(key, *value)
return value
@classmethod
def hget(cls, name, key):
"""
读取指定hash表的键值
"""
r = cls._get_r()
value = r.hget(name, key)
return value.decode('utf-8') if value else value
@classmethod
def hgetall(cls, name):
"""
获取指定hash表所有的值
"""
r = cls._get_r()
return r.hgetall(name)
@classmethod
def delete(cls, *names):
"""
删除一个或者多个
"""
r = cls._get_r()
r.delete(*names)
@classmethod
def hdel(cls, name, key):
"""
删除指定hash表的键值
"""
r = cls._get_r()
r.hdel(name, key)
@classmethod
def expire(cls, name, expire=None):
"""
设置过期时间
"""
if expire:
expire_in_seconds = expire
else:
expire_in_seconds = current_app.config['REDIS_EXPIRE']
r = cls._get_r()
r.expire(name, expire_in_seconds)
2、测试使用(test.py)
from util import Redis
bp = Blueprint(service_name, __name__, url_prefix="/")
@bp.route('/testRedisWrite', methods=['GET'])
def test_redis_write():
"""
测试redis
"""
Redis.write("test_key","test_value",60)
return "ok"
@bp.route('/testRedisRead', methods=['GET'])
def test_redis_read():
"""
测试redis
"""
data = Redis.read("test_key")
return data
3、Flask App(app.py)
from flask import Flask
from test import bp
app = Flask(__name__)
app.config['REDIS_HOST'] = "127.0.0.1" # redis数据库地址
app.config['REDIS_PORT'] = 6379 # redis 端口号
app.config['REDIS_DB'] = 0 # 数据库名
app.config['REDIS_EXPIRE'] = 60 # redis 过期时间60秒
# 注册接口
app.register_blueprint(bp)
if __name__=="__main__":
app.run()
总结
简单的使用了redis以及相关的API封装,方便快捷的使用。
网友评论