SpringBoot整合ActiveMQ
前言
Github:https://github.com/HealerJean
写这篇文件的时候不得不说,网上真的都是闲人啊,基本上是复制粘贴,然后自己也亲自试一试,简直是糟糕透了。所以博主自己开始写了
1、默认的JmsTemplate发送消息
默认的 jmsTemplate 发送queue消息(默认是持久化的)
默认的 jmsTemplate 发送topic消息(默认是非持久化的)
1.1、Maven
<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
1.2、SpringBoot配置
server.port=8888
# ActiveMQ通讯地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
#是否启用内存模式,开始消息持久化就必须关闭in-memory选项。
spring.activemq.in-memory=false
##信任所有的包
spring.activemq.packages.trust-al=true
#最大连接数
spring.activemq.pool.maxConnections=50
#空闲时间
spring.activemq.pool.idleTimeout=30000
# 是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
spring.activemq.pool.enabled=false
1.3、常亮设置
public class JmsConstant {
/** 默认队列 */
public static final String QUEUE_NAME_DEFAULT = "default_queueName";
/** 默认名称 */
public static final String TOPIC_NAME_DEFAULT = "default_topicName";
public static final String TOPIC_LISTENER_FACTORY = "topicListenerFactory";
}
1.4、ActiveMQConfig
@Configuration
@EnableJms
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String jmsUser;
@Value("${spring.activemq.password}")
private String jsmPass;
@Bean(name = "connectionFactory")
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(jmsUser, jsmPass, brokerUrl);
activeMQConnectionFactory.setTrustAllPackages(true);
return activeMQConnectionFactory;
}
@Primary
@Bean("jmsTemplate")
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
// springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置bean
/**
* 发布/订阅 非持久化Topic
*/
@Bean(JmsConstant.TOPIC_LISTENER_FACTORY)
public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//这里必须设置为true,false则表示是queue类型
factory.setPubSubDomain(true);
// 开启非持久化
factory.setSubscriptionDurable(false);
return factory;
}
1.5、监听消息
@Component
public class JmsMsgListener {
//springboot默认只配置queue类型消息,默认就是点对点的,这样在监听的时候,可以不写这个工厂
@JmsListener(destination = JmsConstant.QUEUE_NAME_DEFAULT)
public void receivedefaultJmsTeamplateQueue(String msg) {
System.out.println("[" + JmsConstant.QUEUE_NAME_DEFAULT + "]消息:" + msg);
}
@JmsListener(destination = JmsConstant.TOPIC_NAME_DEFAULT,
containerFactory = JmsConstant.TOPIC_LISTENER_FACTORY)
public void receivedefaultJmsTeamplateTopic(String msg) {
System.out.println("[" + JmsConstant.TOPIC_NAME_DEFAULT + "]消息:" + msg);
}
}
1.6、发送消息接口和服务
1.6.1、发送消息接口
public interface ProducerService {
/**
* 默认的 jmsTemplate 发送queue消息(默认是持久化的)
*/
void jmsTemplateSendMsgToQueue(String destination, final String msg);
/**
* 默认的 jmsTemplate 发送topic消息(默认是非持久化的)
*/
void jmsTemplateSendMsgTopTopic(String destination, String msg);
}
5.2、 实现接口
@Service
public class ProducerServiceImpl implements ProducerService {
/**
* 系统默认的 jmsTemplate
*/
@Autowired
private JmsTemplate jmsTemplate;
/**
* 默认的 jmsTemplate 发送queue消息(默认是持久化的)
*/
@Override
public void jmsTemplateSendMsgToQueue(String destination, final String msg) {
jmsTemplate.convertAndSend(new ActiveMQQueue(destination), msg);
}
/**
* 默认的 jmsTemplate 发送topic消息(默认是非持久化的)
*/
@Override
public void jmsTemplateSendMsgTopTopic(String destination, String msg) {
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), msg);
}
}
6、测试controller
@RestController
@RequestMapping("hlj/activemq")
public class ActiveMQController {
@Autowired
private ProducerService producerService;
@RequestMapping("/defaultQueue")
public String defaultQueue(String msg) {
producerService.jmsTemplateSendMsgToQueue(JmsConstant.QUEUE_NAME_DEFAULT, msg);
return "defaultQueue";
}
@RequestMapping("/defaultTopic")
public String defaultTopic(String msg) {
producerService.jmsTemplateSendMsgTopTopic(JmsConstant.TOPIC_NAME_DEFAULT, msg);
return "defaultTopic";
}
}
1.6..1、http请求
GET http://localhost:8888/hlj/activemq/defaultQueue?msg=hello
GET http://localhost:8888/hlj/activemq/defaultTopic?msg=hello
2、自定义的JmsTemplate
发送queue
和topic
(非持久化和持久化)
2.1、ActiveMQConfig
/**
* @author HealerJean
* @version 1.0v
* @ClassName ActiveMQConfig
* @Date 2019/9/10 15:51.
* @Description
*/
@Configuration
@EnableJms
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String jmsUser;
@Value("${spring.activemq.password}")
private String jsmPass;
@Bean(name = "connectionFactory")
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(jmsUser, jsmPass, brokerUrl);
activeMQConnectionFactory.setTrustAllPackages(true);
return activeMQConnectionFactory;
}
@Primary
@Bean("jmsTemplate")
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
/**
* 持久化的 prsistentJmsTemplate
*/
@Bean("persistentJmsTemplate")
public JmsTemplate persistentJmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate persistentJmsTemplate = new JmsTemplate();
persistentJmsTemplate.setConnectionFactory(connectionFactory);
//设置为true,deliveryMode, priority, timeToLive等设置才会起作用,否则使用默认的值
persistentJmsTemplate.setExplicitQosEnabled(true);
persistentJmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
return persistentJmsTemplate;
}
/**
* 非持久化的 noPrsistentJmsTemplate
*/
@Bean("noPersistentJmsTemplate")
public JmsTemplate noPersistentJmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate persistentJmsTemplate = new JmsTemplate();
persistentJmsTemplate.setConnectionFactory(connectionFactory);
//设置为true,deliveryMode, priority, timeToLive等设置才会起作用,否则使用默认的值
persistentJmsTemplate.setExplicitQosEnabled(true);
persistentJmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return persistentJmsTemplate;
}
// springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置bean
/**
* 发布/订阅 非持久化Topic
*/
@Bean(JmsConstant.TOPIC_LISTENER_FACTORY)
public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//这里必须设置为true,false则表示是queue类型
factory.setPubSubDomain(true);
// 开启非持久化
factory.setSubscriptionDurable(false);
return factory;
}
/**
* 发布/订阅 持久化Topic
*/
@Bean(JmsConstant.PRSISTENT_TOPIC_LISTENER_FACTORY)
public DefaultJmsListenerContainerFactory prsistentTopicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//这里必须设置为true,false则表示是queue类型
factory.setPubSubDomain(true);
// 开启持久化
factory.setSubscriptionDurable(true);
// 设置clientId
factory.setClientId(JmsConstant.PRSISTENT_TOPIC_NAME_CLIENT);
return factory;
}
}
2.2、变量设置
public class JmsConstant {
/** 默认队列 */
public static final String QUEUE_NAME_DEFAULT = "default_queueName";
/** 默认名称 */
public static final String TOPIC_NAME_DEFAULT = "default_topicName";
/** 队列 */
public static final String QUEUE_NAME = "queueName";
/** 订阅名称 */
public static final String TOPIC_NAME = "topicName";
/** 持久化topic */
public static final String PRSISTENT_TOPIC_NAME = "prsistentTopicName";
public static final String PRSISTENT_TOPIC_NAME_CLIENT = "Cliend_HealerJean";
public static final String TOPIC_LISTENER_FACTORY = "topicListenerFactory";
public static final String PRSISTENT_TOPIC_LISTENER_FACTORY = "prsistentTopicListenerFactory";
}
2.3、监听消息
/**
* 消息监听器
*/
@Component
public class JmsMsgListener {
//默认的的JmsTeamplate发送的消息监听
@JmsListener(destination = JmsConstant.QUEUE_NAME_DEFAULT)
public void receivedefaultJmsTeamplateQueue(String msg) {
System.out.println("[" + JmsConstant.QUEUE_NAME_DEFAULT + "]消息:" + msg);
}
@JmsListener(destination = JmsConstant.TOPIC_NAME_DEFAULT,
containerFactory = JmsConstant.TOPIC_LISTENER_FACTORY)
public void receivedefaultJmsTeamplateTopic(String msg) {
System.out.println("[" + JmsConstant.TOPIC_NAME_DEFAULT + "]消息:" + msg);
}
//自定的JmsTeamplate发送的消息监听
@JmsListener(destination = JmsConstant.QUEUE_NAME)
public void receiveQueue(String msg) {
System.out.println("[" + JmsConstant.QUEUE_NAME + "]消息:" + msg);
}
@JmsListener(destination = JmsConstant.TOPIC_NAME,
containerFactory = JmsConstant.TOPIC_LISTENER_FACTORY)
public void receiveTopicName(String msg) {
System.out.println("[" + JmsConstant.TOPIC_NAME + "]消费者:receive=" + msg);
}
@JmsListener(destination = JmsConstant.PRSISTENT_TOPIC_NAME,
containerFactory = JmsConstant.PRSISTENT_TOPIC_LISTENER_FACTORY)
public void receivePrsistentTopicName(String msg) {
System.out.println("[" + JmsConstant.PRSISTENT_TOPIC_NAME + "]消费者:receive=" + msg);
}
}
2.4、发送消息接口和实现
2.4.1、发送消息接口
public interface ProducerService {
/**
* 默认的 jmsTemplate 发送queue消息(默认是持久化的)
*/
void jmsTemplateSendMsgToQueue(String destination, final String msg);
/**
* 默认的 jmsTemplate 发送topic消息(默认是非持久化的)
*/
void jmsTemplateSendMsgTopTopic(String destination, String msg);
/**
* 发送消息(点对点)
*/
void sendMsgToQueue(String destination, String msg);
/**
* 发送消息(非持久化 发布-订阅模式)
*/
void sendMsgToTopic(String destination, String msg);
/**
* 发送消息(持久化 发布-订阅模式)
*/
void sendMsgTopPrsistentTopic(String destination, String msg);
}
2.4.2、实现
/**
* 消息生产者服务实现类
*/
@Service
public class ProducerServiceImpl implements ProducerService {
/**
* 系统默认的 jmsTemplate *************************************
*/
@Autowired
private JmsTemplate jmsTemplate;
/**
* 默认的 jmsTemplate 发送queue消息(默认是持久化的)
*/
@Override
public void jmsTemplateSendMsgToQueue(String destination, final String msg) {
jmsTemplate.convertAndSend(new ActiveMQQueue(destination), msg);
}
/**
* 默认的 jmsTemplate 发送topic消息(默认是非持久化的)
*/
@Override
public void jmsTemplateSendMsgTopTopic(String destination, String msg) {
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), msg);
}
/**
* 自定义的jmsTemplate *************************************
*/
@Autowired
@Qualifier("persistentJmsTemplate")
private JmsTemplate persistentJmsTemplate;
@Autowired
@Qualifier("persistentJmsTemplate")
private JmsTemplate noPersistentJmsTemplate;
@Override
public void sendMsgToQueue(String destination, final String msg) {
persistentJmsTemplate.convertAndSend(new ActiveMQQueue(destination), msg);
}
@Override
public void sendMsgToTopic(String destination, final String msg) {
noPersistentJmsTemplate.convertAndSend(new ActiveMQTopic(destination), msg);
}
@Override
public void sendMsgTopPrsistentTopic(String destination, String msg) {
persistentJmsTemplate.convertAndSend(new ActiveMQTopic(destination), msg);
}
}
2.5、测试Controller
@RestController
@RequestMapping("hlj/activemq")
public class ActiveMQController {
@Autowired
private ProducerService producerService;
@RequestMapping("/defaultQueue")
public String defaultQueue(String msg) {
producerService.jmsTemplateSendMsgToQueue(JmsConstant.QUEUE_NAME_DEFAULT, msg);
return "defaultQueue";
}
@RequestMapping("/defaultTopic")
public String defaultTopic(String msg) {
producerService.jmsTemplateSendMsgTopTopic(JmsConstant.TOPIC_NAME_DEFAULT, msg);
return "defaultTopic";
}
/**
* 点对点
*/
@RequestMapping("/queue")
public String queue(String msg) {
producerService.sendMsgToQueue(JmsConstant.QUEUE_NAME, msg);
return "queue";
}
/**
* 非持久化 发布/订阅
*/
@RequestMapping("/topic")
public String topic(String msg) {
producerService.sendMsgToTopic(JmsConstant.TOPIC_NAME, msg);
return "topic";
}
/**
* 持久化 发布/订阅
*/
@RequestMapping("/persistentTopic")
public String persistentTopic(String msg) {
producerService.sendMsgTopPrsistentTopic(JmsConstant.PRSISTENT_TOPIC_NAME, msg);
return "persistentTopic";
}
}
2.5.1、http
GET http://localhost:8888/hlj/activemq/queue?msg=hello
GET http://localhost:8888/hlj/activemq/topic?msg=hello
GET http://localhost:8888/hlj/activemq/persistentTopic?msg=persistentTopic