美文网首页
opensearch元数据分析

opensearch元数据分析

作者: 以梦为马驾驾驾 | 来源:发表于2023-05-03 12:05 被阅读0次

OpenSearch存储数据的默认位置是data目录:


data目录

存储的数据的种类

nodes/0下的就是节点存储的数据了, 分为以下几种:

  1. state元信息
  2. index lucene的索引和段文件等
  3. translog 事务日志
image.png image.png

state 元信息

opensearch fork自 elasticsearch7.10, 与它一样, 关于集群状态的元数据, 是写在了lucene的段文件里.
只有具备Master资格的节点和数据节点可以持久化集群状态.

  • GatewayMetaState 负责元数据的接收和持久化
  • GatewayService 负责元数据的恢复(在重启集群)
  1. _state/*.st : 集群层面的元信息: UUID, Settings, template, etc
  2. indices/{index_uuid}/_state/*.st: 索引层面的元信息
  3. indices/{index_uuid}/_state/0/_state/*.st: 分片层面的元信息

关于 st 后缀的文件, 可以下载一个16进制的查看器, 查看文件的内容:
http://hexfiend.com/

image.png

Notice: 持久化的state不包括某个分片存在于哪个节点这种内容路由信息,集群完全重启时,依靠
gateway的recovery过程重建RoutingTable.

image.png

元数据的持久化与变更

元数据从哪里来: 1. 持久化存储 2. ClusterState

GatewayMetaState的内部类: GatewayClusterApplier extents ClusterStateApplier 会将接受到的ClusterState的元信息持久化. applyClusterState 的统一调用入口在ClusterApplierService#callClusterStateAppliers

        public void applyClusterState(ClusterChangedEvent event) {
            if (event.state().blocks().disableStatePersistence()) {
                incrementalClusterStateWriter.setIncrementalWrite(false);
                return;
            }

            try {
                // Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term
                // that's higher than the last accepted term.
                // TODO: can we get rid of this hack?
                if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {
                    incrementalClusterStateWriter.setCurrentTerm(event.state().term());
                }

                incrementalClusterStateWriter.updateClusterState(event.state());
                incrementalClusterStateWriter.setIncrementalWrite(true);
            } catch (WriteStateException e) {
                logger.warn("Exception occurred when storing new meta data", e);
            }
        }

org.opensearch.gateway.IncrementalClusterStateWriter#updateClusterState 会负责持久化元信息(实际的写入动作还是委托给了MetaStateService以及MetadataStateFormat, 处理失败等等:

写入全局状态long globalStateGeneration = writeGlobalState(writer, newMetadata);
然后写入索引状态: Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState);
最后写入Manifest: writeManifest(writer, manifest);Manifest 文件是持久化元信息的入口.

举例, 写入全局状态

        long writeGlobalState(String reason, Metadata metadata) throws WriteStateException {
            assert finished == false : FINISHED_MSG;
            try { // 1. 设置回滚清理 2. 写入 3. 设置提交清理  两个清理的区别是: 1 会删除previousManifest之前的提交记录 3. 是提交成功的,会删除此次提交之前的记录, 包括previousManifest
                rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration()));
                long generation = metaStateService.writeGlobalState(reason, metadata);
                commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation));
                return generation;
            } catch (WriteStateException e) {
                rollback();
                throw e;
            }
        }

  1. 设置回滚清理
  2. 写入
  3. 设置提交清理 两个清理的区别是: 1 会删除previousManifest之前的提交记录 3. 是提交成功的,会删除此次提交之前的记录, 包括previousManifest.

所谓提交记录, 是提交点. _state 目录下的带有对应前缀的都是提交点, 数字后缀是版本. 比如node-130就是node的元信息, 版本130.

而底层执行读写的逻辑都被封装在了MetaDataStateFormat<T>中, 主要由write 方法调用写(且写都遵循同样的逻辑 : 写临时文件 -> 刷盘 -> rename/move 原子操作 ), 不同的元数据作为子类, 需要实现抽象方法:

    /**
     * Writes the given state to the given XContentBuilder
     * Subclasses need to implement this class for theirs specific state.
     */
    public abstract void toXContent(XContentBuilder builder, T state) throws IOException;

    /**
     * Reads a new instance of the state from the given XContentParser
     * Subclasses need to implement this class for theirs specific state.
     */
    public abstract T fromXContent(XContentParser parser) throws IOException;
image.png

元数据的恢复

