美文网首页
spring boot 整合xmemched和工具类

spring boot 整合xmemched和工具类

作者: thinking2019 | 来源:发表于2020-07-16 23:22 被阅读0次
<dependency>
    <groupId>com.googlecode.xmemcached</groupId>
    <artifactId>xmemcached</artifactId>
    <version>2.4.6</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

application.yml

spring:
  memcache:
    # memcached服务器节点
    servers: 127.0.0.1:11211 127.0.0.1:11212
    # nio连接池的数量
    poolSize: 10
    # 设置默认操作超时
    opTimeout: 3000
    # 是否启用url encode机制
    sanitizeKeys: false

XMemcachedProperties

/**
 * XMemcached 配置属性,读取的是 yml 文件中 spring.memcached 开头的属性
 */
@ConfigurationProperties(prefix = "spring.memcached")
@Configuration
@Data
public class XMemcachedProperties {

    /**
     * memcached服务器节点
     */
    private String servers;

    /**
     * nio连接池的数量
     */
    private Integer poolSize;

    /**
     * 设置默认操作超时
     */
    private Long opTimeout;

    /**
     * 是否启用url encode机制
     */
    private Boolean sanitizeKeys;
}

MemcachedConfig

@Configuration
@Slf4j
public class MemcachedConfig {

    @Autowired
    private XMemcachedProperties xMemcachedProperties;

    @Bean
    public MemcachedClient getMemcachedClinet(){
        MemcachedClient memcachedClient = null;
        try {
            MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(xMemcachedProperties.getServers()));
            // 宕机报警
            builder.setFailureMode(false);
            // 是否启用url encode机制
            builder.setSanitizeKeys(xMemcachedProperties.getSanitizeKeys());
            //设置连接数
            builder.setConnectionPoolSize(xMemcachedProperties.getPoolSize());
            //设置缓存服务器连接时间
            builder.setOpTimeout(xMemcachedProperties.getOpTimeout());
            // 使用一致性哈希算法(Consistent Hash Strategy)
            builder.setSessionLocator(new KetamaMemcachedSessionLocator());
            // 使用二进制文件
            builder.setCommandFactory(new BinaryCommandFactory());
            // 使用序列化传输编码
            builder.setTranscoder(new SerializingTranscoder());
            // 设置接收缓存区为32K,默认16K
            builder.setSocketOption(StandardSocketOption.SO_RCVBUF, 32* 1024);
            // 设置发送缓冲区为16K,默认为8K
            builder.setSocketOption(StandardSocketOption.SO_SNDBUF,16 *1024);
            // 启用nagle算法,提高吞吐量,默认关闭
            builder.setSocketOption(StandardSocketOption.TCP_NODELAY,false);
            // 心跳检测不在意,也可以关闭心跳检测,减小系统开销
            memcachedClient.setEnableHeartBeat(false);
            // 默认如果连接超过5秒没有任何IO操作发生即认为空闲并发起心跳检测,可以调长这个时间为10秒;
            builder.getConfiguration().setSessionIdleTimeout(10000);
            // 进行数据压缩,大于1KB时进行压缩
            builder.getTranscoder().setCompressionThreshold(1024);
            memcachedClient = builder.build();
        }catch (IOException e){
            log.error("init MemcachedClient failed:", e);
        }
        return memcachedClient;
    }
}

XMemcachedUtils

@Slf4j
@Configuration
public class XMemcachedUtils {
    //
    private MemcachedClient memcachedClient;

    @Autowired(required = false)
    public void setMemcachedClient(MemcachedClient memcachedClient) {
        this.memcachedClient = memcachedClient;
    }

    /**
     * Get方法, 转换结果类型,失败时屏蔽异常只返回null.
     */
    public <T> T get(String key) {
        try {
            return (T) memcachedClient.get(key);
        } catch (Exception e) {
            handleException(e, key);
            return null;
        }
    }

    /**
     * Get方法,同时更新过期时间, 转换结果类型,失败时屏蔽异常只返回null.
     * @param exp 过期秒数
     */
    public <T> T get(String key,int exp) {
        try {
            return (T) memcachedClient.getAndTouch(key,exp);
        } catch (Exception e) {
            handleException(e, key);
            return null;
        }
    }

    /**
     * 修改过期时间
     * @param key
     * @param exp 过期秒数  0-永久存储
     * @return
     */
    public boolean expire(String key,int exp) {
        try {
            // OpTimeout 操作超时时间
//            memcachedClient.touch(key,exp,OpTimeout);
            boolean touchResult = false;
            if (exp > 0) {
                touchResult = memcachedClient.touch(key, exp);
            }
            return touchResult;
        } catch (Exception e) {
            handleException(e, key);
            return false;
        }
    }

