Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/src/bin/jobs/memberScoreCoordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { PythonWorkerMessageType } from '../../serverless/types/workerTypes'

const job: CrowdJob = {
name: 'Member Score Coordinator',
cronTime: cronGenerator.every(20).minutes(),
cronTime: cronGenerator.every(90).minutes(),
onTrigger: async () => {
await sendPythonWorkerMessage('global', {
type: PythonWorkerMessageType.MEMBERS_SCORE,
Expand Down
24 changes: 21 additions & 3 deletions backend/src/bin/nodejs-worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger, getChildLogger, getServiceLogger } from '@crowd/logging'
import { Logger, getChildLogger, getServiceLogger, logExecutionTimeV2 } from '@crowd/logging'
import { DeleteMessageRequest, Message, ReceiveMessageRequest } from 'aws-sdk/clients/sqs'
import moment from 'moment'
import { timeout } from '@crowd/common'
Expand Down Expand Up @@ -119,7 +119,19 @@ async function handleMessages() {
})

try {
messageLogger.debug('Received a new queue message!')
if (
msg.type === NodeWorkerMessageType.DB_OPERATIONS &&
(msg as any).operation === 'update_members'
) {
messageLogger.warn('Skipping update_members message! TEMPORARY MEASURE!')
await removeFromQueue(message.ReceiptHandle)
return
}

messageLogger.info(
{ messageType: msg.type, messagePayload: JSON.stringify(msg) },
'Received a new queue message!',
)

let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise<void>
let keep = false
Expand Down Expand Up @@ -152,7 +164,13 @@ async function handleMessages() {
await removeFromQueue(message.ReceiptHandle)
messagesInProgress.set(message.MessageId, msg)
try {
await processFunction(msg, messageLogger)
await logExecutionTimeV2(
async () => {
await processFunction(msg, messageLogger)
},
messageLogger,
'queueMessageProcessingTime',
)
} catch (err) {
messageLogger.error(err, 'Error while processing queue message!')
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop index if exists "ix_segmentActivityChannels_segmentId_platform";
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create index if not exists "ix_segmentActivityChannels_segmentId_platform" on "segmentActivityChannels" ("segmentId", platform);
3 changes: 2 additions & 1 deletion services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"script:restart-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-failed-results.ts",
"script:restart-all-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-all-failed-results.ts",
"script:restart-x-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-x-failed-results.ts",
"script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts"
"script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts",
"script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts"
},
"dependencies": {
"@crowd/common": "file:../../libs/common",
Expand Down
52 changes: 52 additions & 0 deletions services/apps/data_sink_worker/src/bin/process-results.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG } from '@/conf'
import DataSinkRepository from '@/repo/dataSink.repo'
import DataSinkService from '@/service/dataSink.service'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: resultIds')
process.exit(1)
}

const resultIds = processArguments[0].split(',')

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const redisClient = await getRedisClient(REDIS_CONFIG())

const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, log)
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, log)

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)

const service = new DataSinkService(
store,
nodejsWorkerEmitter,
searchSyncWorkerEmitter,
redisClient,
log,
)

const repo = new DataSinkRepository(store, log)
for (const resultId of resultIds) {
const result = await repo.getResultInfo(resultId)
if (!result) {
log.error(`Result ${resultId} not found!`)
continue
} else {
await repo.resetResults([resultId])

await service.processResult(resultId)
}
}

process.exit(0)
})
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const log = getServiceLogger()

const processArguments = process.argv.slice(2)

const integrationId = processArguments[0]
const parameter = processArguments[0]

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
Expand All @@ -21,35 +21,40 @@ setImmediate(async () => {

const repo = new IntegrationRunRepository(store, log)

const integration = await repo.getIntegrationData(integrationId)

if (integration) {
if (integration.state == IntegrationState.IN_PROGRESS) {
log.warn(`Integration already running!`)
process.exit(1)
}

if (integration.state == IntegrationState.INACTIVE) {
log.warn(`Integration is not active!`)
process.exit(1)
const integrationIds = parameter.split(',')

for (const integrationId of integrationIds) {
const integration = await repo.getIntegrationData(integrationId)

if (integration) {
if (integration.state == IntegrationState.IN_PROGRESS) {
log.warn(`Integration already running!`)
continue
}

if (integration.state == IntegrationState.INACTIVE) {
log.warn(`Integration is not active!`)
continue
}

if (integration.state == IntegrationState.WAITING_APPROVAL) {
log.warn(`Integration is waiting for approval!`)
continue
}

log.info(`Triggering integration run for ${integrationId}!`)

await emitter.triggerIntegrationRun(
integration.tenantId,
integration.type,
integration.id,
true,
)
} else {
log.error({ integrationId }, 'Integration not found!')
continue
}

if (integration.state == IntegrationState.WAITING_APPROVAL) {
log.warn(`Integration is waiting for approval!`)
process.exit(1)
}

log.info(`Triggering integration run for ${integrationId}!`)

await emitter.triggerIntegrationRun(
integration.tenantId,
integration.type,
integration.id,
true,
)
process.exit(0)
} else {
log.error({ integrationId }, 'Integration not found!')
process.exit(1)
}

process.exit(0)
})
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ export class OrganizationSyncService extends LoggerBase {
},
}

const sort = [{ date_joinedAt: 'asc' }]
const include = ['date_joinedAt', 'uuid_organizationId']
const sort = [{ date_createdAt: 'asc' }]
const include = ['date_createdAt', 'uuid_organizationId']
const pageSize = 500
let lastJoinedAt: string
let lastCreatedAt: string

let results = (await this.openSearchService.search(
OpenSearchIndex.ORGANIZATIONS,
Expand All @@ -101,7 +101,7 @@ export class OrganizationSyncService extends LoggerBase {
sort,
undefined,
include,
)) as ISearchHit<{ date_joinedAt: string; uuid_organizationId: string }>[]
)) as ISearchHit<{ date_createdAt: string; uuid_organizationId: string }>[]

let processed = 0

Expand All @@ -126,17 +126,17 @@ export class OrganizationSyncService extends LoggerBase {
processed += results.length
this.log.warn({ tenantId }, `Processed ${processed} organizations while cleaning up tenant!`)

// use last joinedAt to get the next page
lastJoinedAt = results[results.length - 1]._source.date_joinedAt
// use last createdAt to get the next page
lastCreatedAt = results[results.length - 1]._source.date_createdAt
results = (await this.openSearchService.search(
OpenSearchIndex.ORGANIZATIONS,
query,
undefined,
pageSize,
sort,
lastJoinedAt,
lastCreatedAt,
include,
)) as ISearchHit<{ date_joinedAt: string; uuid_organizationId: string }>[]
)) as ISearchHit<{ date_createdAt: string; uuid_organizationId: string }>[]
}

this.log.warn(
Expand Down