Skip to content

Commit c5e857b

Browse files
committed
add script
1 parent ceb9c6d commit c5e857b

File tree

3 files changed

+85
-1
lines changed

3 files changed

+85
-1
lines changed

services/apps/integration_run_worker/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
"tsc-check": "./node_modules/.bin/tsc --noEmit",
1717
"script:onboard-integration": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/onboard-integration.ts",
1818
"script:process-repo": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-repo.ts",
19-
"script:trigger-stream-processed": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/trigger-stream-processed.ts"
19+
"script:trigger-stream-processed": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/trigger-stream-processed.ts",
20+
"script:continue-run": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/continue-run.ts"
2021
},
2122
"dependencies": {
2223
"@crowd/common": "file:../../libs/common",
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
2+
import { DbStore, getDbConnection } from '@crowd/database'
3+
import { getServiceLogger } from '@crowd/logging'
4+
import { IntegrationRunWorkerEmitter, getSqsClient } from '@crowd/sqs'
5+
import IntegrationRunRepository from '@/repo/integrationRun.repo'
6+
import { IntegrationRunState } from '@crowd/types'
7+
8+
const log = getServiceLogger()
9+
10+
const processArguments = process.argv.slice(2)
11+
12+
const runId = processArguments[0]
13+
14+
setImmediate(async () => {
15+
const sqsClient = getSqsClient(SQS_CONFIG())
16+
const emitter = new IntegrationRunWorkerEmitter(sqsClient, log)
17+
await emitter.init()
18+
19+
const dbConnection = await getDbConnection(DB_CONFIG(), 1)
20+
const store = new DbStore(log, dbConnection)
21+
22+
const repo = new IntegrationRunRepository(store, log)
23+
24+
const run = await repo.findIntegrationRunById(runId)
25+
26+
if (run) {
27+
if (run.state != IntegrationRunState.PENDING) {
28+
log.warn(`Integration run is not pending, setting to pending!`)
29+
30+
await repo.restart(run.id)
31+
}
32+
33+
log.info(`Triggering integration run for ${runId}!`)
34+
35+
await emitter.triggerIntegrationRun(run.tenantId, run.platform, run.integrationId, true)
36+
process.exit(0)
37+
} else {
38+
log.error({ run }, 'Run not found!')
39+
process.exit(1)
40+
}
41+
})

services/apps/integration_run_worker/src/repo/integrationRun.repo.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,4 +371,46 @@ export default class IntegrationRunRepository extends RepositoryBase<Integration
371371

372372
return result.settings
373373
}
374+
375+
public async findIntegrationRunById(runId: string): Promise<{
376+
id: string
377+
state: IntegrationRunState
378+
tenantId: string
379+
integrationId: string
380+
platform: string
381+
} | null> {
382+
const result = await this.db().oneOrNone(
383+
`
384+
select r.id, r.state, r."tenantId", r."integrationId", i.platform
385+
from integration.runs r
386+
inner join integrations i on r."integrationId" = i.id
387+
where r.id = $(runId)
388+
`,
389+
{
390+
runId,
391+
},
392+
)
393+
394+
return result
395+
}
396+
397+
public async restart(runId: string): Promise<void> {
398+
const result = await this.db().result(
399+
`
400+
update integration.runs
401+
set state = $(state),
402+
"processedAt" = null,
403+
error = null,
404+
"delayedUntil" = null,
405+
"updatedAt" = now()
406+
where id = $(runId)
407+
`,
408+
{
409+
runId,
410+
state: IntegrationRunState.PENDING,
411+
},
412+
)
413+
414+
this.checkUpdateRowCount(result.rowCount, 1)
415+
}
374416
}

0 commit comments

Comments
 (0)