diff --git a/Classes/Queue/DoctrineQueue.php b/Classes/Queue/DoctrineQueue.php index f88bdc1..731f834 100644 --- a/Classes/Queue/DoctrineQueue.php +++ b/Classes/Queue/DoctrineQueue.php @@ -16,8 +16,12 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\DBALException; use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Exception; use Doctrine\DBAL\Exception\InvalidArgumentException; use Doctrine\DBAL\Exception\TableNotFoundException; +use Doctrine\DBAL\Query\Expression\CompositeExpression; +use Doctrine\DBAL\Query\QueryBuilder; +use Doctrine\DBAL\Schema\Schema; use Doctrine\ORM\EntityManagerInterface; use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueInterface; @@ -27,67 +31,42 @@ */ class DoctrineQueue implements QueueInterface { - /** - * @var string - */ - protected $name; + protected string $name; - /** - * @var array - */ - protected $options; + protected array $options; - /** - * @var Connection - */ - protected $connection; + protected Connection $connection; /** * Default timeout for message reserves, in seconds - * - * @var int */ - protected $defaultTimeout = 60; + protected int $defaultTimeout = 60; /** * Interval messages are looked up in waitAnd*(), in microseconds - * - * @var int */ - protected $pollInterval = 1000000; + protected int $pollInterval = 1000000; /** * Interval messages are looked up in waitAnd*(), if any messages were processed within the last $boostTime microseconds; in microseconds - * - * @var int */ - protected $boostPollInterval = 500000; + protected int $boostPollInterval = 500000; /** * Number of microseconds of the "boost time": If any messages were processed within that time, the special $boostPollInterval is used instead of the default $pollInterval; in microseconds - * - * @var int */ - protected $boostTime = 10000000; + protected int $boostTime = 10000000; /** * Time when the last message was processed - * - * @var int|null */ - protected $lastMessageTime; + protected int $lastMessageTime = 0; /** * Name of the table to store queue messages. Defaults to "_messages" - * - * @var string */ - protected $tableName; + protected string $tableName; - /** - * @param string $name - * @param array $options - */ public function __construct(string $name, array $options) { $this->name = $name; @@ -95,13 +74,13 @@ public function __construct(string $name, array $options) $this->defaultTimeout = (int)$options['defaultTimeout']; } if (isset($options['pollInterval'])) { - $this->pollInterval = (int)($options['pollInterval'] * 1000000); + $this->pollInterval = (int)$options['pollInterval'] * 1000000; } if (isset($options['boostPollInterval'])) { - $this->boostPollInterval = (int)($options['boostPollInterval'] * 1000000); + $this->boostPollInterval = (int)$options['boostPollInterval'] * 1000000; } if (isset($options['boostTime'])) { - $this->boostTime = (int)($options['boostTime'] * 1000000); + $this->boostTime = (int)$options['boostTime'] * 1000000; } if (isset($options['tableName'])) { $this->tableName = $options['tableName']; @@ -112,8 +91,6 @@ public function __construct(string $name, array $options) } /** - * @param EntityManagerInterface $doctrineEntityManager - * @return void * @throws DBALException */ public function injectDoctrineEntityManager(EntityManagerInterface $doctrineEntityManager): void @@ -131,21 +108,29 @@ public function injectDoctrineEntityManager(EntityManagerInterface $doctrineEnti */ public function setUp(): void { - switch ($this->connection->getDatabasePlatform()->getName()) { - case 'sqlite': - $createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id INTEGER PRIMARY KEY AUTOINCREMENT, payload LONGTEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled TEXT DEFAULT NULL)"; - break; - case 'postgresql': - $createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id SERIAL PRIMARY KEY, payload TEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled TIMESTAMP(0) WITHOUT TIME ZONE DEFAULT NULL)"; - break; - default: - $createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id INTEGER PRIMARY KEY AUTO_INCREMENT, payload LONGTEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled DATETIME DEFAULT NULL) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci ENGINE = InnoDB"; + $databasePlatform = $this->connection->getDatabasePlatform(); + if ($databasePlatform === null) { + throw new \RuntimeException('No database platform for current connection', 1703863019); } - $this->connection->exec($createDatabaseStatement); - try { - $this->connection->exec("CREATE INDEX state_scheduled ON {$this->connection->quoteIdentifier($this->tableName)} (state, scheduled)"); - } catch (DBALException $e) { - // See https://dba.stackexchange.com/questions/24531/mysql-create-index-if-not-exists + $schemaManager = $this->connection->getSchemaManager(); + if ($schemaManager === null) { + throw new \RuntimeException('No schema manager for current connection', 1703863021); + } + if (!$schemaManager->tablesExist($this->tableName)) { + $schema = new Schema(); + $table = $schema->createTable($this->connection->quoteIdentifier($this->tableName)); + $table->addColumn('id', 'integer', ['autoincrement' => true]); + $table->addColumn('payload', 'text'); + $table->addColumn('state', 'string', ['length' => 255]); + $table->addColumn('failures', 'integer', ['default' => 0]); + $table->addColumn('scheduled', 'datetime', ['notnull' => false]); + $table->setPrimaryKey(['id']); + $table->addIndex(['state', 'scheduled'], 'state_scheduled'); + + $createDatabaseStatements = $schema->toSql($databasePlatform); + foreach ($createDatabaseStatements as $createDatabaseStatement) { + $this->connection->exec($createDatabaseStatement); + } } } @@ -160,16 +145,29 @@ public function getName(): string /** * @inheritdoc * @throws DBALException + * @throws \JsonException + * @throws \Doctrine\DBAL\Driver\Exception */ public function submit($payload, array $options = []): string { if ($this->connection->getDatabasePlatform()->getName() === 'postgresql') { - $insertStatement = $this->connection->prepare("INSERT INTO {$this->connection->quoteIdentifier($this->tableName)} (payload, state, scheduled) VALUES (:payload, 'ready', {$this->resolveScheduledQueryPart($options)}) RETURNING id"); - $result = $insertStatement->executeQuery(['payload' => json_encode($payload)]); + $insertStatement = $this->connection->prepare(sprintf( + "INSERT INTO %s (payload, state, scheduled) VALUES (:payload, 'ready', %s) RETURNING id", + $this->connection->quoteIdentifier($this->tableName), + $this->resolveScheduledQueryPart($options) + )); + $result = $insertStatement->executeQuery(['payload' => json_encode($payload, JSON_THROW_ON_ERROR)]); return (string)$result->fetchOne(); } - $numberOfAffectedRows = (int)$this->connection->executeStatement("INSERT INTO {$this->connection->quoteIdentifier($this->tableName)} (payload, state, scheduled) VALUES (:payload, 'ready', {$this->resolveScheduledQueryPart($options)})", ['payload' => json_encode($payload)]); + $numberOfAffectedRows = (int)$this->connection->executeStatement( + sprintf( + "INSERT INTO %s (payload, state, scheduled) VALUES (:payload, 'ready', %s)", + $this->connection->quoteIdentifier($this->tableName), + $this->resolveScheduledQueryPart($options) + ), + ['payload' => json_encode($payload, JSON_THROW_ON_ERROR)] + ); if ($numberOfAffectedRows !== 1) { return ''; } @@ -179,6 +177,9 @@ public function submit($payload, array $options = []): string /** * @inheritdoc * @throws DBALException + * @throws Exception + * @throws \Doctrine\DBAL\Driver\Exception + * @throws \JsonException */ public function waitAndTake(?int $timeout = null): ?Message { @@ -187,7 +188,10 @@ public function waitAndTake(?int $timeout = null): ?Message return null; } - $numberOfDeletedRows = $this->connection->delete($this->connection->quoteIdentifier($this->tableName), ['id' => (int)$message->getIdentifier()]); + $numberOfDeletedRows = $this->connection->delete( + $this->connection->quoteIdentifier($this->tableName), + ['id' => (int)$message->getIdentifier()] + ); if ($numberOfDeletedRows !== 1) { // TODO error handling return null; @@ -199,6 +203,8 @@ public function waitAndTake(?int $timeout = null): ?Message /** * @inheritdoc * @throws DBALException + * @throws \Doctrine\DBAL\Driver\Exception + * @throws \JsonException */ public function waitAndReserve(?int $timeout = null): ?Message { @@ -206,9 +212,9 @@ public function waitAndReserve(?int $timeout = null): ?Message } /** - * @param int|null $timeout - * @return Message|null * @throws DBALException + * @throws \JsonException + * @throws \Doctrine\DBAL\Driver\Exception */ protected function reserveMessage(?int $timeout = null): ?Message { @@ -217,15 +223,44 @@ protected function reserveMessage(?int $timeout = null): ?Message } $this->reconnectDatabaseConnection(); + $selectMessageQuery = $this->createQueryBuilder(); + $selectMessageQuery + ->select('*') + ->where( + $selectMessageQuery->expr()->and( + 'state = :state', + $this->getScheduledQueryConstraint($selectMessageQuery) + ) + ) + ->setParameter('state', 'ready') + ->orderBy('id') + ->setMaxResults(1); + + $reserveMessageQuery = $this->connection->createQueryBuilder(); + $reserveMessageQuerySql = $reserveMessageQuery + ->update($this->connection->quoteIdentifier($this->tableName)) + ->set('state', ':newstate') + ->where('id = :id') + ->andWhere('state = :state') + ->andWhere($this->getScheduledQueryConstraint($reserveMessageQuery)) + ->getSQL(); + $startTime = time(); do { try { - $row = $this->connection->fetchAssociative("SELECT * FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'ready' AND {$this->getScheduledQueryConstraint()} ORDER BY id ASC LIMIT 1"); + $row = $selectMessageQuery->execute()->fetchAssociative(); } catch (TableNotFoundException $exception) { throw new \RuntimeException(sprintf('The queue table "%s" could not be found. Did you run ./flow queue:setup "%s"?', $this->tableName, $this->name), 1469117906, $exception); } if ($row !== false) { - $numberOfUpdatedRows = (int)$this->connection->executeStatement("UPDATE {$this->connection->quoteIdentifier($this->tableName)} SET state = 'reserved' WHERE id = :id AND state = 'ready' AND {$this->getScheduledQueryConstraint()}", ['id' => (int)$row['id']]); + $numberOfUpdatedRows = (int)$this->connection->executeStatement( + $reserveMessageQuerySql, + [ + 'newstate' => 'reserved', + 'id' => (int)$row['id'], + 'state' => 'ready' + ] + ); if ($numberOfUpdatedRows === 1) { $this->lastMessageTime = time(); return $this->getMessageFromRow($row); @@ -235,7 +270,7 @@ protected function reserveMessage(?int $timeout = null): ?Message return null; } - $currentPollInterval = ((int)$this->lastMessageTime + (int)($this->boostTime / 1000000) > time()) ? $this->boostPollInterval : $this->pollInterval; + $currentPollInterval = ($this->lastMessageTime + (int)($this->boostTime / 1000000) > time()) ? $this->boostPollInterval : $this->pollInterval; usleep($currentPollInterval); } while (true); } @@ -246,7 +281,13 @@ protected function reserveMessage(?int $timeout = null): ?Message */ public function release(string $messageId, array $options = []): void { - $this->connection->executeStatement("UPDATE {$this->connection->quoteIdentifier($this->tableName)} SET state = 'ready', failures = failures + 1, scheduled = {$this->resolveScheduledQueryPart($options)} WHERE id = :id", ['id' => (int)$messageId]); + $this->connection->executeStatement( + sprintf( + "UPDATE %s SET state = 'ready', failures = failures + 1, scheduled = %s WHERE id = :id", + $this->connection->quoteIdentifier($this->tableName), + $this->resolveScheduledQueryPart($options) + ), + ['id' => (int)$messageId]); } /** @@ -254,7 +295,11 @@ public function release(string $messageId, array $options = []): void */ public function abort(string $messageId): void { - $this->connection->update($this->connection->quoteIdentifier($this->tableName), ['state' => 'failed'], ['id' => (int)$messageId]); + $this->connection->update( + $this->connection->quoteIdentifier($this->tableName), + ['state' => 'failed'], + ['id' => (int)$messageId] + ); } /** @@ -271,7 +316,15 @@ public function finish(string $messageId): bool */ public function peek(int $limit = 1): array { - $rows = $this->connection->fetchAllAssociative("SELECT * FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'ready' AND {$this->getScheduledQueryConstraint()} ORDER BY id ASC LIMIT $limit"); + $selectMessagesQuery = $this->createQueryBuilder(); + $selectMessagesQuery + ->select('*') + ->where('state = :state') + ->andWhere($this->getScheduledQueryConstraint($selectMessagesQuery)) + ->setParameter('state', 'ready') + ->orderBy('id') + ->setMaxResults($limit); + $rows = $selectMessagesQuery->execute()->fetchAllAssociative(); $messages = []; foreach ($rows as $row) { @@ -286,7 +339,12 @@ public function peek(int $limit = 1): array */ public function countReady(): int { - return (int)$this->connection->fetchOne("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'ready'"); + return (int)$this->createQueryBuilder() + ->select('COUNT(*)') + ->where('state = :state') + ->setParameter('state', 'ready') + ->execute() + ->fetchOne(); } /** @@ -294,7 +352,12 @@ public function countReady(): int */ public function countReserved(): int { - return (int)$this->connection->fetchOne("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'reserved'"); + return (int)$this->createQueryBuilder() + ->select('COUNT(*)') + ->where('state = :state') + ->setParameter('state', 'reserved') + ->execute() + ->fetchOne(); } /** @@ -302,65 +365,68 @@ public function countReserved(): int */ public function countFailed(): int { - return (int)$this->connection->fetchColumn("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'failed'"); + return (int)$this->createQueryBuilder() + ->select('COUNT(*)') + ->where('state = :state') + ->setParameter('state', 'failed') + ->execute() + ->fetchOne(); } /** - * @return void * @throws DBALException */ public function flush(): void { - $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->connection->quoteIdentifier($this->tableName)}"); + $schemaManager = $this->connection->getSchemaManager(); + if ($schemaManager === null) { + throw new \RuntimeException('No schema manager in current connection', 1703863433); + } + + if ($schemaManager->tablesExist($this->tableName)) { + $schemaManager->dropTable($this->connection->quoteIdentifier($this->tableName)); + } $this->setUp(); } /** - * @param array $row - * @return Message + * @throws \JsonException */ protected function getMessageFromRow(array $row): Message { - return new Message($row['id'], json_decode($row['payload'], true), (int)$row['failures']); + return new Message($row['id'], json_decode($row['payload'], true, 512, JSON_THROW_ON_ERROR), (int)$row['failures']); } /** - * @param array $options - * @return string + * @throws Exception */ protected function resolveScheduledQueryPart(array $options): string { if (!isset($options['delay'])) { return 'null'; } - switch ($this->connection->getDatabasePlatform()->getName()) { - case 'sqlite': - return 'datetime(\'now\', \'+' . (int)$options['delay'] . ' second\')'; - case 'postgresql': - return 'NOW() + INTERVAL \'' . (int)$options['delay'] . ' SECOND\''; - default: - return 'DATE_ADD(NOW(), INTERVAL ' . (int)$options['delay'] . ' SECOND)'; - } + + return $this->connection->getDatabasePlatform()->getDateAddSecondsExpression($this->connection->getDatabasePlatform()->getCurrentTimestampSQL(), (int)$options['delay']); } - /** - * @return string - */ - protected function getScheduledQueryConstraint(): string + protected function getScheduledQueryConstraint(QueryBuilder $qb): CompositeExpression { - switch ($this->connection->getDatabasePlatform()->getName()) { - case 'sqlite': - return '(scheduled IS NULL OR scheduled <= datetime("now"))'; - default: - return '(scheduled IS NULL OR scheduled <= NOW())'; - } + return $qb->expr()->or( + $qb->expr()->isNull('scheduled'), + $qb->expr()->lte('scheduled', $this->connection->getDatabasePlatform()->getCurrentTimestampSQL()) + ); + } + + private function createQueryBuilder(): QueryBuilder + { + return $this->connection->createQueryBuilder() + ->from($this->connection->quoteIdentifier($this->tableName)); } /** * Reconnects the database connection associated with this queue, if it doesn't respond to a ping * * @see \Neos\Flow\Persistence\Doctrine\PersistenceManager::persistAll() - * @return void */ private function reconnectDatabaseConnection(): void { diff --git a/README.md b/README.md index c3e9213..6d52335 100644 --- a/README.md +++ b/README.md @@ -63,15 +63,15 @@ The `DoctrineQueue` supports following options: | tableName | string | flowpack_jobqueue_messages_ | Name of the database table for this queue. By default this is the queue name prefixed with "flowpack_jobqueue_messages_" | | backendOptions | array | - | Doctrine-specific connection params (see [Doctrine reference](http://doctrine-orm.readthedocs.io/projects/doctrine-dbal/en/latest/reference/configuration.html)) | -*NOTE:* The `DoctrineQueue` currently supports `MySQL`, `PostgreSQL` and -`SQLite` backends. You can specify the backend via the `backendOptions`. If -you omit this setting, the *current connection* will be re-used (i.e. the +*NOTE:* The `DoctrineQueue` should work with any database supported by +Doctrine DBAL. It has been tested on MySQL, PostgreSQL, SQL Server and +SQLite. You can specify the backend via the `backendOptions`. If you +omit this setting, the *current connection* will be re-used (i.e. the currently active Flow database). ### Submit options -Additional options supported by `JobManager::queue()`, `DoctrineQueue::submit -()` and the `Job\Defer` annotation: +Additional options supported by `JobManager::queue()`, `DoctrineQueue::submit()` and the `Job\Defer` annotation: | Option | Type | Default | Description | |--------|---------|--------:|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------| @@ -79,7 +79,7 @@ Additional options supported by `JobManager::queue()`, `DoctrineQueue::submit ### Release options -Additional options to be specified via `releaseOptions`: +Additional options to be specified via `releaseOptions` for the queue: | Option | Type | Default | Description | |--------|---------|--------:|----------------------------------------------------------------------------------| diff --git a/Tests/Functional/Queue/DoctrineQueueTest.php b/Tests/Functional/Queue/DoctrineQueueTest.php index 78842af..3b946f8 100644 --- a/Tests/Functional/Queue/DoctrineQueueTest.php +++ b/Tests/Functional/Queue/DoctrineQueueTest.php @@ -26,6 +26,6 @@ class DoctrineQueueTest extends AbstractQueueTest */ protected function getQueue() { - return new DoctrineQueue('Test-queue', $this->queueSettings); + return new DoctrineQueue('test-queue', $this->queueSettings); } }