我们使用springboot 构建示例代码。创建一个控制器接收用户请求,向rabbitmq压入数据。程序启动的时候拉起一个消费线程,消费数据。
例子使用基本的rabbitmq函数。
rabbitmq基础操作类
./java/com/amqp/demo/util/rabbitmq.java
package com.amqp.demo.util;
import com.amqp.demo.service.queueCost;
import com.rabbitmq.client.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Component
public class rabbitmq {
@Value("${amqp.host}")
String host;
@Value("${amqp.port}")
int port;
@Value("${amqp.user}")
String user;
@Value("${amqp.passwd}")
String passwd;
@Value("${amqp.vhost}")
private String vhost;
private Connection connection = null;
private Channel amqpChanenl = null;
private void initConnect() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置参数
factory.setHost(host);
factory.setPort(port);
factory.setUsername(user);
factory.setPassword(passwd);
factory.setVirtualHost(vhost);
//创建连接,tcp连接可以在channel之间复用,所以不需要每次都创建
if (null == connection) {
connection = factory.newConnection();
}
//创建Channel
amqpChanenl = connection.createChannel();
}
public boolean pushInfo(String msgInfo, String exchangeName, String routingKey) throws IOException, TimeoutException {
initConnect();
byte[] messageodyByte = msgInfo.getBytes();
//消息压入
amqpChanenl.basicPublish(exchangeName, routingKey, null, messageodyByte);
//关闭channel
amqpChanenl.close();
return true;
}
/**
* 弹出消息
* @param autoAck
* @param queueName
* @param queue
* @return
* @throws IOException
* @throws TimeoutException
*/
public String popInfo(boolean autoAck, String queueName, queueCost queue) throws IOException, TimeoutException {
//初始化连接和channel
initConnect();
//想queueCost 注入channel依赖
queue.setChannel(amqpChanenl);
try {
//订阅消息
amqpChanenl.basicConsume(queueName, autoAck, "myConsumerTag", queue);
} catch(IOException error) {
return "";
}
return "";
}
}
处理队列消息
./java/com/amqp/demo/service/queueCost.java
package com.amqp.demo.controller;
import com.alibaba.fastjson.JSONObject;
import com.amqp.demo.util.rabbitmq;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeoutException;
@RestController
public class HelloController {
@Autowired
private rabbitmq amqp;
@RequestMapping("/hello")
public String hello() {
return "this is first";
}
@PostMapping(value = "/push", produces = "application/json;charset=UTF-8")
public String mapClass(HttpServletRequest req) throws JSONException, IOException, TimeoutException {
BufferedReader br = new BufferedReader(new InputStreamReader(req.getInputStream()));
StringBuffer sb=new StringBuffer();
String s=null;
while((s=br.readLine())!=null){
sb.append(s);
}
System.out.println(sb);
JSONObject param = JSON.parseObject(String.valueOf(sb));
String name = (String) param.get("name");
amqp.pushInfo(name, "test-exchange", "sibowen");
return name;
}
}
生产消息的接口
./java/com/amqp/demo/controller/HelloController.java
package com.amqp.demo.controller;
import com.alibaba.fastjson.JSONObject;
import com.amqp.demo.util.rabbitmq;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeoutException;
@RestController
public class HelloController {
@Autowired
private rabbitmq amqp;
@RequestMapping("/hello")
public String hello() {
return "this is first";
}
@PostMapping(value = "/push", produces = "application/json;charset=UTF-8")
public String mapClass(HttpServletRequest req) throws JSONException, IOException, TimeoutException {
BufferedReader br = new BufferedReader(new InputStreamReader(req.getInputStream()));
StringBuffer sb=new StringBuffer();
String s=null;
while((s=br.readLine())!=null){
sb.append(s);
}
System.out.println(sb);
JSONObject param = JSON.parseObject(String.valueOf(sb));
String name = (String) param.get("name");
amqp.pushInfo(name, "test-exchange", "sibowen");
return name;
}
}
拉起消费线程的任务
./java/com/amqp/demo/command/queueCommand.java
package com.amqp.demo.command;
import com.amqp.demo.service.queueCost;
import com.amqp.demo.util.rabbitmq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Component
public class queueCommand {
@Autowired
private rabbitmq amqp;
@PostConstruct
public void cost() throws IOException, TimeoutException {
System.out.println("start listen");
//调用util,消费消息
amqp.popInfo(false, "queue_name", new queueCost());
System.out.println("end ");
}
}
网友评论