美文网首页Scala的艺术程序员
Scala精简版Filebeat日志采集器

Scala精简版Filebeat日志采集器

作者: 大猪大猪 | 来源:发表于2018-12-05 11:48 被阅读58次

    多功能Scala精简Filebeat日志采集版,可高度定制化。

    filebeat-for-scala

    依赖包

    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.7'
    compile group: 'commons-io', name: 'commons-io', version: '2.6'
    compile group: 'com.google.guava', name: 'guava', version: '27.0.1-jre'
    

    入口 App.scala

    import java.io.File
    import java.util.concurrent.{Executors, LinkedBlockingDeque, ScheduledExecutorService, TimeUnit}
    
    import com.google.common.cache.LoadingCache
    import org.apache.commons.io.filefilter.IOFileFilter
    
    object App {
    
      //线程池
      val scheduledThreadPool: ScheduledExecutorService = Executors.newScheduledThreadPool(5)
    
      //filebeat 实例
      var instalnces = Map[String, JobThread]()
    
    
      def main(args: Array[String]): Unit = {
    
        //过滤优先级 includeSuffix => ignoreOlder => includePaths => excludePaths
    
        val dbPath = "/Users/lake/dounine/git/sr-galaxy-serv-loghub/db"
    
        val job = JobUtil.createJob(
          dbPath,
          logPath = "/Users/huanghuanlai/dounine/git/sr-galaxy-serv-loghub/logdir2",
          jobName = "test",
          includeSuffix = "log,txt", //日志后缀
          ignoreOlder = "24h", //忽略多久不更新的文件
          intervalFileStatus = "1s", //监听当前文件变动频率
          intervalScanFile = "30s", //扫描目录中匹配条件的频率
          includePaths = ".*", //匹配路径(正则表达式)
          excludePaths = "", //排除路径(正则表达式)
          handlerFileClose = "24h" //自动关闭多久不活跃文件句柄
        )
    
        instalnces += ("test" -> job)
    
        instalnces.values.foreach {
          scheduledThreadPool.execute
        }
    
        TimeUnit.SECONDS.sleep(10)
    
        scheduledThreadPool.shutdown()
      }
    }
    
    case class Job(
                    jobName: String,
                    workPath: String,
                    logPath: String,
                    intervalFileStatus: String,
                    intervalScanFile: String,
                    dirFilter: IOFileFilter,
                    fileFilter: IOFileFilter,
                    linesBlockQueue: LinkedBlockingDeque[String],
                    handlerFiles: LoadingCache[String, File],
                    seekDB: LoadingCache[String, java.lang.Long] 
                  )
    

    JobThread.scala

    import java.io.{File, RandomAccessFile}
    import java.util.concurrent.TimeUnit
    import java.util.function.Consumer
    
    import org.apache.commons.io.FileUtils
    
    import scala.collection.JavaConversions._
    import scala.collection.mutable.ListBuffer
    
    class JobThread(job: Job) extends Runnable {
    
      initSeekDb(job.jobName)
    
      override def run(): Unit = {
    
        App.scheduledThreadPool.scheduleAtFixedRate(new Runnable {
          //scan match log file
          override def run(): Unit = {
            val dirFile = FileUtils.getFile(job.logPath)
            val logFiles = FileUtils.listFiles(dirFile, job.fileFilter, job.dirFilter)
            logFiles.forEach(new Consumer[File] {
              override def accept(logFile: File): Unit = {
                job.handlerFiles.put(logFile.getAbsolutePath, logFile)
              }
            })
          }
        }, 0, JobUtil.getSecondsByAlias(job.intervalScanFile), TimeUnit.SECONDS)
    
        App.scheduledThreadPool.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = {
            job.handlerFiles.asMap().values().forEach(new Consumer[File] {
              override def accept(t: File): Unit = {
                val fileLength = t.length()
                val fileDbSeek = job.seekDB.get(t.getAbsolutePath)
                val lines = if (fileDbSeek == -1) {
                  //新文件
                  readLinesForSeek(0, t)
                } else if (fileDbSeek < t.length()) {
                  //文件修改
                  readLinesForSeek(fileDbSeek, t)
                } else {
                  //文件未更新
                  Array[String]()
                }
                if (lines.nonEmpty) {
                  lines.foreach(job.linesBlockQueue.add)
                }
                job.seekDB.put(t.getAbsolutePath, fileLength)
              }
            })
            flushCacheSeekToDb()
          }
        }, 1, JobUtil.getSecondsByAlias(job.intervalFileStatus), TimeUnit.SECONDS)
    
        App.scheduledThreadPool.schedule(new Runnable {
          override def run(): Unit = {
            while (!App.scheduledThreadPool.isShutdown) {
              val line: String = job.linesBlockQueue.poll()
              if (null != line) {
                println(s"line = ${line}") //TODO 对文件新增的每一行做业务操作
              }
            }
          }
        }, 1, TimeUnit.MILLISECONDS)
    
      }
    
      def flushCacheSeekToDb(): Unit = {
        val dbFile = FileUtils.getFile(s"${job.workPath}/${job.jobName}/seek.txt")
        val seekLines = FileUtils.readLines(dbFile, "utf-8")
        val tmpList = ListBuffer.empty ++= job.seekDB.asMap().keys
        var matchCount = 0
        val matchLine = seekLines.map {
          line => {
            val lineInfos = line.split("\\:")
            val currentSeek = job.seekDB.get(lineInfos(0))
            tmpList -= lineInfos(0)
            if (!currentSeek.equals(lineInfos(1).toLong)) {
              //seek索引不相同,更新
              matchCount += 1
              s"${lineInfos(0)}:${currentSeek}"
            } else {
              line
            }
          }
        }.toList ++ tmpList.map {
          //插入没有匹配到的文件,新文件
          filePath => {
            matchCount += 1
            filePath + ":" + job.seekDB.get(filePath)
          }
        }
        if (matchCount > 0) {
          FileUtils.writeLines(dbFile, matchLine, false)
        }
      }
    
      def initSeekDb(dbName: String): Unit = {
        val dbFold = FileUtils.getFile(s"${job.workPath}/${job.jobName}")
        if (!dbFold.exists()) {
          dbFold.mkdirs()
        }
        val dbFile = FileUtils.getFile(s"${job.workPath}/${job.jobName}/seek.txt")
        if (!dbFile.exists()) {
          dbFile.createNewFile()
        }
      }
    
      def readLinesForSeek(seek: Long, file: File): Array[String] = {
        val randomFile = new RandomAccessFile(file, "r")
        randomFile.seek(seek)
        val byteList = new Array[Byte]((file.length() - seek).toInt)
        randomFile.readFully(byteList)
        randomFile.close()
        new String(byteList, "utf-8").split("\n")
      }
    
    }
    

    JobUtil.scala

    import java.io.File
    import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
    
    import com.google.common.cache.{CacheBuilder, CacheLoader}
    import org.apache.commons.io.FileUtils
    import org.apache.commons.io.filefilter.{FileFilterUtils, IOFileFilter}
    
    object JobUtil {
    
      def getSecondsByAlias(alias: String): Long = {
        val value = alias.substring(0, alias.length - 1).toLong
        alias.reverse.charAt(0) match {
          case 's' => value
          case 'm' => value * 60
          case 'h' => value * 60 * 60
          case 'd' => value * 60 * 60 * 24 * 30
          case default@_ => default.toLong
        }
      }
    
      def createJob(workPath: String, logPath: String, jobName: String, includeSuffix: String, ignoreOlder: String, intervalFileStatus: String, intervalScanFile: String, includePaths: String, excludePaths: String, handlerFileClose: String): JobThread = {
        val ignoreOlderSeconds = getSecondsByAlias(ignoreOlder)
        val suffixTypeFilters = includeSuffix.split(",").map {
          suff => FileFilterUtils.suffixFileFilter(suff)
        }
        val ignoreOlderFilter = new IOFileFilter {
          override def accept(file: File): Boolean = (System.currentTimeMillis() - file.lastModified()) / 1000 <= ignoreOlderSeconds
    
          override def accept(file: File, s: String): Boolean = true
        }
        val includeExcludeFilter = new IOFileFilter {
          override def accept(file: File): Boolean = {
            val excludePathsMatch = excludePaths.split(",").flatMap {
              excludePath => {
                if (file.getAbsolutePath.matches(excludePath)) {
                  Array(x = true)
                } else {
                  Array[Boolean]()
                }
              }
            }.contains(true)
    
            val includePathsMatch = includePaths.split(",").flatMap {
              excludePath => {
                if (file.getAbsolutePath.matches(excludePath)) {
                  Array(x = true)
                } else {
                  Array[Boolean]()
                }
              }
            }.contains(true)
    
            includePathsMatch || !excludePathsMatch
          }
    
          override def accept(file: File, s: String): Boolean = true
        }
    
        val fileFilter = FileFilterUtils.and(
          FileFilterUtils.or(suffixTypeFilters: _*), //文件后缀匹配
          ignoreOlderFilter, //忽略指定时间段以外修改的日志内容
          includeExcludeFilter
        )
        val handlerFiles = CacheBuilder.newBuilder()
          .expireAfterWrite(getSecondsByAlias(handlerFileClose), TimeUnit.SECONDS)
          .build[String, File](new CacheLoader[String, File] {
          override def load(k: String): File = {
            null
          }
        })
        val seekDB = CacheBuilder.newBuilder()
          .expireAfterWrite(24, TimeUnit.HOURS)
          .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long] {
          override def load(path: String): java.lang.Long = {
            val dbFile = FileUtils.getFile(s"${workPath}/${jobName}/seek.txt")
            val seekLines = FileUtils.readLines(dbFile, "utf-8")
            import scala.collection.JavaConversions._
            val matchLine = seekLines.flatMap {
              line => {
                if (line.split("\\:")(0).equals(path)) {
                  Array(line)
                } else {
                  Array[String]()
                }
              }
            }
            if (matchLine.nonEmpty) {
              matchLine.head.split(":")(1).toLong
            } else {
              -1L
            }
          }
        })
        new JobThread(Job(
          jobName,
          workPath,
          logPath,
          intervalFileStatus,
          intervalScanFile,
          FileFilterUtils.directoryFileFilter(),
          fileFilter,
          new LinkedBlockingDeque[String](),
          handlerFiles,
          seekDB
        ))
      }
    

    相关文章

      网友评论

        本文标题:Scala精简版Filebeat日志采集器

        本文链接:https://www.haomeiwen.com/subject/diuicqtx.html