书接上文 微服务实战之 Cassandra 之一, Cassandra 的优点说了不少,它的缺点大家也有所耳闻。
作为一个 NoSQL 存储系统,它不支持多表连接,不支持外键约束,所以也不需要遵循数据库的经典范式。
虽然 CQL 看起来很象 SQL,其实还是有点差别的,它的 Insert/Update 其实都是 Set, 它也不支持行级锁,不支持事务(它也有一种轻量级的事务,但由于它的分布系统特点,与传统的事务大相迥异)。
它的客户端驱动是很丰富的, 下面我们以 Java 举一个实际应用实例:
创建 Keyspace 和 Tables
- 创建一个 key space: walter_apjc
采用网络拓扑策略,两个DC: HF1 和 SZ1
CREATE KEYSPACE walter_apjc WITH replication =
{'class': 'NetworkTopologyStrategy', 'HF1': '3', 'SZ1': '3'} AND durable_writes = true;
- 创建两张表 inventory 和 person
CREATE TABLE inventory ( user_id uuid,
inventory_id uuid,
inventory_name text,
name text, tags text,
create_time timestamp,
last_modified_time timestamp,
PRIMARY KEY(user_id,inventory_id));
CREATE TABLE person (
id text PRIMARY KEY,
name text,
age int);
- 创建一个索引
create index on inventory(name);
使用 Cassandra 的 Java Driver 来存取数据
创建一个基础类CassandraClient 来连接 Cassandra Cluster, 这里有很多关键属性需要设置
package com.github.walterfan.hellocassandra;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.*;
import lombok.Builder;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
@Data
@Builder
public class CassandraClient {
private String contactPoints;
private int port;
private String localDC;
private String username;
private String password;
private int maxConnectionsPerHost ;
private int usedHostsPerRemote ;
private long reconnectBaseDelayMs ;
private long reconnectMaxDelayMs ;
private volatile Cluster cluster;
public synchronized void init() {
DCAwareRoundRobinPolicy loadBanalcePolicy = DCAwareRoundRobinPolicy.builder()
.withLocalDc(localDC)
.withUsedHostsPerRemoteDc(usedHostsPerRemote)
.allowRemoteDCsForLocalConsistencyLevel()
.build();
PoolingOptions poolingOptions =new PoolingOptions();
poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost);
poolingOptions.setMaxConnectionsPerHost(HostDistance.REMOTE, maxConnectionsPerHost);
Cluster.Builder clusterBuilder = Cluster.builder()
.withReconnectionPolicy(new ExponentialReconnectionPolicy(reconnectBaseDelayMs,reconnectMaxDelayMs))
.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
.withPoolingOptions(poolingOptions)
.withLoadBalancingPolicy(new TokenAwarePolicy(loadBanalcePolicy))
.withPort(port)
.addContactPoints(contactPoints.split(","))
.withoutJMXReporting();
if(StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password) ) {
clusterBuilder.withCredentials(username, password);
}
cluster = clusterBuilder.build();
}
public Session connect(String keyspace) {
if(null == cluster) {
init();
}
return this.cluster.connect(keyspace);
}
public void close() {
cluster.close();
}
}
利用 Spring 提供的 CassandraTemplate 可以很方便来存取数据
在读写数据时,有几个很重要的选项需要设置
- ConsistentLevel
- RetryPolicy
- FetchSize
- ReadTimeout
一致性水平是 Cassandra 所特有的,多数情况下我们会选择 LOCAL_QUORUM, 也就是在本地的 DC 要符合 Quorum 法定节点数有成功响应才可以。
代码示例如下:
package com.github.walterfan.hellocassandra;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.*;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cassandra.core.RowMapper;
import org.springframework.cassandra.core.WriteOptions;
import org.springframework.cassandra.support.exception.CassandraTypeMismatchException;
import org.springframework.data.cassandra.core.CassandraTemplate;
import org.springframework.data.cassandra.mapping.Table;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Created by yafan on 15/11/2017.
*/
@Slf4j
@Setter
public class CassandraTemplateExample {
static final String AGE_COLUMN_NAME = "age";
static final String ID_COLUMN_NAME = "id";
static final String NAME_COLUMN_NAME = "name";
private String keyspace;
private CassandraTemplate template;
private CassandraClient client;
private QueryOptions queryOptions;
private WriteOptions writeOptions;
public CassandraTemplateExample(String hostnames, int port, String localDC, String username, String password, String keysapce) {
client = CassandraClient.builder()
.contactPoints(hostnames)
.port(port)
.localDC(localDC)
.username(username)
.password(password)
.maxConnectionsPerHost(2048)
.reconnectBaseDelayMs(1000)
.reconnectMaxDelayMs(600_000)
.build();
this.keyspace = keysapce;
queryOptions = this.getQueryOptions();
writeOptions = this.getWriteOptions(null);
}
protected QueryOptions getQueryOptions() {
QueryOptions queryOptions = new QueryOptions();
queryOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
return queryOptions;
}
protected WriteOptions getWriteOptions(Integer ttl) {
WriteOptions writeOptions = new WriteOptions();
writeOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
writeOptions.setTtl(ttl);
return writeOptions;
}
private void testCql() {
try(Session session = client.connect(keyspace)) {
template = new CassandraTemplate(session);
testCrud();
testTransaction();
testPagination();
}
}
private void execute(String cql) {
log.info("execute {}" , cql);
if(cql.startsWith("select")) {
List<?> aList = template.select(cql, List.class);
aList.forEach(System.out::println);
} else {
template.execute(cql);
}
}
private void testCrud() {
log.info("--------- testCrud ----------------");
Person thePerson = template.insert(Person.create("Walter Fan", 37));
log.info("Inserted [{}]", thePerson);
Person queriedPerson = queryPersonById(thePerson.getId());
assertThat(queriedPerson).isNotSameAs(thePerson);
assertThat(queriedPerson).isEqualTo(thePerson);
}
private Person queryPersonById(String id) {
Select personQuery = selectPerson(id);
log.info("CQL SELECT [{}]", personQuery);
Person queriedPerson = template.queryForObject(personQuery, personRowMapper());
log.info("* Query Result [{}]", queriedPerson);
return queriedPerson;
}
private void testTransaction() {
System.out.println("--------- testTransaction ----------------");
template.execute("insert into person(id, name, age) values ('a', 'alice', 20)");
template.execute("update person set name = 'ada' where id='a' ");
template.execute("update person set name = 'adam' where id='a' IF name ='alice'");
template.execute("insert into person(id, name, age) values ('b', 'bob', 21)");
template.execute("insert into person(id, name, age) values ('c', 'carl', 22)");
template.execute("insert into person(id, name, age) values ('d', 'david', 31)");
template.execute("insert into person(id, name, age) values ('d', 'dean', 32) ");
template.execute("insert into person(id, name, age) values ('d', 'dog', 33) IF NOT EXISTS");
Person a = queryPersonById( "a");
Person d = queryPersonById( "d");
assertThat(a.getName().equals("ada"));
assertThat(a.getAge() == 20);
assertThat(a.getName().equals("dean"));
template.execute("delete from person where id in ('a','b','c','d')");
}
private void testPagination() {
System.out.println("--------- testPagination ----------------");
String user_id = "e7d6038e-7a07-4dca-a98f-939428ded582";
String[] inventoryIDs = {"01786fd5-92ef-491c-b64e-7d83a624b95d",
"12a7cd81-d384-44d4-8919-5fa092f6427f",
"1cd52315-fba7-4461-99b3-8e44b7b589e8",
"2a7a3b39-f080-40c1-b5c8-08b743d66076",
"2b2b458e-75e4-4709-9853-8491cfeb13e9"};
int i = 0;
for(String inventory_id: inventoryIDs) {
String cql = String.format("insert into inventory(user_id, inventory_id, inventory_name, name, tags, create_time, last_modified_time) " +
"values (%s, %s, '%s','%s', '%s', '%s', '%s')",
user_id, inventory_id, "book", "posa" + (++i), "tech", Instant.now().toString(), Instant.now().toString());
log.info("execute {}", cql);
template.execute(cql);
}
List<Inventory> inventories0 = template.select(String.format("select * from inventory WHERE user_id=%s", user_id), Inventory.class);
List<Inventory> inventories2 = template.select(String.format("select * from inventory WHERE user_id=%s and inventory_id > %s ALLOW FILTERING", user_id, inventoryIDs[2]), Inventory.class);
System.out.println("---------all inventories----------------");
inventories0.forEach(System.out::println);
System.out.println("---------filterd inventories----------------");
inventories2.forEach(System.out::println);
}
public InetSocketAddress newSocketAddress(String hostname, int port) {
return new InetSocketAddress(hostname, port);
}
public void close() {
this.client.close();
}
protected static RowMapper<Person> personRowMapper() {
return new RowMapper<Person>() {
public Person mapRow(Row row, int rowNum) throws DriverException {
try {
log.debug("row [{}] @ index [{}]", row, rowNum);
Person person = Person.create(row.getString(ID_COLUMN_NAME),
row.getString(NAME_COLUMN_NAME), row.getInt(AGE_COLUMN_NAME));
log.debug("person [{}]", person);
return person;
}
catch (Exception e) {
throw new CassandraTypeMismatchException(String.format(
"failed to map row [%1$] @ index [%2$d] to object of type [%3$s]",
row, rowNum, Person.class.getName()), e);
}
}
};
}
protected static Select selectPerson(String personId) {
Select selectStatement = QueryBuilder.select().from(toTableName(Person.class));
selectStatement.where(QueryBuilder.eq(ID_COLUMN_NAME, personId));
return selectStatement;
}
@SuppressWarnings("unused")
protected static String toTableName(Object obj) {
return toTableName(obj.getClass());
}
protected static String toTableName(Class<?> type) {
Table tableAnnotation = type.getAnnotation(Table.class);
return (tableAnnotation != null && StringUtils.isNotEmpty(tableAnnotation.value())
? tableAnnotation.value() : type.getSimpleName());
}
public static void main(String[] args) throws Exception {
CassandraTemplateExample exam = new CassandraTemplateExample("10.224.38.139", 9042, "HF1","test", "pass","walter_apjc");
exam.testCql();
exam.close();
}
}
Spring Data Cassadra 项目
Spring Data 项目为 Cassandra 也创建了一个子项目,我们可以用它来大大简化我们的代码。
1) 先加入相应的 dependency
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.walterfan</groupId>
<artifactId>hellocassandra</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hellocassandra</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
再分别为这两张表创建相应的实体对象
- 清单类 Inventory
package com.github.walterfan.hellocassandra;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.cassandra.core.PrimaryKeyType;
import org.springframework.data.cassandra.mapping.Column;
import org.springframework.data.cassandra.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.mapping.Table;
import java.time.Instant;
import java.util.UUID;
@Data
@NoArgsConstructor
@Table
public class Inventory {
@PrimaryKeyColumn(name = "user_id", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
private UUID userId;
@PrimaryKeyColumn(name = "inventory_id", ordinal = 1, type = PrimaryKeyType.PARTITIONED)
private UUID inventoryId;
@Column("inventory_name")
private String inventoryName;
@Column("name")
private String name;
@Column("tags")
private String tags;
@Column("create_time")
private Instant createTime;
@Column("last_modified_time")
private Instant lastmodifiedTime;
public Inventory(UUID userId, UUID inventoryId, String inventoryName, String name, String tags) {
this.userId = userId;
this.inventoryId = inventoryId;
this.inventoryName = inventoryName;
this.name = name;
this.tags = tags;
this.createTime = Instant.now();
this.lastmodifiedTime = Instant.now();
}
}
- 人员类 Person
package com.github.walterfan.hellocassandra;
import java.util.UUID;
import lombok.Data;
import org.springframework.data.annotation.PersistenceConstructor;
import org.springframework.data.cassandra.mapping.PrimaryKey;
import org.springframework.data.cassandra.mapping.Table;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
@Data
@Table("person")
public class Person {
@PrimaryKey
private final String id;
private final String name;
private int age;
public static Person create(String name, int age) {
return create(UUID.randomUUID().toString(), name, age);
}
public static Person create(String id, String name, int age) {
return new Person(id, name, age);
}
@PersistenceConstructor
public Person(String id, String name, int age) {
Assert.hasText(id, "'id' must be set");
Assert.hasText(name, "'name' must be set");
this.id = id;
this.name = name;
this.age = validateAge(age);
}
private int validateAge(int age) {
Assert.isTrue(age > 0, "age must be greater than 0");
return age;
}
}
就象大多数 Spring Data JPA 项目一样,它的存取仓库类也是声明式,非常简单
package com.github.walterfan.hellocassandra;
import org.springframework.data.cassandra.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.UUID;
@Repository
public interface InventoryRepository extends CrudRepository<Inventory, String> {
@Query(value="SELECT * FROM inventory WHERE name=?0")
public List<Inventory> findByName(String name);
@Query("SELECT * FROM inventory WHERE tags = ?0")
public List<Inventory> findByTags(String tags);
@Query("SELECT * FROM inventory WHERE user_id = ?0")
public List<Inventory> findByUserId(UUID userId);
public void deleteAllByUserId(UUID userId);
@Query("SELECT * FROM inventory WHERE user_id = ?0 and inventory_id= ?1 ")
public List<Inventory> findByUserAndInventoryId(UUID userId, UUID inventory_id);
}
通过 Spring Boot Command Line Application 来简单演示一下
package com.github.walterfan.hellocassandra;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.List;
import java.util.UUID;
/**
* Created by yafan on 14/11/2017.
*/
@SpringBootApplication
public class HelloCassandra implements CommandLineRunner {
private UUID userId = UUID.randomUUID();
@Autowired
InventoryRepository inventoryRepository;
@Override
public void run(String... args) throws Exception {
clearData();
saveData();
lookup();
}
public void clearData(){
inventoryRepository.deleteAllByUserId(userId);
}
public void saveData(){
System.out.println("===================Save Customers to Cassandra===================");
Inventory inventory_1 = new Inventory(userId, UUID.randomUUID(), "book", "Web Scalability", "tech");
Inventory inventory_2 = new Inventory(userId, UUID.randomUUID(), "book","Ansible", "tech");
Inventory inventory_3 = new Inventory(userId, UUID.randomUUID(), "book", "Go in action", "tech");
Inventory inventory_4 = new Inventory(userId, UUID.randomUUID(),"task","Write diary", "work");
Inventory inventory_5 = new Inventory(userId, UUID.randomUUID(),"task","Write book", "work");
Inventory inventory_6 = new Inventory(userId, UUID.randomUUID(),"task","Write reading notes", "work");
// save customers to ElasticSearch
inventoryRepository.save(inventory_1);
inventoryRepository.save(inventory_2);
inventoryRepository.save(inventory_3);
inventoryRepository.save(inventory_4);
inventoryRepository.save(inventory_5);
inventoryRepository.save(inventory_6);
}
public void lookup(){
System.out.println("===================Lookup Inventory from Cassandra by userId===================");
List<Inventory> list1 = inventoryRepository.findByUserId(userId);
list1.forEach(System.out::println);
System.out.println("===================Lookup Inventory from Cassandra by name ===================");
List<Inventory> list2 = inventoryRepository.findByName("Ansible");
list2.forEach(System.out::println);
}
public static void main(String[] args) {
SpringApplication.run(HelloCassandra.class, args).close();
}
}
大家可以把源代码下载下来运行并查看结果,完整代码参见 https://github.com/walterfan/helloworld/tree/master/hellocassandra
网友评论