美文网首页Thinking In Data
(五)apache ignite-Persistence 持久化

(五)apache ignite-Persistence 持久化

作者: 席梦思 | 来源:发表于2018-11-04 22:21 被阅读118次

In-memory方法可以通过将数据集合放入系统内存来实现惊人的速度。当所有数据都保存在内存中时,就不再需要处理使用传统旋转磁盘所产生的问题。例如,这意味着不需要维护数据的其他缓存副本并管理它们之间的同步。但是这种方法也有一个缺点,因为数据仅在内存中,如果整个集群终止,它将丢失。因此,这种类型的数据存储根本不考虑持久性。
在大多数情况下,不能(不应该)将整个数据集存储在应用程序的内存中,大多数情况下,应该存储相对较小的热数据或活动数据子集,以提高应用程序的性能。其余的数据应该存储在低成本的磁盘或磁带中,以便存档。有两个主要的内存数据库存储需求:

  • 持久性的数据存储媒体(例如一些数据库或文件系统),用于存储已提交的事务,因此,如果in-memory数据库需要重新加载到内存中,则可以实现维护数据的持久性和达到恢复数据的目的。
  • 永久存储,用于保存整个in-memory数据库的备份副本。

永久存储或媒体可以是任何分布式或本地文件系统、SAN、NoSQL数据库甚至RDBMS(如Postgres或Oracle)。Apache Ignite提供了一种简洁的方式连接持久数据存储,例如RDBMS或NoSQL DB(如mongo DB或Cassandra)。

对于任何企业级系统,另一个突出的必备特性是事务。支持ACID事务,保证您一旦更改了记录集,数据是正确的,并且没有丢失数据。由于具有事务处理能力,现代企业通常需要在单个系统上具有分析能力的低延迟事务系统。这种混合事务处理通常称为HTAP(混合事务处理和分析处理)。HTAP允许您处理最新数据的实时分析。在HTAP中,数据不需要从操作数据库移动到单独的数据集市来支持分析。在这种系统中,SQL支持起着至关重要的作用。事实上,SQL是用于数据分析的自然语言,也是用于编写查询的生产性语言。因此,本章将探讨以下内容:

  • 持久化Ignite数据网格缓存。
  • Apache Ignite的事务
  • 对Ignite缓存运行SQL查询。
  • 使用JPA和Apache ignite。
  • 更新Ignite缓存、回收和过期策略的方法。

Persistence Ignite’s cache 持久化Ignite的缓存

在开始之前,明确单词持久性的概念是很重要的。在计算机系统中存储数据的情况下,这意味着数据在创建过程结束后仍然存在。换句话说,为了将数据存储视为持久性,它必须写入非易失性存储。在使用内存数据库存储数据时,您可能会考虑以下几点:

  • 系统中的每一块数据都有自己的生命周期。数据可以是热门的、活跃的或历史的。无需在内存中存储大量的历史数据。在内存中存储历史数据将非常昂贵,而且会浪费资源。
  • 在任何节点失败的情况下,它可能导致该节点中的任何数据丢失。

让我们考虑一个来自银行系统的示例。银行系统可以将其客户资料和相关产品信息作为数据的热门子集存储在内存中。客户端将最后执行的100个事务可以看作是活动数据,也可以放在内存中计算动态定价或类似的东西。与客户端相关的其他数据子集(如客户端审计数据和超过一个月的事务)可以被认为是冷数据或历史数据。冷数据的一个子集可以存储到任何非易失性存储中,比如磁盘,如图所示。


image.png

与二级缓存不同,in-memory数据存储做为应用程序的主要数据存储。为了支持持久化数据存储,Apache Ignite实现了针对CacheLoader和CacheWriter的JCache接口,这些接口用于对任何基于持久化的媒体进行读写。为了简单起见,Ignite提供org.apache.ignite.cache.store.CacheStore接口,它扩展了CacheLoader和CacheWriter接口。Apache ignite out-of- box支持外部持久性存储,如Oracle、Postgres和NoSQL数据库,如MongoDB或Cassandra。
另外一点,Ignite CacheStore是完全事务性的,可以自动合并到正在进行的缓存事务中。
在使用write-through方法时需要进行一些考虑。对于每个事务,Ignite都试图将数据保存或更新到持久性存储库或磁盘上的数据库中,因此缓存更新时间的总体持续时间可能相对较高。在这种情况下,Ignite与磁盘上的数据库相同。


