Skip to content

Commit 1a8faab

Browse files
committed
FEATURE: Support SQL Server (and more?)
This switches the hard-coded SQL against Doctrine DBAL query builder use. This makes the queue work on SQL Server and potentially more databases supported by Doctrine DBAL.
1 parent e34818b commit 1a8faab

File tree

2 files changed

+106
-38
lines changed

2 files changed

+106
-38
lines changed

Classes/Queue/DoctrineQueue.php

Lines changed: 102 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Doctrine\DBAL\Exception\TableNotFoundException;
2222
use Doctrine\DBAL\Query\Expression\CompositeExpression;
2323
use Doctrine\DBAL\Query\QueryBuilder;
24+
use Doctrine\DBAL\Schema\Schema;
2425
use Doctrine\ORM\EntityManagerInterface;
2526
use Flowpack\JobQueue\Common\Queue\Message;
2627
use Flowpack\JobQueue\Common\Queue\QueueInterface;
@@ -107,21 +108,29 @@ public function injectDoctrineEntityManager(EntityManagerInterface $doctrineEnti
107108
*/
108109
public function setUp(): void
109110
{
110-
switch ($this->connection->getDatabasePlatform()->getName()) {
111-
case 'sqlite':
112-
$createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id INTEGER PRIMARY KEY AUTOINCREMENT, payload LONGTEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled TEXT DEFAULT NULL)";
113-
break;
114-
case 'postgresql':
115-
$createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id SERIAL PRIMARY KEY, payload TEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled TIMESTAMP(0) WITHOUT TIME ZONE DEFAULT NULL)";
116-
break;
117-
default:
118-
$createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id INTEGER PRIMARY KEY AUTO_INCREMENT, payload LONGTEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled DATETIME DEFAULT NULL) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci ENGINE = InnoDB";
111+
$databasePlatform = $this->connection->getDatabasePlatform();
112+
if ($databasePlatform === null) {
113+
throw new \RuntimeException('No database platform for current connection', 1703863019);
119114
}
120-
$this->connection->exec($createDatabaseStatement);
121-
try {
122-
$this->connection->exec("CREATE INDEX state_scheduled ON {$this->connection->quoteIdentifier($this->tableName)} (state, scheduled)");
123-
} catch (DBALException $e) {
124-
// See https://dba.stackexchange.com/questions/24531/mysql-create-index-if-not-exists
115+
$schemaManager = $this->connection->getSchemaManager();
116+
if ($schemaManager === null) {
117+
throw new \RuntimeException('No schema manager for current connection', 1703863021);
118+
}
119+
if (!$schemaManager->tablesExist($this->tableName)) {
120+
$schema = new Schema();
121+
$table = $schema->createTable($this->connection->quoteIdentifier($this->tableName));
122+
$table->addColumn('id', 'integer', ['autoincrement' => true]);
123+
$table->addColumn('payload', 'text');
124+
$table->addColumn('state', 'string', ['length' => 255]);
125+
$table->addColumn('failures', 'integer', ['default' => 0]);
126+
$table->addColumn('scheduled', 'datetime', ['notnull' => false]);
127+
$table->setPrimaryKey(['id']);
128+
$table->addIndex(['state', 'scheduled'], 'state_scheduled');
129+
130+
$createDatabaseStatements = $schema->toSql($databasePlatform);
131+
foreach ($createDatabaseStatements as $createDatabaseStatement) {
132+
$this->connection->exec($createDatabaseStatement);
133+
}
125134
}
126135
}
127136

@@ -191,15 +200,38 @@ protected function reserveMessage(?int $timeout = null): ?Message
191200
}
192201
$this->reconnectDatabaseConnection();
193202

