美文网首页
rabbitmq简单使用示例

rabbitmq简单使用示例

作者: 一个小废材 | 来源:发表于2020-04-18 15:54 被阅读0次

    我们使用springboot 构建示例代码。创建一个控制器接收用户请求,向rabbitmq压入数据。程序启动的时候拉起一个消费线程,消费数据。
    例子使用基本的rabbitmq函数。

    rabbitmq基础操作类
    ./java/com/amqp/demo/util/rabbitmq.java
    package com.amqp.demo.util;
    
    import com.amqp.demo.service.queueCost;
    import com.rabbitmq.client.*;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class rabbitmq {
        @Value("${amqp.host}")
         String host;
        @Value("${amqp.port}")
         int port;
        @Value("${amqp.user}")
         String user;
        @Value("${amqp.passwd}")
         String passwd;
        @Value("${amqp.vhost}")
        private String vhost;
    
        private Connection connection = null;
        private Channel amqpChanenl = null;
    
        private void initConnect() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            //设置参数
            factory.setHost(host);
            factory.setPort(port);
            factory.setUsername(user);
            factory.setPassword(passwd);
            factory.setVirtualHost(vhost);
    
            //创建连接,tcp连接可以在channel之间复用,所以不需要每次都创建
            if (null == connection) {
                connection = factory.newConnection();
            }
            //创建Channel
            amqpChanenl = connection.createChannel();
        }
    
    
        public boolean pushInfo(String msgInfo, String exchangeName, String routingKey) throws IOException, TimeoutException {
            initConnect();
            byte[] messageodyByte = msgInfo.getBytes();
            //消息压入
            amqpChanenl.basicPublish(exchangeName, routingKey, null, messageodyByte);
            //关闭channel
            amqpChanenl.close();
            return true;
        }
    
        /**
         * 弹出消息
         * @param autoAck
         * @param queueName
         * @param queue
         * @return
         * @throws IOException
         * @throws TimeoutException
         */
        public String popInfo(boolean autoAck, String queueName, queueCost queue) throws IOException, TimeoutException {
            //初始化连接和channel
            initConnect();
            //想queueCost 注入channel依赖
            queue.setChannel(amqpChanenl);
            try {
                //订阅消息
                amqpChanenl.basicConsume(queueName, autoAck, "myConsumerTag", queue);
            } catch(IOException error) {
                return "";
            }
            return "";
        }
    }
    
    处理队列消息
    ./java/com/amqp/demo/service/queueCost.java
    package com.amqp.demo.controller;
    
    import com.alibaba.fastjson.JSONObject;
    import com.amqp.demo.util.rabbitmq;
    import org.json.JSONException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import com.alibaba.fastjson.JSON;
    
    import javax.servlet.http.HttpServletRequest;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.concurrent.TimeoutException;
    
    @RestController
    public class HelloController {
        @Autowired
        private rabbitmq amqp;
    
        @RequestMapping("/hello")
        public String hello() {
            return "this is first";
        }
    
        @PostMapping(value = "/push", produces = "application/json;charset=UTF-8")
        public String mapClass(HttpServletRequest req) throws JSONException, IOException, TimeoutException {
            BufferedReader br = new BufferedReader(new InputStreamReader(req.getInputStream()));
            StringBuffer sb=new StringBuffer();
            String s=null;
            while((s=br.readLine())!=null){
                sb.append(s);
            }
            System.out.println(sb);
            JSONObject param = JSON.parseObject(String.valueOf(sb));
            String name = (String) param.get("name");
    
            amqp.pushInfo(name, "test-exchange", "sibowen");
            return name;
        }
    }
    
    生产消息的接口
    ./java/com/amqp/demo/controller/HelloController.java
      package com.amqp.demo.controller;
    
    import com.alibaba.fastjson.JSONObject;
    import com.amqp.demo.util.rabbitmq;
    import org.json.JSONException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import com.alibaba.fastjson.JSON;
    
    import javax.servlet.http.HttpServletRequest;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.concurrent.TimeoutException;
    
    @RestController
    public class HelloController {
        @Autowired
        private rabbitmq amqp;
    
        @RequestMapping("/hello")
        public String hello() {
            return "this is first";
        }
    
        @PostMapping(value = "/push", produces = "application/json;charset=UTF-8")
        public String mapClass(HttpServletRequest req) throws JSONException, IOException, TimeoutException {
            BufferedReader br = new BufferedReader(new InputStreamReader(req.getInputStream()));
            StringBuffer sb=new StringBuffer();
            String s=null;
            while((s=br.readLine())!=null){
                sb.append(s);
            }
            System.out.println(sb);
            JSONObject param = JSON.parseObject(String.valueOf(sb));
            String name = (String) param.get("name");
    
            amqp.pushInfo(name, "test-exchange", "sibowen");
            return name;
        }
    }
    
    拉起消费线程的任务
    ./java/com/amqp/demo/command/queueCommand.java
    package com.amqp.demo.command;
    
    import com.amqp.demo.service.queueCost;
    import com.amqp.demo.util.rabbitmq;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class queueCommand  {
        @Autowired
        private rabbitmq amqp;
    
        @PostConstruct
        public void cost() throws IOException, TimeoutException {
            System.out.println("start listen");
            //调用util,消费消息
            amqp.popInfo(false, "queue_name", new queueCost());
            System.out.println("end ");
        }
    }
    
    

    相关文章

      网友评论

          本文标题:rabbitmq简单使用示例

          本文链接:https://www.haomeiwen.com/subject/ijpbvhtx.html