<dependency>
<groupId>com.googlecode.xmemcached</groupId>
<artifactId>xmemcached</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
application.yml
spring:
memcache:
# memcached服务器节点
servers: 127.0.0.1:11211 127.0.0.1:11212
# nio连接池的数量
poolSize: 10
# 设置默认操作超时
opTimeout: 3000
# 是否启用url encode机制
sanitizeKeys: false
XMemcachedProperties
/**
* XMemcached 配置属性,读取的是 yml 文件中 spring.memcached 开头的属性
*/
@ConfigurationProperties(prefix = "spring.memcached")
@Configuration
@Data
public class XMemcachedProperties {
/**
* memcached服务器节点
*/
private String servers;
/**
* nio连接池的数量
*/
private Integer poolSize;
/**
* 设置默认操作超时
*/
private Long opTimeout;
/**
* 是否启用url encode机制
*/
private Boolean sanitizeKeys;
}
MemcachedConfig
@Configuration
@Slf4j
public class MemcachedConfig {
@Autowired
private XMemcachedProperties xMemcachedProperties;
@Bean
public MemcachedClient getMemcachedClinet(){
MemcachedClient memcachedClient = null;
try {
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(xMemcachedProperties.getServers()));
// 宕机报警
builder.setFailureMode(false);
// 是否启用url encode机制
builder.setSanitizeKeys(xMemcachedProperties.getSanitizeKeys());
//设置连接数
builder.setConnectionPoolSize(xMemcachedProperties.getPoolSize());
//设置缓存服务器连接时间
builder.setOpTimeout(xMemcachedProperties.getOpTimeout());
// 使用一致性哈希算法(Consistent Hash Strategy)
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
// 使用二进制文件
builder.setCommandFactory(new BinaryCommandFactory());
// 使用序列化传输编码
builder.setTranscoder(new SerializingTranscoder());
// 设置接收缓存区为32K,默认16K
builder.setSocketOption(StandardSocketOption.SO_RCVBUF, 32* 1024);
// 设置发送缓冲区为16K,默认为8K
builder.setSocketOption(StandardSocketOption.SO_SNDBUF,16 *1024);
// 启用nagle算法,提高吞吐量,默认关闭
builder.setSocketOption(StandardSocketOption.TCP_NODELAY,false);
// 心跳检测不在意,也可以关闭心跳检测,减小系统开销
memcachedClient.setEnableHeartBeat(false);
// 默认如果连接超过5秒没有任何IO操作发生即认为空闲并发起心跳检测,可以调长这个时间为10秒;
builder.getConfiguration().setSessionIdleTimeout(10000);
// 进行数据压缩,大于1KB时进行压缩
builder.getTranscoder().setCompressionThreshold(1024);
memcachedClient = builder.build();
}catch (IOException e){
log.error("init MemcachedClient failed:", e);
}
return memcachedClient;
}
}
XMemcachedUtils
@Slf4j
@Configuration
public class XMemcachedUtils {
//
private MemcachedClient memcachedClient;
@Autowired(required = false)
public void setMemcachedClient(MemcachedClient memcachedClient) {
this.memcachedClient = memcachedClient;
}
/**
* Get方法, 转换结果类型,失败时屏蔽异常只返回null.
*/
public <T> T get(String key) {
try {
return (T) memcachedClient.get(key);
} catch (Exception e) {
handleException(e, key);
return null;
}
}
/**
* Get方法,同时更新过期时间, 转换结果类型,失败时屏蔽异常只返回null.
* @param exp 过期秒数
*/
public <T> T get(String key,int exp) {
try {
return (T) memcachedClient.getAndTouch(key,exp);
} catch (Exception e) {
handleException(e, key);
return null;
}
}
/**
* 修改过期时间
* @param key
* @param exp 过期秒数 0-永久存储
* @return
*/
public boolean expire(String key,int exp) {
try {
// OpTimeout 操作超时时间
// memcachedClient.touch(key,exp,OpTimeout);
boolean touchResult = false;
if (exp > 0) {
touchResult = memcachedClient.touch(key, exp);
}
return touchResult;
} catch (Exception e) {
handleException(e, key);
return false;
}
}
/**
* 功能描述:判断key是否存在
* @param key
* @return
*/
public boolean keyIsExist(String key){
try {
if(null == memcachedClient.get(key))
return false;
return true;
} catch (TimeoutException e) {
return false;
} catch (InterruptedException e) {
return false;
} catch (MemcachedException e) {
return false;
}
}
/**
* GetBulk方法, 转换结果类型, 失败时屏蔽异常只返回null.
* @param keys
* @param <T>
* @return
*/
public <T> Map<String, T> getBulk(Collection<String> keys) {
try {
return (Map<String, T>) memcachedClient.get(keys);
} catch (Exception e) {
handleException(e, StringUtils.join(keys, ","));
return null;
}
}
/**
* Set方法, 不等待操作返回结果, 失败抛出RuntimeException..
* @param key
* @param expiredTime
* @param value
*/
public void asyncSet(String key, int expiredTime, Object value) {
try {
memcachedClient.setWithNoReply(key, expiredTime, value);
} catch (Exception e) {
handleException(e, key);
}
}
/**
* Set方法,等待操作返回结果,失败抛出RuntimeException..
* @param key
* @param expiredTime
* @param value
* @return
*/
public boolean set(String key, int expiredTime, Object value) {
try {
return memcachedClient.set(key, expiredTime, value);
} catch (Exception e) {
throw handleException(e, key);
}
}
/**
* Delete方法, 失败抛出RuntimeException.
* @param key
* @return
*/
public boolean delete(String key) {
try {
return memcachedClient.delete(key);
} catch (Exception e) {
throw handleException(e, key);
}
}
/**
* 递增++ : 永久存储
*
* 第一个参数指定递增的key名称
* 第二个参数指定递增的幅度大小
* 第三个参数指定当key不存在的情况下的初始值。
* 两个参数的重载方法省略了第三个参数,默认指定为0。
*/
public long incr(String key, int by, long defaultValue) {
try {
return memcachedClient.incr(key, by, defaultValue);
} catch (Exception e) {
throw handleException(e, key);
}
}
/**
* 递减-- :永久存储,指定key、递减因子、初始值
*
* 第一个参数指定递增的key名称
* 第二个参数指定递增的幅度大小
* 第三个参数指定当key不存在的情况下的初始值
* 两个参数的重载方法省略了第三个参数,默认指定为0。
*/
public long decr(String key, int by, long defaultValue) {
try {
return memcachedClient.decr(key, by, defaultValue);
} catch (Exception e) {
throw handleException(e, key);
}
}
/**
* 清理对象
*
* @param key
*/
public void flushObject(String key) {
try {
memcachedClient.deleteWithNoReply(key);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
log.info("Flush Object: [" + key + "]");
}
/**
* 替换已存在的key的value,注意类型是Object(统计切勿使用replace)
* @param key
* @param expiry(单位秒),超过这个时间,memcached将这个数据替换出去,0表示永久存储(默认是一个月)
* @param value
* @return
*/
public boolean replace(String key, int expiry, Object value){
boolean flag = false;
try {
flag = memcachedClient.replace(key, expiry,value);
} catch (TimeoutException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (InterruptedException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (MemcachedException e) {
log.error("error ========== \r\n{}", e.getMessage());
}
return flag;
}
/**
* 替换已存在的key的value,注意类型是Object(统计切勿使用replace)
* @param key
* @param expiry(单位秒),超过这个时间,memcached将这个数据替换出去,0表示永久存储(默认是一个月)
* @param value
* @param timeout(单位毫秒),设置过期时间,如果expiry还未到期,timeout到期,则该memcached过期
* @return
*/
public boolean replace(String key, int expiry, Object value, long timeout){
boolean flag = false;
try {
flag = memcachedClient.replace(key, expiry, value, timeout);
} catch (TimeoutException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (InterruptedException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (MemcachedException e) {
log.error("error ========== \r\n{}", e.getMessage());
}
return flag;
}
/**
* append在原有的key的value的末尾追加值,如果key不存在,则追加失败
* @param key 键
* @param value 末尾追加的值
* @param l
*/
public boolean append(String key,Object value,Long l) throws InterruptedException, MemcachedException, TimeoutException {
return memcachedClient.append(key,value,l);
}
/**
* prepend在原有的value的头位置添加值,如果key不存在,则添加失败
* @param key 键
* @param value 在头位置添加的值
* @param l 第几个位置添加
*/
public boolean prepend(String key,Object value,Long l) throws InterruptedException, MemcachedException, TimeoutException {
return memcachedClient.prepend(key,value,l);
}
/**
* prepend在原有的value的头位置添加值,不返回添加结果,如果key不存在,则添加失败
* @param key 键
* @param value 在头位置添加的值
*/
public void prependWithNoReply(String key,Object value) throws MemcachedException, InterruptedException {
memcachedClient.prependWithNoReply(key,value);
}
/**
* 计数器累加inc
* @param key
* @param inc 递增的幅度大小
* @return
*/
public long addStats(String key, long inc){
long rec = -1;
Counter counter = memcachedClient.getCounter(key);
try {
rec = counter.incrementAndGet();
} catch (TimeoutException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (InterruptedException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (MemcachedException e) {
log.error("error ========== \r\n{}", e.getMessage());
}
return rec;
}
/**
* 计数器累加inc
* @param key
* @param inc 递增的幅度大小
* @param original key不存在的情况下的初始值
* @return
*/
public long addStats(String key, long inc, long original){
long rec = -1;
Counter counter = memcachedClient.getCounter(key, original);
try {
rec = counter.incrementAndGet();
} catch (TimeoutException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (InterruptedException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (MemcachedException e) {
log.error("error ========== \r\n{}", e.getMessage());
}
return rec;
}
/**
* 获取计数器,key不存在则返回-1
* @param key
*/
public long getStats(String key){
long rec = -1;
// 第二个参数是计数器的初始值
Counter counter = memcachedClient.getCounter(key, -1);
try {
rec = counter.get();
//使用count时实质是在创建一个key,因此需要将这个key清除掉
if(rec == -1)
delete(key);
} catch (TimeoutException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (InterruptedException e) {
log.error("error ========== \r\n{}", e.getMessage());
} catch (MemcachedException e) {
log.error("error ========== \r\n{}", e.getMessage());
}
return rec;
}
/**
* 清空全部缓存 cache ,谨慎使用 真正项目上禁用
*/
public void flushAll() throws InterruptedException, MemcachedException, TimeoutException {
memcachedClient.flushAll();
}
/**gets : gets除了会返回缓存值外,还会返回当前缓存的版本号,一般是用于协同CAS完成原子操作使用
* 获取缓存数据:根据key获取 value 值
* @param key 缓存中的key
*/
public <T> GetsResponse<T> gets(String key) throws InterruptedException, MemcachedException, TimeoutException {
return memcachedClient.gets(key);
}
/**
* 每次操作,cas的id都为递增,并且cas的key一定要存在,要不然会执行失败
* @param key 键
* @param expiry 时间
* @param newValue 新值
* @param cas cas版本号,通过gets 结果集中获取cas
*/
public boolean cas(String key, int expiry, Object newValue,long cas) throws InterruptedException, MemcachedException, TimeoutException {
return memcachedClient.cas(key,expiry,newValue,cas);
}
/**
* 并发时修改值
* Memcached ⽐对这个 CAS 值与当前存储数据的 CAS 值是否相
* 等,如果相等就让新的数据覆盖⽼的数据,如果不相等就认为更新失败,这在并发环境下特别有⽤
* @throws Exception
*/
public void setByCas(String key, int expiry, Object newValue) throws Exception{
GetsResponse<Integer> result = this.gets(key);
long cas = result.getCas();
//尝试将 a 的值更新为 2
if (!this.cas(key, expiry, newValue, cas)) {
System.err.println("cas error");
}
}
private RuntimeException handleException(Exception e, String key) {
log.warn("xmemcached client receive an exception with key:" + key, e);
return new RuntimeException(e);
}
public MemcachedClient getMemcachedClient() {
return memcachedClient;
}
public void destroy() throws Exception {
if (memcachedClient != null) {
memcachedClient.shutdown();
}
}
}
网友评论