203+
$qb = $this->connection->createQueryBuilder();
204+
$qb
205+
->select('*')
206+
->from($this->connection->quoteIdentifier($this->tableName))
207+
->where(
208+
$qb->expr()->and(
209+
$qb->expr()->eq('state', $qb->expr()->literal('ready')),
210+
$this->getScheduledQueryConstraint($qb)
211+
)
212+
)
213+
->orderBy('id')
214+
->setMaxResults(1);
194215
$startTime = time();
195216
do {
196217
try {
197-
$row = $this->connection->fetchAssociative("SELECT * FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'ready' AND {$this->getScheduledQueryConstraint()} ORDER BY id ASC LIMIT 1");
218+
$row = $qb->execute()->fetchAssociative();
198219
} catch (TableNotFoundException $exception) {
199220
throw new \RuntimeException(sprintf('The queue table "%s" could not be found. Did you run ./flow queue:setup "%s"?', $this->tableName, $this->name), 1469117906, $exception);
200221
}
201222
if ($row !== false) {
202-
$numberOfUpdatedRows = (int)$this->connection->executeStatement("UPDATE {$this->connection->quoteIdentifier($this->tableName)} SET state = 'reserved' WHERE id = :id AND state = 'ready' AND {$this->getScheduledQueryConstraint()}", ['id' => (int)$row['id']]);
223+
$innerQueryBuilder = $this->connection->createQueryBuilder();
224+
$innerQueryBuilder
225+
->update($this->connection->quoteIdentifier($this->tableName))
226+
->set('state', $innerQueryBuilder->expr()->literal('reserved'))
227+
->where(
228+
$innerQueryBuilder->expr()->and(
229+
$innerQueryBuilder->expr()->eq('id', (int)$row['id']),
230+
$innerQueryBuilder->expr()->eq('state', $innerQueryBuilder->expr()->literal('ready')),
231+
$this->getScheduledQueryConstraint($innerQueryBuilder)
232+
)
233+
);
234+
$numberOfUpdatedRows = (int)$this->connection->executeStatement($innerQueryBuilder->getSQL());
203235
if ($numberOfUpdatedRows === 1) {
204236
$this->lastMessageTime = time();
205237
return $this->getMessageFromRow($row);
@@ -245,7 +277,19 @@ public function finish(string $messageId): bool
245277
*/
246278
public function peek(int $limit = 1): array
247279
{
248-
$rows = $this->connection->fetchAllAssociative("SELECT * FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'ready' AND {$this->getScheduledQueryConstraint()} ORDER BY id ASC LIMIT $limit");
280+
$qb = $this->connection->createQueryBuilder();
281+
$qb
282+
->select('*')
283+
->from($this->connection->quoteIdentifier($this->tableName))
284+
->where(
285+
$qb->expr()->and(
286+
$qb->expr()->eq('state', $qb->expr()->literal('ready')),
287+
$this->getScheduledQueryConstraint($qb)
288+
)
289+
)
290+
->orderBy('id')
291+
->setMaxResults($limit);
292+
$rows = $qb->execute()->fetchAllAssociative();
249293
$messages = [];
250294

251295
foreach ($rows as $row) {
@@ -260,31 +304,62 @@ public function peek(int $limit = 1): array
260304
*/
261305
public function countReady(): int
262306
{
263-
return (int)$this->connection->fetchOne("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'ready'");
307+
$qb = $this->connection->createQueryBuilder();
308+
return (int)$qb
309+
->select('COUNT(*)')
310+
->from($this->connection->quoteIdentifier($this->tableName))
311+
->where(
312+
$qb->expr()->eq('state', $qb->expr()->literal('ready')),
313+
)
314+
->execute()
315+
->fetchOne();
264316
}
265317

266318
/**
267319
* @inheritdoc
268320
*/
269321
public function countReserved(): int
270322
{
271-
return (int)$this->connection->fetchOne("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'reserved'");
323+
$qb = $this->connection->createQueryBuilder();
324+
return (int)$qb
325+
->select('COUNT(*)')
326+
->from($this->connection->quoteIdentifier($this->tableName))
327+
->where(
328+
$qb->expr()->eq('state', $qb->expr()->literal('reserved')),
329+
)
330+
->execute()
331+
->fetchOne();
272332
}
273333

274334
/**
275335
* @inheritdoc
276336
*/
277337
public function countFailed(): int
278338
{
279-
return (int)$this->connection->fetchColumn("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'failed'");
339+
$qb = $this->connection->createQueryBuilder();
340+
return (int)$qb
341+
->select('COUNT(*)')
342+
->from($this->connection->quoteIdentifier($this->tableName))
343+
->where(
344+
$qb->expr()->eq('state', $qb->expr()->literal('failed')),
345+
)
346+
->execute()
347+
->fetchOne();
280348
}
281349

282350
/**
283351
* @throws DBALException
284352
*/
285353
public function flush(): void
286354
{
287-
$this->connection->executeStatement("DROP TABLE IF EXISTS {$this->connection->quoteIdentifier($this->tableName)}");
355+
$schemaManager = $this->connection->getSchemaManager();
356+
if ($schemaManager === null) {
357+
throw new \RuntimeException('No schema manager in current connection', 1703863433);
358+
}
359+
360+
if ($schemaManager->tablesExist($this->tableName)) {
361+
$schemaManager->dropTable($this->connection->quoteIdentifier($this->tableName));
362+
}
288363
$this->setUp();
289364
}
290365

@@ -301,24 +376,16 @@ protected function resolveScheduledQueryPart(array $options): string
301376
if (!isset($options['delay'])) {
302377
return 'null';
303378
}
304-
switch ($this->connection->getDatabasePlatform()->getName()) {
305-
case 'sqlite':
306-
return 'datetime(\'now\', \'+' . (int)$options['delay'] . ' second\')';
307-
case 'postgresql':
308-
return 'NOW() + INTERVAL \'' . (int)$options['delay'] . ' SECOND\'';
309-
default:
310-
return 'DATE_ADD(NOW(), INTERVAL ' . (int)$options['delay'] . ' SECOND)';
311-
}
379+
380+
return $this->connection->getDatabasePlatform()->getDateAddSecondsExpression($this->connection->getDatabasePlatform()->getCurrentTimestampSQL(), (int)$options['delay']);
312381
}
313382

314383
protected function getScheduledQueryConstraint(QueryBuilder $qb): CompositeExpression
315384
{
316-
switch ($this->connection->getDatabasePlatform()->getName()) {
317-
case 'sqlite':
318-
return '(scheduled IS NULL OR scheduled <= datetime("now"))';
319-
default:
320-
return '(scheduled IS NULL OR scheduled <= NOW())';
321-
}
385+
return $qb->expr()->or(
386+
$qb->expr()->isNull('scheduled'),
387+
$qb->expr()->lte('scheduled', $this->connection->getDatabasePlatform()->getCurrentTimestampSQL())
388+
);
322389
}
323390

324391
/**

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ The `DoctrineQueue` supports following options:
6363
| tableName | string | flowpack_jobqueue_messages_<queue-name> | Name of the database table for this queue. By default this is the queue name prefixed with "flowpack_jobqueue_messages_" |
6464
| backendOptions | array | - | Doctrine-specific connection params (see [Doctrine reference](http://doctrine-orm.readthedocs.io/projects/doctrine-dbal/en/latest/reference/configuration.html)) |
6565

66-
*NOTE:* The `DoctrineQueue` currently supports `MySQL`, `PostgreSQL` and
67-
`SQLite` backends. You can specify the backend via the `backendOptions`. If
68-
you omit this setting, the *current connection* will be re-used (i.e. the
66+
*NOTE:* The `DoctrineQueue` should work with any database supported by
67+
Doctrine DBAL. It has been tested on MySQL, PostgreSQL, SQL Server and
68+
SQLite. You can specify the backend via the `backendOptions`. If you
69+
omit this setting, the *current connection* will be re-used (i.e. the
6970
currently active Flow database).
7071

7172
### Submit options

0 commit comments

Comments
 (0)