离线计算
离线计算就是批量的处理数据,周期性的批量计算数据。
代表技术:
Sqoop –批量导入数据
HDFS –批量存储数据
MapReduce –批量计算数据
Hive –批量计算数据
流式/在线计算
流式/在线计算就是实时的数据传输,数据计算,实时的展示,就跟水龙头一样,不断的流出。
代表技术:
Flume –实时获取数据
Kafka –实时存储数据
Redis –实时结果缓存
Storm/JStorm –实时计算数据
SparkStream –实时计算数据
实时计算和离线计算区别:实时计算是要求在秒级或更小的时间单位内出来结果,离线一般是以小时、天为单位出来结果。
常见企业应用流程:数据源-> Flume(获取数据) -> Kafka(缓存数据) -> Storm(消费数据并计算数据) - > Redis(保存数据,缓存) -> MySQL(持久化) - > 业务部门
什么是Storm?
Storm最初是Backtype公司的团队创建的,这个公司后面给Twitter收购了,这个项目就开源捐给了Apache,Apache Storm是在Eclipse public License下进行孵化开发的。
ApacheStorm是一个免费开源的流式分布式实时计算框架,主要是Clojure和Java语言编写的,Storm能轻松、可靠的处理无界数据流,就像Hadoop对数据进行批处理。
特点
应用场景广泛:可以实时处理消息,更新数据库,持续计算等场景
高伸缩性:Storm的伸缩性可以让Stom每秒处理的消息量达到很高,扩展一个实时计算任务,如果你想要加机器并提高任务的并行度,Storm使用Zookeeper来作为协调机器内的各种配置,使得Storm的集群可以很容易得到扩展。
保证无数据丢失:Storm保证所有数据都被处理,数据传到了肯定能被运算的保证。
异常健壮性:Storm集群非常容易管理,轮流重启节点不影响应用。
容错性好:在消息处理的过程中出现异常,Storm会重试
应用场景
Storm用来实时计算分析源源不断的数据,如同流水线生产,所以Storm可以应用到很多场景,在线机器学习,连续计算等
推荐系统:实时推荐,根据下单或加入购物车,马上在页面下方推荐相关商品。
金融系统:实时分析股票信息数据。
预警系统:根据实时采集的数据,判断是否到了预警的阈值。
网站统计:双十一的实时销量等
Storm核心组件与运行过程
Nimbus:是整个集群的管控核心,是一个主控节点,负责Topology的提交,运行状态的监控,任务重新分配、监控主机故障等工作。
Supervisor:就是领取Nimbus分配的任务与作业、管理Nimbus已经分配的工作进程(Worker)等。
Worker:是Spout和Bolt中具体处理逻辑的进程,一个Worker就是一个进程,一个进程要包含有一个或多个线程。
Execute:任务的执行者之一,一个线程就一个executor,一个线程会处理一个或多个Task。
Task:就是具体要干的任务,一个任务一个Task。
Zookeeper:就是一个监控、监督者,告知Supervisor要领取什么任务,任务执行完成没有等。
元组(Tuple):是消息传递的基本单元,是一个命名值列表,元组中的字段可以是任何的类型对象,Storm使用元组作为其数据模型,元组支持所有基本类型、字符串和字节数组作为字段值,只要实现了类型的序列化接口就可以使用该类型对象进行传递,元组本来应该是Key – Value的Map,但是由于各组件间传递的元组的字段名称已经事先定义好,所以只要按照顺序把元组填入各个Value即可,所以元组是一个Value的List。
流(Stream):流是Storm的核心抽象,是一个无界的元组系列,源源不断的传递元组就组成了流,在分布式环境中并行的进行创建和处理。
拓扑(Topology):是一个逻辑上的网络拓扑结构,是Storm中运行的一个实时应用程序,因为是由各个组装件间的消息流动而形成的逻辑上拓扑,所以也可以理解为是一个jar包。
类似MapReduce作业(job),主要区别在于MapReduce作业最终会完成,而一个拓扑永远运行知道他被kill掉,就是相当于网络节点,会不断地运行、交互数据,除非把骨干网给干掉。
就是数据流来流去,记录下这些数据在各个组件之间的流向,形成的一张逻辑图。
水龙头(Spout):是拓扑的流的来源是,是一个拓扑中产生元数据的组件,通常情况下Spout会从外部数据源中读取数据,然后转换为拓扑内部的元数据。
Spout是可靠的,也可以是不可靠的,如果Storm处理元组失败,可靠的Spout能够重新发送,而不可靠的Spout就尽快忘记发出的元组,另外Spout可以发出超过一个流。
Spout的主要方法是nextTyple(),这个方法会发出一个Tuple到拓扑,如果没有新的元组发出,则简单返回。
Spout的其他方法是ack()和fail(),当Storm检测到一个元组从Spout发出时,ack和fail()会被调用,要么成功完成,通过拓扑,要么未能完成,ack()和fail()只被可靠的Spout调用,也就是数据是发出去了。
ISpout的Java Api是Spout的顶级接口。
转接头(Bolt):在拓扑中所有处理都在Bolt中完成,Bolt是流的处理节点,从一个拓扑接收数据,然后进行处理的组件,Bolt可以完成过滤、业务处理、连接运算、连接和访问数据库等任何操作。
Bolt是一个被动的角色,其接口中有一个execute方法,在接收到消息后会调用这个方法,用户可以在其中执行自己希望的操作
Bolt可以完成简单流的转换,而完成复杂的流转换通常需要多个步骤,因此需要多个Bolt进行完成。
Bolt可以发出超过一个流。
Bolt的Java Api顶级接口是IBolt;
Storm和Hadoop区别
StormHadoop描述
应用名称TopologyJob任务名称
角色NimbusJobTracker任务管理者
SupervisorTaskTracker任务执行小组
WorkerChild小组成员
编程接口Spout/BoltMapper/Reducer小组成员干活角色,数据获取、数据处理、数据收集等
应用场景实时计算离线计算接收的任务类型
处理数据存放内存文件系统执行任务的办公区域
数据处理流式处理(来一些处理一些)批量处理任务处理方式
Storm常用命令
启动Nimbus:bin/stormnimbus &
启动Supervisor:bin/stormsupervisor &
启动UI界面:bin/stormui &
查看运行拓扑:bin/storm list
帮助:bin/storm help
启动Web页面日志查看功能:bin/storm logviewer &
运行jar包:stormjar storm- word-count.jar com.levi.storm.WordCountMain wc
jar包路径 包名+类名 拓扑名
杀死运行的拓扑:bin/storm kill wc
激活指定拓扑:bin/storm activate wc
禁用指定的拓扑Spout:bin/stormdeactivate wc
分组策略
分组是用来定义Stream应该分配给Bolts上的哪个execute来去执行,因为多线程、并发的情况下则会有多个人去干execute的事,Storm内部提供了7种分组策略。
分组的前期是线程数要大于1。
Shuffle Grouping:随机分组、轮询、平均分配、随机分配Stream里面的Tuple,保证每个Bolt接收到的Tuple数量大致相同,这个是比较常用的。
Fileds Grouping:按照字段分组,比如按照user id进行分组,相同的Id Tuple则分到同一个Bolts里面的task,不同的id则分配到不同的Task,这个也是比较常用的。
All Grouping:广播发送,对于每一个Tuple,所有Bolt都会接收到。
Global Grouping:全局分组,这个Tuple被分配id值最低的那个Taks(线程Id最小的)。
Non Grouping:不分组,Stream不关心到底谁会收到他的Tuple,目前这种分组和Shuffle Grouping是一样的效果,在多线程的情况不平均分配。
Direct Grouping:直接分组,这是一种特殊的分组,发送者指定消息接收者的哪个Task来处理,只有声明为direct stream的消息流可以声明这种方法,发射Tuple也要用emitDirect
Local Or Shuffle Grouping:如果目标Bolt有一个或多个Task在同一个工作进程中,Tuple会随机的发送给这些Tasks,否则和普通的Shuffle Grouping行为一致
并发度
并发度是用户指定一个任务,可以被多个线程执行,并发度的数量等于线程execute数量。
Task就是具体的处理业务逻辑对象,一个executor线程可以执行一个或多个tasks,一般默认每个executor只执行一个task,所以我们往往认为task就是执行线程,其实不是。
Task代表最大并发度,一个Component的task数量不改变的,但是一个Component的execute数量会发生改变。
Task数>= execute数量,executor数代表实际并发数。
一个executor一个线程。
Worker(进程) -> executor(线程)
Java API – WordCount
WordSpout
public classWordSpoutextends BaseRichSpout {
/**
*水龙头对象,打开nextTuple()则输水给下一个接收者
*/
private SpoutOutputCollector spoutOutputCollector;
@Override
public void nextTuple() {
spoutOutputCollector.emit(new Values("I am go to shanghai beijing
shanghai"));
try {
Thread.sleep(100000);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public voidopen(Maparg0,
TopologyContext arg1, SpoutOutputCollector arg2) {
spoutOutputCollector = arg2;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
WordSplitBolt
public classWordSplitBoltextendsBaseRichBolt {
//继续输出
private OutputCollector outputCollector;
@Override
public void execute(Tuple tuple) {
String[]strings= tuple.getString(0).split(" ");
//发送给下一级
for (String string : strings) {
outputCollector.emit(new Values(string,1));
}
}
@Override
public voidprepare(Maparg0,
TopologyContext arg1, OutputCollector arg2) {
outputCollector = arg2;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//定义发送给下一级的Key
outputFieldsDeclarer.declare(new Fields("word","num"));
System.err.println("切割好数据!");
}
}
WordCountBolt
public classWordCountBoltextends BaseRichBolt
{
private Map<String, Integer> map = new HashMap<>();
@Override
public void execute(Tuple tuple) {
Stringword = tuple.getString(0);
int num = tuple.getInteger(1);
//去重,统计总数量
if(map.containsKey(word)) {
map.put(word, map.get(word) + num);
}else {
map.put(word,num);
}
System.out.println(Thread.currentThread().getId()+"-----" + word + "---" + map.get(word));
}
@Override
public voidprepare(Maparg0,
TopologyContext arg1, OutputCollector arg2) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
WordCountMain
public class WordMain {
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException, AuthorizationException {
//创建拓扑
TopologyBuildertopologyBuilder = new TopologyBuilder();
//水龙头
topologyBuilder.setSpout("WordSpout", new WordSpout(),1);
//转接头1,切分数据,开启4个线程,分组策略用fieldsGrouping
topologyBuilder.setBolt("WordSplitBolt", new WordSplitBolt(),4)
//按照字段进行分组处理
.fieldsGrouping("WordSpout", new Fields("sentence"));
//转接头2,统计切分后的数据,数据来源于WordSplitBolt
topologyBuilder.setBolt("WordCountBolt", new WordCountBolt(),1)
.fieldsGrouping("WordSplitBolt", new Fields("word"));
//配置对象
Configconfig = new Config();
//提交
if(args.length > 0) {//集群执行
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
}else {//本地
new LocalCluster().submitTopology("WordSplitCount", config, topologyBuilder.createTopology());
}
}
}
网友评论