美文网首页
Storm使用初步

Storm使用初步

作者: Wilson_0e83 | 来源:发表于2017-03-28 16:11 被阅读0次

Storm hello world project

  1. pom.xml
<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-redis</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.8.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.9.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.9.0.0</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  1. RedisBolt.java
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/**
 * Created by zhouwenchun on 17/3/24.
 */
public class RedisBolt extends BaseRichBolt {
    private OutputCollector _collector;
    private JedisPool pool;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this._collector = outputCollector;
        this.pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
    }

    public void execute(Tuple tuple) {
        String log = tuple.getString(0);
        System.out.println(sdf.format(new Date()));
        System.out.println(log);
        Jedis jedis = this.pool.getResource();
        jedis.set("20151020", log);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}
  1. UserlogTopo.java
public class UserlogTopo {
    private static  String topicName = "test2";
    private static String  zkRoot = "/test/test2";

    public static void main(String[] args) throws  Exception{
        BrokerHosts hosts = new ZkHosts("117.169.77.211:2181");
        SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName, zkRoot, UUID.randomUUID().toString());
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.metricsTimeBucketSizeInSecs = 5;
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", kafkaSpout);
        builder.setBolt("UserBolt", new RedisBolt(), 2)//parallelism_hint最后一个参数设置Executor的线程数量
                .setNumTasks(4) //每个组件需要的执行任务数, 默认1个Executor执行1个任务, 现在配置为2个
                .shuffleGrouping("kafkaSpout");

        Config conf = new Config();
        conf.setDebug(false);
        if(args != null && args.length > 0) {
            StormSubmitter.submitTopology("userTopo", conf, builder.createTopology());
        }else{
//            conf.setMaxSpoutPending(100);
//            conf.setMaxTaskParallelism(2); //该选项设置了一个组件最多能够分配的 executor 数(线程数上限)
//            conf.setKryoFactory();
            conf.put(Config.NIMBUS_HOST, "10.0.12.36");
            conf.setNumWorkers(3); //设置workers的进程数量
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("userTopo", conf, builder.createTopology());
        }

    }
}

Storm 的消息可靠性

可靠性API

  • 将原始Tupple和新Tupple一起发送;
  • 调用collector#ack()通知Storm处理完成;
    或者使用更简单的方式是继承BaseBasicBolt会自动完成以上两个操作;

禁用可靠性机制:

  • 将 Config.TOPOLOGY_ACKERS 设置为0
  • 可以通过在 SpoutOutputCollector.emit 方法中省略消息 id 来关闭 spout tuple 的跟踪功能;
  • 可以在发送 tuple 的时候选择发送“非锚定”的(unanchored)tuple。

Storm 拓扑的并行度(parallelism)理解

配置storm的Topo的并行度:

  • work数量(Topo在集群中运行所需的工作进程数), 配置方法: Config#setNumWorkers
  • Executors数量(每个组件需要执行的线程数), 配置方法: TopologyBuilder#setSpout() 或TopologyBuilder#setBolt()
  • Task数量(每个组件需要执行的任务数), 配置方法: ComponentConfigurationDeclare#setNumTasks()

如何修改运行当中Topo的并行度

  • 使用Storm UI
  • 使用命令行 eg: storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
    说明:重新配置拓扑 "mytopology",使得该拓扑拥有 5 个 worker processes,
    另外,配置名为 "blue-spout" 的 spout 使用 3 个 executor,
    配置名为 "yellow-bolt" 的 bolt 使用 10 个 executor。

相关文章

网友评论

      本文标题:Storm使用初步

      本文链接:https://www.haomeiwen.com/subject/qxgjottx.html