客户端:用来发起DRPC的调用
DRPC Server:实现与客户端的对接,传递参数给Storm,返回结果给客户端。
DPRCSpout: 用于连接DRPC Server和Topology,传递参数给Topology。
Topology:实现实际的函数功能。
ReturnResults:用于连接DRPC Server和Topology,用于返回参数给DRPC Server。
流程描述如下:
客户端发送函数的参数给DRPC Server
DRPC Server生成发送函数调用的相关信息给DRPC Spout,包括请求ID,请求参数,返回结果的信息。
DRPC Spout发送["args", "return-info"]给Topology的第一个Bolt,其中args代表请求参数,return-info代表返回需要的信息。
Topology最后一个Bolt发送["result", "return-info"]给ReturnResults,其中result代表返回结果,return-info代表返回需要的信息。
ReturnResults将结果和返回需要的信息传递给DRPC Server
DRPC Server将结果返回给DRPC客户端。
Storm DPRC API介绍
我们先看一下DRPC客户端的API:
假设DRPC服务器的地址为172.16.32.105,函数名为“exclamation”,输入为“hello world”
DRPCClient的java用法
package opzoon;
import java.util.Map;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.DRPCClient;
public class MyDrpcClient{
public static void main(String[] args) throws Exception{
Map config = Utils.readDefaultConfig();
DRPCClient client = new DRPCClient(config,"172.16.32.105", 3772);
try{
String result = client.execute("exclamation","hello world");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
这里说明一下,storm1.0.0之前的的DRPCClient构造函数是两个参数的,
DRPCClient client=newDRPCClient("172.16.32.105", 3772);
storm1.0.0之后的的DRPCClient构造函数是三个参数的,
Map config = Utils.readDefaultConfig();
DRPCClient client = new DRPCClient(config, "172.16.32.105", 3772);
DRPCClient的curl用法
curl -d "hello world" "http://172.16.32.105:3774/drpc/exclamation"
然后再看一下DRPC服务器端的API:
package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.DRPCSpout;
import org.apache.storm.drpc.ReturnResults;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class ManualDRPC {
public static class ExclamationBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result", "return-info"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String arg = tuple.getString(0);
Object retInfo = tuple.getValue(1);
collector.emit(new Values(arg + "!!!", retInfo));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
DRPCSpout spout = new DRPCSpout("exclamation");
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar("exclaim", conf, builder.createTopology());
}
}
网友评论