yum安装
https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh
脚本内容
echo "=============================="
echo " td-agent Installation Script "
echo "=============================="
echo "This script requires superuser access to install rpm packages."
echo "You will be prompted for your password by sudo."
# clear any previous sudo permission
sudo -k
# run inside sudo
sudo sh <<SCRIPT
# add GPG key
rpm --import https://packages.treasuredata.com/GPG-KEY-td-agent
# add treasure data repository to yum
cat >/etc/yum.repos.d/td.repo <<'EOF';
[treasuredata]
name=TreasureData
baseurl=http://packages.treasuredata.com/3/redhat/\$releasever/\$basearch
gpgcheck=1
gpgkey=https://packages.treasuredata.com/GPG-KEY-td-agent
EOF
# update your sources
yum check-update
# install the toolbelt
yes | yum install -y td-agent
SCRIPT
# message
echo ""
echo "Installation completed. Happy Logging!"
echo ""
命令
# 开始、停止、重启、查看状态
/etc/init.d/td-agent start
/etc/init.d/td-agent stop
/etc/init.d/td-agent restart
/etc/init.d/td-agent status
# 配置文件
/etc/td-agent/td-agent.conf
# 日志文件
/var/log/td-agent/td-agent.log
# 查询进程
ps -ef | grep td-agent
Ruby安装
卸载自带的ruby
yum remove ruby -y
安装ruby2.5
下载地址: http://www.ruby-lang.org/en/downloads/
tar zxvf ruby-2.5.1.tar.gz
mv ./ruby-2.5.1 /usr/local/ruby-2.5.1
cd /usr/local/ruby-2.5.1
./configure
make && make install
gem换源
# 列出默认源
gem sources
# 移除默认源
gem sources --remove https://rubygems.org/
# 添加ruby-china源
gem sources -a https://gems.ruby-china.com/
安装编译环境和软件包依赖
yum install gcc gcc-c++ make automake autoconf libtool openssl-devel jemalloc-devel gmp-devel -y
安装fluentd
gem install fluentd --no-ri --no-rdoc
安装fluentd插件
# 查询插件
gem search fluent-plugin -rd # 列出github地址
# 安装mongo插件
gem install fluent-plugin-mongo --no-ri --no-rdoc
初始化fluentd
fluentd --setup /etc/fluentd
启动
# debug测试
fluentd -c /etc/fluentd/fluent.conf -vv
# 后台启动
fluentd -c /etc/fluentd/fluent.conf --log /etc/fluentd/log/fluentd.log --log-rotate-size 1024MB &
查询进程
ps -ef | grep fluentd
配置命令
source输入流
# tcp协议16337端口输入
<source>
@type forward
port 16337
</source>
# http协议8888端口输入
# POST http://localhost:8888/<tag>?json=<json>
# POST http://localhost:8888/td.myapp.login?json={"user"%3A"me"}
<source>
@type http
port 8888
</source>
# 仅开放本地连接, 对外通过 fluent-debug 进行操作
<source>
@type debug_agent
bind 127.0.0.1
port 24230
</source>
# 文本文件输入
# 文本更新, 从当前索引继续运行, td-agent重启, 从最后记录位置运行
# path: 文本路径, 多个路径由','分割
# format: 日志格式, 支持apache2,apache_error,nginx,syslog, json等或由'/'包裹的正则表达式
# apache2: 主机、用户、时间、方法、路径、代码、大小、推荐人和代理(format /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/ time_format %d/%b/%Y:%H:%M:%S %z)
# syslog: 时间、主机,识别,和消息(format /^(?<time>[^ ]* [^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?[^\:]*\: *(?<message>.*)$/ time_format %b %d %H:%M:%S)
# tsv或csv(format tsv keys key1, key2, key3 time_key key2)
# pos_file: Fluentd将记录它上次读到这个文件的位置
<source>
@type tail
format apache
path /var/log/httpd-access.log
tag td.apache.access
pos_file /var/log/td-agent/tmp/access.log.pos
</source>
match输出流
*用来匹配tag的一部分(比如:a.*可以匹配a.b,但是不能匹配a或者a.b.c)
**可以用来匹配tag的0个或多个部分(比如:a.**可以匹配a、a.b和a.b.c)
# 文件输出
<match local.**>
@type file
path /var/log/td-agent/access
</match>
# 跳转到其他td-agent服务
<match system.**>
@type forward
host 192.168.0.11
# secondary host is optional
<secondary>
host 192.168.0.12
</secondary>
</match>
# 多个输出
# 输出到Treasure Data和文件
<match td.*.*>
@type copy
<store>
@type tdlog
apikey API_KEY
auto_create_table
buffer_type file
buffer_path /var/log/td-agent/buffer/td
</store>
<store>
@type file
path /var/log/td-agent/td-%Y-%m-%d/%H.log
</store>
</match>
# mongo输出
<match service.**>
@type mongo
host 127.0.0.1
port 27017
database logs
collection service_logs
time_key time
flush_interval 10s
capped
capped_size 1024m
</match>
python日志+fluentd+mongo
# python日志
import logging
from fluent.asynchandler import FluentHandler
from fluent.handler import FluentRecordFormatter
def create_logger():
# 异步流
logger = logging.getLogger('da-service')
level = logging.getLevelName('DEBUG')
logger.setLevel(level)
handler = FluentHandler('service.mongo', '10.100.12.32', 16337)
handler.setFormatter(FluentRecordFormatter(fmt={
'level': '%(levelname)s',
'sys_host': '%(hostname)s',
'sys_name': '%(name)s',
'sys_module': '%(module)s',
'function': '[%(pathname)s:%(funcName)s:%(lineno)d]',
'stack_trace': '%(exc_text)s'
}))
handler.setLevel(level)
logger.addHandler(handler)
# 控制台
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s [%(pathname)s:%(funcName)s:%(lineno)d] %(message)s'))
stream_handler.setLevel(level)
logger.addHandler(stream_handler)
return logger
# fluentd配置
# 端口
<source>
@type forward
port 16337
</source>
# 重构日志
<match service.**>
@type mongo
host 127.0.0.1
port 27017
database logs
collection service
time_key time
flush_interval 10s
capped
capped_size 1024m
</match>
# 重构日志
<match etl.**>
@type mongo
host 127.0.0.1
port 27017
database logs
collection etl
time_key time
flush_interval 10s
capped
capped_size 1024m
</match>
# 执行端日志
<match execute.**>
@type mongo
host 127.0.0.1
port 27017
database logs
collection execute
time_key time
flush_interval 10s
capped
capped_size 1024m
</match>
# 服务器端日志
<match web.**>
@type mongo
host 127.0.0.1
port 27017
database logs
collection web
time_key time
flush_interval 10s
capped
capped_size 1024m
</match>
网友评论