美文网首页
使用flume将Oracle数据导入到ElasticSearch

使用flume将Oracle数据导入到ElasticSearch

作者: 旻璿 | 来源:发表于2022-01-03 20:26 被阅读0次

    业务需要,将oracle旧库数据(1.5亿条)导入到ES中。
    导入方式为flume--》kafka--〉logstash--》es。
    flume版本包在此:链接: https://pan.baidu.com/s/1IPvfUJNDzLTlG0HGhGDEaQ 提取码: mtdm
    flume1.9.0,将jar文件放到flume的lib目录下即可。
    配置文件如下,参考https://blog.csdn.net/hua_yuan2015/article/details/103084500

    # declare source channel sink
    agentTest0.channels = channelTest0
    agentTest0.sources = sourceTest0
    agentTest0.sinks = sinkTest0
    ###########sql source#################
    # declare source type
    agentTest0.sources.sourceTest0.type = org.keedio.flume.source.SQLSource
     
    # Hibernate Database connection properties
    agentTest0.sources.sourceTest0.hibernate.connection.url = jdbc:oracle:thin:@12.1.93.4:1521/zhgsdb
    agentTest0.sources.sourceTest0.hibernate.connection.user = ******
    agentTest0.sources.sourceTest0.hibernate.connection.password = ******
    agentTest0.sources.sourceTest0.hibernate.connection.autocommit = true
    agentTest0.sources.sourceTest0.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
    agentTest0.sources.sourceTest0.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
    # 查询间隔 ms
    agentTest0.sources.sourceTest0.run.query.delay= 1000
    # 声明flume保存状态的文件夹位置
    agentTest0.sources.sourceTest0.status.file.path = /opt/third/apache-flume-1.8.0-bin
    # 声明保存flume状态的文件名称
    agentTest0.sources.sourceTest0.status.file.name = agentTest0.sqlSource.status
    agentTest0.sources.sourceTest0.table = wlslog
    agentTest0.sources.sourceTest0.columns.to.select = *
    agentTest0.sources.sourceTest0.incremental.column.name = id
    agentTest0.sources.sourceTest0.incremental.value = 0
    agentTest0.sources.sourceTest0.batch.size = 60000
    agentTest0.sources.sourceTest0.max.rows = 100000
    agentTest0.sources.sourceTest0.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    agentTest0.sources.sourceTest0.hibernate.c3p0.min_size=1
    agentTest0.sources.sourceTest0.hibernate.c3p0.max_size=10
    ##############################
    agentTest0.channels.channelTest0.type = memory
    agentTest0.channels.channelTest0.capacity = 60000
    agentTest0.channels.channelTest0.transactionCapacity = 60000
    agentTest0.channels.channelTest0.byteCapacityBufferPercentage = 20
    agentTest0.channels.channelTest0.byteCapacity = 16000000
     
    agentTest0.sinks.sinkTest0.type = org.apache.flume.sink.kafka.KafkaSink
    agentTest0.sinks.sinkTest0.topic = TestTopic
    agentTest0.sinks.sinkTest0.brokerList = 12.1.150.111:9092,12.1.150.112:9092,12.1.150.113:9092
    agentTest0.sinks.sinkTest0.requiredAcks = 1
    agentTest0.sinks.sinkTest0.batchSize = 20
     
    agentTest0.sinks.sinkTest0.channel = channelTest0
    agentTest0.sources.sourceTest0.channels=channelTest0
    

    执行命令:bin/flume-ng agent -n agentTest0 -c conf -f conf/flume.conf即可

    附录 org.keedio.flume.source.SQLSource的参数说明

    Configuration of SQL Source:

    Property Name Default Description
    channels - Connected channel names
    type - The component type name, needs to be org.keedio.flume.source.SQLSource
    hibernate.connection.url - Url to connect with the remote Database
    hibernate.connection.user - Username to connect with the database
    hibernate.connection.password - Password to connect with the database
    table - Table to export data
    status.file.name - Local file name to save last row number read
    status.file.path /var/lib/flume Path to save the status file
    start.from 0 Start value to import data
    delimiter.entry , delimiter of incoming entry
    enclose.by.quotes true If Quotes are applied to all values in the output.
    columns.to.select * Which colums of the table will be selected
    run.query.delay 10000 ms to wait between run queries
    batch.size 100 Batch size to send events to flume channel
    max.rows 10000 Max rows to import per query
    read.only false Sets read only session with DDBB
    custom.query - Custom query to force a special request to the DB, be carefull. Check below explanation of this property.
    hibernate.connection.driver_class - Driver class to use by hibernate, if not specified the framework will auto asign one
    hibernate.dialect - Dialect to use by hibernate, if not specified the framework will auto asign one. Check https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects for a complete list of available dialects
    hibernate.connection.provider_class - Set to org.hibernate.connection.C3P0ConnectionProvider to use C3P0 connection pool (recommended for production)
    hibernate.c3p0.min_size - Min connection pool size
    hibernate.c3p0.max_size - Max connection pool size
    default.charset.resultset UTF-8 Result set from DB converted to charset character encoding

    相关文章

      网友评论

          本文标题:使用flume将Oracle数据导入到ElasticSearch

          本文链接:https://www.haomeiwen.com/subject/iybxcrtx.html