美文网首页
storm drpc

storm drpc

作者: 草丛螳螂 | 来源:发表于2017-07-03 17:52 被阅读174次

    客户端:用来发起DRPC的调用

    DRPC Server:实现与客户端的对接,传递参数给Storm,返回结果给客户端。

    DPRCSpout: 用于连接DRPC Server和Topology,传递参数给Topology。

    Topology:实现实际的函数功能。

    ReturnResults:用于连接DRPC Server和Topology,用于返回参数给DRPC Server。

    流程描述如下:

    客户端发送函数的参数给DRPC Server

    DRPC Server生成发送函数调用的相关信息给DRPC Spout,包括请求ID,请求参数,返回结果的信息。

    DRPC Spout发送["args", "return-info"]给Topology的第一个Bolt,其中args代表请求参数,return-info代表返回需要的信息。

    Topology最后一个Bolt发送["result", "return-info"]给ReturnResults,其中result代表返回结果,return-info代表返回需要的信息。

    ReturnResults将结果和返回需要的信息传递给DRPC Server

    DRPC Server将结果返回给DRPC客户端。

    Storm DPRC API介绍

    我们先看一下DRPC客户端的API:

    假设DRPC服务器的地址为172.16.32.105,函数名为“exclamation”,输入为“hello world”

    DRPCClient的java用法

    package opzoon;

    import java.util.Map;

    import org.apache.storm.utils.Utils;

    import org.apache.storm.utils.DRPCClient;

    public class MyDrpcClient{

        public static void main(String[] args) throws Exception{

            Map config = Utils.readDefaultConfig();

            DRPCClient client = new DRPCClient(config,"172.16.32.105", 3772);

            try{

                String result = client.execute("exclamation","hello world");

            } catch (Exception e) {

                System.out.println(e.getMessage());

            }

        }

    }

    这里说明一下,storm1.0.0之前的的DRPCClient构造函数是两个参数的,

    DRPCClient client=newDRPCClient("172.16.32.105", 3772);

    storm1.0.0之后的的DRPCClient构造函数是三个参数的,

    Map config = Utils.readDefaultConfig();

    DRPCClient client = new DRPCClient(config, "172.16.32.105", 3772);

    DRPCClient的curl用法

    curl -d "hello world" "http://172.16.32.105:3774/drpc/exclamation"


    然后再看一下DRPC服务器端的API:

    package org.apache.storm.starter;

    import org.apache.storm.Config;

    import org.apache.storm.StormSubmitter;

    import org.apache.storm.drpc.DRPCSpout;

    import org.apache.storm.drpc.ReturnResults;

    import org.apache.storm.topology.BasicOutputCollector;

    import org.apache.storm.topology.OutputFieldsDeclarer;

    import org.apache.storm.topology.TopologyBuilder;

    import org.apache.storm.topology.base.BaseBasicBolt;

    import org.apache.storm.tuple.Fields;

    import org.apache.storm.tuple.Tuple;

    import org.apache.storm.tuple.Values;

    public class ManualDRPC {

        public static class ExclamationBolt extends BaseBasicBolt {

            @Override

            public void declareOutputFields(OutputFieldsDeclarer declarer) {

                 declarer.declare(new Fields("result", "return-info"));

            }

            @Override

            public void execute(Tuple tuple, BasicOutputCollector collector) {

                String arg = tuple.getString(0);

                Object retInfo = tuple.getValue(1);

                collector.emit(new Values(arg + "!!!", retInfo));

            }

        }

        public static void main(String[] args) throws Exception {

            TopologyBuilder builder = new TopologyBuilder();

            DRPCSpout spout = new DRPCSpout("exclamation");

            builder.setSpout("drpc", spout);

            builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");

            builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");

            Config conf = new Config();

            StormSubmitter.submitTopologyWithProgressBar("exclaim", conf, builder.createTopology());

        }

    }

    相关文章

      网友评论

          本文标题:storm drpc

          本文链接:https://www.haomeiwen.com/subject/dsogcxtx.html