业务需要,将oracle旧库数据(1.5亿条)导入到ES中。
导入方式为flume--》kafka--〉logstash--》es。
flume版本包在此:链接: https://pan.baidu.com/s/1IPvfUJNDzLTlG0HGhGDEaQ 提取码: mtdm
flume1.9.0,将jar文件放到flume的lib目录下即可。
配置文件如下,参考https://blog.csdn.net/hua_yuan2015/article/details/103084500:
# declare source channel sink
agentTest0.channels = channelTest0
agentTest0.sources = sourceTest0
agentTest0.sinks = sinkTest0
###########sql source#################
# declare source type
agentTest0.sources.sourceTest0.type = org.keedio.flume.source.SQLSource
# Hibernate Database connection properties
agentTest0.sources.sourceTest0.hibernate.connection.url = jdbc:oracle:thin:@12.1.93.4:1521/zhgsdb
agentTest0.sources.sourceTest0.hibernate.connection.user = ******
agentTest0.sources.sourceTest0.hibernate.connection.password = ******
agentTest0.sources.sourceTest0.hibernate.connection.autocommit = true
agentTest0.sources.sourceTest0.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
agentTest0.sources.sourceTest0.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
# 查询间隔 ms
agentTest0.sources.sourceTest0.run.query.delay= 1000
# 声明flume保存状态的文件夹位置
agentTest0.sources.sourceTest0.status.file.path = /opt/third/apache-flume-1.8.0-bin
# 声明保存flume状态的文件名称
agentTest0.sources.sourceTest0.status.file.name = agentTest0.sqlSource.status
agentTest0.sources.sourceTest0.table = wlslog
agentTest0.sources.sourceTest0.columns.to.select = *
agentTest0.sources.sourceTest0.incremental.column.name = id
agentTest0.sources.sourceTest0.incremental.value = 0
agentTest0.sources.sourceTest0.batch.size = 60000
agentTest0.sources.sourceTest0.max.rows = 100000
agentTest0.sources.sourceTest0.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agentTest0.sources.sourceTest0.hibernate.c3p0.min_size=1
agentTest0.sources.sourceTest0.hibernate.c3p0.max_size=10
##############################
agentTest0.channels.channelTest0.type = memory
agentTest0.channels.channelTest0.capacity = 60000
agentTest0.channels.channelTest0.transactionCapacity = 60000
agentTest0.channels.channelTest0.byteCapacityBufferPercentage = 20
agentTest0.channels.channelTest0.byteCapacity = 16000000
agentTest0.sinks.sinkTest0.type = org.apache.flume.sink.kafka.KafkaSink
agentTest0.sinks.sinkTest0.topic = TestTopic
agentTest0.sinks.sinkTest0.brokerList = 12.1.150.111:9092,12.1.150.112:9092,12.1.150.113:9092
agentTest0.sinks.sinkTest0.requiredAcks = 1
agentTest0.sinks.sinkTest0.batchSize = 20
agentTest0.sinks.sinkTest0.channel = channelTest0
agentTest0.sources.sourceTest0.channels=channelTest0
执行命令:bin/flume-ng agent -n agentTest0 -c conf -f conf/flume.conf即可
附录 org.keedio.flume.source.SQLSource的参数说明
Configuration of SQL Source:
Property Name | Default | Description |
---|---|---|
channels | - | Connected channel names |
type | - | The component type name, needs to be org.keedio.flume.source.SQLSource |
hibernate.connection.url | - | Url to connect with the remote Database |
hibernate.connection.user | - | Username to connect with the database |
hibernate.connection.password | - | Password to connect with the database |
table | - | Table to export data |
status.file.name | - | Local file name to save last row number read |
status.file.path | /var/lib/flume | Path to save the status file |
start.from | 0 | Start value to import data |
delimiter.entry | , | delimiter of incoming entry |
enclose.by.quotes | true | If Quotes are applied to all values in the output. |
columns.to.select | * | Which colums of the table will be selected |
run.query.delay | 10000 | ms to wait between run queries |
batch.size | 100 | Batch size to send events to flume channel |
max.rows | 10000 | Max rows to import per query |
read.only | false | Sets read only session with DDBB |
custom.query | - | Custom query to force a special request to the DB, be carefull. Check below explanation of this property. |
hibernate.connection.driver_class | - | Driver class to use by hibernate, if not specified the framework will auto asign one |
hibernate.dialect | - | Dialect to use by hibernate, if not specified the framework will auto asign one. Check https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects for a complete list of available dialects |
hibernate.connection.provider_class | - | Set to org.hibernate.connection.C3P0ConnectionProvider to use C3P0 connection pool (recommended for production) |
hibernate.c3p0.min_size | - | Min connection pool size |
hibernate.c3p0.max_size | - | Max connection pool size |
default.charset.resultset | UTF-8 | Result set from DB converted to charset character encoding |
网友评论