    /**
     * 功能描述:判断key是否存在
     * @param key
     * @return
     */
    public boolean keyIsExist(String key){
        try {
            if(null == memcachedClient.get(key))
                return false;
            return true;
        } catch (TimeoutException e) {
            return false;
        } catch (InterruptedException e) {
            return false;
        } catch (MemcachedException e) {
            return false;
        }
    }

    /**
     * GetBulk方法, 转换结果类型, 失败时屏蔽异常只返回null.
     * @param keys
     * @param <T>
     * @return
     */
    public <T> Map<String, T> getBulk(Collection<String> keys) {
        try {
            return (Map<String, T>) memcachedClient.get(keys);
        } catch (Exception e) {
            handleException(e, StringUtils.join(keys, ","));
            return null;
        }
    }

    /**
     * Set方法, 不等待操作返回结果, 失败抛出RuntimeException..
     * @param key
     * @param expiredTime
     * @param value
     */
    public void asyncSet(String key, int expiredTime, Object value) {
        try {
            memcachedClient.setWithNoReply(key, expiredTime, value);
        } catch (Exception e) {
            handleException(e, key);
        }
    }

    /**
     * Set方法,等待操作返回结果,失败抛出RuntimeException..
     * @param key
     * @param expiredTime
     * @param value
     * @return
     */
    public boolean set(String key, int expiredTime, Object value) {
        try {
            return memcachedClient.set(key, expiredTime, value);
        } catch (Exception e) {
            throw handleException(e, key);
        }
    }

    /**
     * Delete方法, 失败抛出RuntimeException.
     * @param key
     * @return
     */
    public boolean delete(String key) {
        try {
            return memcachedClient.delete(key);
        } catch (Exception e) {
            throw handleException(e, key);
        }
    }

    /**
     * 递增++ : 永久存储
     *
     * 第一个参数指定递增的key名称
     * 第二个参数指定递增的幅度大小
     * 第三个参数指定当key不存在的情况下的初始值。
     * 两个参数的重载方法省略了第三个参数,默认指定为0。
     */
    public long incr(String key, int by, long defaultValue) {
        try {
            return memcachedClient.incr(key, by, defaultValue);
        } catch (Exception e) {
            throw handleException(e, key);
        }
    }

    /**
     * 递减-- :永久存储,指定key、递减因子、初始值
     *
     * 第一个参数指定递增的key名称
     * 第二个参数指定递增的幅度大小
     * 第三个参数指定当key不存在的情况下的初始值
     * 两个参数的重载方法省略了第三个参数,默认指定为0。
     */
    public long decr(String key, int by, long defaultValue) {
        try {
            return memcachedClient.decr(key, by, defaultValue);
        } catch (Exception e) {
            throw handleException(e, key);
        }
    }

