在Storm集群上开启DRPC功能,
基于Storm的1.0.1版本,
并且执行简单的例子测试。
1.DRPC概念
DRPC就是分布式远程过程调用。
Storm里面引入DRPC主要是利用storm的实时计算能力,
来并行化CPU intensive的计算。
DRPC的Storm topology以函数的参数流作为输入,
而把这些函数调用的返回值作为topology的输出流。
DRPC其实不能算是Storm本身的一个特性,
它是通过组合Storm的原语spout,bolt,topology而成的一种模式(pattern)。
2.DRPC工作机制
Distributed RPC是由一个DPRC Server协调的,
Storm自带了一个称作LinearDRPCTopologyBuilder的topology builder,
它把实现DRPC的几乎所有步骤都自动化了。
DRPC服务器协调机制:
- 接收一个RPC请求;
- 发送请求到Storm topology;
- 从Storm topology接收结果;
- 把结果发回给等待的客户端。
从客户端的角度来看一个DRPC调用,
跟一个普通的RPC调用没有任何区别。
下面是客户端代码展示了如何调用RPC的
exclaimation方法,方法的参数是hello:
DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("exclaimation", "hello");
DRPC的工作流大致是这样的:
客户端给DRPC服务器发送要执行的方法的名字,
以及这个方法的参数。
实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。
每个函数调用被DRPC服务器标记了一个唯一的id。
这个topology然后计算结果,
在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,
并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。
DRPC服务器用那个唯一id来跟等待的客户端匹配上,
唤醒这个客户端并且把结果发送给它。
3.配置DPRC Server
修改storm.yaml文件,增加drpc的配置:
drpc.servers:
- "zdh-237"
drpc.childopts: "-Xmx1024m"
其他参数drpc.port, drpc.http.port等使用默认值即可,
参考默认值如下:
drpc.port:
3772
drpc.worker.threads:
64
drpc.max_buffer_size:
1048576
drpc.queue.size:
128
drpc.invocations.port:
3773
drpc.invocations.threads:
64
drpc.request.timeout.secs:
600
drpc.childopts:
"-Xmx768m"
drpc.http.port:
3774
drpc.https.port:
-1
drpc.https.keystore.password:
""
drpc.https.keystore.type:
"JKS"
drpc.http.creds.plugin:
backtype.storm.security.auth.DefaultHttpCredentialsPlugin
drpc.authorizer.acl.filename:
"drpc-auth-acl.yaml"
drpc.authorizer.acl.strict:
false
4.启动DPRC Server
使用如下命令启动DRPC进程:
storm drpc > drpc.log 2>&1 &
5.使用本地集群测试
由于命令无入参即没有topo名字,
无对外端口无法提供客户端调用,
在BasicDRPCTopology中启动后调用本地集群,
仅作为测试场景使用。
进入Storm目录,提交处理drpc的topo:
cd /home/stormna/apache-storm-1.0.1/examples/storm-starter/
storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.BasicDRPCTopology
在输出的日志中可以看到如下结果,
结果是单词后面被添加了感叹号!
8043 [Thread-26-bolt2-executor[6 6]] INFO o.a.s.l.ThriftAccessLogger - Request ID: 3 access from: principal: operation: result
Result for "hello": hello!
8054 [Thread-26-bolt2-executor[6 6]] INFO o.a.s.l.ThriftAccessLogger - Request ID: 3 access from: principal: operation: result
Result for "goodbye": goodbye!
6.使用真实集群测试
基于真实集群的DRPC可以提供给外部客户端调用,
先提交处理drpc的topo,指定topo名称为exclamationDrpc:
cd /home/stormna/apache-storm-1.0.1/examples/storm-starter
storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.BasicDRPCTopology exclamationDrpc
7.客户端调用
在BasicDRPCTopology中提供的drpc方法名为exclamation,
效果返回结果是在单词后面被添加的感叹号。
使用下面写客户端代码进行调用测试。
7.1.适配storm-core-0.9.6.jar的客户端代码
注意exclamation名称不要拼错,
否则客户端会一直没有响应:
public class DRPCClientTest096 {
public static void main(String[] args) throws Exception {
String drpcHost = "10.43.159.237";
int drpcPort = 3772;
DRPCClient client = new DRPCClient(drpcHost, drpcPort);
String input="hello1";
String result = client.execute("exclamation",input );
System.out.println("input:"+input+", result:"+result);
}
}
执行DRPCClientTest096类中的main方法,
调用drpc的exclamation函数,传入参数hello1:
控制台输出结果:
input:hello1, result:hello1!
7.2.适配storm-core-1.0.1.jar的客户端代码
注意调用需要配置Storm参数,
和上面的有点区别的。
public class DRPCClientTest101 {
public static void main(String[] args) throws Exception {
String drpcHost = "10.43.159.237";
int drpcPort = 3772;
Properties pro = new Properties();
// InputStream inStream = new FileInputStream("stormclient.conf");
// 读取storm-core-1.0.1.jar里面 的defaults.yaml配置文件
InputStream inStream = ClassLoader.getSystemResourceAsStream("defaults.yaml");
pro.load(inStream);
inStream.close();
//由于Properties加载的值带了引号,需要重新设置一下,或者手动去掉前后的引号
pro.setProperty("storm.thrift.transport", "org.apache.storm.security.auth.SimpleTransportPlugin");
DRPCClient client = new DRPCClient(pro, drpcHost, drpcPort);
String input = "hello2";
String result = client.execute("exclamation", input);
System.out.println("input:" + input + ", result:" + result);
}
}
执行DRPCClientTest101类中的main方法,
调用drpc的exclamation函数,传入参数hello2:
控制台输出结果:
input:hello2, result:hello2!
8.参考文章
StormDRPC 概念以及简单例子测试
storm DRPC问题
Storm高级原语(二) — DRPC
storm自带例子详解 (二)——BasicDRPCTopology
网友评论