diff --git a/backend/package.json b/backend/package.json index e7462fab75..2afeb2a6bc 100644 --- a/backend/package.json +++ b/backend/package.json @@ -27,7 +27,8 @@ "lint": "npx eslint .", "format": "npx prettier --write .", "script:process-integration": "SERVICE=script ts-node ./src/bin/scripts/process-integration.ts", - "script:change-tenant-plan": "SERVICE=script ts-node ./src/bin/scripts/change-tenant-plan.ts" + "script:change-tenant-plan": "SERVICE=script ts-node ./src/bin/scripts/change-tenant-plan.ts", + "script:process-webhook": "SERVICE=script ts-node ./src/bin/scripts/process-webhook.ts" }, "dependencies": { "@aws-sdk/client-comprehend": "^3.159.0", diff --git a/backend/src/bin/api.ts b/backend/src/bin/api.ts index c47833160e..a32bdb42d1 100644 --- a/backend/src/bin/api.ts +++ b/backend/src/bin/api.ts @@ -1,11 +1,31 @@ import { getServiceLogger } from '../utils/logging' import api from '../api' import { API_CONFIG } from '../config' +import { timeout } from '../utils/timing' const PORT = API_CONFIG.port || 8080 const log = getServiceLogger() -api.listen(PORT, () => { +const server = api.listen(PORT, () => { log.info(`Listening on port ${PORT}`) }) + +process.on('SIGTERM', async () => { + log.warn('Detected SIGTERM signal, started exiting!') + await new Promise((resolve) => { + server.close((err) => { + if (err) { + log.error(err, 'Error while closing server!') + resolve() + } else { + log.info('Server closed successfully!') + resolve() + } + }) + }) + + log.info('Exiting in 5 seconds...') + await timeout(5000) + process.exit(0) +}) diff --git a/backend/src/bin/nodejs-worker.ts b/backend/src/bin/nodejs-worker.ts index 307473edf2..cc193c4528 100644 --- a/backend/src/bin/nodejs-worker.ts +++ b/backend/src/bin/nodejs-worker.ts @@ -5,16 +5,26 @@ import { NodeWorkerMessageType } from '../serverless/types/workerTypes' import { processNodeMicroserviceMessage } from '../serverless/microservices/nodejs/workDispatcher' import { processDbOperationsMessage } from '../serverless/dbOperations/workDispatcher' import { sendNodeWorkerMessage } from '../serverless/utils/nodeWorkerSQS' -import { NodeWorkerIntegrationCheckMessage } from '../types/mq/nodeWorkerIntegrationCheckMessage' -import { NodeWorkerIntegrationProcessMessage } from '../types/mq/nodeWorkerIntegrationProcessMessage' import { NodeWorkerMessageBase } from '../types/mq/nodeWorkerMessageBase' -import { createChildLogger, getServiceLogger } from '../utils/logging' +import { createChildLogger, getServiceLogger, Logger } from '../utils/logging' import { deleteMessage, receiveMessage, sendMessage } from '../utils/sqs' import { timeout } from '../utils/timing' import { processIntegration, processIntegrationCheck } from './worker/integrations' +import { processWebhook } from '../serverless/integrations/workers/githubWebhookWorker' /* eslint-disable no-constant-condition */ +const serviceLogger = getServiceLogger() + +let exiting = false + +const messagesInProgress = new Map() + +process.on('SIGTERM', async () => { + serviceLogger.warn('Detected SIGTERM signal, started exiting!') + exiting = true +}) + const receive = (delayed?: boolean): Promise => { const params: ReceiveMessageRequest = { QueueUrl: delayed ? SQS_CONFIG.nodejsWorkerDelayableQueue : SQS_CONFIG.nodejsWorkerQueue, @@ -35,8 +45,6 @@ const removeFromQueue = (receiptHandle: string, delayed?: boolean): Promise Promise + let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise let keep = false switch (msg.type) { case NodeWorkerMessageType.INTEGRATION_CHECK: - await removeFromQueue(message.ReceiptHandle) - await processIntegrationCheck(messageLogger, msg as NodeWorkerIntegrationCheckMessage) + processFunction = processIntegrationCheck break case NodeWorkerMessageType.INTEGRATION_PROCESS: - await removeFromQueue(message.ReceiptHandle) - await processIntegration(messageLogger, msg as NodeWorkerIntegrationProcessMessage) + processFunction = processIntegration break case NodeWorkerMessageType.NODE_MICROSERVICE: processFunction = processNodeMicroserviceMessage @@ -130,6 +138,9 @@ async function handleMessages() { case NodeWorkerMessageType.DB_OPERATIONS: processFunction = processDbOperationsMessage break + case NodeWorkerMessageType.PROCESS_WEBHOOK: + processFunction = processWebhook + break default: keep = true @@ -140,11 +151,13 @@ async function handleMessages() { if (!keep) { // remove the message from the queue as it's about to be processed await removeFromQueue(message.ReceiptHandle) - + messagesInProgress.set(message.MessageId, msg) try { - await processFunction(msg) + await processFunction(msg, messageLogger) } catch (err) { messageLogger.error(err, 'Error while processing queue message!') + } finally { + messagesInProgress.delete(message.MessageId) } } else { messageLogger.warn('Keeping the message in the queue!') @@ -156,7 +169,7 @@ async function handleMessages() { } // noinspection InfiniteLoopJS - while (true) { + while (!exiting) { if (isWorkerAvailable()) { const message = await receive() @@ -170,6 +183,18 @@ async function handleMessages() { await timeout(200) } } + + // mark in flight messages as exiting + for (const msg of messagesInProgress.values()) { + ;(msg as any).exiting = true + } + + while (messagesInProgress.size !== 0) { + handlerLogger.warn(`Waiting for ${messagesInProgress.size} messages to finish!`) + await timeout(500) + } + + handlerLogger.warn('Exiting!') } setImmediate(async () => { diff --git a/backend/src/bin/scripts/process-integration.ts b/backend/src/bin/scripts/process-integration.ts index 45043cc5fa..caa85b043c 100644 --- a/backend/src/bin/scripts/process-integration.ts +++ b/backend/src/bin/scripts/process-integration.ts @@ -41,7 +41,7 @@ const sections = [ }, { header: 'Process Integration', - content: 'Trigger processing of a single integration.', + content: 'Trigger processing of integrations.', }, { header: 'Options', @@ -58,9 +58,9 @@ if (parameters.help || !parameters.integration) { setImmediate(async () => { const integrationIds = parameters.integration.split(',') const onboarding = parameters.onboarding + const options = await SequelizeRepository.getDefaultIRepositoryOptions() for (const integrationId of integrationIds) { - const options = await SequelizeRepository.getDefaultIRepositoryOptions() const integration = await options.database.integration.findOne({ where: { id: integrationId }, }) diff --git a/backend/src/bin/scripts/process-webhook.ts b/backend/src/bin/scripts/process-webhook.ts new file mode 100644 index 0000000000..bd70afd0b9 --- /dev/null +++ b/backend/src/bin/scripts/process-webhook.ts @@ -0,0 +1,77 @@ +import commandLineArgs from 'command-line-args' +import commandLineUsage from 'command-line-usage' +import * as fs from 'fs' +import path from 'path' +import { createServiceLogger } from '../../utils/logging' +import SequelizeRepository from '../../database/repositories/sequelizeRepository' +import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS' +import IncomingWebhookRepository from '../../database/repositories/incomingWebhookRepository' +import { WebhookState } from '../../types/webhooks' +import { NodeWorkerProcessWebhookMessage } from '../../types/mq/nodeWorkerProcessWebhookMessage' + +const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8') + +const log = createServiceLogger() + +const options = [ + { + name: 'webhook', + alias: 'w', + typeLabel: '{underline webhookId}', + type: String, + description: + 'The unique ID of webhook that you would like to process. Use comma delimiter when sending multiple webhooks.', + }, + { + name: 'help', + alias: 'h', + type: Boolean, + description: 'Print this usage guide.', + }, +] +const sections = [ + { + content: banner, + raw: true, + }, + { + header: 'Process Webhook', + content: 'Trigger processing of webhooks.', + }, + { + header: 'Options', + optionList: options, + }, +] + +const usage = commandLineUsage(sections) +const parameters = commandLineArgs(options) + +if (parameters.help || !parameters.webhook) { + console.log(usage) +} else { + setImmediate(async () => { + const webhookIds = parameters.webhook.split(',') + const options = await SequelizeRepository.getDefaultIRepositoryOptions() + const repo = new IncomingWebhookRepository(options) + + for (const webhookId of webhookIds) { + const webhook = await repo.findById(webhookId) + + if (!webhook) { + log.error({ webhookId }, 'Webhook not found!') + process.exit(1) + } else if (webhook.state !== WebhookState.PENDING) { + log.error({ webhookId }, 'Webhook is not in pending state!') + process.exit(1) + } else { + log.info({ webhookId }, 'Webhook found - triggering SQS message!') + await sendNodeWorkerMessage( + webhook.tenantId, + new NodeWorkerProcessWebhookMessage(webhook.tenantId, webhook.id), + ) + } + } + process.exit(0) + }) +} diff --git a/backend/src/bin/worker/integrations.ts b/backend/src/bin/worker/integrations.ts index 42d1281440..ea344e5e84 100644 --- a/backend/src/bin/worker/integrations.ts +++ b/backend/src/bin/worker/integrations.ts @@ -6,8 +6,8 @@ import { NodeWorkerIntegrationCheckMessage } from '../../types/mq/nodeWorkerInte import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage' export const processIntegrationCheck = async ( - messageLogger: Logger, msg: NodeWorkerIntegrationCheckMessage, + messageLogger: Logger, ): Promise => { const options = (await SequelizeRepository.getDefaultIRepositoryOptions()) as IServiceOptions options.log = messageLogger @@ -18,8 +18,8 @@ export const processIntegrationCheck = async ( } export const processIntegration = async ( - messageLogger: Logger, msg: NodeWorkerIntegrationProcessMessage, + messageLogger: Logger, ): Promise => { const options = (await SequelizeRepository.getDefaultIRepositoryOptions()) as IServiceOptions options.log = messageLogger diff --git a/backend/src/database/migrations/U1671451319__incoming-webhooks.sql b/backend/src/database/migrations/U1671451319__incoming-webhooks.sql new file mode 100644 index 0000000000..e72a978aed --- /dev/null +++ b/backend/src/database/migrations/U1671451319__incoming-webhooks.sql @@ -0,0 +1 @@ +drop table "incomingWebhooks"; \ No newline at end of file diff --git a/backend/src/database/migrations/V1671451319__incoming-webhooks.sql b/backend/src/database/migrations/V1671451319__incoming-webhooks.sql new file mode 100644 index 0000000000..06653283a5 --- /dev/null +++ b/backend/src/database/migrations/V1671451319__incoming-webhooks.sql @@ -0,0 +1,16 @@ +create table "incomingWebhooks" ( + id uuid not null, + "tenantId" uuid not null, + "integrationId" uuid not null, + state varchar(255) not null, + + type varchar(255) not null, + payload json not null, + + "processedAt" timestamptz null, + error json null, + + "createdAt" timestamptz not null default now(), + + primary key (id) +); \ No newline at end of file diff --git a/backend/src/database/repositories/incomingWebhookRepository.ts b/backend/src/database/repositories/incomingWebhookRepository.ts new file mode 100644 index 0000000000..9dc2cd98a2 --- /dev/null +++ b/backend/src/database/repositories/incomingWebhookRepository.ts @@ -0,0 +1,156 @@ +import { v4 as uuid } from 'uuid' +import { QueryTypes } from 'sequelize' +import { + DbIncomingWebhookInsertData, + IncomingWebhookData, + WebhookState, +} from '../../types/webhooks' +import { IRepositoryOptions } from './IRepositoryOptions' +import { RepositoryBase } from './repositoryBase' + +/* eslint-disable class-methods-use-this */ + +export default class IncomingWebhookRepository extends RepositoryBase< + IncomingWebhookData, + string, + DbIncomingWebhookInsertData, + unknown, + unknown +> { + public constructor(options: IRepositoryOptions) { + super(options, true) + } + + async create(data: DbIncomingWebhookInsertData): Promise { + const transaction = this.transaction + + const id = uuid() + + const results = await this.seq.query( + ` + insert into "incomingWebhooks"(id, "tenantId", "integrationId", state, type, payload) + values(:id, :tenantId, :integrationId, :state, :type, :payload) + returning "createdAt" + `, + { + replacements: { + id, + tenantId: data.tenantId, + integrationId: data.integrationId, + type: data.type, + state: WebhookState.PENDING, + payload: JSON.stringify(data.payload), + }, + type: QueryTypes.INSERT, + transaction, + }, + ) + + return { + id, + state: WebhookState.PENDING, + ...data, + processedAt: null, + error: null, + createdAt: results[0][0].createdAt.toISOString(), + } + } + + override async findById(id: string): Promise { + const transaction = this.transaction + + const seq = this.seq + + const results = await seq.query( + ` + select id, + "tenantId", + "integrationId", + state, + type, + payload, + "processedAt", + error, + "createdAt" + from "incomingWebhooks" + where id = :id + `, + { + replacements: { + id, + }, + type: QueryTypes.SELECT, + transaction, + }, + ) + + if (results.length === 0) { + return null + } + + const data = results[0] as any + + return { + id: data.id, + tenantId: data.tenantId, + integrationId: data.integrationId, + state: data.state, + type: data.type, + payload: data.payload, + processedAt: data.processedAt ? data.processedAt.toISOString() : null, + error: data.error, + createdAt: data.createdAt.toISOString(), + } + } + + async markCompleted(id: string): Promise { + const transaction = this.transaction + + const [, rowCount] = await this.seq.query( + ` + update "incomingWebhooks" + set state = :state, + "processedAt" = now() + where id = :id + `, + { + replacements: { + id, + state: WebhookState.PROCESSED, + }, + type: QueryTypes.UPDATE, + transaction, + }, + ) + + if (rowCount !== 1) { + throw new Error(`Failed to mark webhook '${id}' as completed!`) + } + } + + async markError(id: string, error: any): Promise { + const transaction = this.transaction + + const [, rowCount] = await this.seq.query( + ` + update "incomingWebhooks" + set state = :state, + error = :error + where id = :id + `, + { + replacements: { + id, + state: WebhookState.ERROR, + error: JSON.stringify(error), + }, + type: QueryTypes.UPDATE, + transaction, + }, + ) + + if (rowCount !== 1) { + throw new Error(`Failed to mark webhook '${id}' as error!`) + } + } +} diff --git a/backend/src/database/repositories/repositoryBase.ts b/backend/src/database/repositories/repositoryBase.ts index ffc2c9f499..5503cf4270 100644 --- a/backend/src/database/repositories/repositoryBase.ts +++ b/backend/src/database/repositories/repositoryBase.ts @@ -37,19 +37,29 @@ export abstract class RepositoryBase< return this.options.database } - abstract create(data: TCreate): Promise + async create(data: TCreate): Promise { + throw new Error('Method not implemented.') + } - abstract update(id: TId, data: TUpdate): Promise + async update(id: TId, data: TUpdate): Promise { + throw new Error('Method not implemented.') + } async destroy(id: TId): Promise { return this.destroyAll([id]) } - abstract destroyAll(ids: TId[]): Promise + async destroyAll(ids: TId[]): Promise { + throw new Error('Method not implemented.') + } - abstract findById(id: TId): Promise + async findById(id: TId): Promise { + throw new Error('Method not implemented.') + } - abstract findAndCountAll(criteria: TCriteria): Promise> + async findAndCountAll(criteria: TCriteria): Promise> { + throw new Error('Method not implemented.') + } async findAll(criteria: TCriteria): Promise { const copy = { ...criteria } diff --git a/backend/src/serverless/integrations/services/integrationProcessor.ts b/backend/src/serverless/integrations/services/integrationProcessor.ts index 6537de7e9e..a728bfd356 100644 --- a/backend/src/serverless/integrations/services/integrationProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationProcessor.ts @@ -264,6 +264,8 @@ export class IntegrationProcessor extends LoggingBase { let delay: number = 5 let stopProcessing = false + let exit = false + if (streams.length > 0) { logger.info({ streamCount: streams.length }, 'Detected streams to process!') @@ -271,6 +273,18 @@ export class IntegrationProcessor extends LoggingBase { let processedCount = 0 let notifyCount = 0 while (streams.length > 0) { + if ((req as any).exiting) { + if (!req.onboarding) { + logger.warn('Stopped processing integration (not onboarding)!') + exit = true + break + } else { + logger.warn('Stopped processing integration (onboarding)!') + delay = 3 * 60 + break + } + } + const stream = streams.pop() processedCount++ @@ -388,9 +402,10 @@ export class IntegrationProcessor extends LoggingBase { // postprocess integration settings await intService.postprocess(stepContext, failedStreams, streams) - if (streams.length > 0 || failedStreams.length > 0) { + if (!exit && (streams.length > 0 || failedStreams.length > 0)) { logger.warn( - `${failedStreams.length} streams have not been successfully processed - retrying them with delay! We also have ${streams.length} remaining streams left to process!`, + { failed: failedStreams.length, remaining: streams.length }, + 'Integration processing finished - some streams were not processed!', ) const existingRetryStreams = req.retryStreams || [] diff --git a/backend/src/serverless/integrations/webhooks/__tests__/github.test.ts b/backend/src/serverless/integrations/webhooks/__tests__/github.test.ts index a7e77202fb..a9b1cbdd0d 100644 --- a/backend/src/serverless/integrations/webhooks/__tests__/github.test.ts +++ b/backend/src/serverless/integrations/webhooks/__tests__/github.test.ts @@ -351,7 +351,7 @@ describe('Github webhooks tests', () => { expect(fromDb).toBeDefined() }) - it('getActivityWithMember should return null for all other actions', async () => { + it('getActivityWithMember should throw an error for all other actions', async () => { const { gh } = await init(TestEvents.issues.event, TestEvents.issues.closed, true) const actions = [ @@ -373,9 +373,14 @@ describe('Github webhooks tests', () => { const fromMain = await gh.getActivityWithMember() expect(fromMain).toBeNull() - const fromDb = await gh.main() - expect(fromDb.message).toBe('Event not supported') - expect(fromDb.status).toBe(204) + try { + await gh.main() + fail('Should have thrown an error') + } catch (err) { + expect(err.message).toBe( + `Activity not supported! Event was issues of type string. Action was ${action}, with a payload type of object.`, + ) + } } }) }) @@ -678,7 +683,7 @@ describe('Github webhooks tests', () => { expect(fromDb).toBeDefined() }) - it('getActivityWithMember should return null for all other actions', async () => { + it('getActivityWithMember should throw an error for all other actions', async () => { const { gh } = await init(TestEvents.pullRequests.event, TestEvents.pullRequests.closed, true) const actions = [ @@ -701,9 +706,14 @@ describe('Github webhooks tests', () => { const fromMain = await gh.getActivityWithMember() expect(fromMain).toBeNull() - const fromDb = await gh.main() - expect(fromDb.message).toBe('Event not supported') - expect(fromDb.status).toBe(204) + try { + await gh.main() + fail('Should have thrown an error') + } catch (err) { + expect(err.message).toBe( + `Activity not supported! Event was pull_request of type string. Action was ${action}, with a payload type of object.`, + ) + } } }) }) @@ -1018,7 +1028,7 @@ describe('Github webhooks tests', () => { expect(fromDb.parentId).toBe(pull.id) }) - it('getActivityWithMember should return null for all other actions', async () => { + it('getActivityWithMember should throw an error for all other actions', async () => { const { gh } = await init(TestEvents.comment.event, TestEvents.comment.issue, true) const actions = ['deleted'] @@ -1027,9 +1037,14 @@ describe('Github webhooks tests', () => { const fromMain = await gh.getActivityWithMember() expect(fromMain).toBeNull() - const fromDb = await gh.main() - expect(fromDb.message).toBe('Event not supported') - expect(fromDb.status).toBe(204) + try { + await gh.main() + fail('Should have thrown error') + } catch (err) { + expect(err.message).toBe( + 'Activity not supported! Event was issue_comment of type string. Action was deleted, with a payload type of object.', + ) + } } }) it('It should parse a discussion comment created event coming from the GitHub API', async () => { diff --git a/backend/src/serverless/integrations/webhooks/github.ts b/backend/src/serverless/integrations/webhooks/github.ts index 337f63d874..795b2fffff 100644 --- a/backend/src/serverless/integrations/webhooks/github.ts +++ b/backend/src/serverless/integrations/webhooks/github.ts @@ -510,39 +510,39 @@ export default class GitHubWebhook { /** * Verifies a request coming from GitHub webhooks - * @param req The whole request + * @param signature header signature + * @param data payload * @returns The SQS message sent || Verification Error */ - static verify(req): void { - try { - const signature = req.headers['x-hub-signature'] - const secret = GITHUB_CONFIG.webhookSecret - - log.info('Verifying webhook...') - const isVerified = verifyGithubWebhook(signature, JSON.stringify(req.body), secret) // Returns true if verification succeeds; otherwise, false. + static verify(signature: string, data: any): void { + const secret = GITHUB_CONFIG.webhookSecret - log.info('Verification', isVerified) - if (!isVerified) { - throw new Error('Webhook not verified') - } + let isVerified: boolean + try { + isVerified = verifyGithubWebhook(signature, JSON.stringify(data), secret) // Returns true if verification succeeds; otherwise, false. } catch (err) { throw new Error(`Webhook not verified\n${err}`) } + + if (!isVerified) { + throw new Error('Webhook not verified') + } + + log.info('Webhook verified') } async main(): Promise { const activity = await this.getActivityWithMember() + if (activity) { const userContext = await getUserContext(activity.tenant) return new ActivityService(userContext).createWithMember(activity) } - return { - message: 'Event not supported', - info: `Event was ${this.event} of type ${typeof this.event}. Action was ${ + throw new Error( + `Activity not supported! Event was ${this.event} of type ${typeof this.event}. Action was ${ this.payload.action }, with a payload type of ${typeof this.payload}.`, - status: 204, - } + ) } } diff --git a/backend/src/serverless/integrations/workers/githubWebhookWorker.ts b/backend/src/serverless/integrations/workers/githubWebhookWorker.ts index 1ff3555f88..501788bed4 100644 --- a/backend/src/serverless/integrations/workers/githubWebhookWorker.ts +++ b/backend/src/serverless/integrations/workers/githubWebhookWorker.ts @@ -1,7 +1,96 @@ +import { createChildLogger, createServiceChildLogger, Logger } from '../../../utils/logging' +import { PlatformType } from '../../../types/integrationEnums' +import IntegrationRepository from '../../../database/repositories/integrationRepository' +import IncomingWebhookRepository from '../../../database/repositories/incomingWebhookRepository' +import { WebhookState, WebhookType } from '../../../types/webhooks' +import SequelizeRepository from '../../../database/repositories/sequelizeRepository' +import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS' +import { NodeWorkerProcessWebhookMessage } from '../../../types/mq/nodeWorkerProcessWebhookMessage' import GitHubWebhook from '../webhooks/github' +import { IRepositoryOptions } from '../../../database/repositories/IRepositoryOptions' + +const log = createServiceChildLogger('githubWebhookWorker') export default async function githubWebhookWorker(req) { - GitHubWebhook.verify(req) - const result = await new GitHubWebhook(req.headers['x-github-event'], req.body).main() - return result + const signature = req.headers['x-hub-signature'] + const event = req.headers['x-github-event'] + const data = req.body + + const identifier = data.installation.id.toString() + const integration = (await IntegrationRepository.findByIdentifier( + identifier, + PlatformType.GITHUB, + )) as any + + if (integration) { + log.info({ integrationId: integration.id }, 'Incoming GitHub Webhook!') + const options = await SequelizeRepository.getDefaultIRepositoryOptions() + const repo = new IncomingWebhookRepository(options) + + const result = await repo.create({ + tenantId: integration.tenantId, + integrationId: integration.id, + type: WebhookType.GITHUB, + payload: { + signature, + event, + data, + }, + }) + + await sendNodeWorkerMessage( + integration.tenantId, + new NodeWorkerProcessWebhookMessage(integration.tenantId, result.id), + ) + + return { + status: 204, + } + } + + log.error({ identifier }, 'No integration found for incoming GitHub Webhook!') + return { + status: 404, + } +} + +export const processWebhook = async ( + msg: NodeWorkerProcessWebhookMessage, + messageLogger: Logger, +) => { + const options = (await SequelizeRepository.getDefaultIRepositoryOptions()) as IRepositoryOptions + const logger = createChildLogger('githubWebhookProcessor', messageLogger, { + webhookId: msg.webhookId, + }) + options.log = logger + + logger.info('Processing GitHub Webhook!') + + // load webhook to process from database + options.transaction = await SequelizeRepository.createTransaction(options) + const repo = new IncomingWebhookRepository(options) + const webhook = await repo.findById(msg.webhookId) + + if (webhook) { + if (webhook.state !== WebhookState.PENDING) { + logger.error({ state: webhook.state }, 'Webhook is not in pending state!') + return + } + + try { + await GitHubWebhook.verify(webhook.payload.signature, webhook.payload.data) + + const processor = new GitHubWebhook(webhook.payload.event, webhook.payload.data) + await processor.main() + await repo.markCompleted(webhook.id) + logger.info('Webhook processed successfully!') + } catch (err) { + logger.error(err, 'Error processing webhook!') + await repo.markError(webhook.id, err) + } finally { + await SequelizeRepository.commitTransaction(options.transaction) + } + } else { + logger.error({ webhookId: msg.webhookId }, 'Webhook not found!') + } } diff --git a/backend/src/serverless/types/workerTypes.ts b/backend/src/serverless/types/workerTypes.ts index b560240b51..20113379b1 100644 --- a/backend/src/serverless/types/workerTypes.ts +++ b/backend/src/serverless/types/workerTypes.ts @@ -3,6 +3,7 @@ export enum NodeWorkerMessageType { INTEGRATION_PROCESS = 'integration_process', NODE_MICROSERVICE = 'node_microservice', DB_OPERATIONS = 'db_operations', + PROCESS_WEBHOOK = 'process_webhook', } export enum PythonWorkerMessageType { diff --git a/backend/src/serverless/utils/nodeWorkerSQS.ts b/backend/src/serverless/utils/nodeWorkerSQS.ts index 956d5cb8fd..3799c12814 100644 --- a/backend/src/serverless/utils/nodeWorkerSQS.ts +++ b/backend/src/serverless/utils/nodeWorkerSQS.ts @@ -63,14 +63,16 @@ export const sendNodeWorkerMessage = async ( delayed = true } - await sendMessage({ + const params = { QueueUrl: delayed ? SQS_CONFIG.nodejsWorkerDelayableQueue : SQS_CONFIG.nodejsWorkerQueue, MessageGroupId: delayed ? undefined : tenantId, MessageDeduplicationId: delayed ? undefined : `${tenantId}-${moment().valueOf()}`, MessageBody: JSON.stringify(body), MessageAttributes: attributes, DelaySeconds: delay, - }) + } + + await sendMessage(params) } export const sendNewActivityNodeSQSMessage = async ( diff --git a/backend/src/types/mq/nodeWorkerProcessWebhookMessage.ts b/backend/src/types/mq/nodeWorkerProcessWebhookMessage.ts new file mode 100644 index 0000000000..2b6d265fb1 --- /dev/null +++ b/backend/src/types/mq/nodeWorkerProcessWebhookMessage.ts @@ -0,0 +1,8 @@ +import { NodeWorkerMessageType } from '../../serverless/types/workerTypes' +import { NodeWorkerMessageBase } from './nodeWorkerMessageBase' + +export class NodeWorkerProcessWebhookMessage extends NodeWorkerMessageBase { + constructor(public readonly tenantId: string, public readonly webhookId: string) { + super(NodeWorkerMessageType.PROCESS_WEBHOOK) + } +} diff --git a/backend/src/types/webhooks.ts b/backend/src/types/webhooks.ts new file mode 100644 index 0000000000..e7996d7d03 --- /dev/null +++ b/backend/src/types/webhooks.ts @@ -0,0 +1,36 @@ +export enum WebhookState { + PENDING = 'PENDING', + PROCESSED = 'PROCESSED', + ERROR = 'ERROR', +} + +export enum WebhookType { + GITHUB = 'GITHUB', +} + +export interface GithubWebhookPayload { + signature: string + event: string + data: any +} + +export type IncomingWebhookPayload = GithubWebhookPayload + +export interface IncomingWebhookData { + id: string + tenantId: string + integrationId: string + state: WebhookState + type: WebhookType + payload: IncomingWebhookPayload + processedAt: string | null + error: any | null + createdAt: string +} + +export interface DbIncomingWebhookInsertData { + tenantId: string + integrationId: string + type: WebhookType + payload: any +} diff --git a/backend/src/utils/slack.ts b/backend/src/utils/slack.ts index 5ad5fcf7bf..f960e3a802 100644 --- a/backend/src/utils/slack.ts +++ b/backend/src/utils/slack.ts @@ -22,3 +22,23 @@ export const sendSlackAlert = async (text: string): Promise => { log.warn('No Slack client defined! Can not send a slack message!') } } + +export const sendSlackError = async (source: string, error: any): Promise => { + if (slackClientInstance) { + await slackClientInstance.chat.postMessage({ + channel: SLACK_CONFIG.reporterChannel, + text: `Error from ${source}:`, + username: 'Error Reporter', + icon_emoji: ':warning:', + blocks: [ + { + type: 'section', + text: { + type: 'mrkdwn', + text: `\`\`\`${JSON.stringify(error, null, 2)}\`\`\``, + }, + }, + ], + }) + } +} diff --git a/scripts/cli b/scripts/cli index 07d0ea3a66..8abc58368a 100755 --- a/scripts/cli +++ b/scripts/cli @@ -161,6 +161,9 @@ function scaffold() { create-migration) create_migration $2 exit; ;; + migrate-up) migrate_local + exit; + ;; up-test) up_test_scaffold exit;