WizdomWeb/app/Utilities/QueueUtility.php

245 lines
8.0 KiB
PHP

<?php
namespace WizdomNetworks\WizeWeb\Utilities;
use WizdomNetworks\WizeWeb\Utilities\Logger;
class QueueUtility
{
private string $queueDir;
public function __construct(string $queueDir)
{
$this->queueDir = $queueDir;
// Ensure the queue directory exists
if (!is_dir($this->queueDir)) {
mkdir($this->queueDir, 0755, true);
Logger::logInfo("Queue directory created at: $this->queueDir");
}
}
/**
* Add a task to the queue with priority and expiration.
*
* @param string $queueName The name of the queue.
* @param array $task The task to enqueue.
* @param int $priority The priority of the task (lower value = higher priority).
* @param int $ttl Time-to-live in seconds (0 for no expiration).
* @return bool True if the task is added successfully, false otherwise.
*/
public function enqueue(string $queueName, array $task, int $priority = 0, int $ttl = 0): bool
{
$queueFile = $this->queueDir . "/$queueName.queue";
$expiry = $ttl > 0 ? time() + $ttl : 0;
try {
$taskData = serialize(['priority' => $priority, 'expiry' => $expiry, 'task' => $task]);
file_put_contents($queueFile, $taskData . PHP_EOL, FILE_APPEND | LOCK_EX);
Logger::logInfo("Task added to queue: $queueName with priority $priority and expiry $expiry");
return true;
} catch (\Throwable $e) {
Logger::logError("Failed to enqueue task: " . $e->getMessage());
return false;
}
}
/**
* Retrieve and remove the next task from the queue, considering priority and expiration.
*
* @param string $queueName The name of the queue.
* @return array|null The next task, or null if the queue is empty.
*/
public function dequeue(string $queueName): ?array
{
$queueFile = $this->queueDir . "/$queueName.queue";
if (!file_exists($queueFile)) {
Logger::logInfo("Queue file does not exist: $queueFile");
return null;
}
try {
$lines = file($queueFile, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
if (empty($lines)) {
unlink($queueFile);
Logger::logInfo("Queue is empty: $queueName");
return null;
}
// Sort tasks by priority and expiration
$tasks = array_map('unserialize', $lines);
usort($tasks, function ($a, $b) {
return $a['priority'] <=> $b['priority'] ?: $a['expiry'] <=> $b['expiry'];
});
// Find the next valid task
$updatedLines = [];
$nextTask = null;
foreach ($tasks as $taskData) {
if ($taskData['expiry'] > 0 && $taskData['expiry'] < time()) {
Logger::logInfo("Skipping expired task in queue: $queueName");
continue;
}
if ($nextTask === null) {
$nextTask = $taskData['task'];
} else {
$updatedLines[] = serialize($taskData);
}
}
file_put_contents($queueFile, implode(PHP_EOL, $updatedLines) . PHP_EOL, LOCK_EX);
return $nextTask;
} catch (\Throwable $e) {
Logger::logError("Failed to dequeue task: " . $e->getMessage());
return null;
}
}
/**
* Retry a failed task by re-adding it to the queue.
*
* @param string $queueName The name of the queue.
* @param array $task The task to retry.
* @param int $priority The priority of the task.
* @param int $retryLimit The maximum number of retries allowed.
* @param int $currentRetry The current retry count (default: 0).
* @return bool True if the task is retried successfully, false otherwise.
*/
public function retryTask(string $queueName, array $task, int $priority = 0, int $retryLimit = 3, int $currentRetry = 0): bool
{
if ($currentRetry >= $retryLimit) {
Logger::logWarning("Task moved to dead letter queue after exceeding retry limit: $queueName");
$this->enqueue("dead_letter_$queueName", $task, $priority);
return false;
}
Logger::logInfo("Retrying task in queue: $queueName, attempt: " . ($currentRetry + 1));
return $this->enqueue($queueName, $task, $priority);
}
/**
* Get the status of a queue.
*
* @param string $queueName The name of the queue.
* @return array|null Queue statistics or null if the queue does not exist.
*/
public function getQueueStats(string $queueName): ?array
{
$queueFile = $this->queueDir . "/$queueName.queue";
if (!file_exists($queueFile)) {
return null;
}
$lines = file($queueFile, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
$tasks = array_map('unserialize', $lines);
return [
'total_tasks' => count($tasks),
'last_modified' => date('Y-m-d H:i:s', filemtime($queueFile)),
'oldest_task' => $tasks[0]['task'] ?? null,
];
}
/**
* Clear all tasks in a queue.
*
* @param string $queueName The name of the queue.
* @return bool True if the queue is cleared successfully, false otherwise.
*/
public function clearQueue(string $queueName): bool
{
$queueFile = $this->queueDir . "/$queueName.queue";
if (!file_exists($queueFile)) {
Logger::logInfo("Queue file does not exist: $queueFile");
return false;
}
try {
unlink($queueFile);
Logger::logInfo("Queue cleared: $queueName");
return true;
} catch (\Throwable $e) {
Logger::logError("Failed to clear queue: " . $e->getMessage());
return false;
}
}
/**
* List all available queues.
*
* @return array List of queue names.
*/
public function listQueues(): array
{
$files = glob($this->queueDir . '/*.queue');
return array_map(function ($file) {
return basename($file, '.queue');
}, $files);
}
/**
* Clear all queues in the directory.
*
* @return bool True if all queues are cleared successfully, false otherwise.
*/
public function clearAllQueues(): bool
{
try {
$files = glob($this->queueDir . '/*.queue');
foreach ($files as $file) {
unlink($file);
}
Logger::logInfo("All queues cleared.");
return true;
} catch (\Throwable $e) {
Logger::logError("Failed to clear all queues: " . $e->getMessage());
return false;
}
}
/**
* Retrieve and remove a batch of tasks from the queue.
*
* @param string $queueName The name of the queue.
* @param int $batchSize The number of tasks to dequeue.
* @return array List of tasks.
*/
public function dequeueBatch(string $queueName, int $batchSize = 10): array
{
$queueFile = "$this->queueDir/$queueName.queue";
if (!file_exists($queueFile)) {
Logger::logInfo("Queue file does not exist: $queueFile");
return [];
}
try {
$lines = file($queueFile, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
if (empty($lines)) {
unlink($queueFile);
Logger::logInfo("Queue is empty: $queueName");
return [];
}
$tasks = array_map('unserialize', $lines);
usort($tasks, fn($a, $b) => $a['priority'] <=> $b['priority']);
$batch = array_splice($tasks, 0, $batchSize);
file_put_contents($queueFile, implode(PHP_EOL, array_map('serialize', $tasks)) . PHP_EOL);
return array_map(fn($task) => $task['task'], $batch);
} catch (Throwable $e) {
Logger::logError("Failed to dequeue batch: " . $e->getMessage());
return [];
}
}
}
?>