返回介绍

13.3.2 监听器容器

发布于 2025-04-22 22:09:19 字数 17562 浏览 0 评论 0 收藏

消息监听器容器是一个用于查看 JMS 目标等待消息到达的特殊 bean,一旦消息到达它就可以获取到消息,并通过调用 onMessage() 方法将消息传递给一个 MessageListener 实现。Spring 中消息监听器容器的类型如下。

SimpleMessageListenerContainer:最简单的消息监听器容器,只能处理固定数量的 JMS 会话,且不支持事务。

DefaultMessageListenerContainer:这个消息监听器容器建立在 SimpleMessageListener Container 容器之上,添加了对事务的支持。

serversession.ServerSessionMessage.ListenerContainer:这是功能最强大的消息监听器,与 DefaultMessageListenerContainer 相同,它支持事务,但是它还允许动态地管理 JMS 会话。

下面以 DefaultMessageListenerContainer 为例进行分析,看看消息监听器容器的实现。在之前消息监听器的使用示例中,我们了解到在使用消息监听器容器时一定要将自定义的消息监听器置入到容器中,这样才可以在收到信息时,容器把消息转向监听器处理。查看 DefaultMessageListenerContainer 层次结构图,如图 13-2 所示。

figure_0386_0053

图 13-2 DefaultMessageListenerContainer 层次结构图

同样,我们看到此类实现了 InitializingBean 接口,按照以往的风格我们还是首先查看接口方法 afterPropertiesSet() 中的逻辑,其方法实现在其父类 AbstractJmsListeningContainer 中。

public void afterPropertiesSet() {

 //验证 connectionFactory

 super.afterPropertiesSet();

 //验证配置文件

 validateConfiguration();

 //初始化

 initialize();

}

监听器容器的初始化只包含了三句代码,其中前两句只用于属性的验证,比如 connectionFacory 或者 destination 等属性是否为空等,而真正用于初始化的操作委托在 initialize() 中执行。

public void initialize() throws JmsException {

  try {

  //lifecycleMonitor 用于控制生命周期的同步处理

   synchronized (this.lifecycleMonitor) {

    this.active = true;

   this.lifecycleMonitor.notifyAll();

  }

  doInitialize();

 }

  catch (JMSException ex) {

   synchronized (this.sharedConnectionMonitor) {

   ConnectionFactoryUtils.releaseConnection(this.sharedConnection,

    getConnectionFactory(), this.autoStartup);

    this.sharedConnection = null;

  }

   throw convertJmsAccessException(ex);

 }

}

protected void doInitialize() throws JMSException {

  synchronized (this.lifecycleMonitor) {

   for (int i = 0; i < this.concurrentConsumers; i++) {

   scheduleNewInvoker();

  }

 }

}

这里用到了 concurrentConsumers 属性,网络中对此属性用法的说明如下。

消息监听器允许创建多个 Session 和 MessageConsumer 来接收消息。具体的个数由 concurrentConsumers 属性指定。需要注意的是,应该只是在 Destination 为 Queue 的时候才使用多个 MessageConsumer(Queue 中的一个消息只能被一个 Consumer 接收),虽然使用多个 MessageConsumer 会提高消息处理的性能,但是消息处理的顺序却得不到保证。消息被接收的顺序仍然是消息发送时的顺序,但是由于消息可能会被并发处理,因此消息处理的顺序可能和消息发送的顺序不同。此外,不应该在 Destination 为 Topic 的时候使用多个 MessageConsumer,因为多个 MessageConsumer 会接收到同样的消息。

对于具体的实现逻辑我们只能继续查看源码:

private void scheduleNewInvoker() {

  AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();

  if (rescheduleTaskIfNecessary(invoker)) {

// This should always be true, since we're only calling this when active.

this.scheduledInvokers.add(invoker);

 }

}

protected final boolean rescheduleTaskIfNecessary(Object task) {

  if (this.running) {

   try {

   doRescheduleTask(task);

  }

   catch (RuntimeException ex) {

    logRejectedTask(task, ex);

   this.pausedTasks.add(task);

  }

   return true;

 }

  else if (this.active) {

  this.pausedTasks.add(task);

   return true;

 }

  else {

   return false;

 }

}

protected void doRescheduleTask(Object task) {

  this.taskExecutor.execute((Runnable) task);

}

分析源码得知,根据 concurrentConsumers 数量建立了对应数量的线程,即使读者不了解线程池的使用,至少根据以上代码可以推断出 doRescheduleTask 函数其实是在开启一个线程执行 Runnable。我们反追踪这个传入的参数,可以看到这个参数其实是 AsyncMessageListenerInvoker 类型实例。因此我们可以推断,Spring 根据 concurrentConsumers 数量建立了对应数量的线程,而每个线程都作为一个独立的接收者在循环接收消息。

