美文网首页
记录一次logstash消费kafka导入elasticsear

记录一次logstash消费kafka导入elasticsear

作者: 玩玩风行啦 | 来源:发表于2017-12-13 20:16 被阅读168次

    源json数据:

    {"vaid": 4, "layer_type": 0, "clid": 4, "chid": "1073266302", "sid_type": 2, "create_time": 1512724518.1473277, "pid": 1777607729, "content_type": 3, "sid": 1142957589, "seq": 1771, "seq_duration": 1.0026666666666666, "sub_seq_count": 0, "stage": 2, "state": 5, "ctime": 1512726502.305, "mtime": 1512726502.3050795}
    

    需求:
    ctime、mtime和create_time转换为毫秒 存入es为date格式
    seq_duration转换成float
    sub_seq_count转换成integer

    新建elasticsearch template

    curl -X PUT http://10.86.0.107:9200/_template/live_seq_monitor -d '{
        "template": "live_seq_monitor-*",
        "settings": {
            "number_of_shards": 1,
            "index.refresh_interval": "5s"
        },
        "mappings": {
            "seq": {
                "properties": {
                    "@timestamp": {
                        "type": "date"
                    },
                    "@version": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "chid": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "clid": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "content_type": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "create_time": {
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                    },
                    "ctime": {
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                    },
                    "host": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "layer_type": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "mtime": {
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                    },
                    "pid": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "seq": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "seq_duration": {
                        "type": "float"
                    },
                    "sid": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "sid_type": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "stage": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "state": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "sub_seq_count": {
                        "type": "integer"
                    },
                    "vaid": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    }
                }
            }
        }
    }'
    

    设置logstash配置文件

    input {
        kafka {
            bootstrap_servers => "10.86.0.106:9092"
                    topics => ["live_seq_monitor"]
            group_id => "logstash"
            auto_offset_reset => "earliest"
            consumer_threads => 6
            decorate_events => true
      }
    }
    filter {
        json {
            source => "message"
            remove_field => [ "message"]
        }
        ruby {
            code => "
                event.set('create_time',event.get('create_time')*1000)
                event.set('ctime',event.get('ctime')*1000)
                event.set('mtime',event.get('mtime')*1000)
            "
        }
        mutate {
            convert => [
            "sub_seq_count","integer",
            "ctime","integer",
            "mtime","integer",
            "create_time","integer",
            "seq_duration","float",
            "sid_type","string",
            "clid","string",
            "pid","string",
            "chid","string",
            "layer_type","string",
            "sid","string",
            "vaid","string",
            "content_type","string",
            "stage","string",
            "state","string",
            "seq","string"
            ]
        }
    }
    output {
        stdout {
            codec => rubydebug
        }
        elasticsearch {
            index => "live_seq_monitor-%{+YYYY.MM.dd}"
            hosts => ["10.86.0.107:9200"]
        }
    }
    

    之前测试时候用命令行测试踩了很多坑,写入配置文件就不报错了
    ruby code简单的调用event的set和get设置字段

    bin/logstash -e 'input { stdin { } } filter { json {source => "message" } ruby{code => "event.get('message')" }  mutate{convert => ["sub_seq_count","integer","seq_duration","float","sid_type","string","clid","string","pid","string","chid","string","layer_type","string","sid","string","vaid","string","content_type","string","stage","string","state","string","seq","string"]}} output { stdout {codec => rubydebug} elasticsearch{index => "live_seq_monitor-%{+YYYY.MM.dd}" hosts => ["127.0.0.1:9200"] } }'
    

    相关文章

      网友评论

          本文标题:记录一次logstash消费kafka导入elasticsear

          本文链接:https://www.haomeiwen.com/subject/dpizixtx.html