博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JMS 发布/订阅消息 -- 同步
阅读量:4050 次
发布时间:2019-05-25

本文共 3343 字,大约阅读时间需要 11 分钟。

– Start


JMS 支持两种消息传递方式,一种是点对点方式(point-to-point (PTP)),一种是发布/订阅方式(publish/subscribe (pub/sub))。本文关注发布/订阅方式,先看看它的结构图。

这里写图片描述

发布/订阅消息有以下特性。

  1. Client1 是消息的发布者(Publisher)。
  2. 消息被发送到一个主题(Topic)。
  3. Client2 是消息的订阅者(Subscriber)。
  4. Client2 接收消息前需要先订阅某个主题
  5. 每个消息可以有多个订阅者
  6. 订阅者想要接收消息,需要时刻保持运行状态,否则消息将丢失,有点像听广播。
  7. 为了避免消息丢失,我们可以创建一个持久化订阅(Durable Subscribe)
  8. 接收消息有两种方式,同步和异步
  9. 同步方式是,消息订阅者调用receive()方法,该方法会阻塞,直到有新消息。
    10.异步方式是,消息订阅者需注册一个消息监听器(MessageListener),当有新消息时,监听器的 onMessage() 方式会被自动调用

下面是一个使用 JMS 同步方法发送和接收消息的例子。

发送消息

package shangbo.activeMQ.example3;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQTopic;class Publisher {	// ActiveMQ 支持不同的协议,你可以在它的配置文件中 conf/activemq.xml 找到不同协议的连接方式	public static String BROKER_URL = "tcp://0.0.0.0:61616";	public static String USER = "admin";	public static String PASSWORD = "admin";	public static String DESTINATION = "systemA.systemB.Price.Topic";	public static void main(String[] args) throws JMSException {		// 创建连接工厂		ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);		// 创建连接		Connection connection = factory.createConnection(USER, PASSWORD);		// 创建会话		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);		// 创建消息		TextMessage msg = session.createTextMessage("hello topic world");		// 创建消息目的地		Destination dest = new ActiveMQTopic(DESTINATION);		// 发送消息		MessageProducer producer = session.createProducer(dest);		producer.send(msg);		// 释放资源		session.close();		connection.close();		System.exit(0);	}}

接收消息

package shangbo.activeMQ.example3;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQTopic;class Subscriber {	// ActiveMQ 支持不同的协议,你可以在它的配置文件中 conf/activemq.xml 找到不同协议的连接方式	public static String BROKER_URL = "tcp://0.0.0.0:61616";	public static String USER = "admin";	public static String PASSWORD = "admin";	public static String DESTINATION = "systemA.systemB.Price.Topic";	public static void main(String[] args) throws Exception {		// 创建连接工厂		ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);		// 创建连接		Connection connection = factory.createConnection(USER, PASSWORD);		connection.start();		// 创建会话		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);		// 创建消息源		Destination dest = new ActiveMQTopic(DESTINATION);		// 接收消息		MessageConsumer consumer = session.createConsumer(dest);		while (true) {			Message msg = consumer.receive(1000);			if (msg == null) {				System.out.println("No new message, sleeping 5 secs");				Thread.sleep(5 * 1000);				continue;			}			// 解析消息			if (msg instanceof TextMessage) {				String body = ((TextMessage) msg).getText();				System.out.println(body);			} else {				System.out.println("Unexpected message type: " + msg.getClass());			}		}		// 释放资源//		session.close();//		connection.close();//		System.exit(0);	}}

– 声 明:转载请注明出处
– Last Updated on 2018-05-30
– Written by ShangBo on 2017-06-24
– End

你可能感兴趣的文章
ios framework 通用库的制作
查看>>
出现( linker command failed with exit code 1)错误总结
查看>>
iOS开发中一些常见的并行处理
查看>>
iOS获取手机的Mac地址
查看>>
ios7.1发布企业证书测试包的问题
查看>>
如何自定义iOS中的控件
查看>>
iOS 开发百问
查看>>
Mac环境下svn的使用
查看>>
github简单使用教程
查看>>
如何高效利用GitHub
查看>>
GitHub详细教程
查看>>
Swift概览
查看>>
iOS系统方法进行AES对称加密
查看>>
程序内下载App,不用跳转到AppStore
查看>>
iOS应用崩溃日志分析
查看>>
获取手机系统大小、可用空间大小,设备可用内存及当前应用所占内存等
查看>>
IOS7 开发注意事项
查看>>
iOS开发~CocoaPods使用详细说明
查看>>
在xcode6中使用矢量图(iPhone6置配UI)
查看>>
Mac OS X中Apache开启ssl
查看>>