美文网首页
12. elk+kafka+filebeat

12. elk+kafka+filebeat

作者: _zt_d58b | 来源:发表于2020-04-17 17:15 被阅读0次

    一、elasticsearch

    二、kibana

    三、kafka

    persistent-volume.yaml

    kind: PersistentVolume

    apiVersion: v1

    metadata:

      name: k8s-pv-zk1

      namespace: viomi-kafka

      annotations:

        volume.beta.kubernetes.io/storage-class: "anything"

      labels:

        type: local

    spec:

      capacity:

        storage: 3Gi

      accessModes:

        - ReadWriteOnce

      hostPath:

        path: "/data/zookeeper"

      persistentVolumeReclaimPolicy: Recycle

    ---

    kind: PersistentVolume

    apiVersion: v1

    metadata:

      name: k8s-pv-zk2

      namespace: viomi-kafka

      annotations:

        volume.beta.kubernetes.io/storage-class: "anything"

      labels:

        type: local

    spec:

      capacity:

        storage: 3Gi

      accessModes:

        - ReadWriteOnce

      hostPath:

        path: "/data/zookeeper"

      persistentVolumeReclaimPolicy: Recycle

    ---

    kind: PersistentVolume

    apiVersion: v1

    metadata:

      name: k8s-pv-zk3

      namespace: viomi-kafka

      annotations:

        volume.beta.kubernetes.io/storage-class: "anything"

      labels:

        type: local

    spec:

      capacity:

        storage: 3Gi

      accessModes:

        - ReadWriteOnce

      hostPath:

        path: "/data/zookeeper"

      persistentVolumeReclaimPolicy: Recycle

    zookeeper.yaml

    apiVersion: v1

    kind: Service

    metadata:

      name: zk-hs

      namespace: viomi-kafka

      labels:

        app: zk

    spec:

      ports:

      - port: 2888

        name: server

      - port: 3888

        name: leader-election

      clusterIP: None

      selector:

        app: zk

    ---

    apiVersion: v1

    kind: Service

    metadata:

      name: zk-cs

      namespace: viomi-kafka

      labels:

        app: zk

    spec:

      ports:

      - port: 2181

        name: client

      selector:

        app: zk

    ---

    apiVersion: policy/v1beta1

    kind: PodDisruptionBudget

    metadata:

      name: zk-pdb

      namespace: viomi-kafka

    spec:

      selector:

        matchLabels:

          app: zk

      maxUnavailable: 1

    ---

    apiVersion: apps/v1

    kind: StatefulSet

    metadata:

      name: zk

      namespace: viomi-kafka

    spec:

      selector:

        matchLabels:

          app: zk

      serviceName: zk-hs

      replicas: 3

      updateStrategy:

        type: RollingUpdate

      podManagementPolicy: Parallel

      template:

        metadata:

          labels:

            app: zk

        spec:

          affinity:

            podAntiAffinity:

              requiredDuringSchedulingIgnoredDuringExecution:

                - labelSelector:

                    matchExpressions:

                      - key: "app"

                        operator: In

                        values:

                        - zk

                  topologyKey: "kubernetes.io/hostname"

          containers:

          - name: kubernetes-zookeeper

            imagePullPolicy: IfNotPresent

            image: "hub.kce.ksyun.com/yunmi-infra/viomi/zookeeper:3.4.10"

            resources:

              requests:

                memory: "100Mi"

                cpu: "0.1"

            ports:

            - containerPort: 2181

              name: client

            - containerPort: 2888

              name: server

            - containerPort: 3888

              name: leader-election

            command:

            - sh

            - -c

            - "start-zookeeper \

              --servers=3 \

              --data_dir=/var/lib/zookeeper/data \

              --data_log_dir=/var/lib/zookeeper/data/log \

              --conf_dir=/opt/zookeeper/conf \

              --client_port=2181 \

              --election_port=3888 \

              --server_port=2888 \

              --tick_time=2000 \

              --init_limit=10 \

              --sync_limit=5 \

              --heap=512M \

              --max_client_cnxns=60 \

              --snap_retain_count=3 \

              --purge_interval=12 \

              --max_session_timeout=40000 \

              --min_session_timeout=4000 \

              --log_level=INFO"

            readinessProbe:

              exec:

                command:

                - sh

                - -c

                - "zookeeper-ready 2181"

              initialDelaySeconds: 10

              timeoutSeconds: 5

            livenessProbe:

              exec:

                command:

                - sh

                - -c

                - "zookeeper-ready 2181"

              initialDelaySeconds: 10

              timeoutSeconds: 5

            volumeMounts:

            - name: datadir

              mountPath: /data/zookeeper

          securityContext:

            runAsUser: 1000

            fsGroup: 1000

      volumeClaimTemplates:

      - metadata:

          name: datadir

          annotations:

            volume.beta.kubernetes.io/storage-class: "anything"

        spec:

          accessModes: [ "ReadWriteOnce" ]

          resources:

            requests:

              storage: 3Gi

    apiVersion: apps/v1

    kind: StatefulSet

    metadata:

      name: kafka

      namespace: viomi-kafka

    spec:

      selector:

        matchLabels:

            app: kafka

      serviceName: kafka-svc

      replicas: 3

      template:

        metadata:

          labels:

            app: kafka

        spec:

          nodeSelector:

              travis.io/schedule-only: "kafka"

          tolerations:

          - key: "travis.io/schedule-only"

            operator: "Equal"

            value: "kafka"

            effect: "NoSchedule"

          - key: "travis.io/schedule-only"

            operator: "Equal"

            value: "kafka"

            effect: "NoExecute"

            tolerationSeconds: 3600

          - key: "travis.io/schedule-only"

            operator: "Equal"

            value: "kafka"

            effect: "PreferNoSchedule"

          affinity:

            podAntiAffinity:

              requiredDuringSchedulingIgnoredDuringExecution:

                - labelSelector:

                    matchExpressions:

                      - key: "app"

                        operator: In

                        values:

                        - kafka

                  topologyKey: "kubernetes.io/hostname"

            podAffinity:

              preferredDuringSchedulingIgnoredDuringExecution:

                - weight: 1

                  podAffinityTerm:

                    labelSelector:

                        matchExpressions:

                          - key: "app"

                            operator: In

                            values:

                            - zk

                    topologyKey: "kubernetes.io/hostname"

          terminationGracePeriodSeconds: 300

          containers:

          - name: k8s-kafka

            imagePullPolicy: Always

            #        image: hub.kce.ksyun.com/yunmi-infra/viomi/kafka:latest

            image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi-kafka:2.11-1.1.1

            resources:

              requests:

                memory: "600Mi"

                cpu: 500m

            ports:

            - containerPort: 9092

              name: server

            command:

            - sh

            - -c

            - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \

              --override listeners=PLAINTEXT://:9092 \

              --override zookeeper.connect=zk-0.zk-hs.viomi-kafka.svc.cluster.local:2181,zk-1.zk-hs.viomi-kafka.svc.cluster.local:2181,zk-2.zk-hs.viomi-kafka.svc.cluster.local:2181 \

              --override log.dir=/var/lib/kafka \

              --override auto.create.topics.enable=true \

              --override auto.leader.rebalance.enable=true \

              --override background.threads=10 \

              --override compression.type=producer \

              --override delete.topic.enable=false \

              --override leader.imbalance.check.interval.seconds=300 \

              --override leader.imbalance.per.broker.percentage=10 \

              --override log.flush.interval.messages=9223372036854775807 \

              --override log.flush.offset.checkpoint.interval.ms=60000 \

              --override log.flush.scheduler.interval.ms=9223372036854775807 \

              --override log.retention.bytes=-1 \

              --override log.retention.hours=12 \

              --override log.roll.hours=12 \

              --override log.roll.jitter.hours=0 \

              --override log.segment.bytes=1073741824 \

              --override log.segment.delete.delay.ms=60000 \

              --override message.max.bytes=1000012 \

              --override min.insync.replicas=1 \

              --override num.io.threads=8 \

              --override num.network.threads=3 \

              --override num.recovery.threads.per.data.dir=1 \

              --override num.replica.fetchers=1 \

              --override offset.metadata.max.bytes=4096 \

              --override offsets.commit.required.acks=-1 \

              --override offsets.commit.timeout.ms=5000 \

              --override offsets.load.buffer.size=5242880 \

              --override offsets.retention.check.interval.ms=600000 \

              --override offsets.retention.minutes=1440 \

              --override offsets.topic.compression.codec=0 \

              --override offsets.topic.num.partitions=50 \

              --override offsets.topic.replication.factor=3 \

              --override offsets.topic.segment.bytes=104857600 \

              --override queued.max.requests=500 \

              --override quota.consumer.default=9223372036854775807 \

              --override quota.producer.default=9223372036854775807 \

              --override replica.fetch.min.bytes=1 \

              --override replica.fetch.wait.max.ms=500 \

              --override replica.high.watermark.checkpoint.interval.ms=5000 \

              --override replica.lag.time.max.ms=10000 \

              --override replica.socket.receive.buffer.bytes=65536 \

              --override replica.socket.timeout.ms=30000 \

              --override request.timeout.ms=30000 \

              --override socket.receive.buffer.bytes=102400 \

              --override socket.request.max.bytes=104857600 \

              --override socket.send.buffer.bytes=102400 \

              --override unclean.leader.election.enable=true \

              --override zookeeper.session.timeout.ms=6000 \

              --override zookeeper.set.acl=false \

              --override broker.id.generation.enable=true \

              --override connections.max.idle.ms=600000 \

              --override controlled.shutdown.enable=true \

              --override controlled.shutdown.max.retries=3 \

              --override controlled.shutdown.retry.backoff.ms=5000 \

              --override controller.socket.timeout.ms=30000 \

              --override default.replication.factor=1 \

              --override fetch.purgatory.purge.interval.requests=1000 \

              --override group.max.session.timeout.ms=300000 \

              --override group.min.session.timeout.ms=6000 \

              --override inter.broker.protocol.version=0.11.0.3 \

              --override log.cleaner.backoff.ms=15000 \

              --override log.cleaner.dedupe.buffer.size=134217728 \

              --override log.cleaner.delete.retention.ms=86400000 \

              --override log.cleaner.enable=true \

              --override log.cleaner.io.buffer.load.factor=0.9 \

              --override log.cleaner.io.buffer.size=524288 \

              --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \

              --override log.cleaner.min.cleanable.ratio=0.5 \

              --override log.cleaner.min.compaction.lag.ms=0 \

              --override log.cleaner.threads=1 \

              --override log.cleanup.policy=delete \

              --override log.index.interval.bytes=4096 \

              --override log.index.size.max.bytes=10485760 \

              --override log.message.timestamp.difference.max.ms=9223372036854775807 \

              --override log.message.timestamp.type=CreateTime \

              --override log.preallocate=false \

              --override log.retention.check.interval.ms=300000 \

              --override max.connections.per.ip=2147483647 \

              --override num.partitions=4 \

              --override producer.purgatory.purge.interval.requests=1000 \

              --override replica.fetch.backoff.ms=1000 \

              --override replica.fetch.max.bytes=1048576 \

              --override replica.fetch.response.max.bytes=10485760 \

              --override reserved.broker.max.id=1000 "

            env:

            - name: KAFKA_HEAP_OPTS

              value : "-Xmx512M -Xms512M"

            - name: KAFKA_OPTS

              value: "-Dlogging.level=INFO"

            readinessProbe:

              tcpSocket:

                port: 9092

              timeoutSeconds: 1

              initialDelaySeconds: 5

          securityContext:

            runAsUser: 0

            fsGroup: 1000

    apiVersion: v1

    kind: Service

    metadata:

      labels:

        app: zookeeper-cluster-service-1

      name: zookeeper-cluster1

      namespace: viomi-kafka

    spec:

      ports:

      - name: client

        port: 2181

        protocol: TCP

      - name: follower

        port: 2888

        protocol: TCP

      - name: leader

        port: 3888

        protocol: TCP

      selector:

        app: zookeeper-cluster-service-1

    ---

    apiVersion: v1

    kind: Service

    metadata:

      labels:

        app: zookeeper-cluster-service-2

      name: zookeeper-cluster2

      namespace: viomi-kafka

    spec:

      ports:

      - name: client

        port: 2181

        protocol: TCP

      - name: follower

        port: 2888

        protocol: TCP

      - name: leader

        port: 3888

        protocol: TCP

      selector:

        app: zookeeper-cluster-service-2

    ---

    apiVersion: v1

    kind: Service

    metadata:

      labels:

        app: zookeeper-cluster-service-3

      name: zookeeper-cluster3

      namespace: viomi-kafka

    spec:

      ports:

      - name: client

        port: 2181

        protocol: TCP

      - name: follower

        port: 2888

        protocol: TCP

      - name: leader

        port: 3888

        protocol: TCP

      selector:

        app: zookeeper-cluster-service-3

    kafka_manager.yaml

    apiVersion: apps/v1

    kind: Deployment

    metadata:

      name: kafka-manager

      namespace: viomi-kafka

    spec:

      replicas: 1

      selector:

        matchLabels:

          app: kafka-manager

      template:

        metadata:

          labels:

            app: kafka-manager

        spec:

          containers:

            - image: zenko/kafka-manager

              #basicAuth:

              #  enabled: false

              name: kafka-manager

              ports:

              - name: kafka-manager

                containerPort: 9000

                protocol: TCP

              env:

              - name: ZK_HOSTS

                value: "zoo1:2181,zoo2:2181,zoo3:2181"

    apiVersion: v1

    kind: Service

    metadata:

            #  annotations:

            #service.beta.kubernetes.io/ksc-loadbalancer-id: 63405a34-2875-4b4b-b169-ae37b285100e

      labels:

        app: kafka-manager

      name: kafka-manager-server

      namespace: viomi-kafka

    spec:

      ports:

      - name: "9000"

      #  nodePort: 32662

        port: 9000

        protocol: TCP

        targetPort: 9000

      selector:

        app: kafka-manager

      type: ClusterIP

    apiVersion: extensions/v1beta1

    kind: Ingress

    metadata:

      annotations:

        kubernetes.io/ingress.class: traefik

      name: kafka-manager-ingress

      namespace: viomi-kafka

    spec:

      rules:

      - host: kafka-manager.viomi.com.cn

        http:

          paths:

          - backend:

              serviceName: kafka-manager-server

              servicePort: 9000

    四、filebeat

    apiVersion: v1

    kind: ConfigMap

    metadata:

      name: filebeat-config

      namespace: kube-system

      labels:

        k8s-app: filebeat

    data:

      filebeat.yml: |-

        processors:

        - add_cloud_metadata: ~

        - add_docker_metadata: ~

        logging.level: error

        filebeat.inputs:

        - type: container

          scan_frequency: 1s

          backoff_factor: 2

          backoff: 1s

          tail_files: true

          max_backoff: 30s

          spool_size: 2048

          paths:

            - /data/docker/containers/*/*-json.log

            - /data/docker/containers/*/*-json.log-*

          include_lines: ['INFO','ERROR','WARN','DEBUG']

          #multiline.pattern: '^\[[0-9]{4}-[0-9]{2}|^\['

          multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}|^\[0-9]{4}\/|^[[:space:]]+|^Caused by:'

          multiline.negate: false

          multiline.match: after

          multiline.max_lines: 150

          processors:

            - add_kubernetes_metadata:

                host: ${NODE_NAME}

                matchers:

                - logs_path:

                    logs_path: "/data/docker/containers/"

        output.kafka:

          version: "1.1.1"

          enabled: true

          hosts: ["10.62.1.135:9092"]

          topic: "elk_kafka"

          topics:

            - topic: "yunmi-infra"

              when.contains:

                kubernetes.namespace: "yunmi-infra"

            - topic: "yunmi-trade"

              when.contains:

                kubernetes.namespace: "yunmi-trade"

            - topic: "yunmi-business"

              when.contains:

                kubernetes.namespace: "yunmi-business"

            - topic: "yunmi-front"

              when.contains:

                kubernetes.namespace: "yunmi-front"

            - topic: "yunmi-vwater"

              when.contains:

                kubernetes.namespace: "yunmi-vwater"

            - topic: "yunmi-bigdata"

              when.contains:

                kubernetes.namespace: "yunmi-bigdata"

            - topic: "kube-system"

              when.contains:

                kubernetes.namespace: "kube-system"

          partition.round_robin:

            reachable_only: true

          required_acks: 0

          compression: gzip

          compression_level: 1

          max_message_bytes: 10000000

    apiVersion: apps/v1

    kind: DaemonSet

    metadata:

      labels:

        app: filebeat

        id: filebeat

      name: filebeat

      namespace: kube-system

    spec:

      revisionHistoryLimit: 10

      selector:

        matchLabels:

          app: filebeat

          id: filebeat

      template:

        metadata:

          annotations:

            cattle.io/timestamp: "2019-11-28T07:38:18Z"

          creationTimestamp: null

          labels:

            app: filebeat

            id: filebeat

          name: filebeat

        spec:

          hostAliases:

          - ip: "10.62.1.135"

            hostnames:

            - "kafka-0.kafka-svc.viomi-kafka.svc.cluster.local"

            - "kafka-1.kafka-svc.viomi-kafka.svc.cluster.local"

            - "kafka-2.kafka-svc.viomi-kafka.svc.cluster.local"

            - "kafka-2.kafka-svc.viomi-kafka.svc.cluster.local"

          serviceAccountName: filebeat

          containers:

          - image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi-filebeat:latest

            imagePullPolicy: Always

            name: filebeat

            args: [

              "-c", "/etc/filebeat.yml",

              "-e",

            ]

            env:

            - name: NODE_NAME

              valueFrom:

                fieldRef:

                  fieldPath: spec.nodeName

            resources:

              limits:

                cpu: "3"

                memory: 3000Mi

              requests:

                cpu: 300m

                memory: 300Mi

            securityContext:

              privileged: true

              procMount: Default

              runAsUser: 0

            terminationMessagePath: /dev/termination-log

            terminationMessagePolicy: File

            volumeMounts:

            - mountPath: /data/docker/containers

              name: containers

            - name: config

              mountPath: /etc/filebeat.yml

              readOnly: true

              subPath: filebeat.yml

          dnsPolicy: ClusterFirst

          restartPolicy: Always

          schedulerName: default-scheduler

          securityContext: {}

          terminationGracePeriodSeconds: 30

          volumes:

          - hostPath:

              path: /data/docker/containers

              type: ""

            name: containers

          - name: config

            configMap:

              defaultMode: 0600

              name: filebeat-config

      updateStrategy:

        type: OnDelete

    五、logstash

    logstash.yaml        

    apiVersion: apps/v1

    kind: StatefulSet

    metadata:

      labels:

        app: viomi-logstash

      name: viomi-logstash

      namespace: kube-system

    spec:

      podManagementPolicy: OrderedReady

      replicas: 9

      selector:

        matchLabels:

          app: viomi-logstash

          id: viomi-logstash

      serviceName: elasticsearch-logging

      template:

        metadata:

          creationTimestamp: null

          labels:

            app: viomi-logstash

            id: viomi-logstash

        spec:

          containers:

          - args:

            - /usr/share/logstash/bin/logstash -f /etc/logstash/conf/logstash.conf

            command:

            - /bin/sh

            - -c

            image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi_logstash_latest:5.6.4

            imagePullPolicy: IfNotPresent

            name: viomi-logstash

            resources:

              limits:

                memory: 2000Mi

              requests:

                memory: 512Mi

            volumeMounts:

            - mountPath: /etc/logstash/conf/

              name: conf

          dnsPolicy: ClusterFirst

          initContainers:

          - command:

            - bash

            - -c

            - |

              hostname=`echo $HOSTNAME | awk -F '-' '{print $NF}'`

              if [[ $hostname -eq 0 ]]; then

                cp /mnt/conf.d/logstash_infra.conf /etc/logstash/conf/logstash.conf && cat /etc/logstash/conf/logstash.conf

              fi

              if [[ $hostname -eq 1 ]]; then

                cp /mnt/conf.d/logstash_trade.conf /etc/logstash/conf/logstash.conf

              fi

              if [[ $hostname -eq 2 ]]; then

                cp /mnt/conf.d/logstash_business.conf /etc/logstash/conf/logstash.conf

              fi

              if [[ $hostname -eq 3 ]]; then

                cp /mnt/conf.d/logstash_other.conf /etc/logstash/conf/logstash.conf

              fi

              if [[ $hostname -eq 4 ]]; then

                cp /mnt/conf.d/logstash_test.conf /etc/logstash/conf/logstash.conf

              fi

              if [[ $hostname -eq 5 ]]; then

                cp /mnt/conf.d/logstash_infra.conf /etc/logstash/conf/logstash.conf

              fi

              if [[ $hostname -eq 6 ]]; then

                cp /mnt/conf.d/logstash_trade.conf /etc/logstash/conf/logstash.conf

              fi

              if [[ $hostname -eq 7 ]]; then

                cp /mnt/conf.d/logstash_business.conf /etc/logstash/conf/logstash.conf

              fi

              if [[ $hostname -eq 8 ]]; then

                cp /mnt/conf.d/logstash_test.conf /etc/logstash/conf/logstash.conf

              fi

            image: hub.kce.ksyun.com/yunmi-infra/viomi/rocketmq:broker-4.5.0

            imagePullPolicy: IfNotPresent

            name: init-broker

            resources:

              limits:

                memory: 1000Mi

            terminationMessagePath: /dev/termination-log

            terminationMessagePolicy: File

            volumeMounts:

            - mountPath: /mnt/conf.d/

              name: viomi-logstash-config

            - mountPath: /etc/logstash/conf/

              name: conf

          restartPolicy: Always

          schedulerName: default-scheduler

          securityContext:

            fsGroup: 1000

            runAsUser: 0

          terminationGracePeriodSeconds: 30

          volumes:

          - configMap:

              defaultMode: 420

              name: viomi-logstash-config

            name: viomi-logstash-config

          - emptyDir: {}

            name: conf

    logstash_conf.yaml

    apiVersion: v1

    kind: ConfigMap

    metadata:

      name: viomi-logstash-config

      namespace: kube-system

      labels:

        k8s-app: logstash-config

    data:

      logstash_infra.conf: |-

        input {

          kafka {

            bootstrap_servers => "10.62.1.135:9092"

            auto_offset_reset => "latest"

            #consumer_threads => 6

            decorate_events => true

            partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

            group_id  => "logstash-infra"

            topics => ["yunmi-infra"]

            codec => json {

              charset => "UTF-8"

            }

          }

        }

        filter {

          json {

            source => "message"

          }

          mutate {

            remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

          }

        }

        output {

          elasticsearch {

            hosts => ["10.62.1.130:9200"]

            index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

          }

        }

      logstash_business.conf: |-

        input {

          kafka {

            bootstrap_servers => "10.62.1.135:9092"

            auto_offset_reset => "latest"

            #consumer_threads => 6

            decorate_events => true

            partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

            group_id  => "logstash-business"

            topics => ["yunmi-business"]

            codec => json {

              charset => "UTF-8"

            }

          }

        }

        filter {

          json {

            source => "message"

          }

          mutate {

            remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

          }

        }

        output {

          elasticsearch {

            hosts => ["10.62.1.130:9200"]

            index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

          }

        }

      logstash_trade.conf: |-

        input {

          kafka {

            bootstrap_servers => "10.62.1.135:9092"

            auto_offset_reset => "latest"

            #consumer_threads => 6

            decorate_events => true

            partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

            group_id  => "logstash-trade"

            topics => ["yunmi-trade"]

            codec => json {

              charset => "UTF-8"

            }

          }

        }

        filter {

          json {

            source => "message"

          }

          mutate {

            remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

          }

        }

        output {

          elasticsearch {

            hosts => ["10.62.1.130:9200"]

            index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

          }

        }

      logstash_other.conf: |-

        input {

          kafka {

            bootstrap_servers => "10.62.1.135:9092"

            auto_offset_reset => "latest"

            #consumer_threads => 6

            decorate_events => true

            partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

            group_id  => "logstash-other"

            topics => ["prod_elk_kafka","kafka_topic","kube-system","elk_kafka","yunmi-front","yunmi-vwater","yunmi-bigdata"]

            codec => json {

              charset => "UTF-8"

            }

          }

        }

        filter {

          json {

            source => "message"

          }

          mutate {

            remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

          }

        }

        output {

          elasticsearch {

            hosts => ["10.62.1.130:9200"]

            index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

          }

        }

      logstash_test.conf: |-

        input {

          kafka {

            bootstrap_servers => "10.62.1.135:9092"

            auto_offset_reset => "latest"

            #consumer_threads => 6

            decorate_events => true 

            group_id  => "logstash-test-grp"

            topics => ["test_elk_kafka"]

            codec => json {

              charset => "UTF-8"

            }

          }

        }

        filter {

          json {

            source => "message"

          }

          mutate {

            remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

          }

        }

        output {

          elasticsearch {

            hosts => ["10.62.1.130:9200"]

          }

        }

    相关文章

      网友评论

          本文标题:12. elk+kafka+filebeat

          本文链接:https://www.haomeiwen.com/subject/nqmyvhtx.html