美文网首页Thinking In Data
(五)apache ignite-Persistence 持久化

(五)apache ignite-Persistence 持久化

作者: 席梦思 | 来源:发表于2018-11-08 15:08 被阅读31次

    使用NoSQL或MongoDB作为持久性存储的主要好处是,它们非常适合进行扩展。通过使用MongoDB,您可以将冷数据或历史数据直接以JSON的形式存储到持久层,而无需进行任何数据转换。MongoDB的持久性方法与前一节RDBMS非常相似。


    image.png

    上图显示了使用MongoDB作为持久化存储的概念体系结构。关于MongoDB: MongoDB是一个开源的无模式数据库。MongoDB是一个面向文档的数据库,以BSON或二进制JSON格式存储所有数据。这使MongoDB能够灵活地通过javascript访问它的功能,使特定类型的数据集成更容易、更快。MongoDB还通过共享集群的配置支持数据分片;这意味着可以垂直和水平无缝地扩展MongoDB。通常,MongoDB用于实时分析,在这种情况下,延迟很低,可用性非常高。为了完成MongoDB中缓存数据记录的持久性,我们将首先安装MongoDB并准备maven项目。如果已经在系统中安装和配置了MongoDB数据库,那么可以跳过第一步,从第4步继续。

    Step 1:

    从这里下载MongoDB发行版(https://www.mongodb.com/download-center?jmp=nav#community)。我将使用社区版和下载MacOS的发行版。

    Step 2:

    将发行版解压到操作系统中的某个位置,如下所示。

    tar –xvf mongodb-osx-ssl-x86_64-4.0.3.tgz
    

    创建本地文件目录/data/db

    Step 3:

    在命令行中使用以下命令运行MongoDB服务器。

    ./mongod --dbpath /data/db
    

    MongoDB在27017端口启动并运行。

    Step 4:

    创建一个maven项目,并在pom.xml配置文件中添加所有的依赖项。

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.mycookcode.bigData.ignite</groupId>
      <artifactId>ignite-persistence</artifactId>
      <packaging>jar</packaging>
      <version>1.0-SNAPSHOT</version>
      <name>ignite-persistence</name>
      <url>http://maven.apache.org</url>
    
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <ignite.version>2.6.0</ignite.version>
        <spring.version>5.1.1.RELEASE</spring.version>
      </properties>
    
    
      <dependencies>
    
        <dependency>
          <groupId>org.apache.ignite</groupId>
          <artifactId>ignite-core</artifactId>
          <version>${ignite.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.ignite</groupId>
          <artifactId>ignite-spring</artifactId>
          <version>${ignite.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.ignite</groupId>
          <artifactId>ignite-indexing</artifactId>
          <version>${ignite.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.ignite</groupId>
          <artifactId>ignite-slf4j</artifactId>
          <version>${ignite.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.ignite</groupId>
          <artifactId>ignite-log4j</artifactId>
          <version>${ignite.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.25</version>
        </dependency>
    
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
    
        <dependency>
          <groupId>postgresql</groupId>
          <artifactId>postgresql</artifactId>
          <version>9.1-901-1.jdbc4</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-core</artifactId>
          <version>${spring.version}</version>
    
        </dependency>
    
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-aop</artifactId>
          <version>${spring.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-beans</artifactId>
          <version>${spring.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-context</artifactId>
          <version>${spring.version}</version>
        </dependency>
    
    
        <dependency>
          <groupId>org.springframework.data</groupId>
          <artifactId>spring-data-mongodb</artifactId>
          <version>2.1.1.RELEASE</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.data</groupId>
          <artifactId>spring-data-jpa</artifactId>
          <version>2.1.1.RELEASE</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-tx</artifactId>
          <version>${spring.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-expression</artifactId>
          <version>${spring.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-framework-bom</artifactId>
          <version>${spring.version}</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
    
    
      </dependencies>
    
    
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
    
          <plugin>
            <groupId>com.jolira</groupId>
            <artifactId>onejar-maven-plugin</artifactId>
            <version>1.4.4</version>
            <executions>
              <execution>
                <id>build-query</id>
                <configuration>
                  <mainClass>com.mycookcode.bigData.ignite.App</mainClass>
                  <attachToBuild>true</attachToBuild>
                  <classifier>onejar</classifier>
                  <filename>cache-store-runnable.jar</filename>
                </configuration>
                <goals>
                  <goal>one-jar</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    
    </project>
    

    Step 5:

    创建com.mycookcode.bigData.ignite.nosql.model.MongoPost类对应MongoDB中的数据模型。

    package com.mycookcode.bigData.ignite.nosql.model;
    
    
    import org.springframework.data.annotation.Id;
    import org.springframework.data.mongodb.core.mapping.Document;
    
    import java.time.LocalDate;
    import java.util.Objects;
    
    @Document
    public class MongoPost {
    
        @Id
        private String id;
    
        private String title;
    
        private String description;
    
        private LocalDate creationDate;
    
        private String author;
    
        public MongoPost() {
        }
    
        public MongoPost(String id, String title, String description, LocalDate creationDate, String author) {
            this.id = id;
            this.title = title;
            this.description = description;
            this.creationDate = creationDate;
            this.author = author;
        }
    
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getTitle() {
            return title;
        }
    
        public void setTitle(String title) {
            this.title = title;
        }
    
        public String getDescription() {
            return description;
        }
    
        public void setDescription(String description) {
            this.description = description;
        }
    
        public LocalDate getCreationDate() {
            return creationDate;
        }
    
        public void setCreationDate(LocalDate creationDate) {
            this.creationDate = creationDate;
        }
    
        public String getAuthor() {
            return author;
        }
    
        public void setAuthor(String author) {
            this.author = author;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o)
                return true;
            if (o == null || getClass() != o.getClass())
                return false;
            MongoPost post = (MongoPost)o;
            return Objects.equals(id, post.id) &&
                    Objects.equals(title, post.title) &&
                    Objects.equals(description, post.description) &&
                    Objects.equals(creationDate, post.creationDate) &&
                    Objects.equals(author, post.author);
        }
    
        @Override
        public int hashCode() {
            return Objects.hash(id, title, description, creationDate, author);
        }
    
        @Override
        public String toString() {
            return "MongoPost{" +
                    "id='" + id + '\'' +
                    ", title='" + title + '\'' +
                    ", description='" + description + '\'' +
                    ", creationDate=" + creationDate +
                    ", author='" + author + '\'' +
                    '}';
        }
    }
    

    Step 6:

    我们将使用spring框架中的spring-data-MongoDB库来处理MongoDB。我们还创建了一个spring上下文文件mongo-context.xml来配置MongoDB工厂。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:mongo="http://www.springframework.org/schema/data/mongo"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    
        <context:annotation-config />
    
        <mongo:mongo-client id="mongo" host="localhost" port="27017"/>
    
        <mongo:db-factory id="mongoDbFactory" dbname="test" mongo-ref="mongo" />
    
        <bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
            <constructor-arg ref="mongoDbFactory"/>
        </bean>
    
    
        <mongo:repositories base-package="com.mycookcode.bigData.ignite.nosql"  mongo-template-ref="mongoTemplate"/>
    
        <bean class="org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor"/>
    
        </beans>
    

    上面的XML文件指示Spring执行以下操作:

    • 指定MongoDB主机和端口。
    • 设置MongoDB工厂数据库名称是test。
    • 转换MongoPost DTO为mongo Bson。
    • 设置MongoDB模版工厂。

    Step 7:

    创建一个名为MongoDBStore的新Java类,它将扩展Ignite CacheStoreAdapter并实现LifecycleAware接口。

    package com.mycookcode.bigData.ignite.nosql;
    
    import com.mycookcode.bigData.ignite.nosql.model.MongoPost;
    import org.apache.ignite.IgniteException;
    import org.apache.ignite.cache.store.CacheStoreAdapter;
    import org.apache.ignite.lifecycle.LifecycleAware;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    
    
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.data.mongodb.core.MongoOperations;
    
    import javax.cache.Cache;
    import javax.cache.integration.CacheLoaderException;
    import javax.cache.integration.CacheWriterException;
    
    
    public class MongoDBStore extends CacheStoreAdapter<String,MongoPost> implements LifecycleAware {
    
        @Autowired
        private PostRepository postRepository;
    
        @Autowired
        private MongoOperations mongoOperations;
    
    
        private static Logger logger = LoggerFactory.getLogger(MongoDBStore.class);
    
        @Override
        public MongoPost load(String key) throws CacheLoaderException
        {
            logger.info(String.valueOf(postRepository));
            return postRepository.findById(key).get();
        }
    
        @Override
        public void write(Cache.Entry<? extends String,? extends MongoPost> entry) throws CacheWriterException
        {
            MongoPost post = entry.getValue();
            logger.info(String.valueOf(postRepository));
            postRepository.save(post);
        }
    
        @Override
        public void delete(Object key) throws CacheWriterException
        {
            logger.info(String.valueOf(postRepository));
            postRepository.deleteById((String) key);
        }
    
        @Override
        public void start() throws IgniteException
        {
            ConfigurableApplicationContext context  = new ClassPathXmlApplicationContext("mongo-context.xml");
            postRepository = context.getBean(PostRepository.class);
            logger.info(String.valueOf(postRepository));
            mongoOperations = context.getBean(MongoOperations.class);
            if (!mongoOperations.collectionExists(MongoPost.class))
                mongoOperations.createCollection(MongoPost.class);
        }
    
        @Override
        public void stop() throws IgniteException
        {
    
        }
    
    }
    

    上面的代码实现并覆写了三个CacheStore方法:load()、write()和delete()。在load()方法中,我们通过给定的键查找MongoDB中的文档,并返回MongoPost实例。Write()方法将单个缓存的数据记录保存到MongoDB test数据库中。Delete()方法通过给定的键从MongoDB删除bson文档。

    Step 8:

    为了存储并从MongoDB数据库加载数据,我们以编程方式配置Ignite cacheConfiguration。我们编写一个com.mycookcode.bigData.ignite.App类用于从缓存中存储和加载数据记录。

    package com.mycookcode.bigData.ignite;
    
    import com.mycookcode.bigData.ignite.jdbc.PostgresDBStore;
    import com.mycookcode.bigData.ignite.jdbc.model.Post;
    import com.mycookcode.bigData.ignite.nosql.MongoDBStore;
    import com.mycookcode.bigData.ignite.nosql.model.MongoPost;
    import org.apache.ignite.Ignite;
    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.Ignition;
    import org.apache.ignite.cache.CacheAtomicityMode;
    import org.apache.ignite.configuration.CacheConfiguration;
    import org.apache.ignite.configuration.IgniteConfiguration;
    import org.apache.ignite.transactions.Transaction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.cache.Cache;
    import javax.cache.configuration.FactoryBuilder;
    
    import java.time.LocalDate;
    import java.time.temporal.ChronoUnit;
    
    import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
    import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
    
    
    public class App
    {
    
        private static final String POST_CACHE_NAME = App.class.getSimpleName() + "-post";
        private static Logger LOGGER = LoggerFactory.getLogger(App.class);
        private static final String POSTGRESQL = "postgresql";
        private static final String MONGODB = "mongodb";
    
        public static void main( String[] args ) throws Exception
        {
    
            if(args.length <= 0 ){
                LOGGER.error("Usages! java -jar .\\target\\cache-store-runnable.jar postgresql|mongodb");
                System.exit(0);
            }
            if(args[0].equalsIgnoreCase(POSTGRESQL)){
                jdbcStoreExample();
            } else if (args[0].equalsIgnoreCase(MONGODB)){
                nosqlStore();
            }
        }
    
    
        private static void nosqlStore() throws Exception
        {
            IgniteConfiguration cfg = new IgniteConfiguration();
    
            CacheConfiguration configuration = new CacheConfiguration();
            configuration.setName("mongoDynamicCache");
            configuration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
    
            configuration.setCacheStoreFactory(FactoryBuilder.factoryOf(MongoDBStore.class));
            configuration.setReadThrough(true);
            configuration.setWriteThrough(true);
            configuration.setWriteBehindEnabled(true);
    
            log("Start. PersistenceStore example.");
            cfg.setCacheConfiguration(configuration);
    
            try (Ignite ignite = Ignition.start(cfg)){
                int count = 10;
                try(IgniteCache<String,MongoPost> igniteCache = ignite.getOrCreateCache(configuration))
                {
                    try(Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
                    {
                        for(int i = 0;i <= count;i++)
                        {
                            igniteCache.put("_"+i,new MongoPost("_" + i, "title-" + i, "description-" + i, LocalDate.now().plus(i, ChronoUnit.DAYS), "author-" + i));
                        }
                        for (int i = 1; i < count; i += 2) {
                            igniteCache.clear("_" + i);
                            log("Clear every odd key: " + i);
                        }
    
                        for (long i = 1; i <= count; i++)
                            log("Local peek at [key=_" + i + ", val=" + igniteCache.localPeek("_" + i) + ']');
    
                        for (long i = 1; i <= count; i++)
                            log("Got [key=_" + i + ", val=" + igniteCache.get("_" + i) + ']');
                        tx.commit();
                    }
                }
                log("PersistenceStore example finished.");
                ignite.destroyCache("mongoDynamicCache");
            }
        }
    
    
        private static void jdbcStoreExample() throws Exception
        {
            //构建一个动态缓存,它分布在所有运行的节点上。
            //也可以在xml配置文件中使用相同的配置
            IgniteConfiguration cfg = new IgniteConfiguration();
    
            CacheConfiguration configuration = new CacheConfiguration();
            configuration.setName("dynamicCache");
            configuration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
    
            configuration.setCacheStoreFactory(FactoryBuilder.factoryOf(PostgresDBStore.class));
            configuration.setReadThrough(true);
            configuration.setWriteThrough(true);
    
            configuration.setWriteBehindEnabled(true);
    
            log("Start. PersistenceStore example.");
            cfg.setCacheConfiguration(configuration);
    
            try(Ignite ignite = Ignition.start(cfg))
            {
                int count = 10;
                try(IgniteCache<String,Post> igniteCache = ignite.getOrCreateCache(configuration))
                {
    
                    try (Transaction tx =  ignite.transactions().txStart(PESSIMISTIC,REPEATABLE_READ)){
                        for(int i = 1;i <= count;i++)
                        {
                            igniteCache.put("_"+i,new Post("_" + i, "title-" + i, "description-" + i, LocalDate.now().plus(i, ChronoUnit.DAYS),"author-" + i));
                        }
                        tx.commit();
    
                        for (int i = 1;i < count;i+=2)
                        {
                            igniteCache.clear("_"+i);
                            log("Clear every odd key: " + i);
                        }
    
                        for (long i = 1;i <= count;i++)
                        {
                            log("Local peek at [key=_" + i + ", val=" + igniteCache.localPeek("_" + i) + ']');
                        }
    
                        for (long i = 1;i <= count;i++)
                        {
                            log("Got [key=_" + i + ", val=" + igniteCache.get("_" + i) + ']');
                        }
                        tx.commit();
    
                    }
                }
                log("PersistenceStore example finished.");
                //ignite.destroyCache("dynamicCache");
                Thread.sleep(Integer.MAX_VALUE);
            }
    
        }
    
    
        private static void log(String msg) {
            LOGGER.info("\t" + msg);
        }
    }
    

    首先创建Ignite CacheConfiguration实例,并将缓存名称设置为mongoDynamicCache。将原子性模式设置为事务性。然后将MongoDBStore设置为缓存工厂,并启用读写功能。

    Step 9:

    通过maven编译这个项目,并运行应用程序将一些条目从Ignite缓存存储到MongoDB中。

    mvn clean install
    

    使用以下命令运行应用程序。

    java -jar ./target/cache-store-runnable.jar mongodb
    

    相关文章

      网友评论

        本文标题:(五)apache ignite-Persistence 持久化

        本文链接:https://www.haomeiwen.com/subject/syadxqtx.html