image.png
此外,高强度的缓存更新速率会导致磁盘上数据库的存储负载非常高。对于这种情况,Ignite提供了一个批量更新选项,以异步方式进行更新,名为write-behind,如上图右边所示。这种方法背后的主要思想是聚合更新,并将其异步地刷新到持久性存储中。Ignite write-behind实现非常类似之前描述的JCache entry processor。Ignite可以用不同的标准刷新聚合数据:
  • 基于时间的事件 Time-based events:数据条目可以驻留在队列中的最长时间。
  • 队列大小事件 Queue-size events:当队列的大小达到某个特定点时,会刷新队列。
  • 两者结合。

除了JCache cache storage和loading方法之外,Ignite CacheStore接口还提供了从持久性存储库进行批量存储和加载缓存的能力。此外,Ignite还提供了名为CacheStoreAdapter的缓存存储便利适配器,它实现CacheStore接口,并为批量操作提供默认实现。抽象类CacheStoreAdapter有以下方法,您必须实现这些方法才能使用持久化存储:

方法名称 描述
load(), write(), delete() 从接口JCache继承的方法。CacheLoader 和JCache。CacheWrite允许 put, get 或者delete持久化存储缓存中的条目。这些方法用于在处理单个缓存条目时启用read-through和write-through行为。
loadAll(), writeAll(), deleteAll() 从接口JCache继承的方法。CacheLoader和JCache。CacheWriter允许执行批量操作。这些方法用于在处理多个缓存项时启用read-through和write-through行为。为了获得更好的性能,应该对持久性存储的批处理操作实现这些方法。
loadCache() 方法继承Ignite。CacheStore接口,用于从底层持久性存储加载所有值。它通常用于启动时对缓存进行热加载,但也可以在启动缓存后的任何时候调用。
sessionEnd() 可选方法,为结束事务提供默认的空实现。Ignite使用此方法存储可能跨越多个缓存存储操作的会话。这种方法在处理事务时非常有用。

类CacheConfiguration定义了“Ignite”网格缓存配置,并提供了一组方法,用于启用 write/read-through和write-behind caching。

方法名称 描述 默认值
setCacheStoreFactory() 设置持久性存储工厂。缓存存储工厂应该实现的CacheStoreAdapter抽象类。
setReadThrough(boolean) 支持read-through持久性方法。 False
setWriteThrough(boolean) 启用write-through持久化方法。 False
setWriteBehindEnabled(boolean) 启用write-behind持久性方法。 False
setWriteBehindFlushSize(int) 写后缓存的最大大小。如果缓存大小超过此值,则所有缓存项都是刷新到缓存存储并写入缓存清除。如果此值为0,则根据刷新频率间隔执行刷新。注意,不能同时将刷新大小和刷新频率设置为0。 1024
setWriteBehindFlushFrequency(long) 写入后缓存刷新到缓存存储的频率(以毫秒为单位)。这个值定义了从缓存中插入/删除对象到应用相应操作的时刻之间的最大时间间隔到缓存存储。如果此值为0,则根据刷新大小执行刷新。注意,不能同时将刷新大小和刷新频率设置为0。 5 seconds
setWriteBehindFlushThreadCount(int) 执行缓存刷新的线程数。 1
setWriteBehindBatchSize(int) 写后缓存存储操作的最大批处理大小。 512

Persistence in RDBMS (PostgreSQL) 持久化到RDBMS数据库中

到目前为止,我们已经介绍了为什么要使用持久性存储,并探索了一些快速启动的基础知识。在本节中,我们将把Apache Ignite cache条目存储到关系数据库中,比如PostgreSQL。如前所述,您可以使用任何RDBMS,如Oracle、MySQL甚至Teradata来存储Ignite缓存条目;对于所有RDBMS,持久化过程是相同的。


image.png

在我们的例子中,持久化存储将是PostgreSQL,这是一个非常流行的开源企业级RDBMS。
在本节中,我们将实现以下场景:

  • Write-through: 将缓存条目写入PostgreSQL。
  • Read-through: 当缓存项在缓存中找不到时,从持久化存储中读取。
  • Write-behind: 将更新聚合到单个操作中以更新存储。

为了完成以上的场景,我们需要满足几个先决条件:

  1. 安装并且配置PostgreSQL服务器;
  2. 创建新的schema或者使用默认的;
  3. 根据缓存条目创建表。
