Overview

Description

Caldera Queue is a job queue layer, to dispatch and execute offline jobs.

As with the other Caldera components it has been built to be swappable and modular.

Installation

The easisest way to install it is to use Composer:

composer require vecode/caldera-queue

Requires

  • php: >=8.1
  • ext-mbstring: *

Basic usage

Getting started

A very common requirement of modern web applications is to execute offline processes, that is in an asynchronous way that doesn't interfere with the UI.

For example, if you request a password-reset email, it would be wiser to schedule it rather than to send it directly, because it may take a couple of seconds to relay the message to the mail delivery system. So you show a message to the user saying that the mail is going to be sent in a couple of seconds and dispatch a job to do so.

That job is processed by a worker, and in an ideal setting, you will have several workers waiting for jobs to be done; one worker takes the job, prepares the message and sends it.

Awesome? That is the power of job queues and they can process anything from simple transactional emails to audio/video processing.

So let's get started by creating a job:

<?php

declare(strict_types = 1);

namespace App\Queue\Jobs;

use RuntimeException;
use Caldera\Queue\AbstractJob;

class SendPasswordRecoveryJob extends AbstractJob {

    /**
     * The handler function, must be implemented
     * @return mixed
     */
    public function handle() {
        $user = $this->getData('user');
        if ($user->id) {
            # Send email message to $user->email
        } else {
            throw new RuntimeException('Invalid user specified');
        }
    }
}

A job extends the AbstractJob which in turn implements JobInterface.

You just need to implement the handle method with your logic.

Dispatching jobs

First you will need an instance of the Queue class.

use Caldera\Queue\Adapter\DatabaseAdapter;
use Caldera\Queue\Queue;

$adapter = new DatabaseAdapter($database);
$queue = new Queue($adapter);

For the above example we are using the DatabaseAdapter, which uses the database to store the job information, so you'll need to pass a Database instance from Caldera Database.

A RedisAdapter is included too:

use Predis\Client;
use Caldera\Queue\Adapter\RedisAdapter;
use Caldera\Queue\Queue;

$params = [
    'scheme' => 'tcp',
    'host'   => '127.0.0.1',
    'port'   => 6379,
];
$client = new Client($params);
$adapter = new RedisAdapter($client);
$queue = new Queue($adapter);

Whatever adapter you choose, the procedure to dispatch jobs is the same:

use App\Entities\User;
use App\Queue\Jobs\SendPasswordRecoveryJob;

$queue->add(SendPasswordRecoveryJob::class, ['user' => new User(1, '[email protected]')]);

As simple as that; the add method receives a job class and an array of data to be passed to that job, you may pass scalar values but also complete objects (they must be serializable).

Once the job is queued, it's just a matter of having one or more workers to process the jobs.

Running jobs

This functionality is commonly accesed through framework means, for example, via a CLI command. If you are not using a framework you can still call the methods directly.

To run the jobs just call the work method:

$queue->work(function(JobInterface $job) {
    # Do nothing
}, function(Queue $queue) {
    return true;
});

It receives two optional Closure parameters, the first one is a callback for each executed job and the later is a callback for when all the jobs have been executed.

You can check the pending jobs with the pending method:

$pending = $queue->pending();

Ideally all the jobs should run without problems, but sometimes that is not case and the job will fail; the reset method allows you to reset the failed jobs:

$queue->reset();

And the purge method will delete all the failed jobs:

$queue->purge();