于是我们把所有的焦点转向 AsyncMessageListenerInvoker 这个类的实现,由于它是作为一个 Runnable 角色去执行,所以对以这个类的分析从 run 方法开始。

public void run() {

 //并发控制

  synchronized (lifecycleMonitor) {

  activeInvokerCount++;

  lifecycleMonitor.notifyAll();

 }

  boolean messageReceived = false;

  try {

  //根据每个任务设置的最大处理消息数量而作不同处理

  //小于 0 默认为无限制,一直接收消息

   if (maxMessagesPerTask < 0) {

    messageReceived = executeOngoingLoop();

  }

   else {

    int messageCount = 0;

   //消息数量控制,一旦超出数量则停止循环

    while (isRunning() && messageCount < maxMessagesPerTask) {

     messageReceived = (invokeListener() || messageReceived);

    messageCount++;

   }

  }

 }

  catch (Throwable ex) {

  //清理操作,包括关闭 session 等

  clearResources();

   if (!this.lastMessageSucceeded) {

    // We failed more than once in a row - sleep for recovery interval

    // even before first recovery attempt.

   sleepInbetweenRecoveryAttempts();

  }

   this.lastMessageSucceeded = false;

   boolean alreadyRecovered = false;

   synchronized (recoveryMonitor) {

    if (this.lastRecoveryMarker == currentRecoveryMarker) {

     handleListenerSetupFailure(ex, false);

    recoverAfterListenerSetupFailure();

     currentRecoveryMarker = new Object();

   }

    else {

     alreadyRecovered = true;

   }

  }

   if (alreadyRecovered) {

    handleListenerSetupFailure(ex, true);

  }

 }

  finally {

   synchronized (lifecycleMonitor) {

   decreaseActiveInvokerCount();

   lifecycleMonitor.notifyAll();

  }

   if (!messageReceived) {

   this.idleTaskExecutionCount++;

  }

   else {

    this.idleTaskExecutionCount = 0;

  }

   synchronized (lifecycleMonitor) {

    if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !reschedule

    TaskIfNecessary(this)) {

     // We're shutting down completely.

    scheduledInvokers.remove(this);

     if (logger.isDebugEnabled()) {

      logger.debug("Lowered scheduled invoker count: " +

     scheduledInvokers.size());

    }

    lifecycleMonitor.notifyAll();

    clearResources();

   }

    else if (isRunning()) {

     int nonPausedConsumers = getScheduledConsumerCount() -

    getPausedTaskCount();

     if (nonPausedConsumers < 1) {

      logger.error("All scheduled consumers have been paused,

      probably due to tasks having been rejected. " +

      "Check your thread pool configuration! Manual

      recovery necessary through a start() call.");

    }

     else if (nonPausedConsumers < getConcurrentConsumers()) {

      logger.warn("Number of scheduled consumers has dropped

      below concurrentConsumers limit, probably " +

      "due to tasks having been rejected. Check your

      thread pool configuration! Automatic recovery " +

      "to be triggered by remaining consumers.");

    }

   }

  }

 }

}

以上函数中主要根据变量 maxMessagesPerTask 的值来分为不同的情况处理,当然,函数中还使用了大量的代码处理异常机制的数据维护,但是我相信大家跟我一样更加关注程序的正常流程是如何处理的。

其实核心的处理就是调用 invokeListener 来接收消息并激活消息监听器,但是之所以两种情况分开处理,正是考虑到在无限制循环接收消息的情况下,用户可以通过设置标志位 running 来控制消息接收的暂停与恢复,并维护当前消息监听器的数量。

