基于Ignite的ContinuousQuery

作者: SofiyaJ | 来源:发表于2018-03-27 15:43 被阅读31次

    知识准备

    持续查询语言(CQL, continuous query language)类似于:
    内存数据库+视图+触发器 的解决方案。
    简单来说,一有符合条件的对象进入查询结果集,就执行一次回调函数。

    本文的实现是基于C/S模式的,即Client端先按照一定规则从Server端查询数据,返回结果集后,Server端继续添加符合条件的数据,Client端仍然可以实时查询返回结果。
    持续查询可以监听缓存中数据的变更。持续查询一旦启动,如果有,就会收到符合查询条件的数据变化的通知。

    主要maven依赖:

    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.ignite</groupId>
                <artifactId>ignite-core</artifactId>
                <version>${ignite.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.ignite</groupId>
                <artifactId>ignite-spring</artifactId>
                <version>${ignite.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.ignite</groupId>
                <artifactId>ignite-indexing</artifactId>
                <version>${ignite.version}</version>
            </dependency>
        <properties>
            <ignite.version>2.4.0</ignite.version>
        </properties>
    

    TIPS:本工程使用的ignite的版本是2.4.0,ignite更新迭代较快,版本见得差异还是很大的。

    主要代码实现

    Server端实现:

    github源代码

    package xx.xx.searchengine;
    
    import org.apache.ignite.Ignite;
    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.Ignition;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @Author: wangjie
     * @Description:
     * @Date: Created in 10:13 2018/3/27
     */
    
    @SpringBootApplication
    @RestController
    public class ServerApplication {
    
        //cache name
        private static final String CACHE_NAME = "serverCache";
    
        private static Ignite ignite = Ignition.start("example-cache.xml");
    
        public static void main(String[] args) throws InterruptedException {
    
            SpringApplication.run(ServerApplication.class,args);
    
        }
    
        @RequestMapping(value = "/testIgnite",method = RequestMethod.GET)
        public String testIgnite(Integer key,String value) throws InterruptedException{
            ignite.active(true);
            System.out.println("*******insert data begins*********");
    
            try(IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)){
                cache.put(key,value);
                Thread.sleep(2000);
            }
            return "*******insert data succeed*********";
        }
    }
    
    

    example-cache.xml主要配置(在初始化文件之后添加的):

     <property name="clientMode" value="false"/>
     <property name="peerClassLoadingEnabled" value="true"/>
    

    Client端实现:

    github源代码

    package xx.xx.searchengine;
    
    import org.apache.ignite.Ignite;
    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.Ignition;
    import org.apache.ignite.cache.query.ContinuousQuery;
    import org.apache.ignite.cache.query.QueryCursor;
    import org.apache.ignite.cache.query.ScanQuery;
    import org.apache.ignite.lang.IgniteBiPredicate;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import javax.cache.Cache;
    import javax.cache.event.CacheEntryEvent;
    import javax.cache.event.CacheEntryUpdatedListener;
    
    /**
     * @Author: wangjie
     * @Description:
     * @Date: Created in 10:26 2018/3/27
     */
    @SpringBootApplication
    public class ClientApplication {
    
        //cache name
        private static final String CACHE_NAME = "serverCache";
    
        public static void main(String[] args) throws InterruptedException {
    
            SpringApplication.run(ClientApplication.class, args);
    
    
            try (Ignite ignite = Ignition.start("example-cache.xml")) {
                ignite.active(true);
                System.out.println("**********Cache continuous query example started**********");
    
                try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) {
    
                    // Create new continuous query.
                    ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
                    //init query
                    qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
                        @Override
                        public boolean apply(Integer key, String val) {
                            return key > 0;
                        }
                    }));
                     //set local listener
                    qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
                        @Override
                        public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
                            for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) {
                                System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
                            }
                        }
                    });
    
    
                    try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
                        // Iterate through existing data.
                        for (Cache.Entry<Integer, String> e : cur) {
                            System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
                            Thread.sleep(2000000000);
                        }
                    } finally {
                        ignite.destroyCache(CACHE_NAME);
                    }
                }
            }
        }
    }
    
    • 初始化查询
      当要执行持续查询时,在将持续查询注册在集群中以及开始接收更新之前,可以有选择地指定一个初始化查询。
      初始化查询可以通过ContinuousQuery.setInitialQuery(Query)方法进行设置,并且可以是任意查询类型,包括扫描查询,SQL查询和文本查询。
    • 远程过滤器
      这个过滤器在给定键对应的主和备节点上执行,然后评估更新是否需要作为一个事件传播给该查询的本地监听器。
      如果过滤器返回true,那么本地监听器就会收到通知,否则事件会被忽略。产生更新的特定主和备节点,会在主/备节点以及应用端执行的本地监听器之间,减少不必要的网络流量。
      远程过滤器可以通过ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>)方法进行设置。
    • 本地监听器
      当缓存被修改时(一个条目被插入、更新或者删除),更新对应的事件就会发送给持续查询的本地监听器,之后应用就可以做出对应的反应。
      当事件通过了远程过滤器,他们就会被发送给客户端,通知哪里的本地监听器。
      本地监听器是通过ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>)方法设置的。

    example-cache.xml主要配置(在初始化文件之后添加的):

      <property name="clientMode" value="true"/>
      <property name="peerClassLoadingEnabled" value="true"/>
    

    启动程序,测试连续查询

    启动Server端:

    Server.png

    启动Client端:

    Client.png

    在postman中发送get请求:

    http://localhost:8080/testIgnite?key=26&value="hahahah"

    postman.png

    查看Client端控制台的输出信息:

    Client-updata.png

    关于ignite的其它文章:
    Ignite CS 模式 java初探
    Ignite 之计算运用的 Hello world

    程序媛小白一枚,如有错误,烦请批评指正!(#.#)

    相关文章

      网友评论

      本文标题:基于Ignite的ContinuousQuery

      本文链接:https://www.haomeiwen.com/subject/jscscftx.html