From 49f27059713bde18a1c19a8b9af2dc4e8afc9de6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 15 Sep 2023 08:49:23 +0200 Subject: [PATCH 1/4] improvements --- .../src/bin/jobs/memberScoreCoordinator.ts | 2 +- ...egment-activity-channels-missing-index.sql | 1 + ...egment-activity-channels-missing-index.sql | 1 + services/apps/data_sink_worker/package.json | 3 +- .../src/bin/process-results.ts | 58 +++++++++++++++++ .../src/bin/onboard-integration.ts | 65 ++++++++++--------- .../src/service/organization.sync.service.ts | 14 ++-- 7 files changed, 105 insertions(+), 39 deletions(-) create mode 100644 backend/src/database/migrations/U1694760454__segment-activity-channels-missing-index.sql create mode 100644 backend/src/database/migrations/V1694760454__segment-activity-channels-missing-index.sql create mode 100644 services/apps/data_sink_worker/src/bin/process-results.ts diff --git a/backend/src/bin/jobs/memberScoreCoordinator.ts b/backend/src/bin/jobs/memberScoreCoordinator.ts index 5807a818c8..85f5c60497 100644 --- a/backend/src/bin/jobs/memberScoreCoordinator.ts +++ b/backend/src/bin/jobs/memberScoreCoordinator.ts @@ -5,7 +5,7 @@ import { PythonWorkerMessageType } from '../../serverless/types/workerTypes' const job: CrowdJob = { name: 'Member Score Coordinator', - cronTime: cronGenerator.every(20).minutes(), + cronTime: cronGenerator.every(90).minutes(), onTrigger: async () => { await sendPythonWorkerMessage('global', { type: PythonWorkerMessageType.MEMBERS_SCORE, diff --git a/backend/src/database/migrations/U1694760454__segment-activity-channels-missing-index.sql b/backend/src/database/migrations/U1694760454__segment-activity-channels-missing-index.sql new file mode 100644 index 0000000000..35ef6cfc58 --- /dev/null +++ b/backend/src/database/migrations/U1694760454__segment-activity-channels-missing-index.sql @@ -0,0 +1 @@ +drop index if exists "ix_segmentActivityChannels_segmentId_platform"; \ No newline at end of file diff --git a/backend/src/database/migrations/V1694760454__segment-activity-channels-missing-index.sql b/backend/src/database/migrations/V1694760454__segment-activity-channels-missing-index.sql new file mode 100644 index 0000000000..b7b5b869b4 --- /dev/null +++ b/backend/src/database/migrations/V1694760454__segment-activity-channels-missing-index.sql @@ -0,0 +1 @@ +create index if not exists "ix_segmentActivityChannels_segmentId_platform" on "segmentActivityChannels" ("segmentId", platform); \ No newline at end of file diff --git a/services/apps/data_sink_worker/package.json b/services/apps/data_sink_worker/package.json index ec03e182b6..64d235cb60 100644 --- a/services/apps/data_sink_worker/package.json +++ b/services/apps/data_sink_worker/package.json @@ -17,7 +17,8 @@ "script:restart-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-failed-results.ts", "script:restart-all-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-all-failed-results.ts", "script:restart-x-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-x-failed-results.ts", - "script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts" + "script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts", + "script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts" }, "dependencies": { "@crowd/common": "file:../../libs/common", diff --git a/services/apps/data_sink_worker/src/bin/process-results.ts b/services/apps/data_sink_worker/src/bin/process-results.ts new file mode 100644 index 0000000000..a507a0f7e8 --- /dev/null +++ b/services/apps/data_sink_worker/src/bin/process-results.ts @@ -0,0 +1,58 @@ +import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG } from '@/conf' +import DataSinkRepository from '@/repo/dataSink.repo' +import DataSinkService from '@/service/dataSink.service' +import { DbStore, getDbConnection } from '@crowd/database' +import { getServiceLogger } from '@crowd/logging' +import { getRedisClient } from '@crowd/redis' +import { + DataSinkWorkerEmitter, + NodejsWorkerEmitter, + SearchSyncWorkerEmitter, + getSqsClient, +} from '@crowd/sqs' +import { ProcessIntegrationResultQueueMessage } from '@crowd/types' + +const log = getServiceLogger() + +const processArguments = process.argv.slice(2) + +if (processArguments.length !== 1) { + log.error('Expected 1 argument: resultIds') + process.exit(1) +} + +const resultIds = processArguments[0].split(',') + +setImmediate(async () => { + const sqsClient = getSqsClient(SQS_CONFIG()) + const redisClient = await getRedisClient(REDIS_CONFIG()) + + const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, log) + const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, log) + + const dbConnection = await getDbConnection(DB_CONFIG()) + const store = new DbStore(log, dbConnection) + + const service = new DataSinkService( + store, + nodejsWorkerEmitter, + searchSyncWorkerEmitter, + redisClient, + log, + ) + + const repo = new DataSinkRepository(store, log) + for (const resultId of resultIds) { + const result = await repo.getResultInfo(resultId) + if (!result) { + log.error(`Result ${resultId} not found!`) + continue + } else { + await repo.resetResults([resultId]) + + await service.processResult(resultId) + } + } + + process.exit(0) +}) diff --git a/services/apps/integration_run_worker/src/bin/onboard-integration.ts b/services/apps/integration_run_worker/src/bin/onboard-integration.ts index 33d068f9ae..aca908edb6 100644 --- a/services/apps/integration_run_worker/src/bin/onboard-integration.ts +++ b/services/apps/integration_run_worker/src/bin/onboard-integration.ts @@ -9,7 +9,7 @@ const log = getServiceLogger() const processArguments = process.argv.slice(2) -const integrationId = processArguments[0] +const parameter = processArguments[0] setImmediate(async () => { const sqsClient = getSqsClient(SQS_CONFIG()) @@ -21,35 +21,40 @@ setImmediate(async () => { const repo = new IntegrationRunRepository(store, log) - const integration = await repo.getIntegrationData(integrationId) - - if (integration) { - if (integration.state == IntegrationState.IN_PROGRESS) { - log.warn(`Integration already running!`) - process.exit(1) - } - - if (integration.state == IntegrationState.INACTIVE) { - log.warn(`Integration is not active!`) - process.exit(1) + const integrationIds = parameter.split(',') + + for (const integrationId of integrationIds) { + const integration = await repo.getIntegrationData(integrationId) + + if (integration) { + if (integration.state == IntegrationState.IN_PROGRESS) { + log.warn(`Integration already running!`) + continue + } + + if (integration.state == IntegrationState.INACTIVE) { + log.warn(`Integration is not active!`) + continue + } + + if (integration.state == IntegrationState.WAITING_APPROVAL) { + log.warn(`Integration is waiting for approval!`) + continue + } + + log.info(`Triggering integration run for ${integrationId}!`) + + await emitter.triggerIntegrationRun( + integration.tenantId, + integration.type, + integration.id, + true, + ) + } else { + log.error({ integrationId }, 'Integration not found!') + continue } - - if (integration.state == IntegrationState.WAITING_APPROVAL) { - log.warn(`Integration is waiting for approval!`) - process.exit(1) - } - - log.info(`Triggering integration run for ${integrationId}!`) - - await emitter.triggerIntegrationRun( - integration.tenantId, - integration.type, - integration.id, - true, - ) - process.exit(0) - } else { - log.error({ integrationId }, 'Integration not found!') - process.exit(1) } + + process.exit(0) }) diff --git a/services/apps/search_sync_worker/src/service/organization.sync.service.ts b/services/apps/search_sync_worker/src/service/organization.sync.service.ts index f64139e720..558ba6bce1 100644 --- a/services/apps/search_sync_worker/src/service/organization.sync.service.ts +++ b/services/apps/search_sync_worker/src/service/organization.sync.service.ts @@ -88,10 +88,10 @@ export class OrganizationSyncService extends LoggerBase { }, } - const sort = [{ date_joinedAt: 'asc' }] - const include = ['date_joinedAt', 'uuid_organizationId'] + const sort = [{ date_createdAt: 'asc' }] + const include = ['date_createdAt', 'uuid_organizationId'] const pageSize = 500 - let lastJoinedAt: string + let lastCreatedAt: string let results = (await this.openSearchService.search( OpenSearchIndex.ORGANIZATIONS, @@ -101,7 +101,7 @@ export class OrganizationSyncService extends LoggerBase { sort, undefined, include, - )) as ISearchHit<{ date_joinedAt: string; uuid_organizationId: string }>[] + )) as ISearchHit<{ date_createdAt: string; uuid_organizationId: string }>[] let processed = 0 @@ -127,16 +127,16 @@ export class OrganizationSyncService extends LoggerBase { this.log.warn({ tenantId }, `Processed ${processed} organizations while cleaning up tenant!`) // use last joinedAt to get the next page - lastJoinedAt = results[results.length - 1]._source.date_joinedAt + lastCreatedAt = results[results.length - 1]._source.date_createdAt results = (await this.openSearchService.search( OpenSearchIndex.ORGANIZATIONS, query, undefined, pageSize, sort, - lastJoinedAt, + lastCreatedAt, include, - )) as ISearchHit<{ date_joinedAt: string; uuid_organizationId: string }>[] + )) as ISearchHit<{ date_createdAt: string; uuid_organizationId: string }>[] } this.log.warn( From 4aa70ff271d0c477e4a345692fd4c8171f82affc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 15 Sep 2023 08:56:48 +0200 Subject: [PATCH 2/4] temp ignore db_operations update_members queue message --- backend/src/bin/nodejs-worker.ts | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/backend/src/bin/nodejs-worker.ts b/backend/src/bin/nodejs-worker.ts index be56c99bcb..1729d49bee 100644 --- a/backend/src/bin/nodejs-worker.ts +++ b/backend/src/bin/nodejs-worker.ts @@ -1,4 +1,4 @@ -import { Logger, getChildLogger, getServiceLogger } from '@crowd/logging' +import { Logger, getChildLogger, getServiceLogger, logExecutionTimeV2 } from '@crowd/logging' import { DeleteMessageRequest, Message, ReceiveMessageRequest } from 'aws-sdk/clients/sqs' import moment from 'moment' import { timeout } from '@crowd/common' @@ -119,7 +119,19 @@ async function handleMessages() { }) try { - messageLogger.debug('Received a new queue message!') + if ( + msg.type === NodeWorkerMessageType.DB_OPERATIONS && + (msg as any).operation === 'update_members' + ) { + messageLogger.warn('Skipping update_members message! TEMPORARY MEASURE!') + await removeFromQueue(message.ReceiptHandle) + return + } + + messageLogger.info( + { messageType: msg.type, messagePayload: JSON.stringify(msg) }, + 'Received a new queue message!', + ) let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise let keep = false @@ -152,7 +164,13 @@ async function handleMessages() { await removeFromQueue(message.ReceiptHandle) messagesInProgress.set(message.MessageId, msg) try { - await processFunction(msg, messageLogger) + await logExecutionTimeV2( + async () => { + await processFunction(msg, messageLogger) + }, + messageLogger, + 'queueMessageProcessingTime', + ) } catch (err) { messageLogger.error(err, 'Error while processing queue message!') } finally { From fecb631e54459d6d6fc256baa3c2e2feef90f656 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 15 Sep 2023 08:57:29 +0200 Subject: [PATCH 3/4] fixed linting --- services/apps/data_sink_worker/src/bin/process-results.ts | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/process-results.ts b/services/apps/data_sink_worker/src/bin/process-results.ts index a507a0f7e8..e961a1e425 100644 --- a/services/apps/data_sink_worker/src/bin/process-results.ts +++ b/services/apps/data_sink_worker/src/bin/process-results.ts @@ -4,13 +4,7 @@ import DataSinkService from '@/service/dataSink.service' import { DbStore, getDbConnection } from '@crowd/database' import { getServiceLogger } from '@crowd/logging' import { getRedisClient } from '@crowd/redis' -import { - DataSinkWorkerEmitter, - NodejsWorkerEmitter, - SearchSyncWorkerEmitter, - getSqsClient, -} from '@crowd/sqs' -import { ProcessIntegrationResultQueueMessage } from '@crowd/types' +import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs' const log = getServiceLogger() From 6c87f24a3fda82c7a11cb2d0f9db2df91cdf79a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 15 Sep 2023 09:02:54 +0200 Subject: [PATCH 4/4] fixed comment --- .../search_sync_worker/src/service/organization.sync.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/search_sync_worker/src/service/organization.sync.service.ts b/services/apps/search_sync_worker/src/service/organization.sync.service.ts index 558ba6bce1..8fd0e0efe9 100644 --- a/services/apps/search_sync_worker/src/service/organization.sync.service.ts +++ b/services/apps/search_sync_worker/src/service/organization.sync.service.ts @@ -126,7 +126,7 @@ export class OrganizationSyncService extends LoggerBase { processed += results.length this.log.warn({ tenantId }, `Processed ${processed} organizations while cleaning up tenant!`) - // use last joinedAt to get the next page + // use last createdAt to get the next page lastCreatedAt = results[results.length - 1]._source.date_createdAt results = (await this.openSearchService.search( OpenSearchIndex.ORGANIZATIONS,