美文网首页
Kafka-connect-hdfs源码解析

Kafka-connect-hdfs源码解析

作者: xhh199090 | 来源:发表于2020-07-10 13:59 被阅读0次

    写数据流程分析

    启动task类方法,HdfsSinkTask.java类中start

    @Override
    public void start(Map<String, String> props) {
      Set<TopicPartition> assignment = context.assignment();
      try {
        HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
        boolean hiveIntegration = connectorConfig.getBoolean(HiveConfig.HIVE_INTEGRATION_CONFIG);
        if (hiveIntegration) {
          StorageSchemaCompatibility compatibility = StorageSchemaCompatibility.getCompatibility(
              connectorConfig.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG)
          );
          if (compatibility == StorageSchemaCompatibility.NONE) {
            throw new ConfigException(
                "Hive Integration requires schema compatibility to be BACKWARD, FORWARD or FULL"
            );
          }
        }
    
        //check that timezone it setup correctly in case of scheduled rotation
        if (connectorConfig.getLong(HdfsSinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG) > 0) {
          String timeZoneString = connectorConfig.getString(PartitionerConfig.TIMEZONE_CONFIG);
          if (timeZoneString.equals("")) {
            throw new ConfigException(PartitionerConfig.TIMEZONE_CONFIG,
                timeZoneString, "Timezone cannot be empty when using scheduled file rotation."
            );
          }
          DateTimeZone.forID(timeZoneString);
        }
    
        int schemaCacheSize = connectorConfig.getInt(
            HdfsSinkConnectorConfig.SCHEMA_CACHE_SIZE_CONFIG
        );
        avroData = new AvroData(schemaCacheSize);
        hdfsWriter = new DataWriter(connectorConfig, context, avroData);   //初始化DataWriter
        recover(assignment);
        if (hiveIntegration) {
          syncWithHive();
        }
      } catch (ConfigException e) {
        throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
      } catch (ConnectException e) {
        // Log at info level to help explain reason, but Connect logs the actual exception at ERROR
        log.info("Couldn't start HdfsSinkConnector:", e);
        log.info("Shutting down HdfsSinkConnector.");
        if (hdfsWriter != null) {
          try {
            try {
              log.debug("Closing data writer due to task start failure.");
              hdfsWriter.close();
            } finally {
              log.debug("Stopping data writer due to task start failure.");
              hdfsWriter.stop();
            }
          } catch (Throwable t) {
            log.debug("Error closing and stopping data writer: {}", t.getMessage(), t);
          }
        }
        // Always throw the original exception that prevent us from starting
        throw e;
      }
    
      log.info("The connector relies on offsets in HDFS filenames, but does commit these offsets to "
          + "Connect to enable monitoring progress of the HDFS connector. Upon startup, the HDFS "
          + "Connector restores offsets from filenames in HDFS. In the absence of files in HDFS, "
          + "the connector will attempt to find offsets for its consumer group in the "
          + "'__consumer_offsets' topic. If offsets are not found, the consumer will "
          + "rely on the reset policy specified in the 'consumer.auto.offset.reset' property to "
          + "start exporting data to HDFS.");
    }
    

    初始化DataWriter,DataWriter.java

    @SuppressWarnings("unchecked")
    public DataWriter(
        HdfsSinkConnectorConfig connectorConfig,
        SinkTaskContext context,
        AvroData avroData,
        Time time
    ) {
      this.time = time;
      try {
        String hadoopHome = connectorConfig.getString(HdfsSinkConnectorConfig.HADOOP_HOME_CONFIG);
        System.setProperty("hadoop.home.dir", hadoopHome);
    
        this.connectorConfig = connectorConfig;
        this.avroData = avroData;
        this.context = context;
    
        String hadoopConfDir = connectorConfig.getString(
            HdfsSinkConnectorConfig.HADOOP_CONF_DIR_CONFIG
        );
        log.info("Hadoop configuration directory {}", hadoopConfDir);
        Configuration conf = connectorConfig.getHadoopConfiguration();
        if (!hadoopConfDir.equals("")) {
          conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
          conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
        }
    
        boolean secureHadoop = connectorConfig.getBoolean(
            HdfsSinkConnectorConfig.HDFS_AUTHENTICATION_KERBEROS_CONFIG
        );
        if (secureHadoop) {
          SecurityUtil.setAuthenticationMethod(
              UserGroupInformation.AuthenticationMethod.KERBEROS,
              conf
          );
          String principalConfig = connectorConfig.getString(
              HdfsSinkConnectorConfig.CONNECT_HDFS_PRINCIPAL_CONFIG
          );
          String keytab = connectorConfig.getString(
              HdfsSinkConnectorConfig.CONNECT_HDFS_KEYTAB_CONFIG
          );
    
          if (principalConfig == null || keytab == null) {
            throw new ConfigException(
                "Hadoop is using Kerberos for authentication, you need to provide both a connect "
                    + "principal and the path to the keytab of the principal.");
          }
    
          conf.set("hadoop.security.authentication", "kerberos");
          conf.set("hadoop.security.authorization", "true");
          String hostname = InetAddress.getLocalHost().getCanonicalHostName();
          String namenodePrincipalConfig = connectorConfig.getString(
              HdfsSinkConnectorConfig.HDFS_NAMENODE_PRINCIPAL_CONFIG
          );
    
          String namenodePrincipal = SecurityUtil.getServerPrincipal(
              namenodePrincipalConfig,
              hostname
          );
          // namenode principal is needed for multi-node hadoop cluster
          if (conf.get("dfs.namenode.kerberos.principal") == null) {
            conf.set("dfs.namenode.kerberos.principal", namenodePrincipal);
          }
          log.info("Hadoop namenode principal: " + conf.get("dfs.namenode.kerberos.principal"));
    
          UserGroupInformation.setConfiguration(conf);
          // replace the _HOST specified in the principal config to the actual host
          String principal = SecurityUtil.getServerPrincipal(principalConfig, hostname);
          UserGroupInformation.loginUserFromKeytab(principal, keytab);
          final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
          log.info("Login as: " + ugi.getUserName());
    
          final long renewPeriod = connectorConfig.getLong(
              HdfsSinkConnectorConfig.KERBEROS_TICKET_RENEW_PERIOD_MS_CONFIG
          );
    
          isRunning = true;
          ticketRenewThread = new Thread(new Runnable() {
            @Override
            public void run() {
              synchronized (DataWriter.this) {
                while (isRunning) {
                  try {
                    DataWriter.this.wait(renewPeriod);
                    if (isRunning) {
                      ugi.reloginFromKeytab();
                    }
                  } catch (IOException e) {
                    // We ignore this exception during relogin as each successful relogin gives
                    // additional 24 hours of authentication in the default config. In normal
                    // situations, the probability of failing relogin 24 times is low and if
                    // that happens, the task will fail eventually.
                    log.error("Error renewing the ticket", e);
                  } catch (InterruptedException e) {
                    // ignored
                  }
                }
              }
            }
          });
          log.info("Starting the Kerberos ticket renew thread with period {}ms.", renewPeriod);
          ticketRenewThread.start();
        }
    
        url = connectorConfig.getUrl();
        topicsDir = connectorConfig.getString(StorageCommonConfig.TOPICS_DIR_CONFIG);
    
        @SuppressWarnings("unchecked")
        Class<? extends HdfsStorage> storageClass = (Class<? extends HdfsStorage>) connectorConfig
            .getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG);
        storage = io.confluent.connect.storage.StorageFactory.createStorage(
            storageClass,
            HdfsSinkConnectorConfig.class,
            connectorConfig,
            url
        );
    
        createDir(topicsDir);
        createDir(topicsDir + HdfsSinkConnectorConstants.TEMPFILE_DIRECTORY);
        String logsDir = connectorConfig.getString(HdfsSinkConnectorConfig.LOGS_DIR_CONFIG);
        createDir(logsDir);
    
        // Try to instantiate as a new-style storage-common type class, then fall back to old-style
        // with no parameters
        try {
          Class<io.confluent.connect.storage.format.Format> formatClass =
              (Class<io.confluent.connect.storage.format.Format>)
                  connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
          newFormat = formatClass.getConstructor(HdfsStorage.class).newInstance(storage);
          newWriterProvider = newFormat.getRecordWriterProvider();
          schemaFileReader = newFormat.getSchemaFileReader();
        } catch (NoSuchMethodException e) {
          Class<Format> formatClass =
              (Class<Format>) connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
          format = formatClass.getConstructor().newInstance();
          writerProvider = format.getRecordWriterProvider();
          final io.confluent.connect.hdfs.SchemaFileReader oldReader
              = format.getSchemaFileReader(avroData);
          schemaFileReader = new SchemaFileReader<HdfsSinkConnectorConfig, Path>() {
            @Override
            public Schema getSchema(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, Path path) {
              try {
                return oldReader.getSchema(hdfsSinkConnectorConfig.getHadoopConfiguration(), path);
              } catch (IOException e) {
                throw new ConnectException("Failed to get schema", e);
              }
            }
    
            @Override
            public Iterator<Object> iterator() {
              throw new UnsupportedOperationException();
            }
    
            @Override
            public boolean hasNext() {
              throw new UnsupportedOperationException();
            }
    
            @Override
            public Object next() {
              throw new UnsupportedOperationException();
            }
    
            @Override
            public void remove() {
              throw new UnsupportedOperationException();
            }
    
            @Override
            public void close() throws IOException {
    
            }
          };
        }
    
        partitioner = newPartitioner(connectorConfig);
    
        assignment = new HashSet<>(context.assignment());
    
        hiveIntegration = connectorConfig.getBoolean(HiveConfig.HIVE_INTEGRATION_CONFIG);
        if (hiveIntegration) {
          hiveDatabase = connectorConfig.getString(HiveConfig.HIVE_DATABASE_CONFIG);
          hiveMetaStore = new HiveMetaStore(conf, connectorConfig);
          if (format != null) {
            hive = format.getHiveUtil(connectorConfig, hiveMetaStore);
          } else if (newFormat != null) {
            final io.confluent.connect.storage.hive.HiveUtil newHiveUtil
                = ((HiveFactory) newFormat.getHiveFactory())
                .createHiveUtil(connectorConfig, hiveMetaStore);
            hive = new HiveUtil(connectorConfig, hiveMetaStore) {
              @Override
              public void createTable(
                  String database, String tableName, Schema schema,
                  Partitioner partitioner
              ) {
                newHiveUtil.createTable(database, tableName, schema, partitioner);
              }
    
              @Override
              public void alterSchema(String database, String tableName, Schema schema) {
                newHiveUtil.alterSchema(database, tableName, schema);
              }
            };
          } else {
            throw new ConnectException("One of old or new format classes must be provided");
          }
          executorService = Executors.newSingleThreadExecutor();
          hiveUpdateFutures = new LinkedList<>();
        }
    
        topicPartitionWriters = new HashMap<>();
        for (TopicPartition tp : assignment) {
          TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
              tp,
              storage,
              writerProvider,
              newWriterProvider,
              partitioner,
              connectorConfig,
              context,
              avroData,
              hiveMetaStore,
              hive,
              schemaFileReader,
              executorService,
              hiveUpdateFutures,
              time
          );
          topicPartitionWriters.put(tp, topicPartitionWriter);
        }
      } catch (ClassNotFoundException
              | IllegalAccessException
              | InstantiationException
              | InvocationTargetException
              | NoSuchMethodException e
      ) {
        throw new ConnectException("Reflection exception: ", e);
      } catch (IOException e) {
        throw new ConnectException(e);
      }
    }
    

    初始化DataWriter后,下一步执行方法recover方法

    private void recover(Set<TopicPartition> assignment) {
      for (TopicPartition tp : assignment) {
        hdfsWriter.recover(tp);
      }
    }
    

    hdfsWriter.recover(tp)方法跳转到DataWriter类中执行
    DataWriter.java

    public void recover(TopicPartition tp) {
      topicPartitionWriters.get(tp).recover();
    }
    

    TopicPartitionWriter.java中执行recover

    @SuppressWarnings("fallthrough")
    public boolean recover() {
      try {
        switch (state) {
          case RECOVERY_STARTED:
            log.info("Started recovery for topic partition {}", tp);
            pause();
            nextState();
          case RECOVERY_PARTITION_PAUSED:
            log.debug("Start recovery state: Apply WAL for topic partition {}", tp);
            applyWAL();
            nextState();
          case WAL_APPLIED:
            log.debug("Start recovery state: Truncate WAL for topic partition {}", tp);
            truncateWAL();
            nextState();
          case WAL_TRUNCATED:
            log.debug("Start recovery state: Reset Offsets for topic partition {}", tp);
            resetOffsets();
            nextState();
          case OFFSET_RESET:
            log.debug("Start recovery state: Resume for topic partition {}", tp);
            resume();
            nextState();
            log.info("Finished recovery for topic partition {}", tp);
            break;
          default:
            log.error(
                "{} is not a valid state to perform recovery for topic partition {}.",
                state,
                tp
            );
        }
      } catch (ConnectException e) {
        log.error("Recovery failed at state {}", state, e);
        setRetryTimeout(timeoutMs);
        return false;
      }
      return true;
    }
    

    task线程启动后执行执行workersinktask中的iteration()方法
    第二步执行iteration()方法;

    protected void iteration() {
        final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
    
        try {
            long now = time.milliseconds();
    
            // Maybe commit
            if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
                commitOffsets(now, false);
                nextCommit = now + offsetCommitIntervalMs;
                context.clearCommitRequest();
            }
    
            final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
    
            // Check for timed out commits
            if (committing && now >= commitTimeoutMs) {
                log.warn("{} Commit of offsets timed out", this);
                commitFailures++;
                committing = false;
            }
    
            // And process messages
            long timeoutMs = Math.max(nextCommit - now, 0);
            poll(timeoutMs);
        } catch (WakeupException we) {
            log.trace("{} Consumer woken up", this);
    
            if (isStopping())
                return;
    
            if (shouldPause()) {
                pauseAll();
                onPause();
                context.requestCommit();
            } else if (!pausedForRedelivery) {
                resumeAll();
                onResume();
            }
        }
    }
    
    /**
     * Poll for new messages with the given timeout. Should only be invoked by the worker thread.
     */
    protected void poll(long timeoutMs) {
        rewind();
        long retryTimeout = context.timeout();
        if (retryTimeout > 0) {
            timeoutMs = Math.min(timeoutMs, retryTimeout);
            context.timeout(-1L);
        }
    
        log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs);
        ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
        assert messageBatch.isEmpty() || msgs.isEmpty();
        log.trace("{} Polling returned {} messages", this, msgs.count());
    
        convertMessages(msgs);
        deliverMessages();
    }
    
    private void deliverMessages() {
        // Finally, deliver this batch to the sink
        try {
            // Since we reuse the messageBatch buffer, ensure we give the task its own copy
            log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
            long start = time.milliseconds();
            task.put(new ArrayList<>(messageBatch));
            recordBatch(messageBatch.size());
            sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
            currentOffsets.putAll(origOffsets);
            messageBatch.clear();
            // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
            // the task had not explicitly paused
            if (pausedForRedelivery) {
                if (!shouldPause())
                    resumeAll();
                pausedForRedelivery = false;
            }
        } catch (RetriableException e) {
            log.error("{} RetriableException from SinkTask:", this, e);
            // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
            // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
            pausedForRedelivery = true;
            pauseAll();
            // Let this exit normally, the batch will be reprocessed on the next loop.
        } catch (Throwable t) {
            log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not "
                    + "recover until manually restarted. Error: {}", this, t.getMessage(), t);
            throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", t);
        }
    }
    

    task.put(new ArrayList<>(messageBatch));具体实现方法为hdfssinktask中的put方法,实现如下

    @Override
    public void put(Collection<SinkRecord> records) throws ConnectException {
      if (log.isDebugEnabled()) {
        log.debug("Read {} records from Kafka", records.size());
      }
      try {
        hdfsWriter.write(records);
      } catch (ConnectException e) {
        throw new ConnectException(e);
      }
    }
    

    相关文章

      网友评论

          本文标题:Kafka-connect-hdfs源码解析

          本文链接:https://www.haomeiwen.com/subject/rhqmcktx.html