返回介绍

13.3.1 JmsTemplate

发布于 2025-04-22 22:09:18 字数 7586 浏览 0 评论 0 收藏 0

在代码与 Spring 整合的实例中,我们看到 Spring 采用了与 JDBC 等一贯的套路,为我们提供了 JmsTemplate 来封装常用操作。查看 JmsTemplate 的类型层级结构图,如图 13-1 所示。

figure_0382_0052

图 13-1 JmsTemplate 的类型层级结构图

首先还是按照一贯的分析套路,提取我们感兴趣的接口 InitializingBean,接口方法实现是在 JmsAccessor 类中,如下:

public void afterPropertiesSet() {

  if (getConnectionFactory() == null) {

   throw new IllegalArgumentException("Property 'connectionFactory' is required");

 }

}

发现函数中只是一个验证的功能,并没有逻辑实现。丢掉这个线索,我们转向实例代码的分析。首先以发送为例,在 Spring 中发送消息可以通过 JmsTemplate 中提供的方法来实现。

public void send(final Destination destination, final MessageCreator messageCreator)

throws JmsException

使用方式如下:

jmsTemplate.send(destination, new MessageCreator() {

  public Message createMessage(Session session) throws JMSException {

   return session.createTextMessage("大家好这个是测试!");

 }

});

我们就跟着程序流,进入函数 send 查看其源代码:

public void send(final Destination destination, final MessageCreator messageCreator)

throws JmsException {

 execute(new SessionCallback<Object>() {

   public Object doInJms(Session session) throws JMSException {

    doSend(session, destination, messageCreator);

    return null;

  }

  }, false);

}

现在的风格不得不让我们回想起 JdbcTemplate 的类实现风格,极为相似,都是提取一个公共的方法作为最底层、最通用的功能实现,然后又通过回调函数的不同来区分个性化的功能。我们首先查看通用代码的抽取实现。

1.通用代码抽取

根据之前分析 JdbcTemplate 的经验,我们推断,在 execute 中一定是封装了 Connection 以及 Session 的创建操作。

public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {

  Assert.notNull(action, "Callback object must not be null");

  Connection conToClose = null;

  Session sessionToClose = null;

  try {

   Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(

   getConnectionFactory(), this.transactionalResourceFactory,

  startConnection);

   if (sessionToUse == null) {

   //创建 connection

    conToClose = createConnection();

   //根据 connection 创建 session

    sessionToClose = createSession(conToClose);

   //是否开启向服务器推送连接信息,只有接收信息时需要,发送时不需要

    if (startConnection) {

    conToClose.start();

   }

    sessionToUse = sessionToClose;

  }

   if (logger.isDebugEnabled()) {

    logger.debug("Executing callback on JMS Session: " + sessionToUse);

  }

  //调用回调函数

   return action.doInJms(sessionToUse);

 }

  catch (JMSException ex) {

   throw convertJmsAccessException(ex);

 }

  finally {

  //关闭 session

  JmsUtils.closeSession(sessionToClose);

  //释放连接

   ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(),

  startConnection);

 }

}

在展示单独使用 activeMQ 时,我们知道为了发送一条消息需要做很多工作,需要很多的辅助代码,而这些代码又都是千篇一律的,没有任何的差异,所以 execute 方法的目的就是帮助我们抽离这些冗余代码使我们更加专注于业务逻辑的实现。从函数中看,这些冗余代码包括创建 Connection、创建 Session、当然也包括关闭 Session 和关闭 Connection。而在准备工作结束后,调用回调函数将程序引入用户自定义实现的个性化处理。至于如何创建 Session 与 Connection,有兴趣的读者可以进一步研究 Mybatis 的源码。

2.发送消息的实现

有了基类辅助实现,使 Spring 更加专注于个性的处理,也就是说 Spring 使用 execute 方法中封装了冗余代码,而将个性化的代码实现放在了回调函数 doInJms 函数中。在发送消息的功能中回调函数通过局部类实现。

new SessionCallback<Object>() {

  public Object doInJms(Session session) throws JMSException {

  doSend(session, destination, messageCreator);

   return null;

 }

}

此时的发送逻辑已经完全被转向了 doSend 方法,这样使整个功能实现变得更加清晰。

protected void doSend(Session session, Destination destination, MessageCreator

messageCreator)

throws JMSException {

  Assert.notNull(messageCreator, "MessageCreator must not be null");

  MessageProducer producer = createProducer(session, destination);

  try {

   Message message = messageCreator.createMessage(session);

   if (logger.isDebugEnabled()) {

    logger.debug("Sending created message: " + message);

  }

  doSend(producer, message);

   // Check commit - avoid commit call within a JTA transaction.

   if (session.getTransacted() && isSessionLocallyTransacted(session)) {

    // Transacted session created by this template -> commit.

   JmsUtils.commitIfNecessary(session);

  }

 }

  finally {

  JmsUtils.closeMessageProducer(producer);

 }

}

protected void doSend(MessageProducer producer, Message message) throws JMSException {

  if (isExplicitQosEnabled()) {

   producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());

 }

  else {

  producer.send(message);

 }

}

在演示独立使用消息功能的时候,我们大体了解了消息发送的基本套路,虽然这些步骤已经被 Spring 拆得支离破碎,但是我们还是能捕捉到一些影子。在发送消息还是遵循着消息发送的规则,比如根据 Destination 创建 MessageProducer、创建 Message,并使用 MessageProducer 实例来发送消息。

3.接收消息

我们通常使用 jmsTemplate.receive(destination) 来接收简单的消息,那么这个功能 Spring 是如何封装的呢?

public Message receive(Destination destination) throws JmsException {

  return receiveSelected(destination, null);

}

public Message receiveSelected(final Destination destination, final String messageSelector)

throws JmsException {

  return execute(new SessionCallback<Message>() {

   public Message doInJms(Session session) throws JMSException {

    return doReceive(session, destination, messageSelector);

  }

  }, true);

}

protected Message doReceive(Session session, Destination destination, String messageSelector)

throws JMSException {

  return doReceive(session, createConsumer(session, destination, messageSelector));

}

protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {

  try {

   // Use transaction timeout (if available).

   long timeout = getReceiveTimeout();

   JmsResourceHolder resourceHolder =

   (JmsResourceHolder) TransactionSynchronizationManager.getResource

  (getConnectionFactory());

   if (resourceHolder != null && resourceHolder.hasTimeout()) {

    timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());

  }

   Message message = doReceive(consumer, timeout);

   if (session.getTransacted()) {

    // Commit necessary - but avoid commit call within a JTA transaction.

    if (isSessionLocallyTransacted(session)) {

     // Transacted session created by this template -> commit.

    JmsUtils.commitIfNecessary(session);

   }

  }

   else if (isClientAcknowledge(session)) {

    // Manually acknowledge message, if any.

    if (message != null) {

    message.acknowledge();

   }

  }

   return message;

 }

  finally {

  JmsUtils.closeMessageConsumer(consumer);

 }

}

private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {

  if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {

   return consumer.receiveNoWait();

 }

  else if (timeout > 0) {

   return consumer.receive(timeout);

 }

  else {

   return consumer.receive();

 }

}

实现的套路与发送差不多,同样还是使用 execute 函数来封装冗余的公共操作,而最终的目标还是通过 consumer.receive() 来接收消息,其中的过程就是对于 MessageConsumer 的创建以及一些辅助操作。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。