北京 外贸网站,制作h5的基本流程,工程建设分为哪几个阶段,佛山新网站建设市场代码中包含了PHP在一般情况下使用方法和RabbitMQ开启了SSL验证后的使用方法#xff08;我这边消费队列是使用接口请求的方式#xff0c;每次只从中取出一条#xff09;
安装amqp扩展
PHP使用RabbitMQ前#xff0c;需要安装amqp扩展#xff0c;之前文章中介绍了Windows环…代码中包含了PHP在一般情况下使用方法和RabbitMQ开启了SSL验证后的使用方法我这边消费队列是使用接口请求的方式每次只从中取出一条
安装amqp扩展
PHP使用RabbitMQ前需要安装amqp扩展之前文章中介绍了Windows环境PHP安装amqp扩展的方法windows环境PHP使用RabbitMq安装amqp扩展_windows mq扩展安装-CSDN博客
Linux中安装amqp扩展
### 先进入/usr/local目录下下载两个文件到此目录我的PHP版本是7.2wget -c https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gzwget -c http://pecl.php.net/get/amqp-1.9.3.tgz### 若使用的docker将上面下载的两个包 拷贝到容器内【 docker cp ./文件 dockerID:/usr/local】然后执行下面命令即可### 解压rabbitmq-c-0.8.0.tar.gztar zxf rabbitmq-c-0.8.0.tar.gzcd /usr/local/rabbitmq-c-0.8.0./configure --prefix/usr/local/rabbitmq-c-0.8.0make make install### 然后解压 amqp-1.9.3.tgz 解压后amqp-1.9.3文件下内还有个amqp-1.9.3文件夹将内部的amqp-1.9.3目录拷贝到/usr/local/下执行下列命令cd /usr/local/amqp-1.9.3/usr/local/bin/phpize./configure --with-php-config/usr/local/bin/php-config --with-amqp --with-librabbitmq-dir/usr/local/rabbitmq-c-0.8.0cp /usr/local/rabbitmq-c-0.8.0/librabbitmq/amqp_ssl_socket.h /usr/local/amqp-1.9.3/make make install### 最后修改php.ini 加上配置extension amqp.so
安装后执行php -m 显示amqp 即表明扩展安装成功
加载PHP代码的扩展包
然后需要加载代码的扩展包比较方便快捷的方法是使用composer 加载扩展包
composer require php-amqplib/php-amqplib若想指定版本composer require php-amqplib/php-amqplib:版本
具体使用哪个版本可以在此链接内查询https://packagist.org/packages/php-amqplib/php-amqplib
示例代码包含开启了SSL的连接方式
?phpnamespace common\helpers;use models\setting\Log;
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;class AmqpHelper
{/*** rabbitMq 未开启ssl验证 消费者* return false|string|void* throws \AMQPChannelException* throws \AMQPConnectionException* throws \AMQPQueueException* time 2024/12/2 13:43* author zsh*/public static function consumerResult(){//队列配置信息$configParams array(host \Yii::$app-params[cotaTct][queueHost],port \Yii::$app-params[cotaTct][queuePort],login \Yii::$app-params[cotaTct][queueLogin],password \Yii::$app-params[cotaTct][queuePassword],vhost \Yii::$app-params[cotaTct][queueVhost]);$conn new \AMQPConnection($configParams);if (!$conn-connect()) {die(连接rabbitmq失败!\n);}//建立信道$channel new \AMQPChannel($conn);// 创建队列$q new \AMQPQueue($channel);$queueName \Yii::$app-params[cotaTct][queueName]; //队列名$q-setName($queueName);$q-setFlags(AMQP_DURABLE); // 持久化// 绑定交换机与队列并指定路由键$q-bind(\Yii::$app-params[cotaTct][exchange], \Yii::$app-params[cotaTct][routingKey]);// 消息获取$ret $q-get(AMQP_AUTOACK);if ($ret) {
// echo \nget data:\n;
// var_dump($ret-getBody());
// var_dump(json_decode($ret-getBody(), true));$conn-disconnect();return $ret-getBody();}else{$conn-disconnect();return false;}}/*** rabbitMq 开启ssl了验证 消费者* return mixed|string|void* throws \ErrorException* time 2024/12/2 13:44* author zsh*/public static function sslConsumerResult(){$configParams array(host \Yii::$app-params[cotaTct][prodQueueHost],port \Yii::$app-params[cotaTct][prodQueuePort],login \Yii::$app-params[cotaTct][prodQueueLogin],password \Yii::$app-params[cotaTct][prodQueuePassword],vhost \Yii::$app-params[cotaTct][queueVhost]);// 创建SSL连接时忽略证书验证$ssl_options array(verify_peer false,verify_peer_name false,);$connection new AMQPSSLConnection($configParams[host],$configParams[port],$configParams[login],$configParams[password],$configParams[vhost],$ssl_options);if (!$connection-isConnected()) {die(连接rabbitmq失败!\n);}
// echo 链接成功...;$queueName \Yii::$app-params[cotaTct][queueName]; //队列名$exchange \Yii::$app-params[cotaTct][exchange];$routingKey \Yii::$app-params[cotaTct][routingKey];$channel $connection-channel();// 声明交换器$channel-exchange_declare($exchange, topic, false, true, false);// 获取系统生成的消息队列名称这里也可以指定一个队列名称$channel-queue_declare($queueName, false, true, false, false);// 将队列名与交换器名进行绑定并指定routing_key(路由键值)$channel-queue_bind($queueName,$exchange,$routingKey);$message ;// 定义收到消息回调函数$callback function ($msg) use ($message) {
// echo Message:.$msg-body;$message $msg-body;// 手动确认消息是否正常消费$msg-delivery_info[channel]-basic_Ack($msg-delivery_info[delivery_tag]);};// 设置消费成功后才能继续进行下一个消费$channel-basic_qos(null, 1, null);// 开启消费no_ackfalse,设置为手动应答$channel-basic_consume($queueName, , false, false, false, false, $callback);// 循环进行消费
// while ($channel-is_consuming()) {
// try {
// $channel-wait(null, false, $timeout 10);
// }catch (AMQPTimeoutException $ex){
// // 没有消息可处理退出循环
// echo $ex-getMessage();
// break;
// }
// }if ($channel-is_consuming()) {try {$channel-wait(null, false, $timeout 5);}catch (AMQPTimeoutException $ex){// 没有消息可处理退出循环echo $ex-getMessage();}}//关闭连接$channel-close();$connection-close();$return $message;unset($message);$message null;return $return;}/*** rabbitMq 未开启ssl验证 生产者* return mixed|string|void* throws \ErrorException* time 2024/12/2 13:44* author zsh*/public static function producer($message){$configParams array(host \Yii::$app-params[cotaTct][queueHost],port \Yii::$app-params[cotaTct][queuePort],login \Yii::$app-params[cotaTct][queueLogin],password \Yii::$app-params[cotaTct][queuePassword],vhost \Yii::$app-params[cotaTct][queueVhost]);$exchangeName \Yii::$app-params[cotaTct][producerExchange];try {$conn new AMQPStreamConnection($configParams[host], $configParams[port], $configParams[login], $configParams[password]);//创建channel$channel $conn-channel();$channel-exchange_declare($exchangeName,fanout,false,true,false);$messageData new AMQPMessage($message);$channel-basic_publish($messageData, $exchangeName);$channel-close();$conn-close();return true;}catch (\Exception $e){Log::error(AMQP队列错误.$e,AMQP);return false;}}/*** rabbitMq 开启了ssl验证 生产者* return mixed|string|void* throws \ErrorException* time 2024/12/2 13:44* author zsh*/public static function sslProducer($message){$configParams array(host \Yii::$app-params[cotaTct][prodQueueHost],port \Yii::$app-params[cotaTct][prodQueuePort],login \Yii::$app-params[cotaTct][prodQueueLogin],password \Yii::$app-params[cotaTct][prodQueuePassword],vhost \Yii::$app-params[cotaTct][queueVhost]);$exchangeName \Yii::$app-params[cotaTct][producerExchange];// 创建SSL连接时忽略证书验证$ssl_options array(verify_peer false,verify_peer_name false,);try {$conn new AMQPSSLConnection($configParams[host],$configParams[port],$configParams[login],$configParams[password],$configParams[vhost],$ssl_options);if (!$conn-isConnected()) {die(连接rabbitmq失败!\n);}//创建channel$channel $conn-channel();$channel-exchange_declare($exchangeName,fanout,false,true,false);$messageData new AMQPMessage($message);$channel-basic_publish($messageData, $exchangeName);$channel-close();$conn-close();return true;}catch (\Exception $e){Log::error(AMQP队列错误.$e,AMQP);return false;}}
}