美文网首页玩转大数据程序员spark
Kafka基础组件和辅助类库简介

Kafka基础组件和辅助类库简介

作者: 扫帚的影子 | 来源:发表于2016-12-26 18:03 被阅读1001次
  • 在正式开始扒代码之前, 先来个开胃菜,简单介绍一下kafka的基础组件和一些代码实现中用到的基础类库

Kafka基础组件概述

  • KafkaServer是整个Kafka的核心组件,里面包含了kafka对外提供功能的所有角色;
  • 一图顶千言:
kafkaserver1.png

Kafka辅助类库简介

KafkaScheduler

  • 所在文件: core/src/main/scala/kafka/utils/KafkaScheduler.scala
  • 功能: 接收需周期性执行的任务和延迟作务的添加, 使用一组thread pool来执行具体的任务;
  • 实现: 封装了 java.util.concurrent.ScheduledThreadPoolExecutor;
  • 接口(原有注释已经很清晰):
/**
   * Initialize this scheduler so it is ready to accept scheduling of tasks
   */
  def startup()
  
  /**
   * Shutdown this scheduler. When this method is complete no more executions of background tasks will occur. 
   * This includes tasks scheduled with a delayed execution.
   */
  def shutdown()
  
  /**
   * Check if the scheduler has been started
   */
  def isStarted: Boolean
  
  /**
   * Schedule a task
   * @param name The name of this task
   * @param delay The amount of time to wait before the first execution
   * @param period The period with which to execute the task. If < 0 the task will execute only once.
   * @param unit The unit for the preceding times.
   */
  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)

ZkUtils

  • 所在文件: core/scr/main/scala/kafka/utils/ZkUtils.scala
  • 功能: 封装了可能用到的对zk上节点的创建,读,写,解析(主要是json)操作;
  • 实现: 使用了一个小众的类库 I0Itec 来操作zk;
  • 涉及到以下zk节点:
  val ConsumersPath = "/consumers"
  val BrokerIdsPath = "/brokers/ids"
  val BrokerTopicsPath = "/brokers/topics"
  val ControllerPath = "/controller"
  val ControllerEpochPath = "/controller_epoch"
  val ReassignPartitionsPath = "/admin/reassign_partitions"
  val DeleteTopicsPath = "/admin/delete_topics"
  val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
  val BrokerSequenceIdPath = "/brokers/seqid"
  val IsrChangeNotificationPath = "/isr_change_notification"
  val EntityConfigPath = "/config"
  val EntityConfigChangesPath = "/config/changes"

Pool

  • 所在文件: core/src/main/scala/kafka/utils/Pool.scala
  • 功能: 简单的并发对象池;
  • 实现: 对ConcurrentHashMap的封裝;
  • getAndMaybePut实现小技巧, 使用了double check技术, 在有值的情况下降低锁的开销;
def getAndMaybePut(key: K) = {
    if (valueFactory.isEmpty)
      throw new KafkaException("Empty value factory in pool.")
    val curr = pool.get(key)
    if (curr == null) {
      createLock synchronized {
        val curr = pool.get(key)
        if (curr == null)
          pool.put(key, valueFactory.get(key))
        pool.get(key)
      }
    }
    else
      curr
  }

Logging

  • 所在文件: core/src/main/scala/kafka/utils/Logging.scala
  • 功能: 定义了trait Logging 供其他类继承,方便写日志;
  • 实现: 对org.apache.log4j.Logger的封装;

FileLock

  • 所在文件: core/src/main/scala/kafka/utils/FileLock.scala
  • 功能: 文件锁, 相当于linux的/usr/bin/lockf;
  • 实现: 使用java.nio.channels.FileLock实现;

ByteBounderBlockingQueue

  • 所在文件: core/src/main/scala/kafkak/utils/ByteBoundedBlockingQueue.scala;
  • 功能: 阻塞队列, 队列满的衡量标准有两条: 队列内元素个数达到了上限, 队列内所有元素的size之各达到了上限;
  • 实现: 使用java.util.concurrent.LinkedBlockingQueue实现, 加上了对队列内已有元素size大小的check;
  • 接口:
def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean
def offer(e: E): Boolean
def put(e: E): Boolean
def poll(timeout: Long, unit: TimeUnit)
def poll()
def take(): E
...

DelayedItem

  • 所在文件: core/src/main/scala/kafaka/utils/DelayedItem.scala
  • 功能: 定义了可以放入到DelayQueue队列的对象;
  • 实现: 实现了Delayed接口;

先写这么多吧,其他的遇到的时候再来分析,不得不感叹java的类库真是丰富啊~~~

# 下一篇我们来开始介绍Kafka的Request和Response

Kafka源码分析-汇总

相关文章

  • Kafka基础组件和辅助类库简介

    在正式开始扒代码之前, 先来个开胃菜,简单介绍一下kafka的基础组件和一些代码实现中用到的基础类库 Kafka基...

  • pykafka/kafka-python

    一、简介python连接kafka的标准库,kafka-python和pykafka。kafka-python使用...

  • Vue3 递归组件

    本文简介 点赞 + 关注 + 收藏 = 学会了 在日常 Vue 项目中,大概率会用组件库辅助开发,所以 递归组件 ...

  • Vue3递归组件

    本文简介 点赞 + 关注 + 收藏 = 学会了 在日常 Vue 项目中,大概率会用组件库辅助开发,所以 递归组件 ...

  • 开源组件Kafka Connect推荐

    Kafka Connect简介 Kafka Connect是Kafka的开源组件Confluent提供的功能,用于...

  • Android-JetPack初次尝试

    简介 Jetpack 是一个丰富的组件库,它的组件库按类别分为 4 类,分别是架构(Architecture)、界...

  • 数据同步工具 debezium

    数据同步工具 debezium 简介 Kafka Connect 配置 安装 debezium 插件 分发 辅助脚...

  • 组件13:远程私有库的升级维护-添加文件

    以后组件添加类怎么操作?比如:扩充基础配置、扩充工具类。 一、库增加内容:往本地库中拖入新增的类库,测试工程安装一...

  • Yarn 下一代 Node 包管理器

    简介 在 JavaScript 社区中,工程师们互相分享成千上万的代码,帮助我们节省大量编写基础组件、类库或框架的...

  • 小程序原生高颜值组件库--ColorUI组件库

    简介 ColorUI是一个Css类的UI组件库!不是一个Js框架。相比于同类小程序组件库,ColorUI更注重于视...

网友评论

    本文标题:Kafka基础组件和辅助类库简介

    本文链接:https://www.haomeiwen.com/subject/nxdrvttx.html