美文网首页
nanomsg使用记录--java版

nanomsg使用记录--java版

作者: yanshaowen | 来源:发表于2018-03-26 23:02 被阅读0次

    1 PAIR 模式

    Pair.java

    package pair;
    
    import nanomsg.exceptions.IOException;
    import nanomsg.pair.PairSocket;
    
    /**
     * Created by wenshao on 2018/3/27.
     */
    public class Pair {
        private static String url = "tcp://127.0.0.1:7789";
    
        public static void main(String[] args) {
            node0();
            node1();
        }
    
        private static void node0() {
            PairSocket socket = new PairSocket();
            socket.connect(url);
            send(socket);
            recv(socket, "node0");
        }
    
        private static void node1() {
            PairSocket socket = new PairSocket();
            socket.bind(url);
            send(socket);
            recv(socket, "node1");
        }
    
        private static void recv(final PairSocket socket, final String nodeName) {
            socket.setRecvTimeout(2000);   // 设置执行recv的超时时间
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            System.out.println(nodeName + ":" + socket.recvString());  // 阻塞socket,直到超时或者有响应
                            Thread.sleep(1000);
                        } catch (IOException e) {       // 忽略超时
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    
        private static void send(final PairSocket socket) {
            socket.setSendTimeout(1100);        // 设置执行send的超时时间
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            socket.send("hello");
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
    
                }
            }).start();
        }
    }
    
    
    

    2 PAIR 模式

    Pair.java

    package pipeline;
    
    import nanomsg.exceptions.IOException;
    import nanomsg.pipeline.PushSocket;
    import nanomsg.pipeline.PullSocket;
    
    /**
     * Created by wenshao on 2018/3/27.
     */
    public class Pipeline {
        private static String url = "tcp://127.0.0.1:7789";
    
        public static void main(String[] args) {
            node1();
            node0();
        }
    
        private static void node1() {
            final PullSocket socket = new PullSocket();
            socket.bind(url);
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            System.out.println(socket.recvString());  // 阻塞socket,直到超时或者有响应
                            Thread.sleep(1000);
                        } catch (IOException e) {       // 忽略超时
                            // e.printStackTrace();
                        } catch (InterruptedException e) {
                            // e.printStackTrace();
                        }
                    }
                }
            }).start();
    
        }
    
        private static void node0() {
            final PushSocket socket = new PushSocket();
            socket.connect(url);
            socket.send("hello");
        }
    
    }
    
    

    3 REQREP模式

    package reqrep;
    
    import nanomsg.exceptions.IOException;
    import nanomsg.reqrep.RepSocket;
    import nanomsg.reqrep.ReqSocket;
    
    /**
     * Created by wenshao on 2018/3/29.
     */
    public class ReqRep {
        private static String url = "tcp://127.0.0.1:7789";
    
        public static void main(String[] args) {
            node1();
            node0();
        }
        private static void node1() {
            final RepSocket socket = new RepSocket();
            socket.bind(url);
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            System.out.println( "node1:" + socket.recvString());  // 阻塞socket,直到超时或者有响应
                            Thread.sleep(1000);
                            socket.send("world");
    
                        } catch (IOException e) {       // 忽略超时
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    
        private static void node0() {
            final ReqSocket socket = new ReqSocket();
            socket.connect(url);
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            socket.send("hello");
                            Thread.sleep(1000);
                            System.out.println( "node0:" + socket.recvString());  // 阻塞socket,直到超时或者有响应
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            // e.printStackTrace();
                        }
                    }
    
                }
            }).start();
        }
    
    }
    
    

    4 PUBSUB 模式

    package pubsub;
    
    import nanomsg.exceptions.IOException;
    import nanomsg.pubsub.PubSocket;
    import nanomsg.pubsub.SubSocket;
    
    /**
     * Created by wenshao on 2018/3/29.
     */
    public class PubSub {
        private static String url = "tcp://127.0.0.1:7789";
    
        public static void main(String[] args) {
            service();
            client0();
            client1();
            client2();
        }
    
        private static void client0() {
            client("client0");
        }
    
        private static void client1() {
            client("client1");
    
        }
    
        private static void client2() {
            client("client2");
        }
    
        private static void client(final String name) {
            final SubSocket socket = new SubSocket();
            socket.connect(url);
            socket.subscribe("test");   // jnanomsg中 频道的匹配是匹配recv为^test的消息
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try{
                            System.out.println(name + ":" + socket.recvString());
                        }catch (IOException e) {       // 忽略超时
                            //e.printStackTrace();
                        }
                    }
                }
            }).start();
    
    
    
        }
    
        private static void service() {
            final PubSocket socket = new PubSocket();
            socket.bind(url);
    
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try{
                            socket.send("test1 msg");
                            Thread.sleep(2000);
                        }catch (IOException e) {       // 忽略超时
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }
    

    5 SURVEY模式

    jnanomsg不支持,但是可以自己扩展。
    Surver.java

    package survey;
    import nanomsg.Nanomsg;
    import nanomsg.Socket;
    
    /**
     * Created by wenshao on 2018/3/29.
     */
    public class Surver extends  Socket{
        public Surver() {
            super(Nanomsg.constants.AF_SP,((Integer)Nanomsg.symbols.get("NN_SURVEYOR")).intValue());
        }
    }
    

    Respondent.java

    package survey;
    import nanomsg.Nanomsg;
    import nanomsg.Socket;
    
    /**
     * Created by wenshao on 2018/3/29.
     */
    public class Respondent extends  Socket{
        public Respondent() {
            super(Nanomsg.constants.AF_SP,((Integer)Nanomsg.symbols.get("NN_RESPONDENT")).intValue());
        }
    }
    
    

    SurveyTest.java

    package survey;
    
    import nanomsg.async.AsyncSocket;
    import nanomsg.async.IAsyncCallback;
    import nanomsg.exceptions.IOException;
    
    /**
     * Created by wenshao on 2018/3/29.
     */
    public class SurveyTest {
        private static String url = "tcp://127.0.0.1:3000";
    
        public static void main(String[] args) {
            service();
             client();
        }
    
        public static void service() {
            final Surver service = new Surver();
            service.bind(url);
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            System.out.println("service:"+service.recvString());
                            Thread.sleep(1);
                        } catch (IOException e) {       // 忽略超时
                            // e.printStackTrace();
                        } catch (InterruptedException e) {
                            // e.printStackTrace();
                        }
                    }
                }
            }).start();
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            service.send("sup");
                            Thread.sleep(2000);
                        } catch (IOException e) {       // 忽略超时
                            // e.printStackTrace();
                        } catch (InterruptedException e) {
                            // e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    
    
        public static void client() {
            final Respondent client = new Respondent();
            client.connect(url);
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            client.recvString();
                            Thread.sleep(1);
                            client.send("y");
                        } catch (IOException e) {       // 忽略超时
                            // e.printStackTrace();
                        } catch (InterruptedException e) {
                            // e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    
    
    }
    
    

    5 BUS模式

    package bus;
    import nanomsg.async.AsyncSocket;
    import nanomsg.async.IAsyncCallback;
    import nanomsg.bus.BusSocket;
    import nanomsg.exceptions.IOException;
    
    /**
     * Created by wenshao on 2018/3/29.
     */
    public class Bus {
        private static String url0 = "tcp://127.0.0.1:7780";
        private static String url1 = "tcp://127.0.0.1:7781";
        private static String url2 = "tcp://127.0.0.1:7782";
        private static String url3 = "tcp://127.0.0.1:7783";
    
        public static void main(String[] args) {
            BusSocket s0 = node("node0", url0, new String[]{url1, url2, url3});
            BusSocket s1 = node("node1", url1, new String[]{url2, url3});
            BusSocket s2 = node("node2", url2, new String[]{url3});
            BusSocket s3 = node("node3", url3, new String[]{});
    
            s0.send("client0 send a");
            s1.send("client1 send a");
            s2.send("client2 send a");
            s3.send("client3 send a");
    
        }
    
        private static BusSocket node(final String name, String self, String[] other) {
            final BusSocket socket = new BusSocket();
            socket.bind(self);
            for (String s : other){
               socket.connect(s);
            }
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            System.out.println(name + ":"+ socket.recvString());  // 阻塞socket,直到超时或者有响应
                            Thread.sleep(1);
                        } catch (IOException e) {       // 忽略超时
                            // e.printStackTrace();
                        } catch (InterruptedException e) {
                            // e.printStackTrace();
                        }
                    }
                }
            }).start();
            return socket;
            // socket.connect();
    
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:nanomsg使用记录--java版

          本文链接:https://www.haomeiwen.com/subject/xtarcftx.html