diff --git a/backend/src/database/migrations/U1699626027__add-retries-to-results.sql b/backend/src/database/migrations/U1699626027__add-retries-to-results.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1699626027__add-retries-to-results.sql b/backend/src/database/migrations/V1699626027__add-retries-to-results.sql new file mode 100644 index 0000000000..6d3e50c827 --- /dev/null +++ b/backend/src/database/migrations/V1699626027__add-retries-to-results.sql @@ -0,0 +1,3 @@ +ALTER TABLE integration.results +ADD COLUMN retries INT, +ADD COLUMN "delayedUntil" TIMESTAMP with time zone NULL; diff --git a/backend/src/serverless/integrations/services/integrationTickProcessor.ts b/backend/src/serverless/integrations/services/integrationTickProcessor.ts index cbf427ec34..012f4f4a91 100644 --- a/backend/src/serverless/integrations/services/integrationTickProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationTickProcessor.ts @@ -1,7 +1,11 @@ import { processPaginated, singleOrDefault } from '@crowd/common' import { INTEGRATION_SERVICES } from '@crowd/integrations' import { LoggerBase, getChildLogger } from '@crowd/logging' -import { IntegrationRunWorkerEmitter, IntegrationStreamWorkerEmitter } from '@crowd/sqs' +import { + IntegrationRunWorkerEmitter, + IntegrationStreamWorkerEmitter, + DataSinkWorkerEmitter, +} from '@crowd/sqs' import { IntegrationRunState, IntegrationType } from '@crowd/types' import SequelizeRepository from '@/database/repositories/sequelizeRepository' import MicroserviceRepository from '@/database/repositories/microserviceRepository' @@ -14,6 +18,7 @@ import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS' import { getIntegrationRunWorkerEmitter, getIntegrationStreamWorkerEmitter, + getDataSinkWorkerEmitter, } from '../../utils/serviceSQS' import { IntegrationServiceBase } from './integrationServiceBase' @@ -26,6 +31,8 @@ export class IntegrationTickProcessor extends LoggerBase { private intStreamWorkerEmitter: IntegrationStreamWorkerEmitter + private dataSinkWorkerEmitter: DataSinkWorkerEmitter + constructor( options: IServiceOptions, private readonly integrationServices: IntegrationServiceBase[], @@ -46,6 +53,7 @@ export class IntegrationTickProcessor extends LoggerBase { if (!this.emittersInitialized) { this.intRunWorkerEmitter = await getIntegrationRunWorkerEmitter() this.intStreamWorkerEmitter = await getIntegrationStreamWorkerEmitter() + this.dataSinkWorkerEmitter = await getDataSinkWorkerEmitter() this.emittersInitialized = true } @@ -220,6 +228,7 @@ export class IntegrationTickProcessor extends LoggerBase { await this.initEmitters() await this.intRunWorkerEmitter.checkRuns() await this.intStreamWorkerEmitter.checkStreams() + await this.dataSinkWorkerEmitter.checkResults() // TODO check streams as well this.log.trace('Checking for delayed integration runs!') diff --git a/backend/src/serverless/utils/serviceSQS.ts b/backend/src/serverless/utils/serviceSQS.ts index 4608978b9b..344cc0564a 100644 --- a/backend/src/serverless/utils/serviceSQS.ts +++ b/backend/src/serverless/utils/serviceSQS.ts @@ -3,6 +3,7 @@ import { IntegrationStreamWorkerEmitter, IntegrationSyncWorkerEmitter, SearchSyncWorkerEmitter, + DataSinkWorkerEmitter, SqsClient, getSqsClient, } from '@crowd/sqs' @@ -64,3 +65,12 @@ export const getIntegrationSyncWorkerEmitter = async (): Promise => { + if (dataSinkWorkerEmitter) return dataSinkWorkerEmitter + + dataSinkWorkerEmitter = new DataSinkWorkerEmitter(getClient(), tracer, log) + await dataSinkWorkerEmitter.init() + return dataSinkWorkerEmitter +} diff --git a/services/apps/data_sink_worker/config/default.json b/services/apps/data_sink_worker/config/default.json index 4c84e8e754..e9a6b31829 100644 --- a/services/apps/data_sink_worker/config/default.json +++ b/services/apps/data_sink_worker/config/default.json @@ -5,5 +5,8 @@ "unleash": {}, "temporal": { "automationsTaskQueue": "automations" + }, + "worker": { + "maxStreamRetries": 5 } } diff --git a/services/apps/data_sink_worker/src/bin/process-results.ts b/services/apps/data_sink_worker/src/bin/process-results.ts index c6b72de1a5..19f320f150 100644 --- a/services/apps/data_sink_worker/src/bin/process-results.ts +++ b/services/apps/data_sink_worker/src/bin/process-results.ts @@ -12,7 +12,12 @@ import { DbStore, getDbConnection } from '@crowd/database' import { getServiceTracer } from '@crowd/tracing' import { getServiceLogger } from '@crowd/logging' import { getRedisClient } from '@crowd/redis' -import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs' +import { + NodejsWorkerEmitter, + SearchSyncWorkerEmitter, + DataSinkWorkerEmitter, + getSqsClient, +} from '@crowd/sqs' import { initializeSentimentAnalysis } from '@crowd/sentiment' import { getUnleashClient } from '@crowd/feature-flags' import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal' @@ -49,6 +54,9 @@ setImmediate(async () => { const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log) await searchSyncWorkerEmitter.init() + const dataSinkWorkerEmitter = new DataSinkWorkerEmitter(sqsClient, tracer, log) + await dataSinkWorkerEmitter.init() + const dbConnection = await getDbConnection(DB_CONFIG()) const store = new DbStore(log, dbConnection) @@ -56,6 +64,7 @@ setImmediate(async () => { store, nodejsWorkerEmitter, searchSyncWorkerEmitter, + dataSinkWorkerEmitter, redisClient, unleash, temporal, diff --git a/services/apps/data_sink_worker/src/conf/index.ts b/services/apps/data_sink_worker/src/conf/index.ts index 5dcd380ab9..c993f7559e 100644 --- a/services/apps/data_sink_worker/src/conf/index.ts +++ b/services/apps/data_sink_worker/src/conf/index.ts @@ -11,6 +11,18 @@ export interface ISlackAlertingConfig { url: string } +export interface IWorkerConfig { + maxStreamRetries: number +} + +let workerSettings: IWorkerConfig +export const WORKER_SETTINGS = (): IWorkerConfig => { + if (workerSettings) return workerSettings + + workerSettings = config.get('worker') + return workerSettings +} + let redisConfig: IRedisConfiguration export const REDIS_CONFIG = (): IRedisConfiguration => { if (redisConfig) return redisConfig diff --git a/services/apps/data_sink_worker/src/jobs/processOldResults.ts b/services/apps/data_sink_worker/src/jobs/processOldResults.ts index 747ca6a793..4934842a4b 100644 --- a/services/apps/data_sink_worker/src/jobs/processOldResults.ts +++ b/services/apps/data_sink_worker/src/jobs/processOldResults.ts @@ -3,7 +3,7 @@ import { DbConnection, DbStore } from '@crowd/database' import { Unleash } from '@crowd/feature-flags' import { Logger } from '@crowd/logging' import { RedisClient } from '@crowd/redis' -import { NodejsWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/sqs' +import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, DataSinkWorkerEmitter } from '@crowd/sqs' import { Client as TemporalClient } from '@crowd/temporal' import DataSinkRepository from '../repo/dataSink.repo' import DataSinkService from '../service/dataSink.service' @@ -16,6 +16,7 @@ export const processOldResultsJob = async ( redis: RedisClient, nodejsWorkerEmitter: NodejsWorkerEmitter, searchSyncWorkerEmitter: SearchSyncWorkerEmitter, + dataSinkWorkerEmitter: DataSinkWorkerEmitter, unleash: Unleash | undefined, temporal: TemporalClient, log: Logger, @@ -26,6 +27,7 @@ export const processOldResultsJob = async ( store, nodejsWorkerEmitter, searchSyncWorkerEmitter, + dataSinkWorkerEmitter, redis, unleash, temporal, diff --git a/services/apps/data_sink_worker/src/main.ts b/services/apps/data_sink_worker/src/main.ts index b2d11249af..a8d3e347ec 100644 --- a/services/apps/data_sink_worker/src/main.ts +++ b/services/apps/data_sink_worker/src/main.ts @@ -1,7 +1,12 @@ import { getDbConnection } from '@crowd/database' import { getServiceTracer } from '@crowd/tracing' import { getServiceLogger } from '@crowd/logging' -import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs' +import { + NodejsWorkerEmitter, + SearchSyncWorkerEmitter, + DataSinkWorkerEmitter, + getSqsClient, +} from '@crowd/sqs' import { DB_CONFIG, SENTIMENT_CONFIG, @@ -48,11 +53,14 @@ setImmediate(async () => { const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log) + const dataWorkerEmitter = new DataSinkWorkerEmitter(sqsClient, tracer, log) + const queue = new WorkerQueueReceiver( sqsClient, dbConnection, nodejsWorkerEmitter, searchSyncWorkerEmitter, + dataWorkerEmitter, redisClient, unleash, temporal, @@ -64,6 +72,7 @@ setImmediate(async () => { try { await nodejsWorkerEmitter.init() await searchSyncWorkerEmitter.init() + await dataWorkerEmitter.init() let processing = false setInterval(async () => { @@ -77,6 +86,7 @@ setImmediate(async () => { redisClient, nodejsWorkerEmitter, searchSyncWorkerEmitter, + dataWorkerEmitter, unleash, temporal, log, diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 5d4d6a8c67..f7ccade232 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -5,6 +5,7 @@ import { DATA_SINK_WORKER_QUEUE_SETTINGS, NodejsWorkerEmitter, SearchSyncWorkerEmitter, + DataSinkWorkerEmitter, SqsClient, SqsQueueReceiver, } from '@crowd/sqs' @@ -25,6 +26,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver { private readonly dbConn: DbConnection, private readonly nodejsWorkerEmitter: NodejsWorkerEmitter, private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter, + private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter, private readonly redisClient: RedisClient, private readonly unleash: Unleash | undefined, private readonly temporal: TemporalClient, @@ -44,6 +46,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver { new DbStore(this.log, this.dbConn, undefined, false), this.nodejsWorkerEmitter, this.searchSyncWorkerEmitter, + this.dataSinkWorkerEmitter, this.redisClient, this.unleash, this.temporal, @@ -65,6 +68,10 @@ export class WorkerQueueReceiver extends SqsQueueReceiver { ) break } + case DataSinkWorkerQueueMessageType.CHECK_RESULTS: { + await service.checkResults() + break + } default: throw new Error(`Unknown message type: ${message.type}`) diff --git a/services/apps/data_sink_worker/src/repo/dataSink.data.ts b/services/apps/data_sink_worker/src/repo/dataSink.data.ts index 34dcb40258..fa5470eee4 100644 --- a/services/apps/data_sink_worker/src/repo/dataSink.data.ts +++ b/services/apps/data_sink_worker/src/repo/dataSink.data.ts @@ -17,6 +17,9 @@ export interface IResultData { plan: string isTrialPlan: boolean name: string + + retries: number + delayedUntil: string | null } export interface IFailedResultData { diff --git a/services/apps/data_sink_worker/src/repo/dataSink.repo.ts b/services/apps/data_sink_worker/src/repo/dataSink.repo.ts index 566959e20d..6b4bd13c34 100644 --- a/services/apps/data_sink_worker/src/repo/dataSink.repo.ts +++ b/services/apps/data_sink_worker/src/repo/dataSink.repo.ts @@ -1,6 +1,6 @@ import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' -import { IIntegrationResult, IntegrationResultState, TenantPlans } from '@crowd/types' +import { IIntegrationResult, IntegrationResultState, PlatformType, TenantPlans } from '@crowd/types' import { IFailedResultData, IResultData } from './dataSink.data' export default class DataSinkRepository extends RepositoryBase { @@ -18,6 +18,8 @@ export default class DataSinkRepository extends RepositoryBase r.id) } + + public async delayResult(resultId: string, until: Date): Promise { + const result = await this.db().result( + `update integration.results + set state = $(state), + "delayedUntil" = $(until), + retries = coalesce(retries, 0) + 1, + "updatedAt" = now() + where id = $(resultId)`, + { + resultId, + until, + state: IntegrationResultState.DELAYED, + }, + ) + + this.checkUpdateRowCount(result.rowCount, 1) + } + + public async getDelayedResults( + limit: number, + ): Promise<{ id: string; tenantId: string; platform: PlatformType }[]> { + this.ensureTransactional() + + try { + const results = await this.db().any( + ` + select r.id, r."tenantId", i.platform + from integration.results r + join integrations i on r."integrationId" = i.id + where r.state = $(delayedState) + and r."delayedUntil" < now() + limit ${limit} + for update skip locked; + `, + { + delayedState: IntegrationResultState.DELAYED, + }, + ) + + return results.map((s) => ({ id: s.id, tenantId: s.tenantId, platform: s.platform })) + } catch (err) { + this.log.error(err, 'Failed to get delayed results!') + throw err + } + } } diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 2ae91b9a09..6b8ae3cda0 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -1,7 +1,7 @@ import { DbStore } from '@crowd/database' import { Logger, LoggerBase, getChildLogger } from '@crowd/logging' import { RedisClient } from '@crowd/redis' -import { NodejsWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/sqs' +import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, DataSinkWorkerEmitter } from '@crowd/sqs' import { IActivityData, IMemberData, @@ -16,6 +16,9 @@ import MemberService from './member.service' import { OrganizationService } from './organization.service' import { Unleash } from '@crowd/feature-flags' import { Client as TemporalClient } from '@crowd/temporal' +import { IResultData } from '../repo/dataSink.data' +import { addSeconds } from '@crowd/common' +import { WORKER_SETTINGS } from '../conf' export default class DataSinkService extends LoggerBase { private readonly repo: DataSinkRepository @@ -24,6 +27,7 @@ export default class DataSinkService extends LoggerBase { private readonly store: DbStore, private readonly nodejsWorkerEmitter: NodejsWorkerEmitter, private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter, + private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter, private readonly redisClient: RedisClient, private readonly unleash: Unleash | undefined, private readonly temporal: TemporalClient, @@ -35,13 +39,13 @@ export default class DataSinkService extends LoggerBase { } private async triggerResultError( - resultId: string, + resultInfo: IResultData, location: string, message: string, metadata?: unknown, error?: Error, ): Promise { - await this.repo.markResultError(resultId, { + await this.repo.markResultError(resultInfo.id, { location, message, metadata, @@ -49,6 +53,41 @@ export default class DataSinkService extends LoggerBase { errorStack: error?.stack, errorString: error ? JSON.stringify(error) : undefined, }) + + if (resultInfo.retries + 1 <= WORKER_SETTINGS().maxStreamRetries) { + // delay for #retries * 2 minutes + const until = addSeconds(new Date(), (resultInfo.retries + 1) * 2 * 60) + this.log.warn({ until: until.toISOString() }, 'Retrying result!') + await this.repo.delayResult(resultInfo.id, until) + } + } + + public async checkResults(): Promise { + this.log.info('Checking for delayed results!') + + let results = await this.repo.transactionally(async (txRepo) => { + return await txRepo.getDelayedResults(10) + }) + + while (results.length > 0) { + this.log.info({ count: results.length }, 'Found delayed results!') + + for (const result of results) { + this.log.info({ resultId: result.id }, 'Restarting delayed stream!') + await this.repo.resetResults([result.id]) + await this.dataSinkWorkerEmitter.triggerResultProcessing( + result.tenantId, + result.platform, + result.id, + result.id, + `${result.id}-delayed-${Date.now()}`, + ) + } + + results = await this.repo.transactionally(async (txRepo) => { + return await txRepo.getDelayedResults(10) + }) + } } public async createAndProcessActivityResult( @@ -194,7 +233,7 @@ export default class DataSinkService extends LoggerBase { this.log.error(err, 'Error processing result.') try { await this.triggerResultError( - resultId, + resultInfo, 'process-result', 'Error processing result.', undefined, diff --git a/services/libs/sqs/src/instances/dataSinkWorker.ts b/services/libs/sqs/src/instances/dataSinkWorker.ts index 8bba152aa4..ec84ba4c10 100644 --- a/services/libs/sqs/src/instances/dataSinkWorker.ts +++ b/services/libs/sqs/src/instances/dataSinkWorker.ts @@ -6,6 +6,7 @@ import { CreateAndProcessActivityResultQueueMessage, IActivityData, ProcessIntegrationResultQueueMessage, + CheckResultsQueueMessage, } from '@crowd/types' import { Tracer } from '@crowd/tracing' @@ -19,8 +20,13 @@ export class DataSinkWorkerEmitter extends SqsQueueEmitter { platform: string, resultId: string, sourceId: string, + deduplicationId?: string, ) { - await this.sendMessage(sourceId, new ProcessIntegrationResultQueueMessage(resultId), resultId) + await this.sendMessage( + sourceId, + new ProcessIntegrationResultQueueMessage(resultId), + deduplicationId || resultId, + ) } public async createAndProcessActivityResult( @@ -34,4 +40,8 @@ export class DataSinkWorkerEmitter extends SqsQueueEmitter { new CreateAndProcessActivityResultQueueMessage(tenantId, segmentId, integrationId, activity), ) } + + public async checkResults() { + await this.sendMessage('global', new CheckResultsQueueMessage()) + } } diff --git a/services/libs/types/src/enums/integrations.ts b/services/libs/types/src/enums/integrations.ts index 33bfd4aac5..074e873daa 100644 --- a/services/libs/types/src/enums/integrations.ts +++ b/services/libs/types/src/enums/integrations.ts @@ -41,6 +41,7 @@ export enum IntegrationResultState { PROCESSING = 'processing', PROCESSED = 'processed', ERROR = 'error', + DELAYED = 'delayed', } export enum IntegrationResultType { diff --git a/services/libs/types/src/queue/data_sink_worker/index.ts b/services/libs/types/src/queue/data_sink_worker/index.ts index 9704ddb3db..98ff582958 100644 --- a/services/libs/types/src/queue/data_sink_worker/index.ts +++ b/services/libs/types/src/queue/data_sink_worker/index.ts @@ -5,6 +5,7 @@ export enum DataSinkWorkerQueueMessageType { PROCESS_INTEGRATION_RESULT = 'process_integration_result', CALCULATE_SENTIMENT = 'calculate_sentiment', CREATE_AND_PROCESS_ACTIVITY_RESULT = 'create_and_process_activity_result', + CHECK_RESULTS = 'check_results', } export class ProcessIntegrationResultQueueMessage implements IQueueMessage { @@ -29,3 +30,7 @@ export class CreateAndProcessActivityResultQueueMessage implements IQueueMessage public readonly activityData: IActivityData, ) {} } + +export class CheckResultsQueueMessage implements IQueueMessage { + public readonly type: string = DataSinkWorkerQueueMessageType.CHECK_RESULTS +}