美文网首页
DataX二次开发

DataX二次开发

作者: 诺之林 | 来源:发表于2020-10-22 09:29 被阅读0次

    代码

    cd /Users/kevin/Workspace
    
    git clone git@git.nuozhilin.site:yuanlin/datax.git
    
    git clone git@git.nuozhilin.site:yuanlin/datax-source.git
    

    配置

    配置基于IntelliJ IDEA

    • IDE Open => datax-source

    • Edit Configurations => Application "datax"

    • Main class => com.alibaba.datax.core.Engine

    • VM options => -Ddatax.home=/Users/kevin/Workspace/datax

    • Program arguments => -mode standalone -jobid -1 -job /Users/kevin/Workspace/datax/job/local2ots.json

    • Use classpath of module => datax-core

    • JRE => 1.8

    启动调试Debug "datax"

    框架

    (*) Main Thread
    全局 Engine main()
    入口 Engine engine()
    启动 JobContainer start()
    切分 JobContainer split() => channel个数 = TaskGroup个数
    调度 AbstractScheduler schedule()
    执行 ProcessInnerScheduler startAllTaskGroup()
        (*) TaskGroup Thread => 线程个数 = TaskGroup个数
        执行 TaskGroupContainer start()
        读取 ReaderRunner readerRunner
            (*) Reader Thread
        通道 channel => TaskGroup线程内存对空间
            com.alibaba.datax.core.transport.channel.memory.MemoryChannel
            流控 ArrayBlockingQueue
        写入 WriterRunner WriterRunner
            (*) Writer Thread
        报告 TaskGroupContainer reportTaskGroupCommunication()
    

    高性能的两个方面

    • 任务基于并行多子任务 (split channel TaskGroup)

    • 读写基于异步独立线程 (ReaderThread WriterThread)

    插件

    • Plugin MongoReader
    public class MongoDBReader extends Reader {
        public static class Job extends Reader.Job {
            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();
                this.mongoClient = MongoUtil.initMongoClient(originalConfig);
            }
    
            @Override
            public List<Configuration> split(int adviceNumber) {
                return CollectionSplitUtil.doSplit(originalConfig, adviceNumber, mongoClient);
            }
        }
        public static class Task extends Reader.Task {
            private MongoClient mongoClient;
    
            @Override
            public void init() {
                mongoClient = MongoUtil.initMongoClient(readerSliceConfig);
            }
    
            @Override
            public void startRead(RecordSender recordSender) {
                MongoDatabase db = mongoClient.getDatabase(database);
                MongoCollection col = db.getCollection(this.collection);
                dbCursor = col.find(filter).iterator();
                while (dbCursor.hasNext()) {
                    Document item = dbCursor.next();
                }
    
                recordSender.sendToWriter(record);
            }
        }
    }
    
    • Plugin MongoDBWriter
    public class MongoDBWriter extends Writer {
        public static class Job extends Writer.Job {
            @Override
            public List<Configuration> split(int mandatoryNumber) {
                List<Configuration> configList = new ArrayList<Configuration>();
                for(int i = 0; i < mandatoryNumber; i++) {
                    configList.add(this.originalConfig.clone());
                }
                return configList;
            }
    
            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();
            }
        }
    
        public static class Task extends Writer.Task {
            @Override
            public void init() {
                this.writerSliceConfig = this.getPluginJobConf();
                this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig);
                this.batchSize = BATCH_SIZE;
            }
    
            @Override
            public void startWrite(RecordReceiver lineReceiver) {
                MongoDatabase db = mongoClient.getDatabase(database);
                MongoCollection<BasicDBObject> col = db.getCollection(this.collection, BasicDBObject.class);
                List<Record> writerBuffer = new ArrayList<Record>(this.batchSize);
                Record record = null;
                while((record = lineReceiver.getFromReader()) != null) {
                    writerBuffer.add(record);
                    if(writerBuffer.size() >= this.batchSize) {
                        doBatchInsert(col,writerBuffer, mongodbColumnMeta);
                        writerBuffer.clear();
                    }
                }
                if(!writerBuffer.isEmpty()) {
                    doBatchInsert(col,writerBuffer, mongodbColumnMeta);
                    writerBuffer.clear();
                }
            }
        }
    }
    

    参考

    相关文章

      网友评论

          本文标题:DataX二次开发

          本文链接:https://www.haomeiwen.com/subject/mluymktx.html