Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
"lint": "npx eslint .",
"format": "npx prettier --write .",
"script:process-integration": "SERVICE=script ts-node ./src/bin/scripts/process-integration.ts",
"script:change-tenant-plan": "SERVICE=script ts-node ./src/bin/scripts/change-tenant-plan.ts"
"script:change-tenant-plan": "SERVICE=script ts-node ./src/bin/scripts/change-tenant-plan.ts",
"script:process-webhook": "SERVICE=script ts-node ./src/bin/scripts/process-webhook.ts"
},
"dependencies": {
"@aws-sdk/client-comprehend": "^3.159.0",
Expand Down
22 changes: 21 additions & 1 deletion backend/src/bin/api.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
import { getServiceLogger } from '../utils/logging'
import api from '../api'
import { API_CONFIG } from '../config'
import { timeout } from '../utils/timing'

const PORT = API_CONFIG.port || 8080

const log = getServiceLogger()

api.listen(PORT, () => {
const server = api.listen(PORT, () => {
log.info(`Listening on port ${PORT}`)
})

process.on('SIGTERM', async () => {
log.warn('Detected SIGTERM signal, started exiting!')
await new Promise<void>((resolve) => {
server.close((err) => {
if (err) {
log.error(err, 'Error while closing server!')
resolve()
} else {
log.info('Server closed successfully!')
resolve()
}
})
})

log.info('Exiting in 5 seconds...')
await timeout(5000)
process.exit(0)
})
53 changes: 39 additions & 14 deletions backend/src/bin/nodejs-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,26 @@ import { NodeWorkerMessageType } from '../serverless/types/workerTypes'
import { processNodeMicroserviceMessage } from '../serverless/microservices/nodejs/workDispatcher'
import { processDbOperationsMessage } from '../serverless/dbOperations/workDispatcher'
import { sendNodeWorkerMessage } from '../serverless/utils/nodeWorkerSQS'
import { NodeWorkerIntegrationCheckMessage } from '../types/mq/nodeWorkerIntegrationCheckMessage'
import { NodeWorkerIntegrationProcessMessage } from '../types/mq/nodeWorkerIntegrationProcessMessage'
import { NodeWorkerMessageBase } from '../types/mq/nodeWorkerMessageBase'
import { createChildLogger, getServiceLogger } from '../utils/logging'
import { createChildLogger, getServiceLogger, Logger } from '../utils/logging'
import { deleteMessage, receiveMessage, sendMessage } from '../utils/sqs'
import { timeout } from '../utils/timing'
import { processIntegration, processIntegrationCheck } from './worker/integrations'
import { processWebhook } from '../serverless/integrations/workers/githubWebhookWorker'

/* eslint-disable no-constant-condition */

const serviceLogger = getServiceLogger()

let exiting = false

const messagesInProgress = new Map<string, NodeWorkerMessageBase>()

process.on('SIGTERM', async () => {
serviceLogger.warn('Detected SIGTERM signal, started exiting!')
exiting = true
})

const receive = (delayed?: boolean): Promise<Message | undefined> => {
const params: ReceiveMessageRequest = {
QueueUrl: delayed ? SQS_CONFIG.nodejsWorkerDelayableQueue : SQS_CONFIG.nodejsWorkerQueue,
Expand All @@ -35,16 +45,14 @@ const removeFromQueue = (receiptHandle: string, delayed?: boolean): Promise<void
return deleteMessage(params)
}

const serviceLogger = getServiceLogger()

async function handleDelayedMessages() {
const delayedHandlerLogger = createChildLogger('delayedMessages', serviceLogger, {
queue: SQS_CONFIG.nodejsWorkerDelayableQueue,
})
delayedHandlerLogger.info('Listing for delayed messages!')

// noinspection InfiniteLoopJS
while (true) {
while (!exiting) {
const message = await receive(true)

if (message) {
Expand Down Expand Up @@ -84,6 +92,8 @@ async function handleDelayedMessages() {
delayedHandlerLogger.trace('No message received!')
}
}

delayedHandlerLogger.warn('Exiting!')
}

let processingMessages = 0
Expand Down Expand Up @@ -112,24 +122,25 @@ async function handleMessages() {
try {
messageLogger.debug('Received a new queue message!')

let processFunction: (msg: NodeWorkerMessageBase) => Promise<void>
let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise<void>
let keep = false

switch (msg.type) {
case NodeWorkerMessageType.INTEGRATION_CHECK:
await removeFromQueue(message.ReceiptHandle)
await processIntegrationCheck(messageLogger, msg as NodeWorkerIntegrationCheckMessage)
processFunction = processIntegrationCheck
break
case NodeWorkerMessageType.INTEGRATION_PROCESS:
await removeFromQueue(message.ReceiptHandle)
await processIntegration(messageLogger, msg as NodeWorkerIntegrationProcessMessage)
processFunction = processIntegration
break
case NodeWorkerMessageType.NODE_MICROSERVICE:
processFunction = processNodeMicroserviceMessage
break
case NodeWorkerMessageType.DB_OPERATIONS:
processFunction = processDbOperationsMessage
break
case NodeWorkerMessageType.PROCESS_WEBHOOK:
processFunction = processWebhook
break

default:
keep = true
Expand All @@ -140,11 +151,13 @@ async function handleMessages() {
if (!keep) {
// remove the message from the queue as it's about to be processed
await removeFromQueue(message.ReceiptHandle)

messagesInProgress.set(message.MessageId, msg)
try {
await processFunction(msg)
await processFunction(msg, messageLogger)
} catch (err) {
messageLogger.error(err, 'Error while processing queue message!')
} finally {
messagesInProgress.delete(message.MessageId)
}
} else {
messageLogger.warn('Keeping the message in the queue!')
Expand All @@ -156,7 +169,7 @@ async function handleMessages() {
}

// noinspection InfiniteLoopJS
while (true) {
while (!exiting) {
if (isWorkerAvailable()) {
const message = await receive()

Expand All @@ -170,6 +183,18 @@ async function handleMessages() {
await timeout(200)
}
}

// mark in flight messages as exiting
for (const msg of messagesInProgress.values()) {
;(msg as any).exiting = true
}

while (messagesInProgress.size !== 0) {
handlerLogger.warn(`Waiting for ${messagesInProgress.size} messages to finish!`)
await timeout(500)
}

handlerLogger.warn('Exiting!')
}

setImmediate(async () => {
Expand Down
4 changes: 2 additions & 2 deletions backend/src/bin/scripts/process-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const sections = [
},
{
header: 'Process Integration',
content: 'Trigger processing of a single integration.',
content: 'Trigger processing of integrations.',
},
{
header: 'Options',
Expand All @@ -58,9 +58,9 @@ if (parameters.help || !parameters.integration) {
setImmediate(async () => {
const integrationIds = parameters.integration.split(',')
const onboarding = parameters.onboarding
const options = await SequelizeRepository.getDefaultIRepositoryOptions()

for (const integrationId of integrationIds) {
const options = await SequelizeRepository.getDefaultIRepositoryOptions()
const integration = await options.database.integration.findOne({
where: { id: integrationId },
})
Expand Down
77 changes: 77 additions & 0 deletions backend/src/bin/scripts/process-webhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import commandLineArgs from 'command-line-args'
import commandLineUsage from 'command-line-usage'
import * as fs from 'fs'
import path from 'path'
import { createServiceLogger } from '../../utils/logging'
import SequelizeRepository from '../../database/repositories/sequelizeRepository'
import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS'
import IncomingWebhookRepository from '../../database/repositories/incomingWebhookRepository'
import { WebhookState } from '../../types/webhooks'
import { NodeWorkerProcessWebhookMessage } from '../../types/mq/nodeWorkerProcessWebhookMessage'

const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8')

const log = createServiceLogger()

const options = [
{
name: 'webhook',
alias: 'w',
typeLabel: '{underline webhookId}',
type: String,
description:
'The unique ID of webhook that you would like to process. Use comma delimiter when sending multiple webhooks.',
},
{
name: 'help',
alias: 'h',
type: Boolean,
description: 'Print this usage guide.',
},
]
const sections = [
{
content: banner,
raw: true,
},
{
header: 'Process Webhook',
content: 'Trigger processing of webhooks.',
},
{
header: 'Options',
optionList: options,
},
]

const usage = commandLineUsage(sections)
const parameters = commandLineArgs(options)

if (parameters.help || !parameters.webhook) {
console.log(usage)
} else {
setImmediate(async () => {
const webhookIds = parameters.webhook.split(',')
const options = await SequelizeRepository.getDefaultIRepositoryOptions()
const repo = new IncomingWebhookRepository(options)

for (const webhookId of webhookIds) {
const webhook = await repo.findById(webhookId)

if (!webhook) {
log.error({ webhookId }, 'Webhook not found!')
process.exit(1)
} else if (webhook.state !== WebhookState.PENDING) {
log.error({ webhookId }, 'Webhook is not in pending state!')
process.exit(1)
} else {
log.info({ webhookId }, 'Webhook found - triggering SQS message!')
await sendNodeWorkerMessage(
webhook.tenantId,
new NodeWorkerProcessWebhookMessage(webhook.tenantId, webhook.id),
)
}
}
process.exit(0)
})
}
4 changes: 2 additions & 2 deletions backend/src/bin/worker/integrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { NodeWorkerIntegrationCheckMessage } from '../../types/mq/nodeWorkerInte
import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage'

export const processIntegrationCheck = async (
messageLogger: Logger,
msg: NodeWorkerIntegrationCheckMessage,
messageLogger: Logger,
): Promise<void> => {
const options = (await SequelizeRepository.getDefaultIRepositoryOptions()) as IServiceOptions
options.log = messageLogger
Expand All @@ -18,8 +18,8 @@ export const processIntegrationCheck = async (
}

export const processIntegration = async (
messageLogger: Logger,
msg: NodeWorkerIntegrationProcessMessage,
messageLogger: Logger,
): Promise<void> => {
const options = (await SequelizeRepository.getDefaultIRepositoryOptions()) as IServiceOptions
options.log = messageLogger
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table "incomingWebhooks";
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
create table "incomingWebhooks" (
id uuid not null,
"tenantId" uuid not null,
"integrationId" uuid not null,
state varchar(255) not null,

type varchar(255) not null,
payload json not null,

"processedAt" timestamptz null,
error json null,

"createdAt" timestamptz not null default now(),

primary key (id)
);
Loading