美文网首页HBASE 知识整理
原创-Hbase WAL日志数据实时推送到kafka

原创-Hbase WAL日志数据实时推送到kafka

作者: 无色的叶 | 来源:发表于2020-05-25 15:43 被阅读0次

    实时同步Hbase WAL日志到kafka,笔者这边使用场景有以下两个:

    • 解决多个流Join关联(超过三个流以上),对关联字段作为rowkey,实时写入到Hbase组装成一张宽表,解析WAL日志,并把rowkey实时推送到kafka,Flink再反向查询Hbase并进行实时统计分析
    • 利用Hbase的列动态扩展能力,实时对数据进行预处理,组装宽表,解析WAL日志把rowkey实时推送到kafka,Flink再反向查询Hbase,并批量写入到clickhouse提供分钟级的数据OLAP分析加工处理

    实现原理

    Hbase提供了跨集群的数据同步方式Replication,可通过自定义Replication Endpoint,把消息写入kafka,先来了解Hbase Replication集群之间进行复制同步的过程,整体数据复制流程如下图:


    image.png
    • 在创建Peer集群Replication链路时,每一个Regionserver会创建一个ReplicationSource线程,ReplicationSource首先把当前正在写入的HLog都保存在复制队列中,然后再Regionserver上注册一个Listener,用来监听HLog Roll操作,如果Regionserver做了HLog Roll操作,那么ReplicationSource收到这个操作后,会把这个HLog分到对应的walGroup-Queue里面,同时把HLog文件名持久化到Zookeeper上,这样重启后还可以接着复制未完成的HLog
    • 每个walGroup-Queue后端有一个ReplicationSourceWALReader的线程,不断的从Queue中取出一个Hlog,然后把HLog中的entry逐个读取出来,放到一个名为entryBatchQueue的队列中
    • 每个entryBatchQueue的队列后端有一个ReplicationSourceShipper的线程,不断的从Queue中读取Log Entry,交给Peer的ReplicationEndpoint,ReplicationEndpoint把这些entry打包成一个replicationWALEntry操作,通过RPC发送到Peer集群的某个RegionServer上
    • 对应Peer集群上的RegionServer把replicationWALEntry解析成若干个Batch操作,并调用batch接口执行。待RPC调用成功之后,ReplicationSourceShipper会更新最近一次成功复制的HLog Position到Zookeeper以便RegionServer重启后,下次能找到最新的Position开始复制
      <注以上参考HBase原理与实践>

    通过以上Hbase Replication的复制过程,可理解,可通过自定义ReplicationEndpoint把entry解析发送到kafka,即可实现实时解析WAL日志推送到消息系统

    Hbase默认RepliactionEndpoint实现

    Hbase默认对应的RepliactionEndpoint实现是HBaseInterClusterReplicationEndpoint,其中封装replicationWALEntry通过RPC发送到Peer集群,对应方法replicateEntries,可参考该类自定义一个KafkaInterClusterReplicationEndpoint类,改写replicateEntries方法推送数据到kafka

        protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
            try {
                int entriesHashCode = System.identityHashCode(entries);
                if (LOG.isTraceEnabled()) {
                    long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
                    LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
                            logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
                }
    
                LOG.info("---entries size----" + entries.size());
                ProduceUtils produceUtils = ProduceUtils.getInstance(brokerServers);
                try {
                    entries.forEach(entry -> {
                        TableName table = entry.getKey().getTableName();
                        WALEdit edit = entry.getEdit();
    
                        long origLogSeqNum = entry.getKey().getOrigLogSeqNum();
                        long sequenceId = entry.getKey().getSequenceId();
    
                        long writeTime = entry.getKey().getWriteTime();
                        String nameSpace = Bytes.toString(table.getNamespace());
                        String tableName = Bytes.toString(table.getName());
                        String rowKey = "";
    
                        LOG.info("------------------");
                        LOG.info("----namespace---" + nameSpace);
                        LOG.info("----tableName---" + tableName);
                        LOG.info("----brokerServers---" + brokerServers);
    
                        ArrayList<Cell> cells = edit.getCells();
                        if (cells != null && cells.size() > 0) {
                            rowKey = Bytes.toString(CellUtil.cloneRow(cells.get(0)));
                        }
    
                        List<String> rowKeys = new ArrayList();
                        cells.forEach(cell -> {
                            String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                            if (!rowKeys.contains(rowkey)) {
                                rowKeys.add(rowkey);
                            }
                        });
    
    
                        HashMap<String, Object> map = new HashMap<>();
                        map.put("origLogSeqNum", origLogSeqNum);
                        map.put("sequenceId", sequenceId);
                        map.put("batchIndex", batchIndex);
                        map.put("nameSpace", nameSpace);
                        map.put("tableName", tableName);
                        map.put("rowKeys", rowKeys);
                        map.put("currentTime", System.currentTimeMillis());
                        map.put("writeTime", writeTime);
                        map.put("peerId", peerId);
                        LOG.info("----rowKey---" + CollUtil.join(rowKeys, ","));
                        String jsonStr = JSONUtil.toJsonStr(map);
                        produceUtils.send(topicName, rowKey, jsonStr);
                        LOG.info("------------------");
                    });
    
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
                    }
                } catch (Exception e) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
                    }
                    throw e;
                } finally {
                    produceUtils.flush();
                    produceUtils.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return batchIndex;
        }
    

    注意java客户端如批量写入Hbase,对应WAL日志是一条记录

    如何配置自定义的ReplicationEndpoint

    • 创建Hbase表
    create 'person', {NAME=>'info',REPLICATION_SCOPE => '1'}
    

    注意REPLICATION_SCOPE属于设置为1,表示开启复制

    • 创建复制链路
    add_peer '111', ENDPOINT_CLASSNAME => 'cn.com.legend.hbase.replication.KafkaInterClusterReplicationEndpoint',CONFIG => { "brokerServers" => "192.168.111.129:9092,192.168.111.130:9092,192.168.111.131:9092", "topicName" => "test" },TABLE_CFS => { "person" => ["info"]}
    

    注意ENDPOINT_CLASSNAME属性,修改成自定义的ReplicationEndpoint,CONFIG 属性可配置自定义的参数,可在自定义的ReplicationEndpoint类init方法中通过以下方式获取

     @Override
        public void init(Context context) throws IOException {
            super.init(context);
            ReplicationPeer replicationPeer = context.getReplicationPeer();
            Configuration configuration = replicationPeer.getConfiguration();
            this.brokerServers = configuration.get("brokerServers");
            this.topicName = configuration.get("topicName");
            peerId = replicationPeer.getId();
    
            this.conf = HBaseConfiguration.create(ctx.getConfiguration());
    
    • 设置串行复制
    set_peer_serial '111',true 
    

    串行复制和费串行复制有啥区别,可自行查找资料

    • 删除复制链路
    remove_peer '111'
    

    附上涉及的完整类实现:

    • KafkaReplicationEndpoint类
    package cn.com.legend.hbase.replication;
    
    import org.apache.hadoop.hbase.Abortable;
    import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.UUID;
    
    /**
     * A {@link BaseReplicationEndpoint} for replication endpoints whose
     * target cluster is an HBase cluster.
     */
    public abstract class KafkaReplicationEndpoint extends BaseReplicationEndpoint
            implements Abortable {
    
        private static final Logger LOG = LoggerFactory.getLogger(KafkaReplicationEndpoint.class);
    
    
        @Override
        public void start() {
            startAsync();
        }
    
        @Override
        public void stop() {
            stopAsync();
        }
    
        @Override
        protected void doStart() {
            try {
                notifyStarted();
            } catch (Exception e) {
                notifyFailed(e);
            }
        }
    
        @Override
        protected void doStop() {
            notifyStopped();
        }
    
        @Override
        // Synchronize peer cluster connection attempts to avoid races and rate
        // limit connections when multiple replication sources try to connect to
        // the peer cluster. If the peer cluster is down we can get out of control
        // over time.
        public synchronized UUID getPeerUUID() {
            return UUID.randomUUID();
        }
    
    
        @Override
        public void abort(String why, Throwable e) {
            LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
                    + " was aborted for the following reason(s):" + why, e);
        }
    
        @Override
        public boolean isAborted() {
            // Currently this is never "Aborted", we just log when the abort method is called.
            return false;
        }
    
    }
    
    
    • KafkaInterClusterReplicationEndpoint类
    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     * <p>
     * http://www.apache.org/licenses/LICENSE-2.0
     * <p>
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package cn.com.legend.hbase.replication;
    
    import cn.com.legend.hbase.kafka.ProduceUtils;
    import cn.hutool.core.collection.CollUtil;
    import cn.hutool.json.JSONUtil;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.ClusterConnection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.ipc.RpcServer;
    import org.apache.hadoop.hbase.replication.ReplicationPeer;
    import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.hbase.util.FSUtils;
    import org.apache.hadoop.hbase.util.Threads;
    import org.apache.hadoop.hbase.wal.WAL.Entry;
    import org.apache.hadoop.hbase.wal.WALEdit;
    import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
    import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
    import org.apache.yetus.audience.InterfaceAudience;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.*;
    import java.util.concurrent.*;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    /**
     * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
     * implementation for replicating to another HBase cluster.
     * For the slave cluster it selects a random number of peers
     * using a replication ratio. For example, if replication ration = 0.1
     * and slave cluster has 100 region servers, 10 will be selected.
     * <p>
     * A stream is considered down when we cannot contact a region server on the
     * peer cluster for more than 55 seconds by default.
     * </p>
     */
    @InterfaceAudience.Private
    public class KafkaInterClusterReplicationEndpoint extends KafkaReplicationEndpoint {
        private static final Logger LOG =
                LoggerFactory.getLogger(KafkaInterClusterReplicationEndpoint.class);
    
        private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
    
        private ClusterConnection conn;
        private Configuration localConf;
        private Configuration conf;
        // How long should we sleep for each retry
        private long sleepForRetries;
        // Maximum number of retries before taking bold actions
        private int maxRetriesMultiplier;
        // Socket timeouts require even bolder actions since we don't want to DDOS
        private int socketTimeoutMultiplier;
        // Amount of time for shutdown to wait for all tasks to complete
        private long maxTerminationWait;
        // Size limit for replication RPCs, in bytes
        private int replicationRpcLimit;
        //Metrics for this source
        private MetricsSource metrics;
        private String replicationClusterId = "";
        private ThreadPoolExecutor exec;
        private int maxThreads;
        private Path baseNamespaceDir;
        private Path hfileArchiveDir;
        private boolean replicationBulkLoadDataEnabled;
        private Abortable abortable;
        private boolean dropOnDeletedTables;
        private boolean isSerial = false;
        //kafka 地址
        private String brokerServers;
        // topic名称
        private String topicName;
        // peerId
        private String peerId;
    
        @Override
        public void init(Context context) throws IOException {
            super.init(context);
            ReplicationPeer replicationPeer = context.getReplicationPeer();
            Configuration configuration = replicationPeer.getConfiguration();
            this.brokerServers = configuration.get("brokerServers");
            this.topicName = configuration.get("topicName");
            peerId = replicationPeer.getId();
    
            this.conf = HBaseConfiguration.create(ctx.getConfiguration());
            this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
            this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
            this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
                    maxRetriesMultiplier);
            // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
            // tasks to terminate when doStop() is called.
            long maxTerminationWaitMultiplier = this.conf.getLong(
                    "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
            this.maxTerminationWait = maxTerminationWaitMultiplier *
                    this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
            // TODO: This connection is replication specific or we should make it particular to
            // replication and make replication specific settings such as compression or codec to use
            // passing Cells.
            this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
            this.sleepForRetries =
                    this.conf.getLong("replication.source.sleepforretries", 1000);
            this.metrics = context.getMetrics();
            // per sink thread pool
            this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
            this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkKafkaThread-%d").build());
            this.abortable = ctx.getAbortable();
            // Set the size limit for replication RPCs to 95% of the max request size.
            // We could do with less slop if we have an accurate estimate of encoded size. Being
            // conservative for now.
            this.replicationRpcLimit = (int) (0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE,
                    RpcServer.DEFAULT_MAX_REQUEST_SIZE));
            this.dropOnDeletedTables =
                    this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
    
            this.replicationBulkLoadDataEnabled =
                    conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false);
            if (this.replicationBulkLoadDataEnabled) {
                replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
            }
            // Construct base namespace directory and hfile archive directory path
            Path rootDir = FSUtils.getRootDir(conf);
            Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
            baseNamespaceDir = new Path(rootDir, baseNSDir);
            hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
            isSerial = context.getPeerConfig().isSerial();
        }
    
    
        /**
         * Do the sleeping logic
         *
         * @param msg             Why we sleep
         * @param sleepMultiplier by how many times the default sleeping time is augmented
         * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
         */
        protected boolean sleepForRetries(String msg, int sleepMultiplier) {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} {}, sleeping {} times {}",
                            logPeerId(), msg, sleepForRetries, sleepMultiplier);
                }
                Thread.sleep(this.sleepForRetries * sleepMultiplier);
            } catch (InterruptedException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
                }
            }
            return sleepMultiplier < maxRetriesMultiplier;
        }
    
        private int getEstimatedEntrySize(Entry e) {
            long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
            return (int) size;
        }
    
        private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
            int numSinks = 10;
            int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
            List<List<Entry>> entryLists =
                    Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
            int[] sizes = new int[n];
            for (Entry e : entries) {
                int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
                int entrySize = getEstimatedEntrySize(e);
                // If this batch has at least one entry and is over sized, move it to the tail of list and
                // initialize the entryLists[index] to be a empty list.
                if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
                    entryLists.add(entryLists.get(index));
                    entryLists.set(index, new ArrayList<>());
                    sizes[index] = 0;
                }
                entryLists.get(index).add(e);
                sizes[index] += entrySize;
            }
            return entryLists;
        }
    
        private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
            Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
            for (Entry e : entries) {
                regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
                        .add(e);
            }
            return new ArrayList<>(regionEntries.values());
        }
    
        /**
         * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
         * concurrently. Note that, for serial replication, we need to make sure that entries from the
         * same region to be replicated serially, so entries from the same region consist of a batch, and
         * we will divide a batch into several batches by replicationRpcLimit in method
         * serialReplicateRegionEntries()
         */
        private List<List<Entry>> createBatches(final List<Entry> entries) {
            if (isSerial) {
                return createSerialBatches(entries);
            } else {
                return createParallelBatches(entries);
            }
        }
    
        private TableName parseTable(String msg) {
            // ... TableNotFoundException: '<table>'/n...
            Pattern p = Pattern.compile("TableNotFoundException: '([\\S]*)'");
            Matcher m = p.matcher(msg);
            if (m.find()) {
                String table = m.group(1);
                try {
                    // double check that table is a valid table name
                    TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
                    return TableName.valueOf(table);
                } catch (IllegalArgumentException ignore) {
                }
            }
            return null;
        }
    
        // Filter a set of batches by TableName
        private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) {
            return oldEntryList
                    .stream().map(entries -> entries.stream()
                            .filter(e -> !e.getKey().getTableName().equals(table)).collect(Collectors.toList()))
                    .collect(Collectors.toList());
        }
    
    
        private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
                                       List<List<Entry>> batches) throws IOException {
            int futures = 0;
            for (int i = 0; i < batches.size(); i++) {
                List<Entry> entries = batches.get(i);
                if (!entries.isEmpty()) {
    
                    LOG.info("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
                            replicateContext.getSize());
    
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
                                replicateContext.getSize());
                    }
                    // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
                    pool.submit(createReplicator(entries, i));
                    futures++;
                }
            }
    
            IOException iox = null;
            long lastWriteTime = 0;
            for (int i = 0; i < futures; i++) {
                try {
                    // wait for all futures, remove successful parts
                    // (only the remaining parts will be retried)
                    Future<Integer> f = pool.take();
                    int index = f.get();
                    List<Entry> batch = batches.get(index);
                    batches.set(index, Collections.emptyList()); // remove successful batch
                    // Find the most recent write time in the batch
                    long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
                    if (writeTime > lastWriteTime) {
                        lastWriteTime = writeTime;
                    }
                } catch (InterruptedException ie) {
                    iox = new IOException(ie);
                } catch (ExecutionException ee) {
                    // cause must be an IOException
                    iox = (IOException) ee.getCause();
                }
            }
            if (iox != null) {
                // if we had any exceptions, try again
                throw iox;
            }
    
    
            LOG.info("----lastWriteTime----" + lastWriteTime);
            return lastWriteTime;
        }
    
        /**
         * Do the shipping logic
         */
        @Override
        public boolean replicate(ReplicateContext replicateContext) {
            CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
            String walGroupId = replicateContext.getWalGroupId();
            int sleepMultiplier = 1;
    
            List<List<Entry>> batches = createBatches(replicateContext.getEntries());
            while (this.isRunning() && !exec.isShutdown()) {
                if (!isPeerEnabled()) {
                    if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
                        sleepMultiplier++;
                    }
                    continue;
                }
    
                try {
                    long lastWriteTime;
    
                    // replicate the batches to sink side.
                    lastWriteTime = parallelReplicate(pool, replicateContext, batches);
    
                    // update metrics
                    if (lastWriteTime > 0) {
                        this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
                    }
                    return true;
                } catch (IOException ioe) {
                    // Didn't ship anything, but must still age the last time we did
                    this.metrics.refreshAgeOfLastShippedOp(walGroupId);
                    if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
                        sleepMultiplier++;
                    }
                }
            }
            return false; // in case we exited before replicating
        }
    
        protected boolean isPeerEnabled() {
            return ctx.getReplicationPeer().isPeerEnabled();
        }
    
        @Override
        protected void doStop() {
            if (this.conn != null) {
                try {
                    this.conn.close();
                    this.conn = null;
                } catch (IOException e) {
                    LOG.warn("{} Failed to close the connection", logPeerId());
                }
            }
            // Allow currently running replication tasks to finish
            exec.shutdown();
            try {
                exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            // Abort if the tasks did not terminate in time
            if (!exec.isTerminated()) {
                String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " +
                        "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " +
                        "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
                abortable.abort(errMsg, new IOException(errMsg));
            }
            notifyStopped();
        }
    
        @VisibleForTesting
        protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
            try {
                int entriesHashCode = System.identityHashCode(entries);
                if (LOG.isTraceEnabled()) {
                    long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
                    LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
                            logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
                }
    
                LOG.info("---entries size----" + entries.size());
                ProduceUtils produceUtils = ProduceUtils.getInstance(brokerServers);
                try {
                    entries.forEach(entry -> {
                        TableName table = entry.getKey().getTableName();
                        WALEdit edit = entry.getEdit();
    
                        long origLogSeqNum = entry.getKey().getOrigLogSeqNum();
                        long sequenceId = entry.getKey().getSequenceId();
    
                        long writeTime = entry.getKey().getWriteTime();
                        String nameSpace = Bytes.toString(table.getNamespace());
                        String tableName = Bytes.toString(table.getName());
                        String rowKey = "";
    
                        LOG.info("------------------");
                        LOG.info("----namespace---" + nameSpace);
                        LOG.info("----tableName---" + tableName);
                        LOG.info("----brokerServers---" + brokerServers);
    
                        ArrayList<Cell> cells = edit.getCells();
                        if (cells != null && cells.size() > 0) {
                            rowKey = Bytes.toString(CellUtil.cloneRow(cells.get(0)));
                        }
    
                        List<String> rowKeys = new ArrayList();
                        cells.forEach(cell -> {
                            String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                            if (!rowKeys.contains(rowkey)) {
                                rowKeys.add(rowkey);
                            }
                        });
    
    
                        HashMap<String, Object> map = new HashMap<>();
                        map.put("origLogSeqNum", origLogSeqNum);
                        map.put("sequenceId", sequenceId);
                        map.put("batchIndex", batchIndex);
                        map.put("nameSpace", nameSpace);
                        map.put("tableName", tableName);
                        map.put("rowKeys", rowKeys);
                        map.put("currentTime", System.currentTimeMillis());
                        map.put("writeTime", writeTime);
                        map.put("peerId", peerId);
                        LOG.info("----rowKey---" + CollUtil.join(rowKeys, ","));
                        String jsonStr = JSONUtil.toJsonStr(map);
                        produceUtils.send(topicName, rowKey, jsonStr);
                        LOG.info("------------------");
                    });
    
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
                    }
                } catch (Exception e) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
                    }
                    throw e;
                } finally {
                    produceUtils.flush();
                    produceUtils.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return batchIndex;
        }
    
        private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex)
                throws IOException {
            int batchSize = 0, index = 0;
            List<Entry> batch = new ArrayList<>();
            for (Entry entry : entries) {
                int entrySize = getEstimatedEntrySize(entry);
                if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
                    replicateEntries(batch, index++);
                    batch.clear();
                    batchSize = 0;
                }
                batch.add(entry);
                batchSize += entrySize;
            }
            if (batchSize > 0) {
                replicateEntries(batch, index);
            }
            return batchIndex;
        }
    
        @VisibleForTesting
        protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) {
            return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
                    : () -> replicateEntries(entries, batchIndex);
        }
    
        private String logPeerId() {
            return "[Source for peer " + this.ctx.getPeerId() + "]:";
        }
    
    }
    
    

    相关文章

      网友评论

        本文标题:原创-Hbase WAL日志数据实时推送到kafka

        本文链接:https://www.haomeiwen.com/subject/odlyahtx.html