From b2dbe16b5a54a92eef740d43c9066fe0f9ac859f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 12 Oct 2023 12:49:24 +0200 Subject: [PATCH 1/3] refactor to remove the check integrations step on nodejs-worker - now everything is going on in job-generator --- .../src/bin/jobs/checkStuckIntegrationRuns.ts | 3 +- backend/src/bin/nodejs-worker.ts | 5 +- backend/src/bin/scripts/continue-run.ts | 2 +- .../src/bin/scripts/process-integration.ts | 2 +- backend/src/bin/worker/integrations.ts | 9 -- .../repositories/integrationRunRepository.ts | 7 +- .../services/integrationCheckProcessor.ts | 116 ---------------- .../services/integrationProcessor.ts | 14 -- .../services/integrationRunProcessor.ts | 4 +- .../services/integrationServiceBase.ts | 3 +- .../services/integrationTickProcessor.ts | 125 ++++++++++++++++-- .../integrations/githubIntegrationService.ts | 2 +- backend/src/services/integrationService.ts | 3 +- backend/src/types/integrationRunTypes.ts | 8 +- .../mq/nodeWorkerIntegrationCheckMessage.ts | 9 -- 15 files changed, 126 insertions(+), 186 deletions(-) delete mode 100644 backend/src/serverless/integrations/services/integrationCheckProcessor.ts delete mode 100644 backend/src/types/mq/nodeWorkerIntegrationCheckMessage.ts diff --git a/backend/src/bin/jobs/checkStuckIntegrationRuns.ts b/backend/src/bin/jobs/checkStuckIntegrationRuns.ts index 11bcf0b519..3c859c29ec 100644 --- a/backend/src/bin/jobs/checkStuckIntegrationRuns.ts +++ b/backend/src/bin/jobs/checkStuckIntegrationRuns.ts @@ -2,6 +2,7 @@ import { processPaginated } from '@crowd/common' import { Logger, getChildLogger, getServiceChildLogger } from '@crowd/logging' import cronGenerator from 'cron-time-generator' import moment from 'moment' +import { IntegrationRunState } from '@crowd/types' import { INTEGRATION_PROCESSING_CONFIG } from '../../conf' import IncomingWebhookRepository from '../../database/repositories/incomingWebhookRepository' import IntegrationRepository from '../../database/repositories/integrationRepository' @@ -9,7 +10,7 @@ import IntegrationRunRepository from '../../database/repositories/integrationRun import IntegrationStreamRepository from '../../database/repositories/integrationStreamRepository' import SequelizeRepository from '../../database/repositories/sequelizeRepository' import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS' -import { IntegrationRun, IntegrationRunState } from '../../types/integrationRunTypes' +import { IntegrationRun } from '../../types/integrationRunTypes' import { IntegrationStreamState } from '../../types/integrationStreamTypes' import { CrowdJob } from '../../types/jobTypes' import { NodeWorkerProcessWebhookMessage } from '../../types/mq/nodeWorkerProcessWebhookMessage' diff --git a/backend/src/bin/nodejs-worker.ts b/backend/src/bin/nodejs-worker.ts index baf839899e..0a1b7515f6 100644 --- a/backend/src/bin/nodejs-worker.ts +++ b/backend/src/bin/nodejs-worker.ts @@ -10,7 +10,7 @@ import { NodeWorkerMessageType } from '../serverless/types/workerTypes' import { sendNodeWorkerMessage } from '../serverless/utils/nodeWorkerSQS' import { NodeWorkerMessageBase } from '../types/mq/nodeWorkerMessageBase' import { deleteMessage, receiveMessage, sendMessage } from '../utils/sqs' -import { processIntegration, processIntegrationCheck, processWebhook } from './worker/integrations' +import { processIntegration, processWebhook } from './worker/integrations' /* eslint-disable no-constant-condition */ @@ -158,9 +158,6 @@ async function handleMessages() { let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise switch (msg.type) { - case NodeWorkerMessageType.INTEGRATION_CHECK: - processFunction = processIntegrationCheck - break case NodeWorkerMessageType.INTEGRATION_PROCESS: processFunction = processIntegration break diff --git a/backend/src/bin/scripts/continue-run.ts b/backend/src/bin/scripts/continue-run.ts index 41b583c0d8..af5dd5be49 100644 --- a/backend/src/bin/scripts/continue-run.ts +++ b/backend/src/bin/scripts/continue-run.ts @@ -3,11 +3,11 @@ import commandLineUsage from 'command-line-usage' import * as fs from 'fs' import path from 'path' import { getServiceLogger } from '@crowd/logging' +import { IntegrationRunState } from '@crowd/types' import SequelizeRepository from '../../database/repositories/sequelizeRepository' import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS' import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage' import IntegrationRunRepository from '../../database/repositories/integrationRunRepository' -import { IntegrationRunState } from '../../types/integrationRunTypes' /* eslint-disable no-console */ diff --git a/backend/src/bin/scripts/process-integration.ts b/backend/src/bin/scripts/process-integration.ts index ac8a69831d..499571d6bf 100644 --- a/backend/src/bin/scripts/process-integration.ts +++ b/backend/src/bin/scripts/process-integration.ts @@ -5,12 +5,12 @@ import commandLineArgs from 'command-line-args' import commandLineUsage from 'command-line-usage' import * as fs from 'fs' import path from 'path' +import { IntegrationRunState } from '@crowd/types' import IntegrationRepository from '../../database/repositories/integrationRepository' import IntegrationRunRepository from '../../database/repositories/integrationRunRepository' import SequelizeRepository from '../../database/repositories/sequelizeRepository' import { getIntegrationRunWorkerEmitter } from '../../serverless/utils/serviceSQS' import { sendNodeWorkerMessage } from '../../serverless/utils/nodeWorkerSQS' -import { IntegrationRunState } from '../../types/integrationRunTypes' import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage' /* eslint-disable no-console */ diff --git a/backend/src/bin/worker/integrations.ts b/backend/src/bin/worker/integrations.ts index 7e8c11b7de..387de96fa8 100644 --- a/backend/src/bin/worker/integrations.ts +++ b/backend/src/bin/worker/integrations.ts @@ -4,7 +4,6 @@ import { REDIS_CONFIG } from '../../conf' import SequelizeRepository from '../../database/repositories/sequelizeRepository' import { IntegrationProcessor } from '../../serverless/integrations/services/integrationProcessor' import { IServiceOptions } from '../../services/IServiceOptions' -import { NodeWorkerIntegrationCheckMessage } from '../../types/mq/nodeWorkerIntegrationCheckMessage' import { NodeWorkerIntegrationProcessMessage } from '../../types/mq/nodeWorkerIntegrationProcessMessage' import { NodeWorkerProcessWebhookMessage } from '../../types/mq/nodeWorkerProcessWebhookMessage' @@ -25,14 +24,6 @@ async function getIntegrationProcessor(logger: Logger): Promise => { - const processor = await getIntegrationProcessor(messageLogger) - await processor.processCheck(msg.integrationType) -} - export const processIntegration = async ( msg: NodeWorkerIntegrationProcessMessage, messageLogger: Logger, diff --git a/backend/src/database/repositories/integrationRunRepository.ts b/backend/src/database/repositories/integrationRunRepository.ts index 9c5ad3a8a0..a92c40076c 100644 --- a/backend/src/database/repositories/integrationRunRepository.ts +++ b/backend/src/database/repositories/integrationRunRepository.ts @@ -1,10 +1,7 @@ import { QueryTypes } from 'sequelize' import { generateUUIDv1 } from '@crowd/common' -import { - IntegrationRunState, - IntegrationRun, - DbIntegrationRunCreateData, -} from '../../types/integrationRunTypes' +import { IntegrationRunState } from '@crowd/types' +import { IntegrationRun, DbIntegrationRunCreateData } from '../../types/integrationRunTypes' import { IntegrationStreamState } from '../../types/integrationStreamTypes' import { IRepositoryOptions } from './IRepositoryOptions' import { RepositoryBase } from './repositoryBase' diff --git a/backend/src/serverless/integrations/services/integrationCheckProcessor.ts b/backend/src/serverless/integrations/services/integrationCheckProcessor.ts deleted file mode 100644 index 356cb518ea..0000000000 --- a/backend/src/serverless/integrations/services/integrationCheckProcessor.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { processPaginated, singleOrDefault } from '@crowd/common' -import { INTEGRATION_SERVICES } from '@crowd/integrations' -import { LoggerBase, getChildLogger } from '@crowd/logging' -import { IntegrationType } from '@crowd/types' -import { IRepositoryOptions } from '../../../database/repositories/IRepositoryOptions' -import IntegrationRepository from '../../../database/repositories/integrationRepository' -import IntegrationRunRepository from '../../../database/repositories/integrationRunRepository' -import MicroserviceRepository from '../../../database/repositories/microserviceRepository' -import SequelizeRepository from '../../../database/repositories/sequelizeRepository' -import { IServiceOptions } from '../../../services/IServiceOptions' -import { IntegrationRunState } from '../../../types/integrationRunTypes' -import { NodeWorkerIntegrationProcessMessage } from '../../../types/mq/nodeWorkerIntegrationProcessMessage' -import { getIntegrationRunWorkerEmitter } from '../../utils/serviceSQS' -import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS' -import { IntegrationServiceBase } from './integrationServiceBase' - -export class IntegrationCheckProcessor extends LoggerBase { - constructor( - options: IServiceOptions, - private readonly integrationServices: IntegrationServiceBase[], - private readonly integrationRunRepository: IntegrationRunRepository, - ) { - super(options.log) - } - - async processCheck(type: IntegrationType) { - const logger = getChildLogger('processCheck', this.log, { type }) - logger.trace('Processing integration check!') - - if (type === IntegrationType.TWITTER_REACH) { - await processPaginated( - async (page) => MicroserviceRepository.findAllByType('twitter_followers', page, 10), - async (microservices) => { - this.log.debug({ type, count: microservices.length }, 'Found microservices to check!') - for (const micro of microservices) { - const existingRun = await this.integrationRunRepository.findLastProcessingRun( - undefined, - micro.id, - ) - if (!existingRun) { - const microservice = micro as any - - const run = await this.integrationRunRepository.create({ - microserviceId: microservice.id, - tenantId: microservice.tenantId, - onboarding: false, - state: IntegrationRunState.PENDING, - }) - - this.log.debug({ type, runId: run.id }, 'Triggering microservice processing!') - - await sendNodeWorkerMessage( - microservice.tenantId, - new NodeWorkerIntegrationProcessMessage(run.id), - ) - } - } - }, - ) - } else { - const options = - (await SequelizeRepository.getDefaultIRepositoryOptions()) as IRepositoryOptions - - // get the relevant integration service that is supposed to be configured already - const intService = singleOrDefault(this.integrationServices, (s) => s.type === type) - - if (intService) { - await processPaginated( - async (page) => IntegrationRepository.findAllActive(type, page, 10), - async (integrations) => { - logger.debug({ count: integrations.length }, 'Found integrations to check!') - const inactiveIntegrations: any[] = [] - for (const integration of integrations as any[]) { - const existingRun = await this.integrationRunRepository.findLastProcessingRun( - integration.id, - ) - if (!existingRun) { - inactiveIntegrations.push(integration) - } - } - await intService.triggerIntegrationCheck(inactiveIntegrations, options) - }, - ) - } else { - const newIntService = singleOrDefault(INTEGRATION_SERVICES, (i) => i.type === type) - - if (!newIntService) { - throw new Error(`No integration service found for type ${type}!`) - } - - const emitter = await getIntegrationRunWorkerEmitter() - - await processPaginated( - async (page) => IntegrationRepository.findAllActive(type, page, 10), - async (integrations) => { - logger.debug({ count: integrations.length }, 'Found integrations to check!') - for (const integration of integrations as any[]) { - const existingRun = - await this.integrationRunRepository.findLastProcessingRunInNewFramework( - integration.id, - ) - if (!existingRun) { - await emitter.triggerIntegrationRun( - integration.tenantId, - integration.platform, - integration.id, - false, - ) - } - } - }, - ) - } - } - } -} diff --git a/backend/src/serverless/integrations/services/integrationProcessor.ts b/backend/src/serverless/integrations/services/integrationProcessor.ts index 985e2dfbd5..885d9124e2 100644 --- a/backend/src/serverless/integrations/services/integrationProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationProcessor.ts @@ -1,11 +1,9 @@ import { LoggerBase } from '@crowd/logging' import { ApiPubSubEmitter, RedisClient } from '@crowd/redis' -import { IntegrationType } from '@crowd/types' import IntegrationRunRepository from '../../../database/repositories/integrationRunRepository' import IntegrationStreamRepository from '../../../database/repositories/integrationStreamRepository' import { IServiceOptions } from '../../../services/IServiceOptions' import { NodeWorkerIntegrationProcessMessage } from '../../../types/mq/nodeWorkerIntegrationProcessMessage' -import { IntegrationCheckProcessor } from './integrationCheckProcessor' import { IntegrationRunProcessor } from './integrationRunProcessor' import { IntegrationTickProcessor } from './integrationTickProcessor' import { DiscourseIntegrationService } from './integrations/discourseIntegrationService' @@ -16,8 +14,6 @@ import { WebhookProcessor } from './webhookProcessor' export class IntegrationProcessor extends LoggerBase { private readonly tickProcessor: IntegrationTickProcessor - private readonly checkProcessor: IntegrationCheckProcessor - private readonly webhookProcessor: WebhookProcessor private readonly runProcessor: IntegrationRunProcessor | undefined @@ -51,12 +47,6 @@ export class IntegrationProcessor extends LoggerBase { integrationRunRepository, ) - this.checkProcessor = new IntegrationCheckProcessor( - options, - integrationServices, - integrationRunRepository, - ) - this.webhookProcessor = new WebhookProcessor(options, integrationServices) if (apiPubSubEmitter) { @@ -76,10 +66,6 @@ export class IntegrationProcessor extends LoggerBase { await this.tickProcessor.processTick() } - async processCheck(type: IntegrationType) { - await this.checkProcessor.processCheck(type) - } - async processWebhook(webhookId: string, force?: boolean, fireCrowdWebhooks?: boolean) { await this.webhookProcessor.processWebhook(webhookId, force, fireCrowdWebhooks) } diff --git a/backend/src/serverless/integrations/services/integrationRunProcessor.ts b/backend/src/serverless/integrations/services/integrationRunProcessor.ts index 41f625e426..afb595f151 100644 --- a/backend/src/serverless/integrations/services/integrationRunProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationRunProcessor.ts @@ -2,7 +2,7 @@ import moment from 'moment' import { ApiPubSubEmitter } from '@crowd/redis' import { Logger, getChildLogger, LoggerBase } from '@crowd/logging' import { singleOrDefault } from '@crowd/common' -import { PlatformType } from '@crowd/types' +import { IntegrationRunState, PlatformType } from '@crowd/types' import { sendSlackAlert, SlackAlertTypes } from '@crowd/alerting' import IntegrationRepository from '../../../database/repositories/integrationRepository' import IntegrationRunRepository from '../../../database/repositories/integrationRunRepository' @@ -16,7 +16,7 @@ import { IProcessStreamResults, IStepContext, } from '../../../types/integration/stepResult' -import { IntegrationRun, IntegrationRunState } from '../../../types/integrationRunTypes' +import { IntegrationRun } from '../../../types/integrationRunTypes' import { NodeWorkerIntegrationProcessMessage } from '../../../types/mq/nodeWorkerIntegrationProcessMessage' import { IntegrationServiceBase } from './integrationServiceBase' import SampleDataService from '../../../services/sampleDataService' diff --git a/backend/src/serverless/integrations/services/integrationServiceBase.ts b/backend/src/serverless/integrations/services/integrationServiceBase.ts index f17dd34da3..6e459dbe0a 100644 --- a/backend/src/serverless/integrations/services/integrationServiceBase.ts +++ b/backend/src/serverless/integrations/services/integrationServiceBase.ts @@ -2,7 +2,7 @@ import { SuperfaceClient } from '@superfaceai/one-sdk' import moment from 'moment' import crypto from 'crypto' import { getServiceChildLogger } from '@crowd/logging' -import { IntegrationType } from '@crowd/types' +import { IntegrationRunState, IntegrationType } from '@crowd/types' import { IRepositoryOptions } from '../../../database/repositories/IRepositoryOptions' import { IIntegrationStream, @@ -16,7 +16,6 @@ import { IS_TEST_ENV } from '../../../conf' import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS' import { NodeWorkerIntegrationProcessMessage } from '../../../types/mq/nodeWorkerIntegrationProcessMessage' import IntegrationRunRepository from '../../../database/repositories/integrationRunRepository' -import { IntegrationRunState } from '../../../types/integrationRunTypes' const logger = getServiceChildLogger('integrationService') diff --git a/backend/src/serverless/integrations/services/integrationTickProcessor.ts b/backend/src/serverless/integrations/services/integrationTickProcessor.ts index 504b133c37..b0d09c8ff5 100644 --- a/backend/src/serverless/integrations/services/integrationTickProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationTickProcessor.ts @@ -1,17 +1,20 @@ -import { IntegrationRunWorkerEmitter, IntegrationStreamWorkerEmitter } from '@crowd/sqs' -import { processPaginated } from '@crowd/common' +import { processPaginated, singleOrDefault } from '@crowd/common' import { INTEGRATION_SERVICES } from '@crowd/integrations' -import { LoggerBase } from '@crowd/logging' -import { IntegrationType } from '@crowd/types' -import { - getIntegrationRunWorkerEmitter, - getIntegrationStreamWorkerEmitter, -} from '../../utils/serviceSQS' +import { LoggerBase, getChildLogger } from '@crowd/logging' +import { IntegrationRunWorkerEmitter, IntegrationStreamWorkerEmitter } from '@crowd/sqs' +import { IntegrationRunState, IntegrationType } from '@crowd/types' +import SequelizeRepository from '@/database/repositories/sequelizeRepository' +import MicroserviceRepository from '@/database/repositories/microserviceRepository' +import IntegrationRepository from '@/database/repositories/integrationRepository' +import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions' import IntegrationRunRepository from '../../../database/repositories/integrationRunRepository' import { IServiceOptions } from '../../../services/IServiceOptions' -import { NodeWorkerIntegrationCheckMessage } from '../../../types/mq/nodeWorkerIntegrationCheckMessage' import { NodeWorkerIntegrationProcessMessage } from '../../../types/mq/nodeWorkerIntegrationProcessMessage' import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS' +import { + getIntegrationRunWorkerEmitter, + getIntegrationStreamWorkerEmitter, +} from '../../utils/serviceSQS' import { IntegrationServiceBase } from './integrationServiceBase' export class IntegrationTickProcessor extends LoggerBase { @@ -68,6 +71,8 @@ export class IntegrationTickProcessor extends LoggerBase { }) } + const promises: Promise[] = [] + for (const intService of tickers) { let trigger = false @@ -91,9 +96,105 @@ export class IntegrationTickProcessor extends LoggerBase { if (trigger) { this.log.info({ type: intService.type }, 'Triggering integration check!') - await sendNodeWorkerMessage( - new Date().toISOString(), - new NodeWorkerIntegrationCheckMessage(intService.type as IntegrationType), + promises.push( + this.processCheck(intService.type as IntegrationType).catch((err) => { + this.log.error(err, 'Error while processing integration check!') + }), + ) + } + } + + if (promises.length > 0) { + await Promise.all(promises) + } + } + + async processCheck(type: IntegrationType) { + const logger = getChildLogger('processCheck', this.log, { type }) + logger.trace('Processing integration check!') + + if (type === IntegrationType.TWITTER_REACH) { + await processPaginated( + async (page) => MicroserviceRepository.findAllByType('twitter_followers', page, 10), + async (microservices) => { + this.log.debug({ type, count: microservices.length }, 'Found microservices to check!') + for (const micro of microservices) { + const existingRun = await this.integrationRunRepository.findLastProcessingRun( + undefined, + micro.id, + ) + if (!existingRun) { + const microservice = micro as any + + const run = await this.integrationRunRepository.create({ + microserviceId: microservice.id, + tenantId: microservice.tenantId, + onboarding: false, + state: IntegrationRunState.PENDING, + }) + + this.log.debug({ type, runId: run.id }, 'Triggering microservice processing!') + + await sendNodeWorkerMessage( + microservice.tenantId, + new NodeWorkerIntegrationProcessMessage(run.id), + ) + } + } + }, + ) + } else { + const options = + (await SequelizeRepository.getDefaultIRepositoryOptions()) as IRepositoryOptions + + // get the relevant integration service that is supposed to be configured already + const intService = singleOrDefault(this.integrationServices, (s) => s.type === type) + + if (intService) { + await processPaginated( + async (page) => IntegrationRepository.findAllActive(type, page, 10), + async (integrations) => { + logger.debug({ count: integrations.length }, 'Found integrations to check!') + const inactiveIntegrations: any[] = [] + for (const integration of integrations as any[]) { + const existingRun = await this.integrationRunRepository.findLastProcessingRun( + integration.id, + ) + if (!existingRun) { + inactiveIntegrations.push(integration) + } + } + await intService.triggerIntegrationCheck(inactiveIntegrations, options) + }, + ) + } else { + const newIntService = singleOrDefault(INTEGRATION_SERVICES, (i) => i.type === type) + + if (!newIntService) { + throw new Error(`No integration service found for type ${type}!`) + } + + const emitter = await getIntegrationRunWorkerEmitter() + + await processPaginated( + async (page) => IntegrationRepository.findAllActive(type, page, 10), + async (integrations) => { + logger.debug({ count: integrations.length }, 'Found integrations to check!') + for (const integration of integrations as any[]) { + const existingRun = + await this.integrationRunRepository.findLastProcessingRunInNewFramework( + integration.id, + ) + if (!existingRun) { + await emitter.triggerIntegrationRun( + integration.tenantId, + integration.platform, + integration.id, + false, + ) + } + } + }, ) } } diff --git a/backend/src/serverless/integrations/services/integrations/githubIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/githubIntegrationService.ts index acd773c3f3..df1e1a6fb8 100644 --- a/backend/src/serverless/integrations/services/integrations/githubIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/githubIntegrationService.ts @@ -10,6 +10,7 @@ import { } from '@crowd/integrations' import { IActivityScoringGrid, + IntegrationRunState, IntegrationType, MemberAttributeName, PlatformType, @@ -49,7 +50,6 @@ import PullRequestCommitsQueryNoAdditions, { PullRequestCommitNoAdditions, } from '../../usecases/github/graphql/pullRequestCommitsNoAdditions' import IntegrationRunRepository from '../../../../database/repositories/integrationRunRepository' -import { IntegrationRunState } from '../../../../types/integrationRunTypes' import IntegrationStreamRepository from '../../../../database/repositories/integrationStreamRepository' import { DbIntegrationStreamCreateData } from '../../../../types/integrationStreamTypes' import { sendNodeWorkerMessage } from '../../../utils/nodeWorkerSQS' diff --git a/backend/src/services/integrationService.ts b/backend/src/services/integrationService.ts index 41d8d9d4e5..162b750d8d 100644 --- a/backend/src/services/integrationService.ts +++ b/backend/src/services/integrationService.ts @@ -3,7 +3,7 @@ import { request } from '@octokit/request' import moment from 'moment' import lodash from 'lodash' import axios, { AxiosRequestConfig, AxiosResponse } from 'axios' -import { PlatformType } from '@crowd/types' +import { IntegrationRunState, PlatformType } from '@crowd/types' import { HubspotFieldMapperFactory, getHubspotProperties, @@ -33,7 +33,6 @@ import getToken from '../serverless/integrations/usecases/nango/getToken' import { getOrganizations } from '../serverless/integrations/usecases/linkedin/getOrganizations' import Error404 from '../errors/Error404' import IntegrationRunRepository from '../database/repositories/integrationRunRepository' -import { IntegrationRunState } from '../types/integrationRunTypes' import { getIntegrationRunWorkerEmitter, getIntegrationSyncWorkerEmitter, diff --git a/backend/src/types/integrationRunTypes.ts b/backend/src/types/integrationRunTypes.ts index b87f5fefc5..03f18212c2 100644 --- a/backend/src/types/integrationRunTypes.ts +++ b/backend/src/types/integrationRunTypes.ts @@ -1,10 +1,4 @@ -export enum IntegrationRunState { - DELAYED = 'delayed', - PENDING = 'pending', - PROCESSING = 'processing', - PROCESSED = 'processed', - ERROR = 'error', -} +import { IntegrationRunState } from '@crowd/types' export interface IntegrationRun { id: string diff --git a/backend/src/types/mq/nodeWorkerIntegrationCheckMessage.ts b/backend/src/types/mq/nodeWorkerIntegrationCheckMessage.ts deleted file mode 100644 index f52782164e..0000000000 --- a/backend/src/types/mq/nodeWorkerIntegrationCheckMessage.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { IntegrationType } from '@crowd/types' -import { NodeWorkerMessageType } from '../../serverless/types/workerTypes' -import { NodeWorkerMessageBase } from './nodeWorkerMessageBase' - -export class NodeWorkerIntegrationCheckMessage extends NodeWorkerMessageBase { - constructor(public readonly integrationType: IntegrationType) { - super(NodeWorkerMessageType.INTEGRATION_CHECK) - } -} From 86911e5ffea7110cda527a82b897af199f975a7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 12 Oct 2023 12:58:21 +0200 Subject: [PATCH 2/3] added logs to see which integrations are being triggered for checks --- .../services/integrationTickProcessor.ts | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/backend/src/serverless/integrations/services/integrationTickProcessor.ts b/backend/src/serverless/integrations/services/integrationTickProcessor.ts index b0d09c8ff5..9ee09b083f 100644 --- a/backend/src/serverless/integrations/services/integrationTickProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationTickProcessor.ts @@ -110,7 +110,7 @@ export class IntegrationTickProcessor extends LoggerBase { } async processCheck(type: IntegrationType) { - const logger = getChildLogger('processCheck', this.log, { type }) + const logger = getChildLogger('processCheck', this.log, { IntegrationType: type }) logger.trace('Processing integration check!') if (type === IntegrationType.TWITTER_REACH) { @@ -154,7 +154,10 @@ export class IntegrationTickProcessor extends LoggerBase { await processPaginated( async (page) => IntegrationRepository.findAllActive(type, page, 10), async (integrations) => { - logger.debug({ count: integrations.length }, 'Found integrations to check!') + logger.debug( + { integrationIds: integrations.map((i) => i.id) }, + 'Found old integrations to check!', + ) const inactiveIntegrations: any[] = [] for (const integration of integrations as any[]) { const existingRun = await this.integrationRunRepository.findLastProcessingRun( @@ -164,7 +167,14 @@ export class IntegrationTickProcessor extends LoggerBase { inactiveIntegrations.push(integration) } } - await intService.triggerIntegrationCheck(inactiveIntegrations, options) + + if (inactiveIntegrations.length > 0) { + logger.info( + { integrationIds: inactiveIntegrations.map((i) => i.id) }, + 'Triggering old integration checks!', + ) + await intService.triggerIntegrationCheck(inactiveIntegrations, options) + } }, ) } else { @@ -179,13 +189,17 @@ export class IntegrationTickProcessor extends LoggerBase { await processPaginated( async (page) => IntegrationRepository.findAllActive(type, page, 10), async (integrations) => { - logger.debug({ count: integrations.length }, 'Found integrations to check!') + logger.debug( + { integrationIds: integrations.map((i) => i.id) }, + 'Found new integrations to check!', + ) for (const integration of integrations as any[]) { const existingRun = await this.integrationRunRepository.findLastProcessingRunInNewFramework( integration.id, ) if (!existingRun) { + logger.info({ integrationId: integration.id }, 'Triggering new integration check!') await emitter.triggerIntegrationRun( integration.tenantId, integration.platform, From aaf68ab7affab57a74558a828902db00d29e4091 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 12 Oct 2023 14:44:22 +0200 Subject: [PATCH 3/3] bugfix --- backend/src/database/repositories/integrationRepository.ts | 1 + .../integrations/services/integrationTickProcessor.ts | 2 ++ 2 files changed, 3 insertions(+) diff --git a/backend/src/database/repositories/integrationRepository.ts b/backend/src/database/repositories/integrationRepository.ts index 1be1126b45..211cc79dbe 100644 --- a/backend/src/database/repositories/integrationRepository.ts +++ b/backend/src/database/repositories/integrationRepository.ts @@ -260,6 +260,7 @@ class IntegrationRepository { }, limit: perPage, offset: (page - 1) * perPage, + order: [['id', 'ASC']], }) if (!records) { diff --git a/backend/src/serverless/integrations/services/integrationTickProcessor.ts b/backend/src/serverless/integrations/services/integrationTickProcessor.ts index 9ee09b083f..cbf427ec34 100644 --- a/backend/src/serverless/integrations/services/integrationTickProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationTickProcessor.ts @@ -206,6 +206,8 @@ export class IntegrationTickProcessor extends LoggerBase { integration.id, false, ) + } else { + logger.info({ integrationId: integration.id }, 'Existing run found, skipping!') } } },