美文网首页
使用Berkeley DB实现LinkedBlockingQue

使用Berkeley DB实现LinkedBlockingQue

作者: engineer_tang | 来源:发表于2023-08-10 17:05 被阅读0次

    Berkeley DB是一个开放源代码的内嵌式数据库管理系统,能够为应用程序提供高性能的数据管理服务。应用它程序员只需要调用一些简单的API就可以完成对数据的访问和管理。与常用的数据库管理系统(如MySQL和Oracle等)有所不同,在Berkeley DB中并没有数据库服务器的概念。应用程序不需要事先同数据库服务建立起网络连接,而是通过内嵌在程序中的Berkeley DB函数库来完成对数据的保存、查询、修改和删除等操作。

    1. 引入maven依赖

            <dependency>
                <groupId>com.sleepycat</groupId>
                <artifactId>je</artifactId>
                <version>18.3.12</version>
            </dependency>
    
            <dependency>
                <groupId>de.ruedigermoeller</groupId>
                <artifactId>fst</artifactId>
                <version>2.48-jdk-6</version>
                <exclusions>
                    <exclusion>
                        <groupId>commons-logging</groupId>
                        <artifactId>commons-logging</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>com.cedarsoftware</groupId>
                        <artifactId>json-io</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    

    2. 具体实现

    BdbInitService文件

    package com.joe.heightconcurrency.demo.service;
    
    import com.joe.heightconcurrency.demo.config.BdbPropertyConfig;
    import com.joe.heightconcurrency.demo.entity.User;
    import com.joe.heightconcurrency.demo.util.FstSerialBinding;
    import com.sleepycat.collections.StoredMap;
    import com.sleepycat.je.*;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.io.File;
    import java.util.Objects;
    import java.util.concurrent.ConcurrentHashMap;
    
    @Service
    @Slf4j
    public class BdbInitService {
    
        private static Environment env;
    
        private static DatabaseConfig dbConfig;
    
        @Autowired
        private BdbPropertyConfig bdbPropertyConfig;
    
        private ConcurrentHashMap<String, Database> queueDbMap = new ConcurrentHashMap<>();
    
        private ConcurrentHashMap<String, StoredMap<String, User>> storedMaps = new ConcurrentHashMap<>();
    
        static {
            EnvironmentConfig envConfig = new EnvironmentConfig();
            envConfig.setAllowCreate(true);
            envConfig.setTransactional(true);
            envConfig.setCacheSize(10000000);
            try {
                env = new Environment(new File("E://develop//bdb"),envConfig);
            } catch (DatabaseException e) {
                e.printStackTrace();
            }
            dbConfig = new DatabaseConfig();
            dbConfig.setAllowCreate(true);
            dbConfig.setTransactional(true);
            dbConfig.setSortedDuplicates(false);
        }
    
        private Database open(String dbName) {
            Database database = queueDbMap.get(dbName);
            if (Objects.nonNull(database)) {
                return database;
            }
            try {
                database = env.openDatabase(null, dbName, dbConfig);
                Database olddb = queueDbMap.putIfAbsent(dbName, database);
                return Objects.isNull(olddb) ? database : olddb;
            } catch (DatabaseException e) {
                e.printStackTrace();
                return null;
            }
        }
    
        public StoredMap<String, User> getStoredMap(String queueName) {
            FstSerialBinding<String> messageKeyBinding = new FstSerialBinding<>();
            FstSerialBinding<User> messageValueBinding = new FstSerialBinding<>();
            Database database = open(queueName);
            StoredMap<String, User> tmpMap = new StoredMap<>(database, messageKeyBinding, messageValueBinding, true);
            StoredMap<String, User> oldMap = storedMaps.putIfAbsent(queueName, tmpMap);
            return Objects.isNull(oldMap) ? tmpMap : oldMap;
        }
    }
    

    UserQueueService文件

    package com.joe.heightconcurrency.demo.service;
    
    import com.joe.heightconcurrency.demo.entity.User;
    import com.joe.heightconcurrency.demo.queue.BdbBlockingQueue;
    import com.sleepycat.collections.StoredMap;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.util.Objects;
    
    
    @Service
    @Slf4j
    public class UserQueueService {
    
        BdbBlockingQueue<User> bdbBlockingQueue;
    
        @Autowired
        private BdbInitService bdbInitService;
    
        @PostConstruct
        public void init() {
            String queueName = "user";
            StoredMap<String, User> storedMap =  bdbInitService.getStoredMap(queueName);
            bdbBlockingQueue = new BdbBlockingQueue<>(storedMap);
        }
    
        public void addUser(User user) {
            if (Objects.isNull(user)) {
                log.error("用户信息不存在");
                return;
            }
            boolean result = bdbBlockingQueue.offer(user);
            if (result) {
                log.info("用户信息放入队列成功");
                log.info("当前队列大小:{}", bdbBlockingQueue.size());
            } else {
                log.error("用户信息放入队列失败");
                log.info("当前队列大小:{}", bdbBlockingQueue.size());
            }
        }
    
        public Integer getSize() {
            return bdbBlockingQueue.size();
        }
    
        public User getUser() {
            return bdbBlockingQueue.poll();
        }
    }
    
    

    controller文件

        @GetMapping("/addUser")
        public String addUser(User user) {
            userQueueService.addUser(user);
            return "SUCCESS";
        }
    
        @GetMapping("/getUserSize")
        public Integer getUserSize() {
            return userQueueService.getSize();
        }
    
        @GetMapping("/getUser")
        public String getUser() {
            return userQueueService.getUser().toString();
        }
    

    FstSerialBinding文件

    package com.joe.heightconcurrency.demo.util;
    
    import com.sleepycat.bind.ByteArrayBinding;
    import com.sleepycat.bind.EntryBinding;
    import com.sleepycat.je.DatabaseEntry;
    import com.sleepycat.util.RuntimeExceptionWrapper;
    
    import java.io.Serializable;
    
    public class FstSerialBinding<E extends Serializable> implements EntryBinding<E> {
        private final static byte[] ZERO_LENGTH_BYTE_ARRAY = new byte[0];
        private ByteArrayBinding bb = new ByteArrayBinding();
        
            @Override
        public E entryToObject(DatabaseEntry entry) {
            byte[] data = bb.entryToObject(entry);
            if(data ==null || data.length ==0) return null;
            try{
                return (E) FstObjectSerializeUtil.read(data);
            }catch(Exception ex){
                throw RuntimeExceptionWrapper.wrapIfNeeded(ex);
            }
        }
    
        @Override
        public void objectToEntry(E object, DatabaseEntry entry) {
            if(object==null){
                bb.objectToEntry(ZERO_LENGTH_BYTE_ARRAY, entry);
            }else{
                try{
                    bb.objectToEntry(FstObjectSerializeUtil.write(object), entry);
                }catch(Exception ex){
                    throw RuntimeExceptionWrapper.wrapIfNeeded(ex);
                }
            }
        }
    }
    
    

    FstObjectSerializeUtil文件

    package com.joe.heightconcurrency.demo.util;
    
    import org.nustaq.serialization.FSTConfiguration;
    import org.nustaq.serialization.FSTObjectInput;
    import org.nustaq.serialization.FSTObjectOutput;
    
    import java.io.ByteArrayOutputStream;
    import java.io.Serializable;
    
    public abstract class FstObjectSerializeUtil {
        
        private final static ThreadLocal<FSTConfiguration> conf = new ThreadLocal<FSTConfiguration>() { 
            public FSTConfiguration initialValue() {
                return FSTConfiguration.createDefaultConfiguration();
            }
        };
        
        public static byte[] write(Serializable obj) throws Exception{
            ByteArrayOutputStream arroutput = new ByteArrayOutputStream();
            FSTObjectOutput objoutput = conf.get().getObjectOutput(arroutput);
            try{
                objoutput.writeObject(obj);
                objoutput.flush();
                return arroutput.toByteArray();
            }finally{
                arroutput.close();
            }
        }
        
        public static Serializable read(byte[] bytes)  throws Exception{
            FSTObjectInput objinput = conf.get().getObjectInput(bytes);
            try{
                Object t = objinput.readObject();
                if(t instanceof Serializable){
                    return (Serializable)t;
                }else{
                    return null;
                }
            }catch(Exception ex) {
                return null;
            }finally{
                
            }
        }
    }
    
    

    BdbBlockingQueue文件

    package com.joe.heightconcurrency.demo.queue;
    
    import com.joe.heightconcurrency.demo.entity.User;
    import com.sleepycat.collections.StoredMap;
    
    import java.util.Objects;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class BdbBlockingQueue<T extends User> extends LinkedBlockingQueue<T> {
    
        private StoredMap<String, T> storedMap;
    
        public BdbBlockingQueue() {}
    
        public BdbBlockingQueue(StoredMap<String, T> storedMap) {
            super(storedMap.values());
            this.storedMap = storedMap;
        }
    
        @Override
        public boolean offer(T t) {
            if (Objects.isNull(t)) {
                return false;
            }
            storedMap.put(t.getId().toString(), t);
            return super.offer(t);
        }
    
        @Override
        public int size() {
            return super.size();
        }
    
        @Override
        public T poll() {
            T t = super.poll();
            if (Objects.nonNull(t)) {
                storedMap.remove(t.getId().toString());
            }
            return t;
        }
    }
    
    

    参考:https://blog.csdn.net/qq_38617531/article/details/83794893/

    相关文章

      网友评论

          本文标题:使用Berkeley DB实现LinkedBlockingQue

          本文链接:https://www.haomeiwen.com/subject/eafzpdtx.html