Jump to content
  • Hello visitors, welcome to the Hacker World Forum!

    Red Team 1949  (formerly CHT Attack and Defense Team) In this rapidly changing Internet era, we maintain our original intention and create the best community to jointly exchange network technologies. You can obtain hacker attack and defense skills and knowledge in the forum, or you can join our Telegram communication group to discuss and communicate in real time. All kinds of advertisements are prohibited in the forum. Please register as a registered user to check our usage and privacy policy. Thank you for your cooperation.

    TheHackerWorld Official

Recommended Posts

RabbitMq简介
1.1消息队列中间件简介
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(大数据),MetaMQ,RocketMQ
以下介绍消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景

1.2什么是RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

1.可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。2.灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

2.消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker

3.高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

4.多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

5.多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

6.管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

7.跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

8.插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

安装准备工具
1.下载Eralng,下面链接已提供otp_win64_20.2.exe
链接: https://pan.baidu.com/s/1lmvCMPVAV1Ba9UogCdQpZg
提取码:x9m7
2.下载rabbitmq,下面链接已提供rabbitmq-server-3.7.4.exe
链接: https://pan.baidu.com/s/1CPfhg5X1e7UitpgMWIcAEg
提取码:h4r3

安装步骤(图文)
1、安装erlang并配置环境变量
1.1 双击otp_win64_20.2.exe,点击next

2658512-20220716190951201-45982607.png

 

 

1.2 选择安装目录

2658512-20220716191015763-903435096.png

 

 

1.3 配置环境变量
新建系统变量名为:ERLANG_HOME 变量值为erlang安装地址

ERLANG_HOME 路径:E:\Program Files\erl9.2
2658512-20220716191111049-1732803273.png

 

 

双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中。

%ERLANG_HOME%\bin
2658512-20220716191137459-1373736878.png

1.4 验证erlang是否安装成功
win+R键,输入cmd,再输入erl,看到erlang版本号就说明erlang安装成功了。

2658512-20220716191214148-1442170477.png


2、安装RabbitMQ
2.1 双击下载后的.exe文件,安装过程与erlang的安装过程相同。
2.2 安装RabbitMQ-Plugins
打开命令行cd,输入RabbitMQ的sbin目录。
我的目录是:

E:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin
2658512-20220716191245118-1348568100.png


然后输入以下命令进行安装

rabbitmq-plugins enable rabbitmq_management
2658512-20220716191335422-2085988964.png

2.3 验证rabbitmq是否安装成功
输入 以下命令

rabbitmqctl status
2658512-20220716191400049-171619202.png


如果出现以下的图,说明安装是成功的,并且说明现在RabbitMQ Server已经启动了,运行正常


2.4 打开浏览器,地址栏输入mq访问地址,即可看到管理界面的登陆页

http://127.0.0.1:15672     【注意页面端是:15672,服务端是:5672】
2658512-20220716191424912-388476087.png

2.5 输入用户名和密码,都为guest 进入主界面:
最上侧的导航依次是:概览、连接、信道、交换器、队列、用户管理


安装过程中遇到的问题
2658512-20220716191649527-209665597.png

 

 代码示例在框架中测试:

 

安装php-amqplib

php-amqplib是一个纯PHP库,使用它,基于PHP的脚本客户端就可以轻松的连接和操作RabbitMQ。我们使用composer来安装。

composer require php-amqplib/php-amqplib

示例说明

生产者(Producer)和消费者(Consumer)是消息队列的基本概念,生产者是指生产消息的一方,也是消息发送方,消费者就是消费消息的一方,也是消息接收方,队列就是存储消息的一个缓存区。

本实例将由生产者发送很多消息给消息队列,由多个消费者来消费队列中的消息。我们可以想象这样的场景:皮鞋生产打包打包车间,不断有成品鞋进入传送带(消息队列)等待操作工人(消费者)将皮鞋打包。

因为等待打包的鞋子特别多,我们需要安排多个打包工人在传送带两边,及时从传送带取出成品鞋,然后装箱打包。我们要求是要确保工人最后打包好的皮鞋数量一双不少,不能因为打包工人操作慢或者个人原因暂时离开生产线,导致最终打包数不一致。