    /**
     * 清理对象
     *
     * @param key
     */
    public void flushObject(String key) {
        try {
            memcachedClient.deleteWithNoReply(key);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        log.info("Flush Object: [" + key + "]");
    }

    /**
     * 替换已存在的key的value,注意类型是Object(统计切勿使用replace)
     * @param key
     * @param expiry(单位秒),超过这个时间,memcached将这个数据替换出去,0表示永久存储(默认是一个月)
     * @param value
     * @return
     */
    public boolean replace(String key, int expiry, Object value){
        boolean flag = false;
        try {
            flag = memcachedClient.replace(key, expiry,value);
        } catch (TimeoutException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (InterruptedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (MemcachedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        }
        return flag;
    }

    /**
     * 替换已存在的key的value,注意类型是Object(统计切勿使用replace)
     * @param key
     * @param expiry(单位秒),超过这个时间,memcached将这个数据替换出去,0表示永久存储(默认是一个月)
     * @param value
     * @param timeout(单位毫秒),设置过期时间,如果expiry还未到期,timeout到期,则该memcached过期
     * @return
     */
    public boolean replace(String key, int expiry, Object value, long timeout){
        boolean flag = false;
        try {
            flag = memcachedClient.replace(key, expiry, value, timeout);
        } catch (TimeoutException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (InterruptedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (MemcachedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        }
        return flag;
    }

    /**
     * append在原有的key的value的末尾追加值,如果key不存在,则追加失败
     * @param key 键
     * @param value 末尾追加的值
     * @param l
     */
    public boolean append(String key,Object value,Long l) throws InterruptedException, MemcachedException, TimeoutException {
        return memcachedClient.append(key,value,l);
    }

    /**
     * prepend在原有的value的头位置添加值,如果key不存在,则添加失败
     * @param key 键
     * @param value 在头位置添加的值
     * @param l 第几个位置添加
     */
    public boolean prepend(String key,Object value,Long l) throws InterruptedException, MemcachedException, TimeoutException {
        return memcachedClient.prepend(key,value,l);
    }

    /**
     * prepend在原有的value的头位置添加值,不返回添加结果,如果key不存在,则添加失败
     * @param key 键
     * @param value 在头位置添加的值
     */
    public void prependWithNoReply(String key,Object value) throws MemcachedException, InterruptedException {
        memcachedClient.prependWithNoReply(key,value);
    }

    /**
     * 计数器累加inc
     * @param key
     * @param inc 递增的幅度大小
     * @return
     */
    public long addStats(String key, long inc){
        long rec = -1;
        Counter counter = memcachedClient.getCounter(key);
        try {
            rec = counter.incrementAndGet();
        } catch (TimeoutException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (InterruptedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (MemcachedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        }
        return rec;
    }

    /**
     * 计数器累加inc
     * @param key
     * @param inc 递增的幅度大小
     * @param original key不存在的情况下的初始值
     * @return
     */
    public long addStats(String key, long inc, long original){
        long rec = -1;
        Counter counter = memcachedClient.getCounter(key, original);
        try {
            rec = counter.incrementAndGet();
        } catch (TimeoutException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (InterruptedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (MemcachedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        }
        return rec;
    }

    /**
     * 获取计数器,key不存在则返回-1
     * @param key
     */
    public long getStats(String key){
        long rec = -1;
        // 第二个参数是计数器的初始值
        Counter counter = memcachedClient.getCounter(key, -1);
        try {
            rec = counter.get();
            //使用count时实质是在创建一个key,因此需要将这个key清除掉
            if(rec == -1)
                delete(key);
        } catch (TimeoutException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (InterruptedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        } catch (MemcachedException e) {
            log.error("error ========== \r\n{}", e.getMessage());
        }
        return rec;
    }

    /**
     * 清空全部缓存 cache ,谨慎使用 真正项目上禁用
     */
    public void flushAll() throws InterruptedException, MemcachedException, TimeoutException {
        memcachedClient.flushAll();
    }

    /**gets : gets除了会返回缓存值外,还会返回当前缓存的版本号,一般是用于协同CAS完成原子操作使用
     * 获取缓存数据:根据key获取 value 值
     * @param key 缓存中的key
     */
    public <T> GetsResponse<T> gets(String key) throws InterruptedException, MemcachedException, TimeoutException {
        return memcachedClient.gets(key);
    }

    /**
     * 每次操作,cas的id都为递增,并且cas的key一定要存在,要不然会执行失败
     * @param key 键
     * @param expiry 时间
     * @param newValue 新值
     * @param cas  cas版本号,通过gets 结果集中获取cas
     */
    public boolean cas(String key, int expiry, Object newValue,long cas) throws InterruptedException, MemcachedException, TimeoutException {
        return memcachedClient.cas(key,expiry,newValue,cas);
    }

    /**
     * 并发时修改值
     * Memcached ⽐对这个 CAS 值与当前存储数据的 CAS 值是否相
     * 等,如果相等就让新的数据覆盖⽼的数据,如果不相等就认为更新失败,这在并发环境下特别有⽤
     * @throws Exception
     */
    public void setByCas(String key, int expiry, Object newValue) throws Exception{
        GetsResponse<Integer> result = this.gets(key);
        long cas = result.getCas();
        //尝试将 a 的值更新为 2
        if (!this.cas(key, expiry, newValue, cas)) {
            System.err.println("cas error");
        }
    }

    private RuntimeException handleException(Exception e, String key) {
        log.warn("xmemcached client receive an exception with key:" + key, e);
        return new RuntimeException(e);
    }

    public MemcachedClient getMemcachedClient() {
        return memcachedClient;
    }

    public void destroy() throws Exception {
        if (memcachedClient != null) {
            memcachedClient.shutdown();
        }
    }
}

长期更新的公众号:Java技术学习笔记

相关文章

网友评论

      本文标题:spring boot 整合xmemched和工具类

      本文链接:https://www.haomeiwen.com/subject/hxsvhktx.html