private boolean executeOngoingLoop() throws JMSException {

  boolean messageReceived = false;

  boolean active = true;

  while (active) {

   synchronized (lifecycleMonitor) {

    boolean interrupted = false;

    boolean wasWaiting = false;

   //如果当前任务已经处于激活状态但是却给了暂时终止的命令

    while ((active = isActive()) && !isRunning()) {

     if (interrupted) {

      throw new IllegalStateException("Thread was interrupted

      while waiting for " +

      "a restart of the listener container, but

      container is still stopped");

    }

     if (!wasWaiting) {

     //如果并非处于等待状态则说明是第一次执行,需要将激活任务数量减少

     decreaseActiveInvokerCount();

    }

    //开始进入等待状态,等待任务的恢复命令

     wasWaiting = true;

     try {

     //通过 wait 等待,也就是等待 notify 或者 notifyAll

     lifecycleMonitor.wait();

    }

     catch (InterruptedException ex) {

      // Re-interrupt current thread, to allow other threads

      to react.

     Thread.currentThread().interrupt();

      interrupted = true;

    }

   }

    if (wasWaiting) {

    activeInvokerCount++;

   }

    if (scheduledInvokers.size() > maxConcurrentConsumers) {

     active = false;

   }

  }

  //正常处理流程

   if (active) {

    messageReceived = (invokeListener() || messageReceived);

  }

 }

  return messageReceived;

}

如果按照正常的流程其实是不会进入 while 循环中的,而是直接进入函数 invokeListener() 来接收消息并激活监听器,但是,我们不可能让循环一直持续下去,我们要考虑到暂停线程或者恢复线程的情况,这时,isRunning() 函数就派上用场了。

isRunning() 用来检测标志位 this.running 状态进而判断是否需要进入 while 循环。由于要维护当前线程激活数量,所以引入了 wasWaiting 变量,用来判断线程是否处于等待状态。如果线程首次进入等待状态,则需要减少线程激活数量计数器。

当然,还有一个地方需要提一下,就是线程等待不是一味地采用 while 循环来控制,因为如果单纯地采用 while 循环会浪费 CPU 的始终周期,给资源造成巨大的浪费。这里,Spring 采用的是使用全局控制变量 lifecycleMonitor 的 wait() 方法来暂停线程,所以,如果终止线程需要再次恢复的话,除了更改 this.running 标志位外,还需要调用 lifecycleMonitor.notify 或者 lifecycle Monitor.notifyAll 来使线程恢复。

接下来就是消息接收的处理了。

private boolean invokeListener() throws JMSException {

 //初始化资源包括首次创建的时候创建 session 与 consumer

 initResourcesIfNecessary();

  boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);

 //改变标志位,信息成功处理

  this.lastMessageSucceeded = true;

  return messageReceived;

}

protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer

consumer)

throws JMSException {

  if (this.transactionManager != null) {

   // Execute receive within transaction.

   TransactionStatus status = this.transactionManager.getTransaction(this.

  transactionDefinition);

   boolean messageReceived;

   try {

    messageReceived = doReceiveAndExecute(invoker, session, consumer, status);

  }

   catch (JMSException ex) {

    rollbackOnException(status, ex);

    throw ex;

  }

   catch (RuntimeException ex) {

    rollbackOnException(status, ex);

    throw ex;

  }

   catch (Error err) {

    rollbackOnException(status, err);

    throw err;

  }

  this.transactionManager.commit(status);

   return messageReceived;

 }

  else {

   // Execute receive outside of transaction.

   return doReceiveAndExecute(invoker, session, consumer, null);

 }

}

在介绍消息监听器容器的分类时,已介绍了 DefaultMessageListenerContainer 消息监听器容器建立在 SimpleMessageListenerContainer 容器之上,添加了对事务的支持,那么此时,事务特性的实现已经开始了。如果用户配置了 this.transactionManager ,也就是配置了事务,那么,消息的接收会被控制在事务之内,一旦出现任何异常都会被回滚,而回滚操作也会交由事务管理器统一处理,比如 this.transactionManager.rollback(status)。

doReceiveAndExecute 包含了整个消息的接收处理过程,由于参杂着事务,所以并没有复用模板中的方法。

protected boolean doReceiveAndExecute(

Object invoker, Session session, MessageConsumer consumer, TransactionStatus

status)

