美文网首页
7,使用Java Reactive反应式方式操作Document

7,使用Java Reactive反应式方式操作Document

作者: lcjyzm | 来源:发表于2021-07-20 08:23 被阅读0次

    1,创建maven项目,并引入以下依赖:

    <!--测试包-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>compile</scope>
    </dependency>
    
    <!--mongodb reactive驱动包-->
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-reactivestreams</artifactId>
        <version>4.3.0</version>
    </dependency>
    
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.3.9</version>
    </dependency>
    

    2,启动MongoDB服务实例

    mongod.exe --dbpath D:\UserData\mongodb --auth
    

    3,获取MongoClient对象

    import com.mongodb.MongoClientSettings;
    import com.mongodb.MongoCredential;
    import com.mongodb.ServerAddress;
    import com.mongodb.reactivestreams.client.MongoClient;
    import com.mongodb.reactivestreams.client.MongoClients;
    
    import java.util.Arrays;
    
    /**
     * @Package: com.lcj.mongodb.sync
     * @ClassName: MongoUtil
     * @Author: Administrator
     * @CreateTime: 2021/7/15 15:10
     * @Description:
     */
    public class MongoUtil {
    
        /**
         * 通过指定host和port获得连接
         * @return MongoClient
         */
        public static MongoClient getInstance1(){
            // 添加认证
            MongoCredential credential = MongoCredential.createScramSha256Credential("admin", "admin", "123456".toCharArray());
            return MongoClients.create(
                    MongoClientSettings.builder()
                            .applyToClusterSettings(builder ->
                                    builder.hosts(Arrays.asList(new ServerAddress("127.0.0.1", 27017))))
                            .credential(credential)
                            .build());
        }
    
        /**
         * 通过连接字符串
         * @return MongoClient
         */
        public static MongoClient getInstance2(){
            return MongoClients.create("mongodb://admin:123456@127.0.0.1:27017/?authSource=admin&authMechanism=SCRAM-SHA-256");
        }
    
    
    }
    
    
    

    4,SubscriberHelpers工具类

    import com.mongodb.MongoInterruptedException;
    import com.mongodb.MongoTimeoutException;
    import org.bson.Document;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Consumer;
    
    import static java.lang.String.format;
    
    /**
     *  Subscriber helper implementations for the Quick Tour.
     */
    public final class SubscriberHelpers {
    
        /**
         * A Subscriber that stores the publishers results and provides a latch so can block on completion.
         *
         * @param <T> The publishers result type
         */
        public abstract static class ObservableSubscriber<T> implements Subscriber<T> {
            private final List<T> received;
            private final List<RuntimeException> errors;
            private final CountDownLatch latch;
            private volatile Subscription subscription;
            private volatile boolean completed;
    
            /**
             * Construct an instance
             */
            public ObservableSubscriber() {
                this.received = new ArrayList<>();
                this.errors = new ArrayList<>();
                this.latch = new CountDownLatch(1);
            }
    
            @Override
            public void onSubscribe(final Subscription s) {
                subscription = s;
            }
    
            @Override
            public void onNext(final T t) {
                received.add(t);
            }
    
            @Override
            public void onError(final Throwable t) {
                if (t instanceof RuntimeException) {
                    errors.add((RuntimeException) t);
                } else {
                    errors.add(new RuntimeException("Unexpected exception", t));
                }
                onComplete();
            }
    
            @Override
            public void onComplete() {
                completed = true;
                latch.countDown();
            }
    
            /**
             * Gets the subscription
             *
             * @return the subscription
             */
            public Subscription getSubscription() {
                return subscription;
            }
    
            /**
             * Get received elements
             *
             * @return the list of received elements
             */
            public List<T> getReceived() {
                return received;
            }
    
            /**
             * Get error from subscription
             *
             * @return the error, which may be null
             */
            public RuntimeException getError() {
                if (errors.size() > 0) {
                    return errors.get(0);
                }
                return null;
            }
    
            /**
             * Get received elements.
             *
             * @return the list of receive elements
             */
            public List<T> get() {
                return await().getReceived();
            }
    
            /**
             * Get received elements.
             *
             * @param timeout how long to wait
             * @param unit the time unit
             * @return the list of receive elements
             */
            public List<T> get(final long timeout, final TimeUnit unit) {
                return await(timeout, unit).getReceived();
            }
    
            /**
             * Await completion or error
             *
             * @return this
             */
            public ObservableSubscriber<T> await() {
                return await(60, TimeUnit.SECONDS);
            }
    
            /**
             * Await completion or error
             *
             * @param timeout how long to wait
             * @param unit the time unit
             * @return this
             */
            public ObservableSubscriber<T> await(final long timeout, final TimeUnit unit) {
                subscription.request(Integer.MAX_VALUE);
                try {
                    if (!latch.await(timeout, unit)) {
                        throw new MongoTimeoutException("Publisher onComplete timed out");
                    }
                } catch (InterruptedException e) {
                    throw new MongoInterruptedException("Interrupted waiting for observeration", e);
                }
                if (!errors.isEmpty()) {
                    throw errors.get(0);
                }
                return this;
            }
        }
    
        /**
         * A Subscriber that immediately requests Integer.MAX_VALUE onSubscribe
         * 添加,更新,删除订阅
         * @param <T> The publishers result type
         */
        public static class OperationSubscriber<T> extends ObservableSubscriber<T> {
    
            @Override
            public void onSubscribe(final Subscription s) {
                super.onSubscribe(s);
                s.request(Integer.MAX_VALUE);
            }
        }
    
        /**
         * A Subscriber that prints a message including the received items on completion
         * 打印总记录数
         * @param <T> The publishers result type
         */
        public static class PrintSubscriber<T> extends OperationSubscriber<T> {
            private final String message;
    
            /**
             * A Subscriber that outputs a message onComplete.
             *
             * @param message the message to output onComplete
             */
            public PrintSubscriber(final String message) {
                this.message = message;
            }
    
            @Override
            public void onComplete() {
                System.out.println(format(message, getReceived()));
                super.onComplete();
            }
        }
    
        /**
         * A Subscriber that prints the json version of each document
         * 查询集合订阅
         */
        public static class PrintDocumentSubscriber extends ConsumerSubscriber<Document> {
            /**
             * Construct a new instance
             */
            public PrintDocumentSubscriber() {
                super(t -> System.out.println(t.toJson()));
            }
        }
    
        /**
         * A Subscriber that prints the toString version of each element
         * @param <T> the type of the element
         */
        public static class PrintToStringSubscriber<T> extends ConsumerSubscriber<T> {
            /**
             * Construct a new instance
             */
            public PrintToStringSubscriber() {
                super(System.out::println);
            }
        }
    
        /**
         * A Subscriber that processes a consumer for each element
         * @param <T> the type of the element
         */
        public static class ConsumerSubscriber<T> extends OperationSubscriber<T> {
            private final Consumer<T> consumer;
    
            /**
             * Construct a new instance
             * @param consumer the consumer
             */
            public ConsumerSubscriber(final Consumer<T> consumer) {
                this.consumer = consumer;
            }
    
    
            @Override
            public void onNext(final T document) {
                super.onNext(document);
                consumer.accept(document);
            }
        }
    
        private SubscriberHelpers() {
        }
    }
    
    

    5,使用Document对象进行CRUD操作

    import cn.hutool.core.lang.Console;
    import com.lcj.mongodb.reactive.MongoUtil;
    import com.lcj.mongodb.reactive.SubscriberHelpers;
    import com.mongodb.client.model.Filters;
    import com.mongodb.client.model.Projections;
    import com.mongodb.client.model.Sorts;
    import com.mongodb.client.model.Updates;
    import com.mongodb.client.result.DeleteResult;
    import com.mongodb.client.result.InsertManyResult;
    import com.mongodb.client.result.InsertOneResult;
    import com.mongodb.client.result.UpdateResult;
    import com.mongodb.reactivestreams.client.MongoClient;
    import com.mongodb.reactivestreams.client.MongoCollection;
    import com.mongodb.reactivestreams.client.MongoDatabase;
    import org.bson.Document;
    import org.junit.AfterClass;
    import org.junit.BeforeClass;
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @Package: com.lcj.mongodb.sync
     * @ClassName: UseDocumentCRUDTest
     * @Author: Administrator
     * @CreateTime: 2021/7/16 8:26
     * @Description:
     */
    public class UseDocumentCRUDTest {
        // 声明连接对象
        private static MongoClient client;
        // 声明集合变量
        private static MongoCollection<Document> book;
    
        // 所有方法执行前,创建集合对象
        @BeforeClass
        public static void setUp() {
            client = MongoUtil.getInstance1();
            // 访问指定的数据库,如果数据库不存在,自动创建
            MongoDatabase mongoDatabase = client.getDatabase("test");
            // 访问指定的集合,如果集合不存在,在第一次存数据是创建
            book = mongoDatabase.getCollection("book");
        }
    
        // 所有方法执行完后,释放资源
        @AfterClass
        public static void close() {
            //book.drop();  // 删除集合
            client.close();
        }
    
        /**
         * 插入单个文档
         */
        @Test
        public void insertOne() throws InterruptedException {
            // 创建如下文档
            Document document = new Document("name", "MongoDB")
                    .append("type", "type")
                    .append("count", 1)
                    .append("versions", Arrays.asList("v3.2", "v3.0", "v2.6"))
                    .append("info", new Document("x", 203).append("y", 102));
            // 插入单个文档,如果未指定_id字段,会自动添加
            SubscriberHelpers.ObservableSubscriber<InsertOneResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            book.insertOne(document).subscribe(subscriber);
            subscriber.await(); // 等待
        }
    
    
        /**
         * 插入多个文档
         */
        @Test
        public void insertMany() {
            List<Document> documents = new ArrayList<>(100);
            for (int i = 1; i <= 100; i++) {
                documents.add(new Document("i", i));
            }
            // 插入文档,如果未指定_id字段,会自动添加
            SubscriberHelpers.ObservableSubscriber<InsertManyResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            book.insertMany(documents).subscribe(subscriber);
            subscriber.await();
        }
    
        /**
         * @desc 统计集合中文档数量
         */
        @Test
        public void countNumbers(){
            SubscriberHelpers.PrintSubscriber<Long> subscriber = new SubscriberHelpers.PrintSubscriber<>("book集合文档总数: %s");
            book.countDocuments().subscribe(subscriber);
            subscriber.await();
            Console.log("文档总数:{}",subscriber.get().get(0));
        }
    
    
        /**
         * 查询集合的第一个文档
         */
        @Test
        public void findFirst() {
            SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
            book.find().first().subscribe(subscriber);
            subscriber.await();
            Document document = subscriber.get().stream().findFirst().get();
            Console.log("查询第一条文档:{}",document.toJson());
        }
    
        /**
         * 查询集合的所有文档
         */
        @Test
        public void findAll() {
            SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
            book.find().subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Document::toJson);
        }
    
    
        /**
         * 查询i=71的第一条文档
         */
        @Test
        public void findFirstByFilter() {
            SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
            book.find(Filters.eq("i",71)).first().subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Document::toJson);
        }
    
    
        /**
         * 查询i>71的所有的文档
         */
        @Test
        public void findAllByFilterGt() {
            SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
            book.find(Filters.gt("i",71)).subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Document::toJson);
        }
    
    
        /**
         * 查询满足i值在(71,91]之间的文档
         */
        @Test
        public void findAllByFilterAnd() {
            SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
            book.find(Filters.and(Filters.gt("i",71),Filters.lte("i",91))).subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Document::toJson);
        }
    
        /**
         * 查询存在i字段的文档,并按i的值降序排序
         */
        @Test
        public void sort() {
            SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
            book.find(Filters.exists("i")).sort(Sorts.descending("i")).subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Document::toJson);
        }
    
        /**
         * 投影--排除文档的id,很好用强大
         */
        @Test
        public void projection() {
            SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
            book.find().projection(Projections.excludeId()).subscribe(subscriber);
            subscriber.await();
            subscriber.get().forEach(Document::toJson);
        }
    
        // 更新i=50的文档,将i修改为500
        @Test
        public void updateOne() {
            SubscriberHelpers.OperationSubscriber<UpdateResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            book.updateOne(Filters.eq("i", 50), Updates.set("i", 500)).subscribe(subscriber);
            subscriber.await();
            UpdateResult updateResult = subscriber.get().get(0);
            Console.log("更新数量:{}",updateResult.getModifiedCount());
        }
    
        /**
         * 更新字段i小于50的,将匹配的值加上20
         */
        @Test
        public void updateMany() {
            SubscriberHelpers.OperationSubscriber<UpdateResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            book.updateMany(Filters.lt("i", 50), Updates.inc("i", 20)).subscribe(subscriber);
            subscriber.await();
            UpdateResult updateResult = subscriber.get().get(0);
            Console.log("更新数量:{}",updateResult.getModifiedCount());
        }
    
    
        /**
         * 删除字段i=500的文档
         */
        @Test
        public void deleteOne() {
            SubscriberHelpers.OperationSubscriber<DeleteResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            book.deleteOne(Filters.eq("i", 500)).subscribe(subscriber);
            subscriber.await();
            Console.log("删除数量: {}", subscriber.get().get(0).getDeletedCount());
        }
    
    
        /**
         * 删除字段i<50的所有的文档
         */
        @Test
        public void deleteMany() {
            SubscriberHelpers.OperationSubscriber<DeleteResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
            book.deleteMany(Filters.lt("i", 50)).subscribe(subscriber);
            subscriber.await();
            Console.log("删除数量: {}", subscriber.get().get(0).getDeletedCount());
        }
    
    }
    
    

    5,单测结果

    image-20210716170142357.png

    相关文章

      网友评论

          本文标题:7,使用Java Reactive反应式方式操作Document

          本文链接:https://www.haomeiwen.com/subject/gwlcmltx.html