美文网首页
8,使用Java Reactive反应式方式操作POJO对象进行

8,使用Java Reactive反应式方式操作POJO对象进行

作者: lcjyzm | 来源:发表于2021-07-20 08:23 被阅读0次

    1,创建maven项目,并引入以下依赖:

    <!--测试包-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>compile</scope>
    </dependency>
    
    <!--mongodb reactive驱动包-->
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-reactivestreams</artifactId>
        <version>4.3.0</version>
    </dependency>
    
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.3.9</version>
    </dependency>
    

    2,启动MongoDB服务实例

    mongod.exe --dbpath D:\UserData\mongodb --auth
    

    3,获取MongoClient对象

    import com.mongodb.MongoClientSettings;
    import com.mongodb.MongoCredential;
    import com.mongodb.ServerAddress;
    import com.mongodb.reactivestreams.client.MongoClient;
    import com.mongodb.reactivestreams.client.MongoClients;
    import org.bson.codecs.configuration.CodecRegistries;
    import org.bson.codecs.configuration.CodecRegistry;
    import org.bson.codecs.pojo.PojoCodecProvider;
    
    import java.util.Arrays;
    
    /**
     * @Package: com.lcj.mongodb.sync
     * @ClassName: MongoUtil
     * @Author: Administrator
     * @CreateTime: 2021/7/15 15:10
     * @Description:
     */
    public class MongoUtil {
    
        /**
         * 通过指定host和port获得连接
         * @return MongoClient
         */
        public static MongoClient getInstance1(){
            // 添加认证
            MongoCredential credential = MongoCredential.createScramSha256Credential("admin", "admin", "123456".toCharArray());
            return MongoClients.create(
                    MongoClientSettings.builder()
                            .applyToClusterSettings(builder ->
                                    builder.hosts(Arrays.asList(new ServerAddress("127.0.0.1", 27017))))
                            .credential(credential)
                            .build());
        }
    
        /**
         * 通过连接字符串
         * @return MongoClient
         */
        public static MongoClient getInstance2(){
            return MongoClients.create("mongodb://admin:123456@127.0.0.1:27017/?authSource=admin&authMechanism=SCRAM-SHA-256");
        }
    
        /**
         * 创建编解码注册器,用来处理 pojo和bson之间的相互转换
         * @return CodecRegistry
         */
        public static CodecRegistry codecRegistry(){
            return CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(),
                    CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build()));
        }
    
    }
    
    

    4,SubscriberHelpers工具类

    import com.mongodb.MongoInterruptedException;
    import com.mongodb.MongoTimeoutException;
    import org.bson.Document;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Consumer;
    
    import static java.lang.String.format;
    
    /**
     *  Subscriber helper implementations for the Quick Tour.
     */
    public final class SubscriberHelpers {
    
        /**
         * A Subscriber that stores the publishers results and provides a latch so can block on completion.
         *
         * @param <T> The publishers result type
         */
        public abstract static class ObservableSubscriber<T> implements Subscriber<T> {
            private final List<T> received;
            private final List<RuntimeException> errors;
            private final CountDownLatch latch;
            private volatile Subscription subscription;
            private volatile boolean completed;
    
            /**
             * Construct an instance
             */
            public ObservableSubscriber() {
                this.received = new ArrayList<>();
                this.errors = new ArrayList<>();
                this.latch = new CountDownLatch(1);
            }
    
            @Override
            public void onSubscribe(final Subscription s) {
                subscription = s;
            }
    
            @Override
            public void onNext(final T t) {
                received.add(t);
            }
    
            @Override
            public void onError(final Throwable t) {
                if (t instanceof RuntimeException) {
                    errors.add((RuntimeException) t);
                } else {
                    errors.add(new RuntimeException("Unexpected exception", t));
                }
                onComplete();
            }
    
            @Override
            public void onComplete() {
                completed = true;
                latch.countDown();
            }
    
            /**
             * Gets the subscription
             *
             * @return the subscription
             */
            public Subscription getSubscription() {
                return subscription;
            }
    
            /**
             * Get received elements
             *
             * @return the list of received elements
             */
            public List<T> getReceived() {
                return received;
            }
    
            /**
             * Get error from subscription
             *
             * @return the error, which may be null
             */
            public RuntimeException getError() {
                if (errors.size() > 0) {
                    return errors.get(0);
                }
                return null;
            }
    
            /**
             * Get received elements.
             *
             * @return the list of receive elements
             */
            public List<T> get() {
                return await().getReceived();
            }
    
            /**
             * Get received elements.
             *
             * @param timeout how long to wait
             * @param unit the time unit
             * @return the list of receive elements
             */
            public List<T> get(final long timeout, final TimeUnit unit) {
                return await(timeout, unit).getReceived();
            }
    
            /**
             * Await completion or error
             *
             * @return this
             */
            public ObservableSubscriber<T> await() {
                return await(60, TimeUnit.SECONDS);
            }
    
            /**
             * Await completion or error
             *
             * @param timeout how long to wait
             * @param unit the time unit
             * @return this
             */
            public ObservableSubscriber<T> await(final long timeout, final TimeUnit unit) {
                subscription.request(Integer.MAX_VALUE);
                try {
                    if (!latch.await(timeout, unit)) {
                        throw new MongoTimeoutException("Publisher onComplete timed out");
                    }
                } catch (InterruptedException e) {
                    throw new MongoInterruptedException("Interrupted waiting for observeration", e);
                }
                if (!errors.isEmpty()) {
                    throw errors.get(0);
                }
                return this;
            }
        }
    
        /**
         * A Subscriber that immediately requests Integer.MAX_VALUE onSubscribe
         * 添加,更新,删除订阅
         * @param <T> The publishers result type
         */
        public static class OperationSubscriber<T> extends ObservableSubscriber<T> {
    
            @Override
            public void onSubscribe(final Subscription s) {
                super.onSubscribe(s);
                s.request(Integer.MAX_VALUE);
            }
        }
    
        /**
         * A Subscriber that prints a message including the received items on completion
         * 打印总记录数
         * @param <T> The publishers result type
         */
        public static class PrintSubscriber<T> extends OperationSubscriber<T> {
            private final String message;
    
            /**
             * A Subscriber that outputs a message onComplete.
             *
             * @param message the message to output onComplete
             */
            public PrintSubscriber(final String message) {
                this.message = message;
            }
    
            @Override
            public void onComplete() {
                System.out.println(format(message, getReceived()));
                super.onComplete();
            }
        }
    
        /**
         * A Subscriber that prints the json version of each document
         * 查询集合订阅
         */
        public static class PrintDocumentSubscriber extends ConsumerSubscriber<Document> {
            /**
             * Construct a new instance
             */
            public PrintDocumentSubscriber() {
                super(t -> System.out.println(t.toJson()));
            }
        }
    
        /**
         * A Subscriber that prints the toString version of each element
         * @param <T> the type of the element
         */
        public static class PrintToStringSubscriber<T> extends ConsumerSubscriber<T> {
            /**
             * Construct a new instance
             */
            public PrintToStringSubscriber() {
                super(System.out::println);
            }
        }
    
        /**
         * A Subscriber that processes a consumer for each element
         * @param <T> the type of the element
         */
        public static class ConsumerSubscriber<T> extends OperationSubscriber<T> {
            private final Consumer<T> consumer;
    
            /**
             * Construct a new instance
             * @param consumer the consumer
             */
            public ConsumerSubscriber(final Consumer<T> consumer) {
                this.consumer = consumer;
            }
    
    
            @Override
            public void onNext(final T document) {
                super.onNext(document);
                consumer.accept(document);
            }
        }
    
        private SubscriberHelpers() {
        }
    }
    
    

    5,创建POJO类

    package com.lcj.mongodb.model;
    
    import org.bson.types.ObjectId;
    
    /**
     * @Package: com.lcj.mongodb.sync.pojo
     * @ClassName: Student
     * @Author: Administrator
     * @CreateTime: 2021/7/16 9:17
     * @Description: 学生实体
     */
    public class Student {
        // 学生编号
        private ObjectId id;
        // 姓名
        private String name;
        // 年龄
        private int age;
        // 性别
        private char sex;
        // 家庭住址
        private Address address;
    
        public Student() {
        }
    
        public Student(String name, int age, char sex, Address address) {
            this.name = name;
            this.age = age;
            this.sex = sex;
            this.address = address;
        }
    
        public ObjectId getId() {
            return id;
        }
    
        public void setId(ObjectId id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public int getAge() {
            return age;
        }
    
        public void setAge(int age) {
            this.age = age;
        }
    
        public char getSex() {
            return sex;
        }
    
        public void setSex(char sex) {
            this.sex = sex;
        }
    
        public Address getAddress() {
            return address;
        }
    
        public void setAddress(Address address) {
            this.address = address;
        }
    
        @Override
        public String toString() {
            return "Student{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", age=" + age +
                    ", sex=" + sex +
                    ", address=" + address +
                    '}';
        }
    }
    
    
    package com.lcj.mongodb.model;
    
    /**
     * @Package: com.lcj.mongodb.sync.pojo
     * @ClassName: Address
     * @Author: Administrator
     * @CreateTime: 2021/7/16 9:19
     * @Description: 地址
     */
    public class Address {
        // 国家
        private String country;
        // 省份
        private String province;
        // 市
        private String city;
        // 街道
        private String street;
    
        public Address() {
        }
    
        public Address(String country, String province, String city, String street) {
            this.country = country;
            this.province = province;
            this.city = city;
            this.street = street;
        }
    
        public String getCountry() {
            return country;
        }
    
        public void setCountry(String country) {
            this.country = country;
        }
    
        public String getProvince() {
            return province;
        }
    
        public void setProvince(String province) {
            this.province = province;
        }
    
        public String getCity() {
            return city;
        }
    
        public void setCity(String city) {
            this.city = city;
        }
    
        public String getStreet() {
            return street;
        }
    
        public void setStreet(String street) {
            this.street = street;
        }
    
        @Override
        public String toString() {
            return "Address{" +
                    "country='" + country + '\'' +
                    ", province='" + province + '\'' +
                    ", city='" + city + '\'' +
                    ", street='" + street + '\'' +
                    '}';
        }
    }
    
    

    6,使用POJO对象进行CRUD操作

    package com.lcj.mongodb.reactive.pojo;
    
    import cn.hutool.core.lang.Console;
    import cn.hutool.json.JSONUtil;
    import com.lcj.mongodb.model.Address;
    import com.lcj.mongodb.model.Student;
    import com.lcj.mongodb.reactive.MongoUtil;
    import com.lcj.mongodb.reactive.SubscriberHelpers;
    import com.mongodb.client.model.Filters;
    import com.mongodb.client.model.Projections;
    import com.mongodb.client.model.Sorts;
    import com.mongodb.client.model.Updates;
    import com.mongodb.client.result.DeleteResult;
    import com.mongodb.client.result.InsertManyResult;
    import com.mongodb.client.result.InsertOneResult;
    import com.mongodb.client.result.UpdateResult;
    import com.mongodb.reactivestreams.client.MongoClient;
    import com.mongodb.reactivestreams.client.MongoCollection;
    import com.mongodb.reactivestreams.client.MongoDatabase;
    import org.bson.Document;
    import org.junit.AfterClass;
    import org.junit.BeforeClass;
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @Package: com.lcj.mongodb.sync
     * @ClassName: UseDocumentCRUDTest
     * @Author: Administrator
     * @CreateTime: 2021/7/16 8:26
     * @Description:
     */
    public class UsePOJOCRUDTest {
        // 声明连接对象
        private static MongoClient client;
        // 声明集合变量
        private static MongoCollection<Student> student;
    
        // 所有方法执行前,创建集合对象
        @BeforeClass
        public static void setUp() {
            client = MongoUtil.getInstance1();
            // 访问指定的数据库,如果数据库不存在,自动创建
            MongoDatabase mongoDatabase = client.getDatabase("test").withCodecRegistry(MongoUtil.codecRegistry());
            // 访问指定的集合,如果集合不存在,在第一次存数据是创建
            student = mongoDatabase.getCollection("student",Student.class);
        }
    
        // 所有方法执行完后,释放资源
        @AfterClass
        public static void close() {
            //student.drop();  // 删除集合
            client.close();
        }
    
        /**
         * 插入单个文档
         */
        @Test
        public void insertOne() throws InterruptedException {
            // 创建如下文档
            Student s = new Student("张三",18,'男',new Address("中国","湖北","十堰市","北京路38号"));
            // 插入单个文档,如果未指定_id字段,会自动添加
            SubscriberHelpers.ObservableSubscriber<InsertOneResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            student.insertOne(s).subscribe(subscriber);
            subscriber.await(); // 等待
        }
    
    
        /**
         * 插入多个文档
         */
        @Test
        public void insertMany() {
            List<Student> students = Arrays.asList(
                    new Student("李四",34,'女',new Address("中国","河北","合肥市","北京路38号"))
                    ,new Student("王五",10,'女',new Address("中国","湖北","襄樊市","大连路67号"))
                    ,new Student("赵六",58,'男',new Address("中国","湖北","宜昌市","北京路38号"))
            );
            // 插入文档,如果未指定_id字段,会自动添加
            SubscriberHelpers.ObservableSubscriber<InsertManyResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            student.insertMany(students).subscribe(subscriber);
            subscriber.await();
        }
    
        /**
         * @desc 统计集合中文档数量
         */
        @Test
        public void countNumbers(){
            SubscriberHelpers.PrintSubscriber<Long> subscriber = new SubscriberHelpers.PrintSubscriber<>("student集合文档总数: %s");
            student.countDocuments().subscribe(subscriber);
            subscriber.await();
            Console.log("文档总数:{}",subscriber.get().get(0));
        }
    
    
        /**
         * 查询集合的第一个文档
         */
        @Test
        public void findFirst() {
            SubscriberHelpers.PrintToStringSubscriber<Student>  subscriber = new SubscriberHelpers.PrintToStringSubscriber<>();
            student.find().first().subscribe(subscriber);
            subscriber.await();
            Student student = subscriber.get().stream().findFirst().get();
            Console.log("查询第一条文档:{}", JSONUtil.toJsonStr(student));
        }
    
        /**
         * 查询集合的所有文档
         */
        @Test
        public void findAll() {
            SubscriberHelpers.PrintToStringSubscriber<Student>  subscriber = new SubscriberHelpers.PrintToStringSubscriber<>();
            student.find().subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Student::toString);
        }
    
    
        /**
         * 查询满足字段address.city=宜昌市的第一条数据
         */
        @Test
        public void findFirstByFilter() {
            SubscriberHelpers.PrintToStringSubscriber<Student>  subscriber = new SubscriberHelpers.PrintToStringSubscriber<Student>();
            student.find(Filters.eq("address.city","宜昌市")).first().subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Student::toString);
        }
    
    
        /**
         * 查询学生年龄大于30岁的文档
         */
        @Test
        public void findAllByFilterGt() {
            SubscriberHelpers.PrintToStringSubscriber<Student>  subscriber = new SubscriberHelpers.PrintToStringSubscriber<Student>();
            student.find(Filters.gt("age",30)).subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Student::toString);
        }
    
    
        /**
         * 查询学生年龄大于30并且性别为男的文档
         */
        @Test
        public void findAllByFilterAnd() {
            SubscriberHelpers.PrintToStringSubscriber<Student>  subscriber = new SubscriberHelpers.PrintToStringSubscriber<Student>();
            student.find(Filters.and(Filters.gt("age",30),Filters.eq("sex",'男'))).subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Student::toString);
        }
    
        /**
         * 查询存在字段age的文档,并按age的值降序排序
         */
        @Test
        public void sort() {
            SubscriberHelpers.PrintToStringSubscriber<Student>  subscriber = new SubscriberHelpers.PrintToStringSubscriber<Student>();
            student.find(Filters.exists("age")).sort(Sorts.descending("age")).subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Student::toString);
        }
    
        /**
         * 投影--排除文档的id,很好用强大
         */
        @Test
        public void projection() {
            SubscriberHelpers.PrintToStringSubscriber<Student>  subscriber = new SubscriberHelpers.PrintToStringSubscriber<Student>();
            student.find().projection(Projections.excludeId()).subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Student::toString);
        }
    
        // 更新城市为宜昌市,将年龄加2
        @Test
        public void updateOne() {
            SubscriberHelpers.OperationSubscriber<UpdateResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            student.updateOne(Filters.eq("address.city","宜昌市"), Updates.inc("age", 2)).subscribe(subscriber);
            subscriber.await();
            UpdateResult updateResult = subscriber.get().get(0);
            Console.log("更新数量:{}",updateResult.getModifiedCount());
        }
    
        /**
         * 将城市为合肥市的记录,对应的age字段-4
         */
        @Test
        public void updateMany() {
            SubscriberHelpers.OperationSubscriber<UpdateResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            student.updateMany(Filters.eq("address.city", "合肥市"), Updates.inc("age", -4)).subscribe(subscriber);
            subscriber.await();
            UpdateResult updateResult = subscriber.get().get(0);
            Console.log("更新数量:{}",updateResult.getModifiedCount());
        }
    
    
        /**
         * 删除"address.city"="宜昌市"的文档
         */
        @Test
        public void deleteOne() {
            SubscriberHelpers.OperationSubscriber<DeleteResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            student.deleteOne(Filters.eq("address.city","宜昌市")).subscribe(subscriber);
            subscriber.await();
            Console.log("删除数量: {}", subscriber.get().get(0).getDeletedCount());
        }
    
    
        /**
         * 删除字"address.city"="合肥市"的所有文档
         */
        @Test
        public void deleteMany() {
            SubscriberHelpers.OperationSubscriber<DeleteResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            student.deleteMany(Filters.eq("address.city","合肥市")).subscribe(subscriber);
            subscriber.await();
            Console.log("删除数量: {}", subscriber.get().get(0).getDeletedCount());
        }
    
    }
    
    

    5,单测结果

    image-20210719093217640

    相关文章

      网友评论

          本文标题:8,使用Java Reactive反应式方式操作POJO对象进行

          本文链接:https://www.haomeiwen.com/subject/bijcmltx.html