美文网首页
storm学习第三天-storm与其他组件交互

storm学习第三天-storm与其他组件交互

作者: 小王同学加油 | 来源:发表于2017-08-11 16:13 被阅读593次

    storm如何把数据插入到elasticsearch

    1 storm提供的例子

    https://github.com/apache/storm/tree/master/external/storm-elasticsearch

    代码:

    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-elasticsearch</artifactId>
    <version>1.1.0</version>
    </dependency>

    EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
    EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
    EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
    

    问题:依赖低版本的Elasticsearch 这个问题没有解决
    查看最新代码已经修复了 没提供jar包
    需要重新编译storm1.10代码 直接放弃了 采用下面方法

    方法2 elasticsearch-hadoop

    image.png

    疑问?
    仅仅支持hadoop吗 storm支持吗我要的是storm?

    ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,我们既可以把HDFS的数据导入到ES里面做分析,也可以将es数据导出到HDFS上做备份,归档,其中值得一提的是ES-Hadoop全面的支持了Spark框架,
    其中包括Spark(五角星那个上面中间位置)

    • 支持Hive(像蜜蜂的那个下面最左位置)
    • 支持Cascading(有五个竖线那个 上面最右位置)
      Cascading is the proven application development platform for
      building data applications on Hadoop.
    • Storm(闪电的那个)
    • 当然还有标准的MapReduce,
      无论用那一个框架集成ES,都是非常简洁的。
    image.png

    疑问:
    为了使用这个jar 是否引用一系列相关的jar呀

    经过验证不需要引入hadoop 但是json和http引入
    折腾不起

         <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.1</version>
        </dependency>
           
       <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.10</version>
      </dependency>
           
           
            <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>5.5.1</version>
        </dependency>
        
        <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.3</version>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl -->
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.8.8</version>
        </dependency>
        
       <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-core-asl</artifactId>
            <version>1.8.8</version>
        </dependency>
        
        <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-jaxrs</artifactId>
        <version>1.8.8</version>
    </dependency>   
        <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-xc</artifactId>
        <version>1.8.8</version>
    </dependency>
    
        
         <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.0</version>
        </dependency>
        
                      
          
       <!--
             <dependency>
              <groupId>org.apache.storm</groupId>
              <artifactId>storm-elasticsearch</artifactId>
              <version>1.1.0</version>
           </dependency>
           
           
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-hadoop</artifactId>
                <version>5.5.1</version>
        </dependency>
        
         <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-storm</artifactId>
                <version>5.5.1</version>
          </dependency>
          -->   
           <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>5.5.1</version>
            </dependency>
    

    代码实现

    配置文件:
    <dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>5.5.1</version>
    </dependency>

    方法3 elasticsearch 官方提供的例子

    <dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-storm</artifactId>
    <version>5.5.1</version>
    </dependency>

    阅读代码:
    关键类:TransportClient

    Elasticsearch uses standard RESTful APIs and JSON.
    TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
       .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
    SearchResponse sr = client.prepareSearch()
     .setQuery(QueryBuilders.matchQuery("message", "myProduct"))
     .addAggregation(AggregationBuilders.terms("top_10_states")
     .field("state").size(10))
     .execute().actionGet();
    client.close();
    

    es Elasticsearch from Storm

    image.png
    http://blog.csdn.net/sunnyyoona/article/details/52860861

    https://www.elastic.co/guide/en/elasticsearch/guide/current/dynamic-mapping.html
    https://www.elastic.co/guide/en/elasticsearch/reference/2.4/dynamic-field-mapping.html#date-detection
    参考

    storm连接kafka

    //重点
    Storm 如何来封装kafka接口
    class:DynamicPartitionConnections

    [storm数据怎样输出到elasticsearch]

    相关文章

      网友评论

          本文标题:storm学习第三天-storm与其他组件交互

          本文链接:https://www.haomeiwen.com/subject/serlrxtx.html