一、什么是JMS,为什么需要它
(1)、消息中间件的定义:
指利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。消息中间件可以即支持同步方式,又支持异步方式。异步中间件比同步中间件具有更强的容错性,在系统故障时可以保证消息的正常传输。异步中间件技术又分为两类:广播方式和发布/订阅方式。由于发布/订阅方式可以指定哪种类型的用户可以接受哪种类型的消息,更加有针对性,事实上已成为异步中间件的非正式标准。
(2)、JMS定义:
从上个世纪90年代初,随着不同厂商消息中间件大量上市,消息中间件技术得到了长足的发展。目前,IBM和BEA的中间件产品在银行、证券、电信等高端行业,以及IT等行业中得到广泛应用。由于没有统一的规范和标准,基于消息中间件的应用不可移植,不同的消息中间件也不能互操作,这大大阻碍了消息中间件的发展。 Java Message Service(JMS, Java消息服务)是SUN及其伙伴公司提出的旨在统一各种消息中间件系统接口的规范。它定义了一套通用的接口和相关语义,提供了诸如持久、验证和事务的消息服务,它最主要的目的是允许Java应用程序访问现有的消息中间件。JMS规范没有指定在消息节点间所使用的通讯底层协议,来保证应用开发人员不用与其细节打交道,一个特定的JMS实现可能提供基于TCP/IP、HTTP、UDP或者其它的协议。目前许多厂商采用并实现了JMS API,现在,JMS产品能够为企业提供一套完整的消息传递功能,下面是一些比较流行的JMS商业软件和开源产品。
(3)、解决了什么问题:
采用异步通信模式:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者也不必在接到消息后立即对发送者的请求进行处理;
客户和服务对象生命周期的松耦合关系:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。
二、体系结构
整体结构如下:
(1)、ConnectionFactory连接工厂,JMS 用它创建连接Connection,一般设为单例模式,一旦创建,就一直运行在应用容器内
package javax.jms;public interface ConnectionFactory { Connection createConnection() throws JMSException;//创建一个连接 Connection createConnection(String userName, String password)//创建一个有密码的连接 throws JMSException;}
(2)、Connection
package javax.jms;public interface Connection { Session createSession(boolean transacted, int acknowledgeMode) throws JMSException; String getClientID() throws JMSException; //唯一客户端ID void setClientID(String clientID) throws JMSException; ConnectionMetaData getMetaData() throws JMSException; ExceptionListener getExceptionListener() throws JMSException; void setExceptionListener(ExceptionListener listener) throws JMSException; void start() throws JMSException; //开启连接 void stop() throws JMSException; //停止连接 void close() throws JMSException; //关闭连接 ConnectionConsumer createConnectionConsumer( Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException; ConnectionConsumer createDurableConnectionConsumer( Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException;}
JMS 客户端到JMS Provider 的连接,可以理解为数据库里的Connection
(3)、Session:一个发送或接收消息的线程
package javax.jms;import java.io.Serializable;public interface Session extends Runnable { static final int AUTO_ACKNOWLEDGE = 1; //四种模式 static final int CLIENT_ACKNOWLEDGE = 2; static final int DUPS_OK_ACKNOWLEDGE = 3; static final int SESSION_TRANSACTED = 0; BytesMessage createBytesMessage() throws JMSException; //创建消息 MapMessage createMapMessage() throws JMSException; Message createMessage() throws JMSException; ObjectMessage createObjectMessage() throws JMSException; ObjectMessage createObjectMessage(Serializable object) throws JMSException; StreamMessage createStreamMessage() throws JMSException; TextMessage createTextMessage() throws JMSException; TextMessage createTextMessage(String text) throws JMSException; boolean getTransacted() throws JMSException; int getAcknowledgeMode() throws JMSException; void commit() throws JMSException; void rollback() throws JMSException; void close() throws JMSException; void recover() throws JMSException; MessageListener getMessageListener() throws JMSException; void setMessageListener(MessageListener listener) throws JMSException; public void run(); MessageProducer createProducer(Destination destination) //创建消息生产者 throws JMSException; MessageConsumer createConsumer(Destination destination) throws JMSException; MessageConsumer createConsumer( Destination destination, java.lang.String messageSelector) throws JMSException; MessageConsumer createConsumer( Destination destination, java.lang.String messageSelector, boolean NoLocal) throws JMSException; Queue createQueue(String queueName) throws JMSException; //创建队列 Topic createTopic(String topicName) throws JMSException; //创建主题 TopicSubscriber createDurableSubscriber(Topic topic, String name)//创建主题订阅者 throws JMSException; TopicSubscriber createDurableSubscriber( Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException; QueueBrowser createBrowser(Queue queue) throws JMSException; QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException; TemporaryQueue createTemporaryQueue() throws JMSException; TemporaryTopic createTemporaryTopic() throws JMSException; void unsubscribe(String name) throws JMSException;}
所以,session具有创建消息,主题,队列,生产者,消费者功能。
(4)、MessageProducer: 由Session 对象创建的用来发送消息的对象
(5)、Destination:消息的目的地,包括队列(PTP),主题(Pub/Sub)
(6)、MessageConsumer: 由Session 对象创建的用来接收消息的对象
(7)、Message:
JMS 消息由以下几部分组成:消息头,属性,消息体。
消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。
属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。
消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
三、两种消息传递模型
JMS Parent | PTP Domain | Pub/Sub Domain |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
(1)、点对点模型(PTP)
点对点模型用于消息生产者和消息消费者之间点到点的通信。消息生产者将消息发动到由某个名字标识的特定消费者。这个名字实际上对应于消息服务中的一个队列(Queue),在消息传动给消费者之前它被存储在这个队列中。队列可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。
(2)、发布-订阅模型(Pub/Sub)
发布-订阅模型用称为主题(topic)的内容分层结构代替了PTP模型中的惟一目的地,发送应用程序发布自己的消息,指出消息描述的是有关分层结构中的一个主题的信息。希望接收这些消息的应用程序订阅了这个主题。订阅包含子主题的分层结构中的主题的订阅者可以接收该主题和其子主题发表的所有消息
三、原生态JMS编程实践
广义上说,一个JMS应用是几个JMS 客户端交换消息,开发JMS客户端应用由以下几步构成
(1)、用JNDI 得到ConnectionFactory对象;
(2)、用JNDI 得到目标队列或主题对象,即Destination对象;
(3)、用ConnectionFactory创建Connection 对象;
(4)、用Connection对象创建一个或多个JMS Session;
(5)、用Session 和Destination 创建MessageProducer和MessageConsumer;
(6)、通知Connection 开始传递消息。
消费生产者
import java.io.*;import javax.jms.*;import javax.naming.*; public class Sender { public static void main(String[] args) { new Sender().send(); } public void send() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { //Prompt for JNDI names System.out.println("Enter ConnectionFactory name:"); String factoryName = reader.readLine(); System.out.println("Enter Destination name:"); String destinationName = reader.readLine(); //Look up administered objects InitialContext initContext = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) initContext.lookup(factoryName); Destination destination = (Destination) initContext.lookup(destinationName); initContext.close(); //Create JMS objects Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer sender = session.createProducer(queue); //Send messages String messageText = null; while (true) { System.out.println("Enter message to send or 'quit':"); messageText = reader.readLine(); if ("quit".equals(messageText)) break; TextMessage message = session.createTextMessage(messageText); sender.send(message); } //Exit System.out.println("Exiting..."); reader.close(); connection.close(); System.out.println("Goodbye!"); } catch (Exception e) { e.printStackTrace(); System.exit(1); } }}
消息消费者
import java.io.*;import javax.jms.*;import javax.naming.*; public class Receiver implements MessageListener { private boolean stop = false; public static void main(String[] args) { new Receiver().receive(); } public void receive() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { //Prompt for JNDI names System.out.println("Enter ConnectionFactory name:"); String factoryName = reader.readLine(); System.out.println("Enter Destination name:"); String destinationName = reader.readLine(); reader.close(); //Look up administered objects InitialContext initContext = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) initContext.lookup(factoryName); Destination destination = (Destination) initContext.lookup(destinationName); initContext.close(); //Create JMS objects Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer receiver = session.createConsumer(queue); receiver.setMessageListener(this); connection.start(); //Wait for stop while (!stop) { Thread.sleep(1000); } //Exit System.out.println("Exiting..."); connection.close(); System.out.println("Goodbye!"); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } public void onMessage(Message message) { try { String msgText = ((TextMessage) message).getText(); System.out.println(msgText); if ("stop".equals(msgText)) stop = true; } catch (JMSException e) { e.printStackTrace(); stop = true; } }}
四、总结
目前许多厂商采用并实现了JMS API,比如Active MQ
Active MQ是一个基于Apcache 2.0 licenced发布,开放源码的JMS产品。其特点为:
(1)、提供点到点消息模式和发布/订阅消息模式;
(2)、支持JBoss、Geronimo等开源应用服务器,支持Spring框架的消息驱动;
(3)、新增了一个P2P传输层,可以用于创建可靠的P2P JMS网络连接;
(4)、拥有消息持久化、事务、集群支持等JMS基础设施服务。
下篇学习Active MQ