GatewayService负责元数据的恢复动作, 在集群状态发生变化的时候, 负责元数据的恢复. 调用点: ClusterApplierService在应用集群状态变更的时候. GaewayService是一级的组件, 在 org.opensearch.gateway.GatewayService.doStart 组件启动的时候 ,将自己作为一个集群状态监听器注册到ClusterService中, 这样, ClusterServiceClusterState发生变化的时候, 会调用ClusterApplierServicehandleApplyCommit#onNewClusterState#applyChange 来触发监听器的调用. 在zen2模式下集群级别和索引级别的元数据在集群master选择出来后就已经有了, 而zen1(Gateway)是在更新索引级别的. 当索引级别的元数据也恢复以后, 就需要恢复shard级别的. 从始至终, 任务都在MasterSerivce的一个"masterService#updateTask" 的线程池里执行,这是个单线程的线程池, 也就是说,任务会阻塞执行.

index级别

GatewayService依赖于:

  1. AllocationService shard分配服务, 元数据在cluster和index层面的恢复是由GatewayService操作的, 而shard级别的是由AllocationSerivce , 所以完整的恢复需要依赖AllocationService
  2. ClusterService 集群服务, 负责的是集群状态的协调, 状态变更的发生地, GatewayService要感知到集群状态的变化, 必须依赖他
  3. TransportNodesListGatewayMetaState todo
  4. Discovery 用来判断 集群是用的zen1 还是 zen2(修改过的raft)
  5. ThreadPool and Settings

GatewayService 初始化的时候, 读取元数据恢复的各类配置项.

GatewayService() {
        this.expectedNodes = EXPECTED_NODES_SETTING.get(settings);
        this.expectedDataNodes = EXPECTED_DATA_NODES_SETTING.get(settings);
        this.expectedMasterNodes = EXPECTED_MASTER_NODES_SETTING.get(settings);

        if (RECOVER_AFTER_TIME_SETTING.exists(settings)) {
            recoverAfterTime = RECOVER_AFTER_TIME_SETTING.get(settings);
        } else if (expectedNodes >= 0 || expectedDataNodes >= 0 || expectedMasterNodes >= 0) {
            recoverAfterTime = DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET;
        } else {
            recoverAfterTime = null;
        }
        this.recoverAfterNodes = RECOVER_AFTER_NODES_SETTING.get(settings);
        this.recoverAfterDataNodes = RECOVER_AFTER_DATA_NODES_SETTING.get(settings);
        // default the recover after master nodes to the minimum master nodes in the discovery
        if (RECOVER_AFTER_MASTER_NODES_SETTING.exists(settings)) {
            recoverAfterMasterNodes = RECOVER_AFTER_MASTER_NODES_SETTING.get(settings);
        } else if (discovery instanceof ZenDiscovery) {
            recoverAfterMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
        } else {
            recoverAfterMasterNodes = -1;
        }
}
  • expectedNodes: "gateway.expected_nodes" : data和master-elig节点的个数, 要达到这个数据才会开始恢复
  • expectedDataNodes: "gateway.expected_data_nodes"
  • expectedMasterNodes: "gateway.expected_master_nodes"
  • recoverAfterTime: "gateway.recover_after_time" : 如果没有达到预期的节点数量,则恢复过程将等待配置的时间,再尝试恢复。
  • recoverAfterNodes: "gateway.recover_after_nodes" : 只要配置数量的节点(数据节点或具备Master资格的节点)加入集群就可以开始恢复
  • recoverAfterDataNodes: "gateway.recover_after_data_nodes": 更精细化的配置
  • recoverAfterMasterNodes: "gateway.recover_after_master_nodes" : 更精细化的配置
    expected_nodesrecover_after_timerecover_after_nodes 是 或 的关系, 只要有一个先达到, 就执行.

STATE_NOT_RECOVERED_BLOCK : 阻塞态, 如果有, 则表示集群的元数据还没有恢复, 需要执行GatewayService的恢复. 如果没有, 则已经恢复, 不用执行. 所以, 可以关注哪些地方会生成这个阻塞态:
Discovery的子类zenCoordinatordoStart 方法, 即集群在选主的时候, 选主完成后(找到最大的term, 最大的version), 找到最新的ClusterState, 然后发布出去, 二阶段提交的第二阶段触发集群状态变化. 然后, 进入恢复流程.

根据选主算法的不同, 会采用不同的恢复流程:

        if (discovery instanceof Coordinator) {
            recoveryRunnable = () -> clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
        } else {
            final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
            recoveryRunnable = () -> gateway.performStateRecovery(new GatewayRecoveryListener());
        }

