代码
cd /Users/kevin/Workspace
git clone git@git.nuozhilin.site:yuanlin/datax.git
git clone git@git.nuozhilin.site:yuanlin/datax-source.git
配置
配置基于IntelliJ IDEA
-
IDE Open => datax-source
-
Edit Configurations => Application "datax"
-
Main class => com.alibaba.datax.core.Engine
-
VM options => -Ddatax.home=/Users/kevin/Workspace/datax
-
Program arguments => -mode standalone -jobid -1 -job /Users/kevin/Workspace/datax/job/local2ots.json
-
Use classpath of module => datax-core
-
JRE => 1.8
启动调试Debug "datax"
框架
(*) Main Thread
全局 Engine main()
入口 Engine engine()
启动 JobContainer start()
切分 JobContainer split() => channel个数 = TaskGroup个数
调度 AbstractScheduler schedule()
执行 ProcessInnerScheduler startAllTaskGroup()
(*) TaskGroup Thread => 线程个数 = TaskGroup个数
执行 TaskGroupContainer start()
读取 ReaderRunner readerRunner
(*) Reader Thread
通道 channel => TaskGroup线程内存对空间
com.alibaba.datax.core.transport.channel.memory.MemoryChannel
流控 ArrayBlockingQueue
写入 WriterRunner WriterRunner
(*) Writer Thread
报告 TaskGroupContainer reportTaskGroupCommunication()
高性能的两个方面
-
任务基于并行多子任务 (split channel TaskGroup)
-
读写基于异步独立线程 (ReaderThread WriterThread)
插件
- Plugin MongoReader
public class MongoDBReader extends Reader {
public static class Job extends Reader.Job {
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.mongoClient = MongoUtil.initMongoClient(originalConfig);
}
@Override
public List<Configuration> split(int adviceNumber) {
return CollectionSplitUtil.doSplit(originalConfig, adviceNumber, mongoClient);
}
}
public static class Task extends Reader.Task {
private MongoClient mongoClient;
@Override
public void init() {
mongoClient = MongoUtil.initMongoClient(readerSliceConfig);
}
@Override
public void startRead(RecordSender recordSender) {
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection col = db.getCollection(this.collection);
dbCursor = col.find(filter).iterator();
while (dbCursor.hasNext()) {
Document item = dbCursor.next();
}
recordSender.sendToWriter(record);
}
}
}
- Plugin MongoDBWriter
public class MongoDBWriter extends Writer {
public static class Job extends Writer.Job {
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configList = new ArrayList<Configuration>();
for(int i = 0; i < mandatoryNumber; i++) {
configList.add(this.originalConfig.clone());
}
return configList;
}
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
}
}
public static class Task extends Writer.Task {
@Override
public void init() {
this.writerSliceConfig = this.getPluginJobConf();
this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig);
this.batchSize = BATCH_SIZE;
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection<BasicDBObject> col = db.getCollection(this.collection, BasicDBObject.class);
List<Record> writerBuffer = new ArrayList<Record>(this.batchSize);
Record record = null;
while((record = lineReceiver.getFromReader()) != null) {
writerBuffer.add(record);
if(writerBuffer.size() >= this.batchSize) {
doBatchInsert(col,writerBuffer, mongodbColumnMeta);
writerBuffer.clear();
}
}
if(!writerBuffer.isEmpty()) {
doBatchInsert(col,writerBuffer, mongodbColumnMeta);
writerBuffer.clear();
}
}
}
}
网友评论