返回介绍

12.4 分布式消息实现原理演示

发布于 2025-04-26 13:26:37 字数 3139 浏览 0 评论 0 收藏

使用 RabbitMQ 实现分布式消息分发,在分布式系统中具有很大的用途,为各个应用之间传递消息和数据提供了很大的方便,并且其松散耦合的结构和异步处理的机制不会影响系统的性能。

使用 spring-cloud-stream 可以非常简单地使用 RabbitMQ 的异步消息,Spring Cloud 的配置管理中的分布式消息分发也是通过调用 spring-cloud-stream 组件来实现的。下面将创建一个消息生产者和一个消息消费者来演示消息分发的实现原理。

12.4.1 消息生产者

消息生产者的实现如代码清单 12-12 所示,这里主要创建了一个 POST 接口“/send”,以接收传入的 Map 对象作为参数,使用 MessageChannel 的 send 方法,将消息发布到 RabbitMQ 的消息队列上。使用 Map 对象的目的,是保证消息生产者和消息消费者之间,可以使用相同的对象来存取消息的内容,这就要求双方必须事先约定 Map 对象的字段。

代码清单 12-12 消息生产者主程序

@EnableBinding(Source.class)
@RestController
@SpringBootApplication
public class SenderApplication {
    @Autowired
    @Output(Source.OUTPUT)
    private MessageChannel channel;
    @RequestMapping(method = RequestMethod.POST, path = "/send")
    public void write (@RequestBody Map<String, Object> msg){
        channel.send(MessageBuilder.withPayload(msg).build());
    }
    public static void main(String[] args) {
        SpringApplication.run(SenderApplication.class, args);
    }
}

代码清单 12-13 是消息生产者这个工程的配置文件,其中 stream 配置的目的地为 cloud-stream,客户端订阅时必须使用相同的目的地,注意这里绑定的方式是 output,rabbitmq 是连接消息服务器的一些参数配置。

代码清单 12-13 消息生产者工程配置文件

server:
    port: 80
spring:
    cloud:
        stream:
            bindings:
            output:
                destination: cloud-stream
    rabbitmq:
        addresses: amqp:// 192.168.1.216:5672
        username: alan
        password: alan

12.4.2 消息消费者

消息消费者的实现如代码清单 12-14 所示,这里从 SubscribableChannel(即 Sink.INPUT)订阅了通道的消息,它相当于一个监听器,当订阅的通道上有消息发布时,就将消息取回来,然后简单地在控制台上打印出来。这里使用的 Map 对象,跟消息生产者约定了两个使用字段,即 msg 和 name。

代码清单 12-14 消息消费者主程序

@EnableBinding(Sink.class)
@IntegrationComponentScan
@MessageEndpoint
@SpringBootApplication
public class ReceiverApplication {
    @ServiceActivator(inputChannel=Sink.INPUT)
    public void accept(Map<String, Object> msg){
        System.out.println(msg.get("msg").toString() + ":" + msg.get("name"));
    }
    public static void main(String[] args) {
        SpringApplication.run(ReceiverApplication.class, args);
    }
}

代码清单 12-15 是消息消费者工程的配置文件,它与消息生产者有相同的目的地,不同的是它的绑定方式是 input,其中 rabbitmq 的配置是相同的,只要连上 RabbitMQ 消息服务器即可。

代码清单 12-15 消息消费者工程配置文件

server:
    port: 81
spring:
    cloud:
        stream:
            bindings:
            input:
                destination: cloud-stream
                group: group1
                consumer:
                  durableSubscription: true
    rabbitmq:
        addresses: amqp:// 192.168.1.216:5672
        username: alan
        password: alan

这样,当两个工程都启动之后,输入下列 POST 指令,就可以将消息从消息生产者中发布出去。

curl -l -H "Content-type:application/json" -X POST -d'{"msg":"Hello","name":"RabbitMQ"}' http://localhost/send

其中消息结构中的 msg 和 name 就是约定好的字段。这时,在消息消费者的控制台上可以看到接收到了如下信息:

Hello:RabbitMQ

从上面的演示可以看出,使用 spring-cloud-stream 来实现分布式消息的分发和接收都是非常简单的。上面实例工程的完整代码可以从 GitHub 中检出: https://github.com/chenfromsz/spring-cloud-stream-demo.git .

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。