zoukankan      html  css  js  c++  java
  • 在laravel框架中使用mq

    本文写于2018-11-28

    1、部署laravel项目

    https://github.com/laravel/laravel  通过git克隆项目,或者下载zip包然后解压等方式都可以把laravel框架源码下载下来。

    然后composer install 安装各种依赖

    然后复制.env.example 为.env文件,执行php artisan key:generate 生成APP_KEY

    2、上传文件到github【备注:这一步可以略过】

    1)git bash 到项目目录,然后git init,初始化本地仓库

    2)git remote add origin 远程库地址

    3)git add . 把文件缓存到缓冲区

    4)git commit -m '初始提交'

    5)git push origin master  推送本地代码到远程库master分支

    3、安装mq依赖

    从  https://packagist.org/?query=rabbitmq 选择一个依赖包

    从上面截图看到,能用的就是第一个和最后一个依赖。暂时先使用第一个依赖,后续有时间补充下第二个依赖的使用。

    composer require php-amqplib/php-amqplib  即可安装依赖

     4、laravel框架中使用mq

    1)在config目录下新增mq.php,文件内容:

    <?php
    return [
    	'host' => env('MQ_HOST', '127.0.0.1'),
    	'port' => env('MQ_PORT', 5672),
    	'user' => env('MQ_USER', 'guest'),
    	'password' => env('MQ_PASSWORD', 'guest'),
    	'queue' => env('MQ_QUEUE', 'default'),
    	'exchange' => env('MQ_EXCHANGE', 'default'),
    	'key' => env('MQ_KEY', 'default'),
    ];

     具体的配置信息,可以在.env文件中配置,也可以修改这个文件。

    2)新增MqSend.php文件,路径:app/Console/Commands

    <?php
    
    namespace AppConsoleCommands;
    
    use IlluminateConsoleCommand;
    use PhpAmqpLibConnectionAMQPConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class MqSend extends Command
    {
        /**
         * 控制台命令 signature 的名称。
         *
         * @var string
         */
        protected $signature = 'mq:send {msg}';
        /**
         * 控制台命令说明。
         *
         * @var string
         */
        protected $description = 'send messages to rabbitMQ';
    
        /**
         * 创建一个新的命令实例。
         *
         * @return void
         */
        public function __construct()
        {
            parent::__construct();
        }
    
        /**
         * 执行控制台命令。
         *
         * @return mixed
         */
        public function handle()
        {
            $host = config('mq.host');
            $port = config('mq.port');
            $user = config('mq.user');
            $password = config('mq.password');
            $queue = config('mq.queue');        // 队列名称
            $exchange = config('mq.exchange');  // 交换机名称
            $key = config('mq.key');            // 队列绑定交换机时配置的routingKey
    
            $connection = new AMQPConnection($host, $port, $user, $password);
            $channel = $connection->channel();
    
            /**
             * 如果管理后台上已经配置了交换机、队列,以及绑定了关系,则不需要下面的3条语句
             */
            $channel->exchange_declare($exchange, 'direct', false, true, false);    // 初始化交换机
            $channel->queue_declare($queue, false, true, false, false);             // 初始化队列
            $channel->queue_bind($queue, $exchange, $key);  // 将队列与某个交换机进行绑定,并使用路由关键字
    
    
            $msg = '[' . date('Y-m-d H:i:s') . '] ' . $this->argument('msg');
            $data = new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
            $channel->basic_publish($data, $exchange, $queue);
            echo "[X] Sent: $msg 
    ";
    
            $channel->close();
            $connection->close();
        }
    }

    3)新增MqReceive.php文件, 路径:app/Console/Commands

    <?php
    
    namespace AppConsoleCommands;
    
    use IlluminateConsoleCommand;
    use PhpAmqpLibConnectionAMQPConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class MqReceive extends Command
    {
        /**
         * 控制台命令 signature 的名称。
         *
         * @var string
         */
        protected $signature = 'mq:receive';
        /**
         * 控制台命令说明。
         *
         * @var string
         */
        protected $description = 'receive messages to rabbitMQ';
    
        /**
         * 创建一个新的命令实例。
         *
         * @return void
         */
        public function __construct()
        {
            parent::__construct();
        }
    
        /**
         * 执行控制台命令。
         *
         * @return mixed
         */
        public function handle()
        {
            $host = config('mq.host');
            $port = config('mq.port');
            $user = config('mq.user');
            $password = config('mq.password');
            $queue = config('mq.queue');        // 队列名称
            $exchange = config('mq.exchange');  // 交换机名称
            $key = config('mq.key');            // 队列绑定交换机时配置的routingKey
    
            $connection = new AMQPConnection($host, $port, $user, $password);
            $channel = $connection->channel();
    
            /**
             * 如果管理后台上已经配置了交换机、队列,以及绑定了关系,则不需要下面的3条语句
             */
            $channel->queue_declare($queue, false, true, false, false);
    
            echo ' [*] Waiting for messages. To exit press CTRL+C', "
    ";
    
            $callback = function($msg) {
                echo " [x] Received ", $msg->body, "
    ";
            };
    
            $channel->basic_consume($queue, '', false, true, false, false, $callback);
    
            while(count($channel->callbacks)) {
                $channel->wait();
            }
    
            $channel->close();
            $connection->close();
        }
    }
    

    4)修改app/console/Kernel.php文件,在$commands数组中增加:

        protected $commands = [
            CommandsMqSend::class,
            CommandsMqReceive::class,
        ];

    这样子就能在php artisan看到有mq的命令:

    5)生产者发布消息

    发布成功。代码中设置了交换机、队列名、理由关键词,这些默认值都是default,在rabbitMQ管理后台可以看到有新增了交换机、队列,队列里面也有消息。

    【备注】windows系统上可能执行生产者脚本会报错:

    这是因为windows不支持这个SOCKET_EAGAIN常量。

    参考:https://github.com/php-amqplib/php-amqplib/issues/619

    要改下vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php文件:

            // self::$ERRNO_EQUALS_EAGAIN = 'errno=' . SOCKET_EAGAIN;
            // windows不支持SOCKET_EAGAIN,所以会出现未定义的报错。在linux上SOCKET_EAGAIN是SOCKET_EWOULDBLOCK的别名
            // https://github.com/php-amqplib/php-amqplib/issues/619  使用SOCKET_EWOULDBLOCK替换
            self::$ERRNO_EQUALS_EAGAIN = 'errno=' . (defined('SOCKET_EAGAIN') ? SOCKET_EAGAIN : SOCKET_EWOULDBLOCK);
            self::$ERRNO_EQUALS_EWOULDBLOCK = 'errno=' . SOCKET_EWOULDBLOCK;
            self::$ERRNO_EQUALS_EINTR = 'errno=' . SOCKET_EINTR;

    修改完之后就没问题了。

    6)消费者接收消息

    消息有2条,都接收到了。按Ctrl+C可以退出消费者,因为消费是阻塞,一直在等待接收消息。

    5、参考文档:

    1)laravel中新增artisan命令:https://laravel-china.org/docs/laravel/5.7/artisan/2276

    2)laravel中使用 php-amqplib/php-amqplib 依赖包

    https://segmentfault.com/a/1190000012308675

    https://segmentfault.com/a/1190000011825148

  • 相关阅读:
    java如何手动创建一个线程池
    HashMap的面试总结(摘抄)
    JDK源码调试
    分布式和集群的区别
    开发中model,entity和pojo的区别
    java并发编程_CountDownLanch(倒计数锁存器)应用场景
    Map 怎么排序
    java中Thread的 interrupt异常处理
    zookeeper节点失效重连机制
    java并发库_并发库知识点整理
  • 原文地址:https://www.cnblogs.com/guangye/p/10033123.html
Copyright © 2011-2022 走看看