throws JMSException {

  Connection conToClose = null;

  Session sessionToClose = null;

  MessageConsumer consumerToClose = null;

  try {

   Session sessionToUse = session;

   boolean transactional = false;

    if (sessionToUse == null) {

     sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(

     getConnectionFactory(), this.transactionalResourceFactory,

    true);

     transactional = (sessionToUse != null);

   }

    if (sessionToUse == null) {

     Connection conToUse;

     if (sharedConnectionEnabled()) {

      conToUse = getSharedConnection();

    }

     else {

      conToUse = createConnection();

      conToClose = conToUse;

     conToUse.start();

    }

     sessionToUse = createSession(conToUse);

     sessionToClose = sessionToUse;

   }

    MessageConsumer consumerToUse = consumer;

    if (consumerToUse == null) {

     consumerToUse = createListenerConsumer(sessionToUse);

     consumerToClose = consumerToUse;

   }

   //接收消息

    Message message = receiveMessage(consumerToUse);

    if (message != null) {

     if (logger.isDebugEnabled()) {

      logger.debug("Received message of type [" + message.getClass() +

      "] from consumer [" +

      consumerToUse + "] of " + (transactional ? "transactional

      " : "") + "session [" +

      sessionToUse + "]");

    }

    //模板方法,当消息接收且在未处理前给子类机会做相应处理,当期空实现

     messageReceived(invoker, sessionToUse);

     boolean exposeResource = (!transactional && isExposeListenerSession() &&

     !TransactionSynchronizationManager.hasResource (getConnection

    Factory()));

     if (exposeResource) {

     TransactionSynchronizationManager.bindResource(

      getConnectionFactory(), new LocallyExposedJms Resource

     Holder(sessionToUse));

    }

     try {

     //激活监听器

     doExecuteListener(sessionToUse, message);

    }

     catch (Throwable ex) {

      if (status != null) {

       if (logger.isDebugEnabled()) {

logger.debug("Rolling back transaction because of listener exception thrown: " + ex);

   }

   status.setRollbackOnly();

  }

  handleListenerException(ex);

   // Rethrow JMSException to indicate an infrastructure problem

   // that may have to trigger recovery...

   if (ex instanceof JMSException) {

    throw (JMSException) ex;

  }

 }

  finally {

   if (exposeResource) {

TransactionSynchronizationManager.unbindResource&nbsp;(getConnectionFactory());

  }

 }

  // Indicate that a message has been received.

  return true;

}

else {

  if (logger.isTraceEnabled()) {

logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + sessionToUse + "] did not receive a message");

 }

 //接收到空消息的处理

  noMessageReceived(invoker, sessionToUse);

// Nevertheless call commit, in order to reset the transaction timeout (if&nbsp;any).

// However, don't do this on Tibco since this may lead to a deadlock&nbsp;there.

  if (shouldCommitAfterNoMessageReceived(sessionToUse)) {

   commitIfNecessary(sessionToUse, message);

 }

  // Indicate that no message has been received.

  return false;

 }

}

finally {

 JmsUtils.closeMessageConsumer(consumerToClose);

 JmsUtils.closeSession(sessionToClose);

ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(),&nbsp;true);

 }

}

上面函数代码看似繁杂,但是真正的逻辑并不多,大多是固定的套路,而我们最关心的就是监听器的激活处理。

protected void doExecuteListener(Session session, Message message) throws JMSException {

  if (!isAcceptMessagesWhileStopping() && !isRunning()) {

   if (logger.isWarnEnabled()) {

logger.warn("Rejecting received message because of the listener container " + "having been stopped in the meantime: " + message);

  }

  rollbackIfNecessary(session);

   throw new MessageRejectedWhileStoppingException();

 }

  try {

  invokeListener(session, message);

 }

  catch (JMSException ex) {

   rollbackOnExceptionIfNecessary(session, ex);

   throw ex;

 }

  catch (RuntimeException ex) {

   rollbackOnExceptionIfNecessary(session, ex);

   throw ex;

 }

  catch (Error err) {

   rollbackOnExceptionIfNecessary(session, err);

   throw err;

 }

 commitIfNecessary(session, message);

}

protected void invokeListener(Session session, Message message) throws JMSException {

  Object listener = getMessageListener();

  if (listener instanceof SessionAwareMessageListener) {

  doInvokeListener((SessionAwareMessageListener) listener, session, message);

 }

  else if (listener instanceof MessageListener) {

   doInvokeListener((MessageListener) listener, message);

 }

  else if (listener != null) {

throw new IllegalArgumentException( "Only MessageListener and SessionAwareMessageListener supported: " + listener);

 }

  else {

throw new IllegalStateException("No message listener specified - see property 'messageListener'");

 }

}

protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {

 listener.onMessage(message);

}

通过层层调用,最终提取监听器并使用 listener.onMessage(message) 激活了监听器,也就是激活了用户自定义的监听器逻辑。这里还有一句重要的代码很容易被忽略掉,commitIfNecessary(session, message),完成的功能是 session.commit()。完成消息服务的事务提交,涉及两个事务,我们常说的 DefaultMessageListenerContainer 增加了事务的支持,是通用的事务,也就是说我们在消息接收过程中如果产生其他操作,比如向数据库中插入数据,一旦出现异常时就需要全部回滚,包括回滚插入数据库中的数据。但是,除了我们常说的事务之外,对于消息本身还有一个事务,当接收一个消息的时候,必须使用事务提交的方式,这是在告诉消息服务器本地已经正常接收消息,消息服务器接收到本地的事务提交后便可以将此消息删除,否则,当前消息会被其他接收者重新接收。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。