博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spring继承RabbitMQ
阅读量:4626 次
发布时间:2019-06-09

本文共 10670 字,大约阅读时间需要 35 分钟。

1.添加 maven 项目依赖

org.springframework.amqp
spring-rabbit
1.3.5.RELEASE

2.添加 spring-rabbitmq.xml 配置

3.在 applicationContext.xml 文件中引入 spring-rabbitmq.xml

4.Gson配置

package top.tarencez.ssmdemo.config;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;import org.springframework.amqp.support.converter.ClassMapper;import org.springframework.amqp.support.converter.DefaultClassMapper;import org.springframework.amqp.support.converter.MessageConversionException;import com.google.gson.Gson;public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {    private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);    private static ClassMapper classMapper = new DefaultClassMapper();    private static Gson gson = new Gson();    public Gson2JsonMessageConverter() {        super();    }    @Override    protected Message createMessage(Object object, MessageProperties messageProperties) {        byte[] bytes = null;        try {            String jsonString = gson.toJson(object);            bytes = jsonString.getBytes(getDefaultCharset());        } catch (IOException e) {            throw new MessageConversionException(                    "Failed to convert Message content", e);        }        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);        messageProperties.setContentEncoding(getDefaultCharset());        if (bytes != null) {            messageProperties.setContentLength(bytes.length);        }        classMapper.fromClass(object.getClass(), messageProperties);        return new Message(bytes, messageProperties);    }    @Override    public Object fromMessage(Message message) throws MessageConversionException {        Object content = null;        MessageProperties properties = message.getMessageProperties();        if (properties != null) {            String contentType = properties.getContentType();            if (contentType != null && contentType.contains("json")) {                String encoding = properties.getContentEncoding();                if (encoding == null) {                    encoding = getDefaultCharset();                }                try {                    Class
targetClass = getClassMapper().toClass( message.getMessageProperties()); content = convertBytesToObject(message.getBody(), encoding, targetClass); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } } else { log.warn("Could not convert incoming message with content-type [" + contentType + "]"); } } if (content == null) { content = message.getBody(); } return content; } private Object convertBytesToObject(byte[] body, String encoding, Class
clazz) throws UnsupportedEncodingException { String contentAsString = new String(body, encoding); return gson.fromJson(contentAsString, clazz); }}

5.生产者接口及接口调用

package top.tarencez.ssmdemo.common.component;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class MQProducer {    @Autowired    private AmqpTemplate amqpTemplate;    public void sendMessage(String queueKey, Object message) {        System.out.println("===== " + amqpTemplate);        try {            amqpTemplate.convertAndSend(queueKey, message);            System.out.println("===== 消息发送成功 =====");        } catch (Exception e) {            System.out.println(e);        }    }}
package top.tarencez.ssmdemo.rabbitmq.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import top.tarencez.ssmdemo.common.component.MQProducer;import top.tarencez.ssmdemo.shiro.vo.TestVO;@Controller@RequestMapping("/mq")public class MQController {    @Autowired    private MQProducer mqProducer;    @RequestMapping("/test")    public void test() {        System.out.println("===== test mq send =====");        TestVO testVO = new TestVO();        testVO.setId(1);        testVO.setName1("aaa");        testVO.setName2("bbb");        mqProducer.sendMessage("rabbit.queue.test1.key", testVO);    }}

6.消费者接口

package top.tarencez.ssmdemo.common.component;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Componentpublic class MQListenter implements MessageListener {    @Override    public void onMessage(Message msg) {        try {            System.out.print("===== 接受到消息:" + new String(msg.getBody(), "UTF-8"));        } catch (UnsupportedEncodingException e) {            e.printStackTrace();        }    }}

7.测试验证

 

1.添加 maven 项目依赖

org.springframework.amqp
spring-rabbit
1.3.5.RELEASE

2.添加 spring-rabbitmq.xml 配置

