官方在线文档,本文基于5.1版本:https://docs.hazelcast.com/hazelcast/5.1/data-structures/distributed-data-structures
Hazelcast的分布式数据结构有很多,比较常用的有:
-
Map
-->java.util.Map
的分布式实现。 -
Set
-->java.util.Set
的分布式并发实现。 -
List
-->java.util.List
的分布式并发实现。 -
Queue
-->java.util.concurrent.BlockingQueue
的分布式实现。 - ...
本文重点介绍Map。
官方文档:https://docs.hazelcast.com/hazelcast/5.1/data-structures/map
跟Java的Map结构类似,以key-value的形式存储,value可以是String, json格式的String或是序列化过的对象等。
1. Map的数据是如何分布式存储的?
Map的元素会均匀的分布在Hazelcast的节点中,比如一个Map有1000个元素,而此时集群中有两个节点,那么每个节点都会存放500个元素。
详见官网:Data Partitioning and Replication
或是我之前的文章:https://www.jianshu.com/p/e2b276204b6a
在CAP原则中,分布式Map选择的是AP,即会优先确保可用性,而不是一致性。
2 过期策略
默认情况下,Map中的数据并不会过期(没有上限),即会一直留在集群中直到客户端手动移动数据。当然我们也可以通过配置让Map的数据过期,详见地址。
-
元素过期原则
:可以定义Map元素的过期时间。-
time-to-live-seconds
:默认是永不过期。如果配置了60,那么表示一个元素超过60s没有被再次更新(有写操作),那么就会被删除。 -
max-idle-seconds
:默认是永不过期。从最近一次方法调用开始算,方法包括get(),put(),EntryProcessor.process(),containsKey()。如配置为60表示从上述方法最后一次被调用开始算,60秒后元素会被删除。
-
-
元素淘汰原则
:可以配Map最大的size,超过的部分会按某种方式淘汰。-
size
:默认是size没有限制。 -
eviction-policy
:默认是NONE,即没有策略(即便是配置了size,也会被忽略)。除了NONE,还可配置:LRU(最近最少使用会被删除-last recently used)、LFU(最不经常使用会被删除-last frequently used)。
-
示例:
hazelcast:
map:
default:
time-to-live-seconds: 60
max-idle-seconds: 60
eviction:
eviction-policy: LRU
max-size-policy: PER_NODE
size: 5000
3 Map创建、存放数据
IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
employeeMap.put(1, new Employee("aa", 18));
如果Value是对象,需要实现序列化(实现Serializable接口),否则会报错:
com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'com.distributedData.Employee'
4. 读操作,和Java原生的方法很像
参考文档:https://docs.hazelcast.com/hazelcast/5.1/data-structures/reading-a-map
4.1 基本操作
- map.get(K)
- map.values()
- map.keySet()
- map.entrySet()
- map.containsKey() and map.containsValue()
4.2 分布式查询(Distributed Queries)
分布式查询可以按某些条件在请求的时候就进行过滤,而不是先把数据都load到本地后再进行过滤。有两种方式:
SQL
Predicates API
4.2.1 通过SQL查询Map数据
参考文档:https://docs.hazelcast.com/hazelcast/5.1/sql/get-started-sql
感觉和关系型数据库有点像。官网还提供了在线的命令行工具,以供学习:
https://docs.hazelcast.com/hazelcast/5.1/sql/interactive-sql-maps
新建Mapping:
CREATE MAPPING cities (
__key INT,
countries VARCHAR,
cities VARCHAR)
TYPE IMap
OPTIONS('keyFormat'='int', 'valueFormat'='json-flat');
插入数据:
INSERT INTO cities VALUES
(1, 'United Kingdom','London'),
(2, 'United Kingdom','Manchester'),
(3, 'United States', 'New York');
查询数据:
SELECT cities AS City
FROM cities
WHERE cities LIKE 'L%' AND LENGTH(cities) > 6;
用Java程序查询:
https://docs.hazelcast.com/hazelcast/5.1/sql/parameterized-queries
sql.execute("SELECT * FROM employees WHERE employee.status = ?", EmployeeStatus.VACATION);
4.2.2 通过Predicates API查询数据
Predicate如何查询?
首先predicate语句会发送给Hazelcast集群中的每一个节点。每个节点会根据predicate语句查找符合条件的本地数据,在这个过程中key/value会被反序列化。最终各个节点的数据会被汇总成一个集合。
好处有:
- 各个节点查找数据的过程是多线程完成的(相互独立不开扰),耗时少。
- 可以有效的减数IO开销(因为只有过滤后的数据才会被返回)。
示例-1:
先创建一个Employee对象:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Employee implements Serializable{
private String name;
private int age;
private boolean active;
}
测试:
首先创建predicate:active=true and age < 30
然后将这个传入values()方法,返回的就是按条件过滤后的Collection集合。
@SpringBootTest
public class DistributeMapTest {
@Autowired
private HazelcastInstance hazelcastInstance;
@Test
public void test() {
IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
employeeMap.put(1, new Employee("aa", 18, true));
employeeMap.put(2, new Employee("bb", 19, false));
employeeMap.put(3, new Employee("cc", 31, true));
// start to search:
PredicateBuilder.EntryObject e = Predicates.newPredicateBuilder().getEntryObject();
Predicate predicate = e.is("active").and(e.get("age").lessThan(30));
Collection<Employee> result = employeeMap.values(predicate);
Assertions.assertEquals(1, result.size());
result.forEach(x -> Assertions.assertEquals("aa", x.getName()));
}
}
注:Predicates除了可以放在values()
中,还可放在方法keySet()
、entrySet()
、localKeySet()
中。
关于更多Predicate条件的构建,参考:https://docs.hazelcast.com/hazelcast/5.1/query/predicate-overview#predicates-class-operators
示例-2:类似SQL的写法查询:
可以直接通过SQL构建Predicate,然后再传入values()方法:
@Test
public void testSQLLike() {
IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
// map put 略,同示例-1
// start to search:
Predicate predicate = Predicates.sql("active AND age < 30");
Collection<Employee> result = employeeMap.values(predicate);
Assertions.assertEquals(1, result.size());
result.forEach(x -> Assertions.assertEquals("aa", x.getName()));
}
示例-3:Json格式的对象查询:
如果value存的是JSON格式的String,那么如何通过Predicate查询?
需要在存入的时候,封装一层HazelcastJsonValue,HazelcastJsonValue对象可以放入Hazelcast Map的key或value中,封装后的对象,即可使用Predicate查询了。(并不仅仅是扁平化的String了)。
注:HazelcastJsonValue接收的参数是String,所以它本身并不会验证String对象是否是正确格式的JSON。
@Test
public void testJson() {
JsonMapper jsonMapper = new JsonMapper();
IMap<Integer, HazelcastJsonValue> employeeMap = hazelcastInstance.getMap("employeeMap");
employeeMap.put(1, new HazelcastJsonValue("{\"name\":\"aa\",\"age\":18,\"active\":true}"));
employeeMap.put(2, new HazelcastJsonValue("{\"name\":\"bb\",\"age\":19,\"active\":false}"));
employeeMap.put(3, new HazelcastJsonValue("{\"name\":\"cc\",\"age\":31,\"active\":true}"));
// start to search:
Predicate predicate = Predicates.sql("active AND age < 30");
Collection<HazelcastJsonValue> result = employeeMap.values(predicate);
Assertions.assertEquals(1, result.size());
System.out.println(result);
}
示例-4:分页
分页的对象需要实现Comparable接口。
@Test
public void testPage() {
IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
// map put 略,同示例-1
// start to search:
Predicate greaterEqual = Predicates.greaterEqual( "age", 18 );
PagingPredicate pagingPredicate = Predicates.pagingPredicate(greaterEqual, 2);
Collection<Employee> firstPage = employeeMap.values(pagingPredicate);
System.out.println("page-1:" + firstPage);
// 翻页:
pagingPredicate.nextPage();
Collection<Employee> nextPage = employeeMap.values(pagingPredicate);
System.out.println("page-2:" + nextPage);
}
测试结果:
测试结果
当然也可以直接使用setPage()来指定页码:
页码从0开始,所以第2页的数据,需要setPage(1);
@Test
public void testPageSetPage() {
IMap<Integer, Employee> employeeMap = hazelcastInstance.getMap("employeeMap");
// map put 略,同示例-1
// start to search:
Predicate greaterEqual = Predicates.greaterEqual( "age", 18 );
PagingPredicate pagingPredicate = Predicates.pagingPredicate(greaterEqual, 2);
pagingPredicate.setPage(1); // page starts from 0
Collection<Employee> page2 = employeeMap.values(pagingPredicate);
System.out.println("page-2:" + page2);
}
Predicate更高级的用法:
比如可以按分区(Partition)ID进行查询
参考:https://docs.hazelcast.com/hazelcast/5.1/query/predicate-overview#filtering-with-partition-predicate
除此之外,还可以配置查询的线程池大小
https://docs.hazelcast.com/hazelcast/5.1/query/predicate-overview#configuring-the-query-thread-pool
hazelcast:
...
executor-service:
"hz:query":
pool-size: 100
更多关于Predicate的高级用法,参考文档吧,功能非常非常多。
4.3 读操作的优化
如果你经常使用map.get()
方法,从一个相当静态的map中读数据,那么可以配置Near Cache
的功能来提高效率。
Near Cache会在客户端本地创建一个map的副本,map.get()方法在调用的时候不会去各个Hazelcast节点中查询数据并汇总,而是直接读取客户端本地的map副本。
客户端会更期的从集群中同步数据。可以配置同步的时间间隔。
这个方案比较适用于你的map数据是静态的——即不经常改变。
更多关于Near Cache,看Near Cache。
网友评论