美文网首页
Thrift: Bidirectional Async RPC

Thrift: Bidirectional Async RPC

作者: XBruce | 来源:发表于2020-10-30 11:16 被阅读0次

    Source on GitHub: http://github.com/JoelPM/BidiThrift

    A reader posted to the thrift-user mailing list wondering if it was possible for a Thrift RPC server to send messages to the client. The responses indicated that this could be accomplished by polling the server for updates or hosting another Thrift server in the client that could receive RPCs from the server (requires opening another port on the client and handling firewall issues). I responded with a technique I'd used for accomplishing something similar and the responses made me think maybe this would be worth writing up and posting some example code.

    Here's my email to the mailing list that describes what I'm doing:

    I think I've done something similar to what you're trying to do, and as long as you can commit to using only async messages it's possible to pull it off without having to start a server on the client to accept RPCs from the server.

    When your RPC is marked as async the server doesn't send a response and the client doesn't try to read one. So, if all your RPC calls from the client to the server are async you have effectively freed up the inbound half of the socket connection. That means that you can use it for receiving async messages from the server - the only catch is that you have to start a new thread to read and dispatch the incoming async RPC calls.

    In a typical Thrift RPC system you'd create a MyService.Processor on your server and a MyService.Client on your client. To do bidirectional async message sending you'll need to go a step further and create a MyService.Client on your server for each client that connects (this can be accomplished by providing your own TProcessorFactory) and then on each client you create a MyService.Processor. (This assumes that you've gone with a generic MyService definition like you described above that has a bunch of optional messages, another option would be to define separate service definitions for the client and server.) With two clients connected the objects in existence would look something like this:

    Server:
    MyService.Processor mainProcessor - handles incoming async RPCs
    MyService.Client clientA - used to send outgoing async RPCs to ClientA
    MyService.Client clientB - used to send outgoing async RPCs to ClientB

    ClientA:
    MyService.Client - used to send messages to Server
    MyService.Processor clientProcessor - used (by a separate thread) to process incoming async RPCs

    ClientB:
    MyService.Client - used to send messages to Server
    MyService.Processor clientProcessor - used (by a separate thread) to process incoming async RPCs

    Hopefully that explains the concept. If you need example code I can try and pull something together (it will be in Java). The nice thing about this method is that you don't have to establish two connections, so you can get around the firewall issues others have mentioned. I've been using this method on a service in production and haven't had any problems. When you have a separate thread in your client running a Processor you're basically blocking on a read, waiting for a message from the server. The benefit of this is that you're notified immediately when the server shuts down instead of having to wait until you send a message and then finding out that the TCP connection was reset.

    Cheers,
    Joel

    Here's an example app that sends messages between clients connected to a server. It's similar to a chat app.

    Thrift Definition

    First, define your Thrift objects and the service. Our object and service are extremely simple:

    | | #!/usr/local/bin/thrift --gen java:beans:hashcode -O ../ |
    | | |
    | | namespace java com.joelpm.bidiMessages.generated |
    | | |
    | | struct Message { |
    | | 1: string clientName, |
    | | 2: string message |
    | | } |
    | | |
    | | service MessageService { |
    | | oneway void sendMessage(Message msg), |
    | | } |

    view rawservice.thrift hosted with ❤ by GitHub

    In this case the service is generic enough that both the client and server will use the same service definition. We could also create a ClientMessageService and a ServerMesageService if we needed different functionality.

    Server

    On the server side, when a client connection is accepted we want to create a MessageService.Client object that we'll use to send messages back to the client. We can accomplish this by creating our own TProcessorFactory and using the getProcessor method as an opportunity to get access to the transport being used between the client and server:

    | | final MessageDistributor messageDistributor = new MessageDistributor(); |
    | | |
    | | new Thread(messageDistributor).start(); |
    | | |
    | | TProcessorFactory processorFactory = new TProcessorFactory(null) { |
    | | @Override |
    | | public TProcessor getProcessor(TTransport trans) { |
    | | messageDistributor.addClient(new MessageServiceClient(trans)); |
    | | return new MessageService.Processor(messageDistributor); |
    | | } |
    | | }; |
    | | |
    | | TServerTransport serverTransport = new TServerSocket(port); |
    | | TServer server = new TThreadPoolServer(processorFactory, serverTransport); |
    | | LOGGER.info("Server started"); |
    | | server.serve(); |

    view rawserver.java hosted with ❤ by GitHub

    As you can see above, we're using the same MessageDistributor for each new processor that we create. Before we create and return the processor we create a new client and add it to the list of clients that the MessageDistributor is aware of. The server is pretty simple and you can take a look at the code to see how the MessageDistributor uses the clients to send messages back.

    Client

    On the client side things are a little more complex because we have to create a separate thread to read incoming messages (this is handled by the TThreadPoolServer on the server side). Here's the class that reads incoming messages:

    | | public class MessageReceiver extends ConnectionRequiredRunnable { |
    | | private final MessageService.Processor processor; |
    | | private final TProtocol protocol; |
    | | |
    | | public MessageReceiver( |
    | | TProtocol protocol, |
    | | MessageService.Iface messageService, |
    | | ConnectionStatusMonitor connectionMonitor) { |
    | | super(connectionMonitor, "Message Receiver"); |
    | | this.protocol = protocol; |
    | | this.processor = new MessageService.Processor(messageService); |
    | | } |
    | | |
    | | @Override |
    | | public void run() { |
    | | connectWait(); |
    | | while (true) { |
    | | try { |
    | | while (processor.process(protocol, protocol) == true) { } |
    | | } catch (TException e) { |
    | | disconnected(); |
    | | } |
    | | } |
    | | } |
    | | } |

    view rawclient.java hosted with ❤ by GitHub

    It extends a utility class called ConnectionRequiredRunnable that provides utility methods for handling server disconnects and reconnects, but on the whole it's pretty simple because we pass in a separate class that actually handles the incoming messages. We also create a MessageService.Client, but we wrap it in a separate thread and use a blocking queue so that other components in the system wanting to send a message can do so very quickly - or at least, have the message handed off for delivery extremely quickly.

    Here's the class that handles our message sending:

    | | public class MessageSender extends ConnectionRequiredRunnable { |
    | | private final MessageService.Client client; |
    | | private final BlockingQueue<Message> msgSendQueue; |
    | | |
    | | public MessageSender( |
    | | TProtocol protocol, |
    | | ConnectionStatusMonitor connectionMonitor) { |
    | | super(connectionMonitor, "Message Sender"); |
    | | this.client = new MessageService.Client(protocol); |
    | | this.msgSendQueue = new LinkedBlockingQueue<Message>(); |
    | | } |
    | | |
    | | public void send(Message msg) { |
    | | msgSendQueue.add(msg); |
    | | } |
    | | |
    | | @Override |
    | | public void run() { |
    | | connectWait(); |
    | | while (true) { |
    | | try { |
    | | Message msg = msgSendQueue.take(); |
    | | try { |
    | | client.sendMessage(msg); |
    | | } catch (TException e) { |
    | | // The message isn't lost, but it could end up being sent out of |
    | | // order - not ideal. |
    | | msgSendQueue.add(msg); |
    | | disconnected(); |
    | | } |
    | | } catch (InterruptedException e) { |
    | | // Thread will be interrupted if connection is lost, we should wait |
    | | // for reconnection if that happens. |
    | | connectWait(); |
    | | } |
    | | } |
    | | } |
    | | } |

    view rawMessageSender.java hosted with ❤ by GitHub

    This class also extends ConnectionRequiredRunnable since it can't send messages without a connection. Here's the main method of the Client that establishes the connection to the server:

    | | this.transport = new TSocket(server, port); |
    | | this.protocol = new TBinaryProtocol(transport); |
    | | |
    | | this.connectionMonitor = new ConnectionStatusMonitor(transport); |
    | | |
    | | this.sender = new MessageSender(protocol, connectionMonitor); |
    | | this.receiver = new MessageReceiver(protocol, messageHandler, connectionMonitor); |
    | | |
    | | new Thread(sender).start(); |
    | | new Thread(receiver).start(); |
    | | |
    | | this.connectionMonitor.tryOpen(); |

    view rawclient.java hosted with ❤ by GitHub

    It actually looks pretty simple since all the different pieces are organized in separate classes. The ConnectionStatusMonitor class is responsible for opening the actual connection and notifying the MessageSender and MessageReceiver when the connection has been established, at which point they'll start sending and receiving messages. If the server dies both of those processes will stop and wait until a connection has been re-established (a task the ConnectionStatusMonitor is responsible for). Here's sample output from the server:

    | | 2009-04-03 16:28:44,029 INFO main com.joelpm.bidiMessages.server.Server:43 - Server started |
    | | 2009-04-03 16:28:45,814 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:36 - Added client at 127.0.0.1 |
    | | 2009-04-03 16:28:45,822 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client1, message:Hello there!) |
    | | 2009-04-03 16:28:45,823 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client1, message:Message 0) |
    | | 2009-04-03 16:28:46,807 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client1, message:Message 1) |
    | | 2009-04-03 16:28:46,864 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:36 - Added client at 127.0.0.1 |
    | | 2009-04-03 16:28:46,895 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client2, message:Hello there!) |
    | | 2009-04-03 16:28:46,897 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client2, message:Message 0) |
    | | 2009-04-03 16:28:47,805 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client1, message:Message 2) |
    | | 2009-04-03 16:28:47,885 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client2, message:Message 1) |
    | | 2009-04-03 16:28:48,806 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client1, message:Message 3) |
    | | 2009-04-03 16:28:48,885 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client2, message:Message 2) |
    | | 2009-04-03 16:28:49,806 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client1, message:Message 4) |
    | | 2009-04-03 16:28:49,885 INFO pool-1-thread-2 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client2, message:Message 3) |
    | | ^C2009-04-03 16:28:50,807 INFO pool-1-thread-1 com.joelpm.bidiMessages.server.MessageDistributor:66 - Adding message to queue: |
    | | Message(clientName:client1, message:Message 5) |

    view rawserver.bash hosted with ❤ by GitHub

    And here's output from client1:

    | | $ java -jar Client/target/BidiMessages.Client-0.9-jar-with-dependencies.jar client1 localhost 10101 |
    | | 2009-04-03 16:28:45,792 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Receiver waiting for connection to be established. |
    | | 2009-04-03 16:28:45,792 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Sender waiting for connection to be established. |
    | | 2009-04-03 16:28:45,803 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:48 - Message Sender notified of connection, resuming execution |
    | | 2009-04-03 16:28:45,806 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:48 - Message Receiver notified of connection, resuming execution |
    | | Got msg: Message(clientName:client1, message:Hello there!) |
    | | Got msg: Message(clientName:client1, message:Message 0) |
    | | Got msg: Message(clientName:client1, message:Message 1) |
    | | Got msg: Message(clientName:client2, message:Hello there!) |
    | | Got msg: Message(clientName:client2, message:Message 0) |
    | | Got msg: Message(clientName:client1, message:Message 2) |
    | | Got msg: Message(clientName:client2, message:Message 1) |
    | | Got msg: Message(clientName:client1, message:Message 3) |
    | | Got msg: Message(clientName:client2, message:Message 2) |
    | | Got msg: Message(clientName:client1, message:Message 4) |
    | | Got msg: Message(clientName:client2, message:Message 3) |
    | | Got msg: Message(clientName:client1, message:Message 5) |
    | | 2009-04-03 16:28:51,146 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:30 - Message Receiver detected a disconnect from the server. |
    | | 2009-04-03 16:28:51,148 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Receiver waiting for connection to be established. |
    | | 2009-04-03 16:28:51,149 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Sender waiting for connection to be established. |

    view rawclient1.bash hosted with ❤ by GitHub

    And here's from client2:

    | | $ java -jar Client/target/BidiMessages.Client-0.9-jar-with-dependencies.jar client2 localhost 10101 |
    | | 2009-04-03 16:28:46,851 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Sender waiting for connection to be established. |
    | | 2009-04-03 16:28:46,854 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Receiver waiting for connection to be established. |
    | | 2009-04-03 16:28:46,867 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:48 - Message Sender notified of connection, resuming execution |
    | | 2009-04-03 16:28:46,879 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:48 - Message Receiver notified of connection, resuming execution |
    | | Got msg: Message(clientName:client2, message:Hello there!) |
    | | Got msg: Message(clientName:client2, message:Message 0) |
    | | Got msg: Message(clientName:client1, message:Message 2) |
    | | Got msg: Message(clientName:client2, message:Message 1) |
    | | Got msg: Message(clientName:client1, message:Message 3) |
    | | Got msg: Message(clientName:client2, message:Message 2) |
    | | Got msg: Message(clientName:client1, message:Message 4) |
    | | Got msg: Message(clientName:client2, message:Message 3) |
    | | Got msg: Message(clientName:client1, message:Message 5) |
    | | 2009-04-03 16:28:51,146 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:30 - Message Receiver detected a disconnect from the server. |
    | | 2009-04-03 16:28:51,147 INFO Thread-3 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Receiver waiting for connection to be established. |
    | | 2009-04-03 16:28:51,147 INFO Thread-2 com.joelpm.bidiMessages.client.ConnectionRequiredRunnable:42 - Message Sender waiting for connection to be established. |

    view rawclient2.bash hosted with ❤ by GitHub

    You can see that client1 and client2 paused when the server was terminated. Had the server restarted the clients would have reconnected and begun sending messages again.

    The source is built using Maven and requires that you've installed libthrift.jar in your local maven repo (see the README for details). I'm also including a tgz with the compiled jar files for those who can't build the source.

    Source: BidiMessages.tgz Jars: BidiMessagesJars.tgz

    相关文章

      网友评论

          本文标题:Thrift: Bidirectional Async RPC

          本文链接:https://www.haomeiwen.com/subject/uoarvktx.html