美文网首页
【Hazelcast学习】分布式数据结构Map介绍

【Hazelcast学习】分布式数据结构Map介绍

作者: 伊丽莎白2015 | 来源:发表于2022-06-03 23:17 被阅读0次

    官方在线文档,本文基于5.1版本:https://docs.hazelcast.com/hazelcast/5.1/data-structures/distributed-data-structures

    Hazelcast的分布式数据结构有很多,比较常用的有:

    1. Map --> java.util.Map的分布式实现。
    2. Set --> java.util.Set的分布式并发实现。
    3. List --> java.util.List的分布式并发实现。
    4. Queue --> java.util.concurrent.BlockingQueue的分布式实现。
    5. ...

    本文重点介绍Map。
    官方文档:https://docs.hazelcast.com/hazelcast/5.1/data-structures/map
    跟Java的Map结构类似,以key-value的形式存储,value可以是String, json格式的String或是序列化过的对象等。

    1. Map的数据是如何分布式存储的?

    Map的元素会均匀的分布在Hazelcast的节点中,比如一个Map有1000个元素,而此时集群中有两个节点,那么每个节点都会存放500个元素。

    详见官网:Data Partitioning and Replication
    或是我之前的文章:https://www.jianshu.com/p/e2b276204b6a

    在CAP原则中,分布式Map选择的是AP,即会优先确保可用性,而不是一致性。

    2 过期策略

    默认情况下,Map中的数据并不会过期(没有上限),即会一直留在集群中直到客户端手动移动数据。当然我们也可以通过配置让Map的数据过期,详见地址

    • 元素过期原则:可以定义Map元素的过期时间。
      • time-to-live-seconds:默认是永不过期。如果配置了60,那么表示一个元素超过60s没有被再次更新(有写操作),那么就会被删除。
      • max-idle-seconds:默认是永不过期。从最近一次方法调用开始算,方法包括get(),put(),EntryProcessor.process(),containsKey()。如配置为60表示从上述方法最后一次被调用开始算,60秒后元素会被删除。
    • 元素淘汰原则:可以配Map最大的size,超过的部分会按某种方式淘汰。
      • size:默认是size没有限制。
      • eviction-policy:默认是NONE,即没有策略(即便是配置了size,也会被忽略)。除了NONE,还可配置:LRU(最近最少使用会被删除-last recently used)、LFU(最不经常使用会被删除-last frequently used)。

    示例:

    hazelcast:
      map:
        default:
          time-to-live-seconds: 60
          max-idle-seconds: 60
          eviction:
            eviction-policy: LRU
            max-size-policy: PER_NODE
            size: 5000
    

    3 Map创建、存放数据

    IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
    employeeMap.put(1, new Employee("aa", 18));
    

    如果Value是对象,需要实现序列化(实现Serializable接口),否则会报错:

    com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'com.distributedData.Employee'

    4. 读操作,和Java原生的方法很像

    参考文档:https://docs.hazelcast.com/hazelcast/5.1/data-structures/reading-a-map

    4.1 基本操作
    • map.get(K)
    • map.values()
    • map.keySet()
    • map.entrySet()
    • map.containsKey() and map.containsValue()
    4.2 分布式查询(Distributed Queries)

    分布式查询可以按某些条件在请求的时候就进行过滤,而不是先把数据都load到本地后再进行过滤。有两种方式:

    • SQL
    • Predicates API
    官网上有两种方式的对比: SQL vs Predicates API
    4.2.1 通过SQL查询Map数据

    参考文档:https://docs.hazelcast.com/hazelcast/5.1/sql/get-started-sql

    感觉和关系型数据库有点像。官网还提供了在线的命令行工具,以供学习:
    https://docs.hazelcast.com/hazelcast/5.1/sql/interactive-sql-maps

    在线学习工具

    新建Mapping:

    CREATE MAPPING cities (
    __key INT,
    countries VARCHAR,
    cities VARCHAR)
    TYPE IMap
    OPTIONS('keyFormat'='int', 'valueFormat'='json-flat');
    

    插入数据:

    INSERT INTO cities VALUES
    (1, 'United Kingdom','London'),
    (2, 'United Kingdom','Manchester'),
    (3, 'United States', 'New York');
    

    查询数据:

    SELECT cities AS City
    FROM cities
    WHERE cities LIKE 'L%' AND LENGTH(cities) > 6;
    

    用Java程序查询:
    https://docs.hazelcast.com/hazelcast/5.1/sql/parameterized-queries

    sql.execute("SELECT * FROM employees WHERE employee.status = ?", EmployeeStatus.VACATION);
    
    4.2.2 通过Predicates API查询数据

    Predicate如何查询?
    首先predicate语句会发送给Hazelcast集群中的每一个节点。每个节点会根据predicate语句查找符合条件的本地数据,在这个过程中key/value会被反序列化。最终各个节点的数据会被汇总成一个集合。

    好处有:

    • 各个节点查找数据的过程是多线程完成的(相互独立不开扰),耗时少。
    • 可以有效的减数IO开销(因为只有过滤后的数据才会被返回)。

    示例-1:
    先创建一个Employee对象:

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Employee implements Serializable{
        private String name;
        private int age;
        private boolean active;
    }
    

    测试:
    首先创建predicate:active=true and age < 30
    然后将这个传入values()方法,返回的就是按条件过滤后的Collection集合。

    @SpringBootTest
    public class DistributeMapTest {
        @Autowired
        private HazelcastInstance hazelcastInstance;
    
        @Test
        public void test() {
            IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
            employeeMap.put(1, new Employee("aa", 18, true));
            employeeMap.put(2, new Employee("bb", 19, false));
            employeeMap.put(3, new Employee("cc", 31, true));
    
            // start to search: 
            PredicateBuilder.EntryObject e = Predicates.newPredicateBuilder().getEntryObject();
            Predicate predicate = e.is("active").and(e.get("age").lessThan(30));
    
            Collection<Employee> result = employeeMap.values(predicate);
            Assertions.assertEquals(1, result.size());
            result.forEach(x -> Assertions.assertEquals("aa", x.getName()));
        }
    }
    

    注:Predicates除了可以放在values()中,还可放在方法keySet()entrySet()localKeySet()中。

    关于更多Predicate条件的构建,参考:https://docs.hazelcast.com/hazelcast/5.1/query/predicate-overview#predicates-class-operators

    示例-2:类似SQL的写法查询:
    可以直接通过SQL构建Predicate,然后再传入values()方法:

        @Test
        public void testSQLLike() {
            IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
            // map put 略,同示例-1
    
            // start to search:
            Predicate predicate = Predicates.sql("active AND age < 30");
            Collection<Employee> result = employeeMap.values(predicate);
            Assertions.assertEquals(1, result.size());
            result.forEach(x -> Assertions.assertEquals("aa", x.getName()));
        }
    

    示例-3:Json格式的对象查询:
    如果value存的是JSON格式的String,那么如何通过Predicate查询?
    需要在存入的时候,封装一层HazelcastJsonValue,HazelcastJsonValue对象可以放入Hazelcast Map的key或value中,封装后的对象,即可使用Predicate查询了。(并不仅仅是扁平化的String了)。

    注:HazelcastJsonValue接收的参数是String,所以它本身并不会验证String对象是否是正确格式的JSON。

        @Test
        public void testJson() {
            JsonMapper jsonMapper = new JsonMapper();
            IMap<Integer, HazelcastJsonValue> employeeMap = hazelcastInstance.getMap("employeeMap");
            employeeMap.put(1, new HazelcastJsonValue("{\"name\":\"aa\",\"age\":18,\"active\":true}"));
            employeeMap.put(2, new HazelcastJsonValue("{\"name\":\"bb\",\"age\":19,\"active\":false}"));
            employeeMap.put(3, new HazelcastJsonValue("{\"name\":\"cc\",\"age\":31,\"active\":true}"));
    
            // start to search:
            Predicate predicate = Predicates.sql("active AND age < 30");
            Collection<HazelcastJsonValue> result = employeeMap.values(predicate);
            Assertions.assertEquals(1, result.size());
            System.out.println(result);
        }
    

    示例-4:分页
    分页的对象需要实现Comparable接口。

        @Test
        public void testPage() {
            IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
            // map put 略,同示例-1
    
            // start to search:
            Predicate greaterEqual = Predicates.greaterEqual( "age", 18 );
            PagingPredicate pagingPredicate = Predicates.pagingPredicate(greaterEqual, 2);
    
            Collection<Employee> firstPage = employeeMap.values(pagingPredicate);
            System.out.println("page-1:" + firstPage);
            
            // 翻页:
            pagingPredicate.nextPage();
            Collection<Employee> nextPage = employeeMap.values(pagingPredicate);
            System.out.println("page-2:" + nextPage);
        }
    
    测试结果: 测试结果

    当然也可以直接使用setPage()来指定页码:
    页码从0开始,所以第2页的数据,需要setPage(1);

        @Test
        public void testPageSetPage() {
            IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
            // map put 略,同示例-1
    
            // start to search:
            Predicate greaterEqual = Predicates.greaterEqual( "age", 18 );
            PagingPredicate pagingPredicate = Predicates.pagingPredicate(greaterEqual, 2);
    
            pagingPredicate.setPage(1); // page starts from 0
            Collection<Employee> page2 = employeeMap.values(pagingPredicate);
            System.out.println("page-2:" + page2);
        }
    

    Predicate更高级的用法:
    比如可以按分区(Partition)ID进行查询
    参考:https://docs.hazelcast.com/hazelcast/5.1/query/predicate-overview#filtering-with-partition-predicate

    除此之外,还可以配置查询的线程池大小
    https://docs.hazelcast.com/hazelcast/5.1/query/predicate-overview#configuring-the-query-thread-pool

    hazelcast:
      ...
      executor-service:
        "hz:query":
          pool-size: 100
    

    更多关于Predicate的高级用法,参考文档吧,功能非常非常多。

    4.3 读操作的优化

    如果你经常使用map.get()方法,从一个相当静态的map中读数据,那么可以配置Near Cache的功能来提高效率。
    Near Cache会在客户端本地创建一个map的副本,map.get()方法在调用的时候不会去各个Hazelcast节点中查询数据并汇总,而是直接读取客户端本地的map副本。

    客户端会更期的从集群中同步数据。可以配置同步的时间间隔。

    这个方案比较适用于你的map数据是静态的——即不经常改变。
    更多关于Near Cache,看Near Cache

    相关文章

      网友评论

          本文标题:【Hazelcast学习】分布式数据结构Map介绍

          本文链接:https://www.haomeiwen.com/subject/ayjhmrtx.html