filebeat使用redis作为缓存
1.前提条件
- filebeat不支持传输给redis哨兵或集群
- logstash也不支持从redis哨兵或集群里读取数据
2.安装配置redis
yum install redis -y
sed -i 's#^bind 127.0.0.1#bind 127.0.0.1 10.0.0.51#' /etc/redis.conf
systemctl start redis
3.安装配置nginx
配置官方源
yum install nginx -y
放在nginx.conf最后一行的}后面,不要放在conf.d里面
stream {
upstream redis {
server 10.0.0.51:6379 max_fails=2 fail_timeout=10s;
server 10.0.0.52:6379 max_fails=2 fail_timeout=10s backup;
}
server {
listen 6380;
proxy_connect_timeout 1s;
proxy_timeout 3s;
proxy_pass redis;
}
}
nginx -t
systemctl start nginx
4.安装配置keepalived
yum install keepalived -y
db01的配置
cat>/etc/keepalived/keepalived.conf<<EOF
global_defs {
router_id db01
}
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 50
priority 150
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
virtual_ipaddress {
10.0.0.100
}
}
EOF
db02的配置:
cat>/etc/keepalived/keepalived.conf<<EOF
global_defs {
router_id db02
}
vrrp_instance VI_1 {
state BACKUP
interface eth0
virtual_router_id 50
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
virtual_ipaddress {
10.0.0.100
}
}
EOF
systemctl start keepalived
ip a
5.测试访问能否代理到redis
redis-cli -h 10.0.0.100 -p 6380
把db01的redis停掉,测试还能不能连接redis
6.配置filebeat
cat >/etc/filebeat/filebeat.yml <<EOF
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
json.keys_under_root: true
json.overwrite_keys: true
tags: ["access"]
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
tags: ["error"]
output.redis:
hosts: ["10.0.0.100:6380"]
keys:
- key: "nginx_access"
when.contains:
tags: "access"
- key: "nginx_error"
when.contains:
tags: "error"
setup.template.name: "nginx"
setup.template.pattern: "nginx_*"
setup.template.enabled: false
setup.template.overwrite: true
EOF
7.测试访问filebeat能否传输到redis
curl 10.0.0.51/haha
redis-cli -h 10.0.0.51 #应该有数据
redis-cli -h 10.0.0.52 #应该没数据
redis-cli -h 10.0.0.100 -p 6380 #应该有数据
8.配置logstash
cat >/etc/logstash/conf.d/redis.conf<<EOF
input {
redis {
host => "10.0.0.100"
port => "6380"
db => "0"
key => "nginx_access"
data_type => "list"
}
redis {
host => "10.0.0.100"
port => "6380"
db => "0"
key => "nginx_error"
data_type => "list"
}
}
filter {
mutate {
convert => ["upstream_time", "float"]
convert => ["request_time", "float"]
}
}
output {
stdout {}
if "access" in [tags] {
elasticsearch {
hosts => "http://10.0.0.51:9200"
manage_template => false
index => "nginx_access-%{+yyyy.MM}"
}
}
if "error" in [tags] {
elasticsearch {
hosts => "http://10.0.0.51:9200"
manage_template => false
index => "nginx_error-%{+yyyy.MM}"
}
}
}
EOF
9.启动测试
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf
10.关闭前台运行,使用后台运行
ctr+c #结束前台运行,使用后台运行
systemctl start logstash
11.最终测试
ab -n 10000 -c 100 10.0.0.100/
检查es-head上索引条目是否为10000条
关闭db01的redis,在访问,测试logstash正不正常
恢复db01的redis,再测试
filbeat引入redis优化方案
1.新增加一个日志路径需要修改4个地方:
- filebat 2个位置
- logstash 2个位置
2.优化之后需要修改的地方2个地方
- filebat 1个位置
- logstash 1个位置
3.filebeat配置文件
vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
json.keys_under_root: true
json.overwrite_keys: true
tags: ["access"]
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
tags: ["error"]
output.redis:
hosts: ["10.0.0.100:6380"]
key: "nginx_log"
setup.template.name: "nginx"
setup.template.pattern: "nginx_*"
setup.template.enabled: false
setup.template.overwrite: true
4.优化后的logstash
vim /etc/logstash/conf.d/redis.conf
input {
redis {
host => "10.0.0.100"
port => "6380"
db => "0"
key => "nginx_log"
data_type => "list"
}
}
filter {
mutate {
convert => ["upstream_time", "float"]
convert => ["request_time", "float"]
}
}
output {
stdout {}
if "access" in [tags] {
elasticsearch {
hosts => "http://10.0.0.51:9200"
manage_template => false
index => "nginx_access-%{+yyyy.MM}"
}
}
if "error" in [tags] {
elasticsearch {
hosts => "http://10.0.0.51:9200"
manage_template => false
index => "nginx_error-%{+yyyy.MM}"
}
}
}
使用kafka作为缓存
1.配置hosts
10.0.0.51 kafka51
10.0.0.52 kafka52
10.0.0.53 kafka53
2.安装配置zookeeper
cd /data/soft/
tar zxf zookeeper-3.4.11.tar.gz -C /opt/
ln -s /opt/zookeeper-3.4.11/ /opt/zookeeper
mkdir -p /data/zookeeper
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
cat >/opt/zookeeper/conf/zoo.cfg<<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=10.0.0.51:2888:3888
server.2=10.0.0.52:2888:3888
server.3=10.0.0.53:2888:3888
EOF
#每台机器不一样
echo "1" > /data/zookeeper/myid
cat /data/zookeeper/myid
3.启动zookeeper
所有节点都启动
/opt/zookeeper/bin/zkServer.sh start
4.每个节点都检查
/opt/zookeeper/bin/zkServer.sh status
5.测试zookeeper
在一个节点上执行,创建一个频道
/opt/zookeeper/bin/zkCli.sh -server 10.0.0.51:2181
create /test "hello"
在其他节点上看能否接收到
/opt/zookeeper/bin/zkCli.sh -server 10.0.0.52:2181
get /test
6.安装部署kafka
kafka51操作:
cd /data/soft/
tar zxf kafka_2.11-1.0.0.tgz -C /opt/
ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
mkdir /opt/kafka/logs
cat >/opt/kafka/config/server.properties<<EOF
broker.id=1
listeners=PLAINTEXT://10.0.0.51:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
EOF
kafka52操作:
cd /data/soft/
tar zxf kafka_2.11-1.0.0.tgz -C /opt/
ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
mkdir /opt/kafka/logs
cat >/opt/kafka/config/server.properties<<EOF
broker.id=2
listeners=PLAINTEXT://10.0.0.52:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
EOF
kafka53操作:
cd /data/soft/
tar zxf kafka_2.11-1.0.0.tgz -C /opt/
ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
mkdir /opt/kafka/logs
cat >/opt/kafka/config/server.properties<<EOF
broker.id=3
listeners=PLAINTEXT://10.0.0.53:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
EOF
7.前台启动测试
#每个节点测试
[root@kafka51 ~]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
8.验证进程
#每个节点查看
[root@kafka51 ~]# jps
17000 Kafka
15273 QuorumPeerMain
17084 Jps
9.测试创建topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic kafkatest
10.测试获取toppid
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic kafkatest
11.测试删除topic
/opt/kafka/bin/kafka-topics.sh --delete --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic kafkatest
12.kafka测试命令发送消息
#创建命令
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic messagetest
#测试发送消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list 10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 --topic messagetest
#其他节点测试接收
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic messagetest --from-beginning
#测试获取所有的频道
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
13.测试成功之后,可以放在后台启动
ctr+c #结束前台运行
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
14.修改filebeat配置文件
cat >/etc/filebeat/filebeat.yml <<EOF
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
json.keys_under_root: true
json.overwrite_keys: true
tags: ["access"]
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
tags: ["error"]
output.kafka:
hosts: ["10.0.0.51:9092", "10.0.0.52:9092", "10.0.0.53:9092"]
topic: 'filebeat'
setup.template.name: "nginx"
setup.template.pattern: "nginx_*"
setup.template.enabled: false
setup.template.overwrite: true
EOF
15.修改logstash配置文件
cat >/etc/logstash/conf.d/kafka.conf <<EOF
input {
kafka{
bootstrap_servers=>["10.0.0.51:9092", "10.0.0.52:9092", "10.0.0.53:9092"]
topics=>["filebeat"]
group_id=>"logstash"
codec => "json"
}
}
filter {
mutate {
convert => ["upstream_time", "float"]
convert => ["request_time", "float"]
}
}
output {
stdout {}
if "access" in [tags] {
elasticsearch {
hosts => "http://10.0.0.51:9200"
manage_template => false
index => "nginx_access-%{+yyyy.MM}"
}
}
if "error" in [tags] {
elasticsearch {
hosts => "http://10.0.0.51:9200"
manage_template => false
index => "nginx_error-%{+yyyy.MM}"
}
}
}
EOF
16.启动logstash并测试
1.前台启动
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf
2.后台启动
#ctr +c 结束之前的前台启动
systemctl start logstash
17.集群考可用测试
结果:任意一台或两台服务器坏了zookpeer或者kafka或者全坏了或者随机坏了,只要剩一个zookper和kafka都不影正常收集日志。
18.总结kafka实验
1.前提条件
- kafka和zook都是基于java的,所以需要java环境
- 这俩比较吃资源,内存得够
2.安装zook注意
- 每台机器的myid要不一样,而且要和配置文件里的id对应上
- 启动测试,角色为leader和follower
- 测试发送和接受消息
3.安装kafka注意
- kafka依赖于zook,所以如果zook不正常,kafka不能工作
- kafka配置文件里要配上zook的所有IP的列表
- kafka配置文件里要注意,写自己的IP地址
- kafka配置文件里要注意,自己的ID是zook里配置的myid
- kafka启动要看日志出现started才算是成功
4.测试zook和kafka
- 一端发送消息
- 两端能实时接收消息
5.配置filebeat
- output要配上kafka的所有的IP列表
6.配置logstash
- input要写上所有的kafka的IP列表,别忘了[]
- 前台启动测试成功后再后台启动
网友评论