Step1:

在PostgreSQL公共schema中创建表post,create-db-schema.ddl文件(resources目录中)内容如下所示:

DROP TABLE IF EXISTS POSTS;
CREATE TABLE POSTS (
  id           VARCHAR(150),
  title        VARCHAR(255),
  description  TEXT,
  creationDate DATE,
  author       VARCHAR(150)
);
DROP INDEX IF EXISTS posts_id_uindex;
CREATE UNIQUE INDEX posts_id_uindex ON POSTS (id);
ALTER TABLE POSTS
  ADD CONSTRAINT posts_id_pk PRIMARY KEY (id);
ALTER TABLE POSTS
  ALTER COLUMN id SET NOT NULL;

POSTS表的结构非常简单。该表包含文章的唯一ID、标题名、文章作者和文章创建日期。列ID是表的主键。该表的结构应如下图所示。


image.png
Step2:

在jdbc.properties文件中为PostgreSQL服务器配置JDBC URL。jdbc.properties属性文件也位于与文件create-db-schema相同的目录中。

jdbc.driver=org.postgresql.Driver 

jdbc.url=jdbc:postgresql://localhost:5432/postgres 

jdbc.username=postgres

jdbc.password=postgres

Spring jdbcTemplate 配置文件postgres-context.xml(文件存放目录同上)内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">


    <context:component-scan base-package="com.mycookcode.bigData.ignite"/>
    <context:property-placeholder location="classpath:jdbc.properties" />

    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="${jdbc.driver}" />
        <property name="url" value="${jdbc.url}"/>
        <property name="username" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate">
        <constructor-arg ref="dataSource" />
    </bean>
 </beans>
Step3:

