Skip to content

Commit edd6b75

Browse files
author
Uros Marolt
authored
Handle api SIGTERM gracefully (#368)
1 parent 4a9ff6c commit edd6b75

File tree

18 files changed

+502
-43
lines changed

18 files changed

+502
-43
lines changed

backend/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
"lint": "npx eslint .",
2828
"format": "npx prettier --write .",
2929
"script:process-integration": "SERVICE=script ts-node ./src/bin/scripts/process-integration.ts",
30-
"script:change-tenant-plan": "SERVICE=script ts-node ./src/bin/scripts/change-tenant-plan.ts"
30+
"script:change-tenant-plan": "SERVICE=script ts-node ./src/bin/scripts/change-tenant-plan.ts",
31+
"script:process-webhook": "SERVICE=script ts-node ./src/bin/scripts/process-webhook.ts"
3132
},
3233
"dependencies": {
3334
"@aws-sdk/client-comprehend": "^3.159.0",

backend/src/bin/api.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,31 @@
11
import { getServiceLogger } from '../utils/logging'
22
import api from '../api'
33
import { API_CONFIG } from '../config'
4+
import { timeout } from '../utils/timing'
45

56
const PORT = API_CONFIG.port || 8080
67

78
const log = getServiceLogger()
89

9-
api.listen(PORT, () => {
10+
const server = api.listen(PORT, () => {
1011
log.info(`Listening on port ${PORT}`)
1112
})
13+
14+
process.on('SIGTERM', async () => {
15+
log.warn('Detected SIGTERM signal, started exiting!')
16+
await new Promise<void>((resolve) => {
17+
server.close((err) => {
18+
if (err) {
19+
log.error(err, 'Error while closing server!')
20+
resolve()
21+
} else {
22+
log.info('Server closed successfully!')
23+
resolve()
24+
}
25+
})
26+
})
27+
28+
log.info('Exiting in 5 seconds...')
29+
await timeout(5000)
30+
process.exit(0)
31+
})

backend/src/bin/nodejs-worker.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { createChildLogger, getServiceLogger, Logger } from '../utils/logging'
1010
import { deleteMessage, receiveMessage, sendMessage } from '../utils/sqs'
1111
import { timeout } from '../utils/timing'
1212
import { processIntegration, processIntegrationCheck } from './worker/integrations'
13+
import { processWebhook } from '../serverless/integrations/workers/githubWebhookWorker'
1314

1415
/* eslint-disable no-constant-condition */
1516

@@ -137,6 +138,9 @@ async function handleMessages() {
137138
case NodeWorkerMessageType.DB_OPERATIONS:
138139
processFunction = processDbOperationsMessage
139140
break
141+
case NodeWorkerMessageType.PROCESS_WEBHOOK:
142+
processFunction = processWebhook
143+
break
140144

141145
default:
142146
keep = true

backend/src/bin/scripts/process-integration.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ const sections = [
4141
},
4242
{
4343
header: 'Process Integration',
44-
content: 'Trigger processing of a single integration.',
44+
content: 'Trigger processing of integrations.',
4545
},
4646
{
4747
header: 'Options',
@@ -58,9 +58,9 @@ if (parameters.help || !parameters.integration) {
5858
setImmediate(async () => {
5959
const integrationIds = parameters.integration.split(',')
6060
const onboarding = parameters.onboarding
61+
const options = await SequelizeRepository.getDefaultIRepositoryOptions()
6162

6263
for (const integrationId of integrationIds) {
63-
const options = await SequelizeRepository.getDefaultIRepositoryOptions()
6464
const integration = await options.database.integration.findOne({
6565
where: { id: integrationId },
6666
})
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import commandLineArgs from 'command-line-args'
2+
import commandLineUsage from 'command-line-usage'
3+
import * as fs from 'fs'
4+
import path from 'path'
5+
import { createServiceLogger } from '../../utils/logging'
6+
import SequelizeRepository from '../../database/repositories/sequelizeRepository'
7+
import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS'
8+
import IncomingWebhookRepository from '../../database/repositories/incomingWebhookRepository'
9+
import { WebhookState } from '../../types/webhooks'
10+
import { NodeWorkerProcessWebhookMessage } from '../../types/mq/nodeWorkerProcessWebhookMessage'
11+
12+
const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8')
13+
14+
const log = createServiceLogger()
15+
16+
const options = [
17+
{
18+
name: 'webhook',
19+
alias: 'w',
20+
typeLabel: '{underline webhookId}',
21+
type: String,
22+
description:
23+
'The unique ID of webhook that you would like to process. Use comma delimiter when sending multiple webhooks.',
24+
},
25+
{
26+
name: 'help',
27+
alias: 'h',
28+
type: Boolean,
29+
description: 'Print this usage guide.',
30+
},
31+
]
32+
const sections = [
33+
{
34+
content: banner,
35+
raw: true,
36+
},
37+
{
38+
header: 'Process Webhook',
39+
content: 'Trigger processing of webhooks.',
40+
},
41+
{
42+
header: 'Options',
43+
optionList: options,
44+
},
45+
]
46+
47+
const usage = commandLineUsage(sections)
48+
const parameters = commandLineArgs(options)
49+
50+
if (parameters.help || !parameters.webhook) {
51+
console.log(usage)
52+
} else {
53+
setImmediate(async () => {
54+
const webhookIds = parameters.webhook.split(',')
55+
const options = await SequelizeRepository.getDefaultIRepositoryOptions()
56+
const repo = new IncomingWebhookRepository(options)
57+
58+
for (const webhookId of webhookIds) {
59+
const webhook = await repo.findById(webhookId)
60+
61+
if (!webhook) {
62+
log.error({ webhookId }, 'Webhook not found!')
63+
process.exit(1)
64+
} else if (webhook.state !== WebhookState.PENDING) {
65+
log.error({ webhookId }, 'Webhook is not in pending state!')
66+
process.exit(1)
67+
} else {
68+
log.info({ webhookId }, 'Webhook found - triggering SQS message!')
69+
await sendNodeWorkerMessage(
70+
webhook.tenantId,
71+
new NodeWorkerProcessWebhookMessage(webhook.tenantId, webhook.id),
72+
)
73+
}
74+
}
75+
process.exit(0)
76+
})
77+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
drop table "incomingWebhooks";
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
create table "incomingWebhooks" (
2+
id uuid not null,
3+
"tenantId" uuid not null,
4+
"integrationId" uuid not null,
5+
state varchar(255) not null,
6+
7+
type varchar(255) not null,
8+
payload json not null,
9+
10+
"processedAt" timestamptz null,
11+
error json null,
12+
13+
"createdAt" timestamptz not null default now(),
14+
15+
primary key (id)
16+
);
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import { v4 as uuid } from 'uuid'
2+
import { QueryTypes } from 'sequelize'
3+
import {
4+
DbIncomingWebhookInsertData,
5+
IncomingWebhookData,
6+
WebhookState,
7+
} from '../../types/webhooks'
8+
import { IRepositoryOptions } from './IRepositoryOptions'
9+
import { RepositoryBase } from './repositoryBase'
10+
11+
/* eslint-disable class-methods-use-this */
12+
13+
export default class IncomingWebhookRepository extends RepositoryBase<
14+
IncomingWebhookData,
15+
string,
16+
DbIncomingWebhookInsertData,
17+
unknown,
18+
unknown
19+
> {
20+
public constructor(options: IRepositoryOptions) {
21+
super(options, true)
22+
}
23+
24+
async create(data: DbIncomingWebhookInsertData): Promise<IncomingWebhookData> {
25+
const transaction = this.transaction
26+
27+
const id = uuid()
28+
29+
const results = await this.seq.query(
30+
`
31+
insert into "incomingWebhooks"(id, "tenantId", "integrationId", state, type, payload)
32+
values(:id, :tenantId, :integrationId, :state, :type, :payload)
33+
returning "createdAt"
34+
`,
35+
{
36+
replacements: {
37+
id,
38+
tenantId: data.tenantId,
39+
integrationId: data.integrationId,
40+
type: data.type,
41+
state: WebhookState.PENDING,
42+
payload: JSON.stringify(data.payload),
43+
},
44+
type: QueryTypes.INSERT,
45+
transaction,
46+
},
47+
)
48+
49+
return {
50+
id,
51+
state: WebhookState.PENDING,
52+
...data,
53+
processedAt: null,
54+
error: null,
55+
createdAt: results[0][0].createdAt.toISOString(),
56+
}
57+
}
58+
59+
override async findById(id: string): Promise<IncomingWebhookData> {
60+
const transaction = this.transaction
61+
62+
const seq = this.seq
63+
64+
const results = await seq.query(
65+
`
66+
select id,
67+
"tenantId",
68+
"integrationId",
69+
state,
70+
type,
71+
payload,
72+
"processedAt",
73+
error,
74+
"createdAt"
75+
from "incomingWebhooks"
76+
where id = :id
77+
`,
78+
{
79+
replacements: {
80+
id,
81+
},
82+
type: QueryTypes.SELECT,
83+
transaction,
84+
},
85+
)
86+
87+
if (results.length === 0) {
88+
return null
89+
}
90+
91+
const data = results[0] as any
92+
93+
return {
94+
id: data.id,
95+
tenantId: data.tenantId,
96+
integrationId: data.integrationId,
97+
state: data.state,
98+
type: data.type,
99+
payload: data.payload,
100+
processedAt: data.processedAt ? data.processedAt.toISOString() : null,
101+
error: data.error,
102+
createdAt: data.createdAt.toISOString(),
103+
}
104+
}
105+
106+
async markCompleted(id: string): Promise<void> {
107+
const transaction = this.transaction
108+
109+
const [, rowCount] = await this.seq.query(
110+
`
111+
update "incomingWebhooks"
112+
set state = :state,
113+
"processedAt" = now()
114+
where id = :id
115+
`,
116+
{
117+
replacements: {
118+
id,
119+
state: WebhookState.PROCESSED,
120+
},
121+
type: QueryTypes.UPDATE,
122+
transaction,
123+
},
124+
)
125+
126+
if (rowCount !== 1) {
127+
throw new Error(`Failed to mark webhook '${id}' as completed!`)
128+
}
129+
}
130+
131+
async markError(id: string, error: any): Promise<void> {
132+
const transaction = this.transaction
133+
134+
const [, rowCount] = await this.seq.query(
135+
`
136+
update "incomingWebhooks"
137+
set state = :state,
138+
error = :error
139+
where id = :id
140+
`,
141+
{
142+
replacements: {
143+
id,
144+
state: WebhookState.ERROR,
145+
error: JSON.stringify(error),
146+
},
147+
type: QueryTypes.UPDATE,
148+
transaction,
149+
},
150+
)
151+
152+
if (rowCount !== 1) {
153+
throw new Error(`Failed to mark webhook '${id}' as error!`)
154+
}
155+
}
156+
}

backend/src/database/repositories/repositoryBase.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,29 @@ export abstract class RepositoryBase<
3737
return this.options.database
3838
}
3939

40-
abstract create(data: TCreate): Promise<TData>
40+
async create(data: TCreate): Promise<TData> {
41+
throw new Error('Method not implemented.')
42+
}
4143

42-
abstract update(id: TId, data: TUpdate): Promise<TData>
44+
async update(id: TId, data: TUpdate): Promise<TData> {
45+
throw new Error('Method not implemented.')
46+
}
4347

4448
async destroy(id: TId): Promise<void> {
4549
return this.destroyAll([id])
4650
}
4751

48-
abstract destroyAll(ids: TId[]): Promise<void>
52+
async destroyAll(ids: TId[]): Promise<void> {
53+
throw new Error('Method not implemented.')
54+
}
4955

50-
abstract findById(id: TId): Promise<TData>
56+
async findById(id: TId): Promise<TData> {
57+
throw new Error('Method not implemented.')
58+
}
5159

52-
abstract findAndCountAll(criteria: TCriteria): Promise<PageData<TData>>
60+
async findAndCountAll(criteria: TCriteria): Promise<PageData<TData>> {
61+
throw new Error('Method not implemented.')
62+
}
5363

5464
async findAll(criteria: TCriteria): Promise<TData[]> {
5565
const copy = { ...criteria }

0 commit comments

Comments
 (0)