源json数据:
{"vaid": 4, "layer_type": 0, "clid": 4, "chid": "1073266302", "sid_type": 2, "create_time": 1512724518.1473277, "pid": 1777607729, "content_type": 3, "sid": 1142957589, "seq": 1771, "seq_duration": 1.0026666666666666, "sub_seq_count": 0, "stage": 2, "state": 5, "ctime": 1512726502.305, "mtime": 1512726502.3050795}
需求:
ctime、mtime和create_time转换为毫秒 存入es为date格式
seq_duration转换成float
sub_seq_count转换成integer
新建elasticsearch template
curl -X PUT http://10.86.0.107:9200/_template/live_seq_monitor -d '{
"template": "live_seq_monitor-*",
"settings": {
"number_of_shards": 1,
"index.refresh_interval": "5s"
},
"mappings": {
"seq": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"chid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"clid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"content_type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"ctime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"host": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"layer_type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"mtime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"pid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"seq": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"seq_duration": {
"type": "float"
},
"sid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"sid_type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"stage": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"state": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"sub_seq_count": {
"type": "integer"
},
"vaid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}'
设置logstash配置文件
input {
kafka {
bootstrap_servers => "10.86.0.106:9092"
topics => ["live_seq_monitor"]
group_id => "logstash"
auto_offset_reset => "earliest"
consumer_threads => 6
decorate_events => true
}
}
filter {
json {
source => "message"
remove_field => [ "message"]
}
ruby {
code => "
event.set('create_time',event.get('create_time')*1000)
event.set('ctime',event.get('ctime')*1000)
event.set('mtime',event.get('mtime')*1000)
"
}
mutate {
convert => [
"sub_seq_count","integer",
"ctime","integer",
"mtime","integer",
"create_time","integer",
"seq_duration","float",
"sid_type","string",
"clid","string",
"pid","string",
"chid","string",
"layer_type","string",
"sid","string",
"vaid","string",
"content_type","string",
"stage","string",
"state","string",
"seq","string"
]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
index => "live_seq_monitor-%{+YYYY.MM.dd}"
hosts => ["10.86.0.107:9200"]
}
}
之前测试时候用命令行测试踩了很多坑,写入配置文件就不报错了
ruby code简单的调用event的set和get设置字段
bin/logstash -e 'input { stdin { } } filter { json {source => "message" } ruby{code => "event.get('message')" } mutate{convert => ["sub_seq_count","integer","seq_duration","float","sid_type","string","clid","string","pid","string","chid","string","layer_type","string","sid","string","vaid","string","content_type","string","stage","string","state","string","seq","string"]}} output { stdout {codec => rubydebug} elasticsearch{index => "live_seq_monitor-%{+YYYY.MM.dd}" hosts => ["127.0.0.1:9200"] } }'
网友评论