原创文章,转载请注明出处
这篇记录一个项目中遇到的坑。标题所说的复杂对象,其实就是非Java基本类型及其包装类,比如String、Integer这些肯定不是复杂对象,在前两篇文章中,Tuple都是String类型。下面举例来说明。
图解topology
测试用的topology如下图所示:
Tuple携带的数据类型为Data,MySpout每过一定的时间会emit一个Data类的对象(取名为data),BoltA和BoltB都以随机分组(shuffleGrouping)的方式接收data,BoltA会对data进行修改,BoltB会对data的内容进行打印,我们想看看BoltA对data所做的修改会不会影响BoltB的data。这里假定,我们的程序员为了节省新建对象的时间,在MySpout里边只new了一个Data类的对象。我们预期,BoltA对data所做的修改不会影响到BoltB。
Data的定义
很简单的类:
public class Data {
String str;
public String getStr() {
return str;
}
public void setStr(String str) {
this.str = str;
}
}
MySpout的定义
在open函数new了一个Data对象,在nextTuple函数每次随机发送"a"或"b"。
public class MySpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Data data;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
data = new Data();
}
public void close() {
}
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"a", "b"};
final Random rand = new Random();
data.setStr(words[rand.nextInt(words.length)]);
_collector.emit(new Values(data));
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("data"));
}
}
BoltA的定义
如果收到的是"a"就替换成"1","b"就替换成"2"。
public class BoltA extends BaseBasicBolt{
public void execute(Tuple input, BasicOutputCollector collector) {
Data data = (Data)input.getValue(0);
if(data.getStr().compareTo("a")==0){
data.setStr("1");
}else{
data.setStr("2");
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
BoltB的定义
除了打印数据,什么都不做。
public class BoltB extends BaseBasicBolt{
public static Logger logger = LoggerFactory.getLogger(BoltB.class);
public void execute(Tuple input, BasicOutputCollector collector) {
Data data = (Data) input.getValue(0);
logger.info(data.getStr());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
App类的定义
这个类负责创建拓扑。
public class App {
public static void main(String args[]){
Config conf = new Config();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new MySpout(), 1);
builder.setBolt("bolt1", new BoltA(), 1).shuffleGrouping("word");
builder.setBolt("bolt2", new BoltB(), 1).shuffleGrouping("word");
conf.setDebug(true);
String topologyName = "MyTopology";
//集群运行
// try {
// StormSubmitter.submitTopology(topologyName, conf,
// builder.createTopology());
// } catch (Exception e) {
// e.printStackTrace();
// }
//本地运行
try {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf,builder.createTopology());
Thread.sleep(60 * 1000);
cluster.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
修改日志级别
如果就这样开始运行,日志会多到看不清结果。我们要将com.quiterr包的日志输出级别设为info,其他为error。在resources目录下新建一个log4j2.xml,内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Logger name="com.quiterr" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
测试结果
BoltA对data所做的修改在BoltB中同步了,和我们的预期不符,实际上在项目中,这个问题实实在在的影响了我们的业务逻辑。特别指出,集群中运行的topology也存在同样的问题。
经验总结
作为一个好的Storm程序的编程习惯:
一、不要在spout中重复使用复杂对象,每次都应该new一个新的对象。
二、不要在bolt中修改接收到的复杂对象。
频繁的new对象是否会造成性能问题?至少在我们的项目中没有遇到。
本文源代码:https://github.com/quiterr/storm-test/tree/master/complex-object
网友评论