消息发送

生产者将消息发送给队列,至于谁来消费(处理)这些消息,生产者不管。

消息队列(MQ),用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

消息到达队列中后,如果没有一个消费者来处理消息的话,我们希望队列中的消息不要丢弃,也就是消息持久化。在生产者和消费者中都要将queue_declare第3个参数设置为true,表示让消息队列持久化。

$channel->queue_declare($queue, false, true, false, false); 

此外,我们可以确保即使RabbitMQ重启了,消息队列不会丢失,在生产者端设置:'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

 

现在我们建立生产者文件sender.php,我们假设服务端已经安装好RabbitMQ,并且开放好对应端口。

<?php
/**
 * @Author: Helloweba
 * @sender.php
 * @消息生产者-分发任务
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$queue = 'worker';

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection(
    '192.168.0.100', 
    56720, 
    'helloweba',  //user
    'helloweba',  //password
    'test'  //vhost
);
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false); //第3个参数设置为true,表示让消息队列持久化

for ($i = 0; $i < 100; $i++) { 
    $arr = [
        'id' => 'message_' . $i,
        'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i,
        'content' => 'helloweba-' . time()
    ];
    $data = json_encode($arr);
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); ////设置rabbitmq重启后也不会丢失队列,或者设置为'delivery_mode' => 2
    $channel->basic_publish($msg, '', $queue);

    echo 'Send message: ' . $data . PHP_EOL;
}

$channel->close();
$connection->close();

?>

上述代码中,我们模拟了生产者向队列中发送了100条订单消息。

消息接收

消费者是指完成消息的接收和处理的客户端程序,消费者就如同生产线上的操作工人,他们按照操作规程从传送带上取出产品后有序的完成后续工作任务。

实际项目中,如果消费者处理消息能力不够时,就要开启多个消费者来消费队列中的消息。默认情况下,RabbitMQ将会把队列中的消息平均分配给每个消费者。如果消费者要对分配到的消息任务处理时间很长(耗时任务),那么处理消息任务的时候就有可能会遇到意外。

比如某个消费者断电了,或者出故障了,那它正在处理的消息会怎么办?这里就是RabbitMQ的消息确认机制,为了保证数据不丢失,RabbitMQ会将未处理完的消息分配给下一个消费者处理。

此外RabbitMQ还可以设置公平分配消息任务,不会给某个消费者同时分配多个消息处理任务,因为消费者无法同时处理多个消息任务。

换句话说,RabbitMQ在处理和确认消息之前,不会向消费者发送新的消息,而是将消息分发给下一个不忙的消费者。

$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息

 

我们现在建立消费者文件receiver.php,代码如下:

<?php
/**
 * @Author: Helloweba
 * @receiver.php
 * @消息消费者-接收端
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$queue = 'worker';

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test');
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL;

$callback = function($msg){
    echo " Received message:", $msg->body, PHP_EOL;
    sleep(1);  //模拟耗时执行
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息
$channel->basic_consume($queue, '', false, false, false, false, $callback); //第4个参数值为false表示启用消息确认

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

 ?>

模拟测试

现在我们运行多个消费者终端,可以打开多个ssh客户端,client1和client2运行:

php receive.php

 

然后再开个终端,运行生产者:

php sender.php

由于消费者是阻塞运行的,他们会一直等待队列中的消息,当有消息就会去取出来处理。我们可以模拟将其中某个客户端中断,即断开某个消费者。

 

然后再看消息是不是被其他消费者接收处理了。同样我们可以模拟将客户端全部重启,看看队列中的消息是否没有丢失。

当client1中断连接RabbitMQ后,再次运行连接RabbitMQ,在client2中看到的消息处理情况,注意看图中的消息id。

client1

2658512-20220716192005634-1571629953.png

 

 

client2:

2658512-20220716192028255-382752804.png

 

Link to post
Link to comment
Share on other sites

 Share

discussion group

discussion group

    You don't have permission to chat.
    • Recently Browsing   0 members

      • No registered users viewing this page.
    ×
    ×
    • Create New...