美文网首页
Storm框架:Storm整合springboot

Storm框架:Storm整合springboot

作者: 哦00 | 来源:发表于2018-11-25 20:45 被阅读0次

    我们知道Storm本身是一个独立运行的分布式流式数据处理框架,Springboot也是一个独立运行的web框架。那么如何在Strom框架中集成Springboot使得我们能够在Storm开发中运用Spring的Ioc容器及其他如Spring Jpa等功能呢?我们先来了解以下概念:

    Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。

    SpringApplication:是配置Spring应用上下文的起点。通过调用SpringApplication.run()方法它将创建ApplicationContext实例,这是我们能够使用Ioc容器的主要BeanFactory。之后Spring将会加载所有单例模式的beans,并启动后台运行的CommandLineRunner beans等。

    ApplicationContextAware:这是我们能够在普通Java类中调用Spring容器里的beans的关键接口。

    实现原理

    Storm框架中的每个Spout和Bolt都相当于独立的应用,Strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。我们可以把初始化Spring应用的操作放在这里,这样可以保证每个spout/bolt应用在后续执行过程中都能获取到Spring的ApplicationContext,有了ApplicationContext实例对象,Spring的所有功能就都能用上了。

    Spout.open方法实现

    @Overridepublicvoidopen(Mapmap, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector){//启动Springboot应用SpringStormApplication.run();this.map=map;this.topologyContext = topologyContext;this.spoutOutputCollector = spoutOutputCollector;}

    Bolt.prepare方法实现

    @Overridepublicvoidprepare(Mapmap, TopologyContext topologyContext, OutputCollector outputCollector){//启动Springboot应用SpringStormApplication.run();this.map=map;this.topologyContext = topologyContext;this.outputCollector = outputCollector;}

    SpringStormApplication启动类

    @SpringBootApplication@ComponentScan(value ="com.xxx.storm")publicclassSpringStormApplication{/**    * 非工程启动入口,所以不用main方法    * 加上synchronized的作用是由于storm在启动多个bolt线程实例时,如果Springboot用到Apollo分布式配置,会报ConcurrentModificationException错误    * 详见:https://github.com/ctripcorp/apollo/issues/1658    *@paramargs    */publicsynchronizedstaticvoidrun(String ...args) {        SpringApplication app =newSpringApplication(SpringStormApplication.class);//我们并不需要web servlet功能,所以设置为WebApplicationType.NONEapp.setWebApplicationType(WebApplicationType.NONE);//忽略掉banner输出app.setBannerMode(Banner.Mode.OFF);//忽略Spring启动信息日志app.setLogStartupInfo(false);        app.run(args);    }}

    与我们传统的Springboot应用启动入口稍微有点区别,主要禁用了web功能,看下正常的启动方式:

    @SpringBootApplication@ComponentScan(value ="com.xxx.web")public class PlatformApplication {publicstaticvoidmain(String[] args) {SpringApplication.run(PlatformApplication.class, args);    }}

    在spout/bolt中调用了SpringStormApplication.run方法后,我们还需要能够拿到ApplicationContext容器对象,这时候我们还需要实现ApplicationContextAware接口,写个工具类BeanUtils:

    @ComponentpublicclassBeanUtilsimplementsApplicationContextAware{privatestaticApplicationContext applicationContext =null;@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{if(BeanUtils.applicationContext ==null) {            BeanUtils.applicationContext = applicationContext;        }    }publicstaticApplicationContextgetApplicationContext(){returnapplicationContext;    }publicstaticObjectgetBean(String name){returngetApplicationContext().getBean(name);    }publicstaticTgetBean(Class<T> clazz){returngetApplicationContext().getBean(clazz);    }publicstaticTgetBean(String name, Class<T> clazz){returngetApplicationContext().getBean(name, clazz);    }}

    通过@Component注解使得Spring在启动时能够扫描到该bean,因为BeanUtils实现了ApplicationContextAware接口,Spring会在启动成功时自动调用BeanUtils.setApplicationContext方法,将ApplicationContext对象保存到工具类的静态变量中,之后我们就可以使用BeanUtils.getBean()去获取Spring容器中的bean了。

    写个简单例子

    在FilterBolt的execute方法中获取Spring bean

    @Overridepublicvoidexecute(Tuple tuple){    FilterService filterService = (FilterService) BeanUtils.getBean("filterService");    filterService.deleteAll();}

    定义FilterService类,这时候我们就可以使用Spring的相关注解,自动注入,Spring Jpa等功能了。

    @Service("filterService")publicclassFilterService{@AutowiredUserRepository userRepository;publicvoiddeleteAll(){        userRepository.deleteAll();    }}

    将storm应用作为Springboot工程的一个子模块

    工程主目录的pom文件还是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动Strom的topology应用会有一个日志包依赖冲突。

    SLF4J:Class path contains multiple SLF4J bindings.SLF4J:Found bindingin[jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:Found bindingin[jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:Seehttp://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J:Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

    我们需要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将Springboot的相关日志包排除掉,如下:

    org.springframework.bootspring-boot-starterorg.apache.logging.log4jlog4j-to-slf4j2ch.qos.logbacklogback-classic2                                                                                                                                                                                                              欢迎工作一到五年的Java工程师朋友们加入Java群: 891219277

    群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

    相关文章

      网友评论

          本文标题:Storm框架:Storm整合springboot

          本文链接:https://www.haomeiwen.com/subject/tqjmqqtx.html