即, 不同选主算法下, 元数据的恢复流程被封装成了一个 Runnable recoveryRunnable, 在clusterChanged#performStateRecovery的时候,触发调用. 出发前要做一系列的校验, 判断前置条件, 如:
执行时间, 节点个数 等

zen1

入口:
org.opensearch.gateway.Gateway#performStateRecovery

步骤:

  1. final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class); 获取具有Matser资格的节点列表
  2. TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet(); 获取每个节点上的 IndexMetaData, 是同步阻塞调用. TransportNodesAction
  3. final int requiredAllocation = Math.max(1, minimumMasterNodes); zen1模式的缺陷之一: 必须要用户设置了过半的个数
  4. 从各个node拿回来的信息, 比较每个node里version 最大的ClusterState, 取版本最大的. 并获取所有 master-elig节点中存储的所有index信息.
  5. 最大版本号的ClusterState 可以用来做除了index信息之外的其他信息的基础, index信息, 将通过第四步的获取结果重建. 对从各个节点拿回来的信息按照index分组, Index -> List[Node]. 然后找出每个index在各个node中最大版本的元数据作为此index的元信息.
  6. 删除无用信息, 不合法的信息, 最终基于最大版本号的ClusterState(没有index信息)以及第五步中获取的信息构建新的ClusterState
  7. 构建后处理: 调用成功和失败的后处理. 成功: GatewayRecoveryListener , 提交一个"local-gateway-elected-state", new RecoverStateUpdateTask() 的子任务,
    • 任务会混合当前的ClusterState和6得到的恢复了的ClusterState
    • 并且重建阻塞态, Settings中设置的.
    • 最后调用父类 RecoverStateUpdateTask 的execute方法, 从indices重建 routingTable , 取出 Not recovered 的阻塞态.
    • 启动shard的恢复: allocationService.reroute(newState, "state recovered");

zen2 Coordinator

clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask()

与zen1 模式不同的是, Coordinator不需要再向各个master-eligible节点拉取indices的元信息以决定index 的最新元信息是什么, 因为利用了Raft的选主算法, 已经可以保证选完主之后发布的ClusterState就是罪行的. 所以, 直接用他们构建routingTable即可, 然后触发 shard的恢复.

核心逻辑:

        public ClusterState execute(final ClusterState currentState) {
            if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                logger.debug("cluster is already recovered");
                return currentState;
            }

            final ClusterState newState = Function.<ClusterState>identity()
                .andThen(ClusterStateUpdaters::updateRoutingTable)
                .andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock)
                .apply(currentState);

            return allocationService.reroute(newState, "state recovered");
        }

shard级别

见其他章节

相关文章

  • opensearch

    参考文档 opensearch[https://opensearch.org/docs/1.1/troublesh...

  • 阿里云OpenSearch的nodejs接入实现

    阿里云OpenSearch的API 链接 首先说一句,阿里的文档写的真烂 然后说重点 阿里OpenSearch没...

  • Hadoop源码分析-Namenode 元数据管理源码分析

    [TOC] Namenode 元数据管理源码分析 本文讲解 Namenode 元数据管理源码分析,内容包括HDFS...

  • 栅格数据的空间分析——统计分析

    1.局部分析: 局部分析工具组主要用于多层面栅格数据的叠合分析,对栅格数据以像元为单位进行像元统计分析。例如,对同...

  • App 审核分析与解决

    App 审核被拒分析与解决: 1、元数据“元数据与应用内容不符 (贴近自己的软件编写上架内容)* 元数据包含不雅词...

  • JAVA 注解 Annontation

    注解的作用:1) 编写文档:通过代码里标识的元数据生成文档2)代码分析:通过代码里标识的元数据对代码进行分析3)编...

  • ##数据仓库元数据管理

    //数据仓库元数据管理 | 网站数据分析http://webdataanalysis.net/web-data-w...

  • 注解笔记

    元数据 定义:就是对数据进行说明的数据作用:用于生成文档、代码分析、编译检查 注解 注解就属于一种元数据,是对代码...

  • Day 2063:学习

    #数据中台 数据地图是基于元数据中心构建的一站式企业数据资产目录,可以看作是元数据中心的界面。数据开发、分析师、数...

  • OpenSearch:轻松构建大数据搜索服务

    随着互联网数据规模的爆炸式增长,如何从海量的历史、实时 数据中快速获取有用信息,变得越来越具有挑战性。搜索是获取信...

网友评论

      本文标题:opensearch元数据分析

      本文链接:https://www.haomeiwen.com/subject/odpnjdtx.html