3.在 applicationContext.xml 文件中引入 spring-rabbitmq.xml

4.Gson配置

package top.tarencez.ssmdemo.config;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractJsonMessageConverter; import org.springframework.amqp.support.converter.ClassMapper; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.MessageConversionException; import com.google.gson.Gson; public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter { private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class); private static ClassMapper classMapper = new DefaultClassMapper(); private static Gson gson = new Gson(); public Gson2JsonMessageConverter() { super(); } @Override protected Message createMessage(Object object, MessageProperties messageProperties) { byte[] bytes = null; try { String jsonString = gson.toJson(object); bytes = jsonString.getBytes(getDefaultCharset()); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(getDefaultCharset()); if (bytes != null) { messageProperties.setContentLength(bytes.length); } classMapper.fromClass(object.getClass(), messageProperties); return new Message(bytes, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.contains("json")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = getDefaultCharset(); } try { Class
targetClass = getClassMapper().toClass( message.getMessageProperties()); content = convertBytesToObject(message.getBody(), encoding, targetClass); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } } else { log.warn("Could not convert incoming message with content-type [" + contentType + "]"); } } if (content == null) { content = message.getBody(); } return content; } private Object convertBytesToObject(byte[] body, String encoding, Class
clazz) throws UnsupportedEncodingException { String contentAsString = new String(body, encoding); return gson.fromJson(contentAsString, clazz); } }

5.生产者接口及接口调用

package top.tarencez.ssmdemo.common.component;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class MQProducer {    @Autowired    private AmqpTemplate amqpTemplate; public void sendMessage(String queueKey, Object message) { System.out.println("===== " + amqpTemplate); try { amqpTemplate.convertAndSend(queueKey, message); System.out.println("===== 消息发送成功 ====="); } catch (Exception e) { System.out.println(e); } } }
package top.tarencez.ssmdemo.rabbitmq.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import top.tarencez.ssmdemo.common.component.MQProducer;import top.tarencez.ssmdemo.shiro.vo.TestVO; @Controller @RequestMapping("/mq") public class MQController { @Autowired private MQProducer mqProducer; @RequestMapping("/test") public void test() { System.out.println("===== test mq send ====="); TestVO testVO = new TestVO(); testVO.setId(1); testVO.setName1("aaa"); testVO.setName2("bbb"); mqProducer.sendMessage("rabbit.queue.test1.key", testVO); } }

6.消费者接口

package top.tarencez.ssmdemo.common.component;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Componentpublic class MQListenter implements MessageListener { @Override public void onMessage(Message msg) { try { System.out.print("===== 接受到消息:" + new String(msg.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }

7.测试验证

 

 

 

 

参考文章:

  

  

  

 

转载于:https://www.cnblogs.com/tarencez/p/10886938.html

你可能感兴趣的文章
java后端判断用户是否关注公众号
查看>>
判断JS对象是否拥有某属性两种方式
查看>>
自定义异常
查看>>
黑马程序员___Java基础[02-Java基础语法](一)
查看>>
USACO09FEB Fair Shuttle
查看>>
一次完整请求的日志
查看>>
计算机知识的学习
查看>>
Linq 等式运算符:SequenceEqual
查看>>
[LeetCode] Count Different Palindromic Subsequences 计数不同的回文子序列的个数
查看>>
Javascript使用三大家族和事件来DIY动画效果相关笔记(一)
查看>>
投影纹理映射(Projective Texture Mapping)
查看>>
rwkj 1422搜索(素数环)
查看>>
Android开发常用属性
查看>>
Android线程之主线程向子线程发送消息
查看>>
CentOS 6.4下编译安装MySQL 5.6.14
查看>>
PHP拿到别人项目如何修改为自己
查看>>
Flink学习笔记:Operators之CoGroup及Join操作
查看>>
SQL Server DB Link相关
查看>>
2017 .NET 開發者須知
查看>>
判断ie版本
查看>>