在maven项目中指定以下maven依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mycookcode.bigData.ignite</groupId>
  <artifactId>ignite-persistence</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>ignite-persistence</name>
  <url>http://maven.apache.org</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <ignite.version>2.6.0</ignite.version>
    <spring.version>5.1.1.RELEASE</spring.version>
  </properties>


  <dependencies>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-core</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-spring</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-indexing</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-slf4j</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-log4j</artifactId>
      <version>${ignite.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.25</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>9.1-901-1.jdbc4</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>${spring.version}</version>

    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-aop</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-beans</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring.version}</version>
    </dependency>


    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-mongodb</artifactId>
      <version>2.1.1.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-jpa</artifactId>
      <version>2.1.1.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-tx</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-expression</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-framework-bom</artifactId>
      <version>${spring.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>


  </dependencies>


  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>com.jolira</groupId>
        <artifactId>onejar-maven-plugin</artifactId>
        <version>1.4.4</version>
        <executions>
          <execution>
            <id>build-query</id>
            <configuration>
              <mainClass>com.mycookcode.bigData.ignite.App</mainClass>
              <attachToBuild>true</attachToBuild>
              <classifier>onejar</classifier>
              <filename>cache-store-runnable.jar</filename>
            </configuration>
            <goals>
              <goal>one-jar</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>
Step4:

让我们为表post创建DTO类com.mycookcode.bigData.ignite.jdbc.model.Post将映射表Post的行。类Post将具有以下属性:

package com.mycookcode.bigData.ignite.jdbc.model;

import java.io.Serializable;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Objects;

public class Post implements Serializable {

    private static final long serialVersionUID = 0L;

    private String id;
    private String title;
    private String description;
    private LocalDate creationDate;
    private String author;

    public Post() {
    }

    public Post(String id, String title, String description, LocalDate creationDate, String author) {
        this.id = id;
        this.title = title;
        this.description = description;
        this.creationDate = creationDate;
        this.author = author;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public Date getCreationDate() {
        return localDate2Date(creationDate);
    }

    public void setCreationDate(LocalDate creationDate) {
        this.creationDate = creationDate;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    @Override public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Post post = (Post)o;
        return Objects.equals(id, post.id) &&
                Objects.equals(title, post.title) &&
                Objects.equals(description, post.description) &&
                Objects.equals(creationDate, post.creationDate) &&
                Objects.equals(author, post.author);
    }

    @Override public int hashCode() {
        return Objects.hash(id, title, description, creationDate, author);
    }

    @Override public String toString() {
        return "Post{" +
                "id='" + id + '\'' +
                ", title='" + title + '\'' +
                ", description='" + description + '\'' +
                ", creationDate=" + creationDate +
                ", author='" + author + '\'' +
                '}';
    }

    /**
     * LocalDate转Date
     * @param localDate
     * @return
     */
    public static Date localDate2Date(LocalDate localDate) {
        if(null == localDate) {
            return null;
        }
        ZonedDateTime zonedDateTime = localDate.atStartOfDay(ZoneId.systemDefault());
        return Date.from(zonedDateTime.toInstant());
    }
}
Step5:

在这里,我们将创建一个Ignite CacheStore的简单实现。com.mycookcode.bigData.ignite.jdbc.PostgresDBStore类继承了Ignite的抽象类CacheStoreAdapter并且实现了LifecycleAware接口。如果组件实现了接口LifecycleAware,那么在节点启动期间将调用start方法。
我们还使用Spring JdbcTemplete来简化PostgreSQL原生JDBC
调用。

package com.mycookcode.bigData.ignite.jdbc;


import com.mycookcode.bigData.ignite.jdbc.model.Post;

import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

public class PostgresDBStore  extends CacheStoreAdapter<String,Post> implements LifecycleAware{


    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public Post load(String key)throws CacheLoaderException
    {
        Map<String, Object> inputParam = new HashMap<>();
        inputParam.put("id", key);
        return jdbcTemplate.queryForObject("SELECT * FROM POSTS WHERE id=?",inputParam,new RowMapper<Post>(){
            @Override
            public Post mapRow(ResultSet rs, int i)throws SQLException
            {
                return new Post(rs.getString(1), rs.getString(2), rs.getString(3), rs.getDate(4).toLocalDate(), rs.getString(5));
            }
        });
    }


    @Override
    public void write(Cache.Entry<? extends String, ? extends Post> entry)throws CacheWriterException
    {
        Post post = entry.getValue();
        Map<String,Object> parameterMap = new HashMap<>();
        parameterMap.put("ID", post.getId());
        parameterMap.put("TITLE", post.getTitle());
        parameterMap.put("DESCRIPTION", post.getDescription());
        parameterMap.put("CREATIONDATE", post.getCreationDate());
        parameterMap.put("AUTHOR", post.getAuthor());

        jdbcTemplate.update("INSERT INTO POSTS(ID,TITLE,DESCRIPTION,CREATIONDATE,AUTHOR) VALUES (:ID,:TITLE,:DESCRIPTION,:CREATIONDATE,:AUTHOR);",parameterMap);
    }

    @Override
    public void delete(Object key)throws CacheWriterException
    {
        Map<String, String> deleteMap = new HashMap<>();
        deleteMap.put("ID", (String) key);
        jdbcTemplate.update("DELETE FROM POSTS WHERE ID= ?", deleteMap);
    }

    @Override
    public void start() throws IgniteException {
        ConfigurableApplicationContext context = new ClassPathXmlApplicationContext("postgres-context.xml");
        jdbcTemplate = context.getBean(NamedParameterJdbcTemplate.class);
    }

    @Override
    public void stop() throws IgniteException {

    }
}

load()方法通过id (post)查询表post,并在找到时将条目加载到缓存中。方法write()用于将条目存储到表post中。该方法获取作为输入参数的DTO post实例,并将post数据插入表posts。在方法delete()中,我们通过键从表中删除条目。当节点第一次启动时,将调用方法start()。在start()方法中,我们从类路径初始化spring上下文,并从spring上下文获取JdbcTemplete。

Step6:

为了存储和从PostgreSQL加载数据,我们以编程的方式配置了Ignite cacheConfiguration。另一种方法是使用Spring XML配置来配置Ignite cacheConfiguration。我们提供一个com.mycookcode.bigData.ignite.App实例用于从缓存中存储和加载条目。给定的类用于PostgreSQL和MongoDB。现在,让我们首先仔细看看PostgreSQL的缓存配置。

package com.mycookcode.bigData.ignite;

import com.mycookcode.bigData.ignite.jdbc.PostgresDBStore;
import com.mycookcode.bigData.ignite.jdbc.model.Post;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.cache.configuration.FactoryBuilder;

import java.time.LocalDate;
import java.time.temporal.ChronoUnit;

import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;


public class App
{

    private static final String POST_CACHE_NAME = App.class.getSimpleName() + "-post";
    private static Logger LOGGER = LoggerFactory.getLogger(App.class);
    private static final String POSTGRESQL = "postgresql";
    private static final String MONGODB = "mongodb";

    public static void main( String[] args ) throws Exception
    {

        jdbcStoreExample();
    }


    private static void jdbcStoreExample() throws Exception
    {
        //构建一个动态缓存,它分布在所有运行的节点上。
        //也可以在xml配置文件中使用相同的配置
        IgniteConfiguration cfg = new IgniteConfiguration();

        CacheConfiguration configuration = new CacheConfiguration();
        configuration.setName("dynamicCache");
        configuration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);

        configuration.setCacheStoreFactory(FactoryBuilder.factoryOf(PostgresDBStore.class));
        configuration.setReadThrough(true);
        configuration.setWriteThrough(true);

        configuration.setWriteBehindEnabled(true);

        log("Start. PersistenceStore example.");
        cfg.setCacheConfiguration(configuration);

        try(Ignite ignite = Ignition.start(cfg))
        {
            int count = 10;
            try(IgniteCache<String,Post> igniteCache = ignite.getOrCreateCache(configuration))
            {
                
                try (Transaction tx =  ignite.transactions().txStart(PESSIMISTIC,REPEATABLE_READ)){
                    for(int i = 1;i <= count;i++)
                    {
                        igniteCache.put("_"+i,new Post("_" + i, "title-" + i, "description-" + i, LocalDate.now().plus(i, ChronoUnit.DAYS),"author-" + i));
                    }
                    tx.commit();

                    for (int i = 1;i < count;i+=2)
                    {
                        igniteCache.clear("_"+i);
                        log("Clear every odd key: " + i);
                    }

                    for (long i = 1;i <= count;i++)
                    {
                        log("Local peek at [key=_" + i + ", val=" + igniteCache.localPeek("_" + i) + ']');
                    }

                    for (long i = 1;i <= count;i++)
                    {
                        log("Got [key=_" + i + ", val=" + igniteCache.get("_" + i) + ']');
                    }
                    tx.commit();

                }
            }
            log("PersistenceStore example finished.");
            //ignite.destroyCache("dynamicCache");
            Thread.sleep(Integer.MAX_VALUE);
        }

    }


    private static void log(String msg) {
        LOGGER.info("\t" + msg);
    }
}

首先,我们创建了Ignite CacheConfiguration实例,并将缓存名称设置为dynamicCache。将原子性模式设置为事务性。接下来,我们将PostgresDBStore设置为缓存工厂,并启用读写功能。

Step7:

现在,我们已经完成了项目,可以编译并运行应用程序。使用以下maven命令编译项目。

mvn clean install

上面的命令将编译并创建一个可执行jar来运行应用程序。

Step8:

从命令行启动应用程序。转到文件夹持久存储并运行下面的命令。

java -jar ./target/cache-store-runnable.jar

上面的命令将运行应用程序,并向Ignite缓存中添加10个新的唯一post。
缓存条目也将存储在表post中。
Ignite CacheStore工厂执行SQL语句将条目存储到表中。使用ignitevisior命令行工具,还可以在cache dynamicCache中查找缓存条目。


image.png

让我们通过执行以下SQL查询来查询表post。

select * from posts;

上面的SQL语句应该返回如下图所示。


image.png

这是将数据记录存储到数据库表中的最原始的方法。必须提到的一件事是,每个Ignite节点的每个数据记录都将写入数据库表。如果表中已经存在任何数据记录,它将被更新或转义(取决于配置)。在高强度的情况下,写入Ignite缓存可能会对数据库造成严重的负载。要克服这种情况,应该仔细配置数据库的JDBC连接池。使用RDBMS作为持久化存储有一个缺点:RDBMS很难扩展。但是,您可以使用可伸缩的PostgreSQL-XL来克服这个问题。使用RDBMS还有一些优点:您可以使用任何现有的BI工具(如Oracle BI、Tableau)轻松地分析数据。

为了消除使用Java DTO创建数据库表和映射的复杂性,Apache Ignite提供了数据库模式映射向导应用程序,它提供与持久性存储集成的自动支持。该向导自动连接到底层数据库,并创建所需的所有XML OR-mapping配置和Java域模型pojo。

相关文章

网友评论

  • 北国狼人:抓紧写一下MongoDB的第三方存储方案吧

本文标题:(五)apache ignite-Persistence 持久化

本文链接:https://www.haomeiwen.com/subject/czmoaftx.html