Skip to content

Commit 0e57f23

Browse files
garrrikkotuaUroš Marolt
authored andcommitted
Optimize data workers (#1820)
Co-authored-by: Uroš Marolt <[email protected]>
1 parent 969de63 commit 0e57f23

File tree

14 files changed

+106
-95
lines changed

14 files changed

+106
-95
lines changed
Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
import DataSinkRepository from '../repo/dataSink.repo'
2-
import DataSinkService from '../service/dataSink.service'
1+
import { timeout } from '@crowd/common'
32
import { DbConnection, DbStore } from '@crowd/database'
43
import { Unleash } from '@crowd/feature-flags'
54
import { Logger } from '@crowd/logging'
6-
import { RedisClient, processWithLock } from '@crowd/redis'
5+
import { RedisClient } from '@crowd/redis'
76
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/sqs'
87
import { Client as TemporalClient } from '@crowd/temporal'
8+
import DataSinkRepository from '../repo/dataSink.repo'
9+
import DataSinkService from '../service/dataSink.service'
10+
11+
const MAX_CONCURRENT_PROMISES = 50
12+
const MAX_RESULTS_TO_LOAD = 200
913

1014
export const processOldResultsJob = async (
1115
dbConn: DbConnection,
@@ -16,7 +20,7 @@ export const processOldResultsJob = async (
1620
temporal: TemporalClient,
1721
log: Logger,
1822
): Promise<void> => {
19-
const store = new DbStore(log, dbConn)
23+
const store = new DbStore(log, dbConn, undefined, false)
2024
const repo = new DataSinkRepository(store, log)
2125
const service = new DataSinkService(
2226
store,
@@ -28,39 +32,60 @@ export const processOldResultsJob = async (
2832
log,
2933
)
3034

35+
let current = 0
3136
const loadNextBatch = async (): Promise<string[]> => {
32-
return await processWithLock(redis, 'process-old-results', 5 * 60, 3 * 60, async () => {
33-
const resultIds = await repo.getOldResultsToProcess(5)
34-
await repo.touchUpdatedAt(resultIds)
37+
return await repo.transactionally(async (txRepo) => {
38+
const resultIds = await txRepo.getOldResultsToProcess(MAX_RESULTS_TO_LOAD)
39+
await txRepo.touchUpdatedAt(resultIds)
3540
return resultIds
3641
})
3742
}
3843

39-
// load 5 oldest results and try process them
4044
let resultsToProcess = await loadNextBatch()
4145

46+
log.info(`Processing ${resultsToProcess.length} old results...`)
47+
4248
let successCount = 0
4349
let errorCount = 0
50+
let i = 0
51+
let batchLength = resultsToProcess.length
4452

4553
while (resultsToProcess.length > 0) {
46-
log.info(`Detected ${resultsToProcess.length} old results rows to process!`)
54+
const resultId = resultsToProcess.pop()
4755

48-
for (const resultId of resultsToProcess) {
49-
try {
50-
const result = await service.processResult(resultId)
51-
if (result) {
52-
successCount++
53-
} else {
54-
errorCount++
55-
}
56-
} catch (err) {
57-
log.error(err, 'Failed to process result!')
58-
errorCount++
59-
}
56+
while (current == MAX_CONCURRENT_PROMISES) {
57+
await timeout(1000)
6058
}
6159

62-
log.info(`Processed ${successCount} old results successfully and ${errorCount} with errors.`)
60+
const currentIndex = i
61+
i += 1
62+
log.info(`Processing result ${currentIndex + 1}/${batchLength}`)
63+
current += 1
64+
service
65+
.processResult(resultId)
66+
.then(() => {
67+
current--
68+
successCount++
69+
log.info(`Processed result ${currentIndex + 1}/${batchLength}`)
70+
})
71+
.catch((err) => {
72+
current--
73+
errorCount++
74+
log.error(err, `Error processing result ${currentIndex + 1}/${batchLength}!`)
75+
})
76+
77+
if (resultsToProcess.length === 0) {
78+
while (current > 0) {
79+
await timeout(1000)
80+
}
6381

64-
resultsToProcess = await loadNextBatch()
82+
log.info(`Processed ${successCount} old results successfully and ${errorCount} with errors.`)
83+
resultsToProcess = await loadNextBatch()
84+
log.info(`Processing ${resultsToProcess.length} old results...`)
85+
successCount = 0
86+
errorCount = 0
87+
i = 0
88+
batchLength = resultsToProcess.length
89+
}
6590
}
6691
}

services/apps/data_sink_worker/src/main.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal'
2020
const tracer = getServiceTracer()
2121
const log = getServiceLogger()
2222

23-
const MAX_CONCURRENT_PROCESSING = 3
24-
const PROCESSING_INTERVAL_MINUTES = 5
23+
const MAX_CONCURRENT_PROCESSING = 5
24+
const PROCESSING_INTERVAL_MINUTES = 4
2525

2626
setImmediate(async () => {
2727
log.info('Starting data sink worker...')
@@ -68,8 +68,10 @@ setImmediate(async () => {
6868
let processing = false
6969
setInterval(async () => {
7070
try {
71+
log.info('Checking for old results to process...')
7172
if (!processing) {
7273
processing = true
74+
log.info('Processing old results...')
7375
await processOldResultsJob(
7476
dbConnection,
7577
redisClient,

services/apps/data_sink_worker/src/queue/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
5252

5353
switch (message.type) {
5454
case DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT:
55+
// this type of message will be processed by the processOldResultsJob
5556
await service.processResult((message as ProcessIntegrationResultQueueMessage).resultId)
5657
break
5758
case DataSinkWorkerQueueMessageType.CREATE_AND_PROCESS_ACTIVITY_RESULT: {

services/apps/data_sink_worker/src/repo/dataSink.repo.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { DbStore, RepositoryBase } from '@crowd/database'
22
import { Logger } from '@crowd/logging'
3-
import { IIntegrationResult, IntegrationResultState } from '@crowd/types'
3+
import { IIntegrationResult, IntegrationResultState, TenantPlans } from '@crowd/types'
44
import { IFailedResultData, IResultData } from './dataSink.data'
55

66
export default class DataSinkRepository extends RepositoryBase<DataSinkRepository> {
@@ -56,22 +56,21 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
5656
}
5757

5858
public async getOldResultsToProcess(limit: number): Promise<string[]> {
59+
this.ensureTransactional()
60+
5961
try {
6062
const results = await this.db().any(
6163
`
62-
select id
63-
from integration.results
64-
where state in ($(pendingState), $(processingState))
65-
and "updatedAt" < now() - interval '1 hour'
66-
order by case when "webhookId" is not null then 0 else 1 end,
67-
"webhookId" asc,
68-
"updatedAt" desc
69-
limit ${limit};
64+
select r.id
65+
from integration.results r
66+
where r.state = $(pendingState)
67+
and r."updatedAt" < now() - interval '1 hour'
68+
limit ${limit}
69+
for update skip locked;
7070
`,
7171
{
7272
pendingState: IntegrationResultState.PENDING,
73-
processingState: IntegrationResultState.PROCESSING,
74-
maxRetries: 5,
73+
plans: [TenantPlans.Growth, TenantPlans.Scale],
7574
},
7675
)
7776

services/apps/data_sink_worker/src/repo/organization.repo.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
459459
},
460460
})
461461
const updatedAt = new Date()
462+
const oneMinuteAgo = new Date(updatedAt.getTime() - 60 * 1000)
462463
const prepared = RepositoryBase.prepare(
463464
{
464465
...data,
@@ -472,9 +473,9 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
472473
)
473474

474475
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
475-
const condition = this.format('where id = $(id) and "updatedAt" < $(updatedAt)', {
476+
const condition = this.format('where id = $(id) and "updatedAt" <= $(oneMinuteAgo)', {
476477
id,
477-
updatedAt,
478+
oneMinuteAgo,
478479
})
479480

480481
await this.db().result(`${query} ${condition}`)

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,10 @@ import IntegrationRepository from '../repo/integration.repo'
1717
import GithubReposRepository from '../repo/githubRepos.repo'
1818
import MemberAffiliationService from './memberAffiliation.service'
1919
import { RedisClient } from '@crowd/redis'
20-
import { acquireLock, releaseLock } from '@crowd/redis'
2120
import { Unleash, isFeatureEnabled } from '@crowd/feature-flags'
2221
import { Client as TemporalClient, WorkflowIdReusePolicy } from '@crowd/temporal'
2322
import { TEMPORAL_CONFIG } from '../conf'
2423

25-
const MEMBER_LOCK_EXPIRE_AFTER = 10 * 60 // 10 minutes
26-
const MEMBER_LOCK_TIMEOUT_AFTER = 5 * 60 // 5 minutes
27-
28-
const acquireLock = async () => {}
29-
const releaseLock = async () => {}
30-
3124
export default class ActivityService extends LoggerBase {
3225
private readonly conversationService: ConversationService
3326

@@ -465,15 +458,6 @@ export default class ActivityService extends LoggerBase {
465458
this.log.trace({ activityId: dbActivity.id }, 'Found existing activity. Updating it.')
466459
// process member data
467460

468-
// acquiring lock for member inside activity exists
469-
await acquireLock(
470-
this.redisClient,
471-
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
472-
'check-member-inside-activity-exists',
473-
MEMBER_LOCK_EXPIRE_AFTER,
474-
MEMBER_LOCK_TIMEOUT_AFTER,
475-
)
476-
477461
let dbMember = await txMemberRepo.findMember(tenantId, segmentId, platform, username)
478462
if (dbMember) {
479463
// we found a member for the identity from the activity
@@ -518,12 +502,6 @@ export default class ActivityService extends LoggerBase {
518502
},
519503
dbMember,
520504
false,
521-
async () =>
522-
await releaseLock(
523-
this.redisClient,
524-
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
525-
'check-member-inside-activity-exists',
526-
),
527505
)
528506

529507
if (!createActivity) {
@@ -562,12 +540,6 @@ export default class ActivityService extends LoggerBase {
562540
},
563541
dbMember,
564542
false,
565-
async () =>
566-
await releaseLock(
567-
this.redisClient,
568-
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
569-
'check-member-inside-activity-exists',
570-
),
571543
)
572544

573545
memberId = dbActivity.memberId
@@ -870,11 +842,6 @@ export default class ActivityService extends LoggerBase {
870842
}
871843
} finally {
872844
// release locks matter what
873-
await releaseLock(
874-
this.redisClient,
875-
`member:processing:${tenantId}:${platform}:${username}`,
876-
'check-member-inside-activity-exists',
877-
)
878845
}
879846
})
880847

services/apps/integration_data_worker/src/jobs/processOldData.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import IntegrationDataRepository from '../repo/integrationData.repo'
2-
import IntegrationDataService from '../service/integrationDataService'
31
import { DbConnection, DbStore } from '@crowd/database'
42
import { Logger } from '@crowd/logging'
5-
import { RedisClient, processWithLock } from '@crowd/redis'
6-
import { IntegrationStreamWorkerEmitter, DataSinkWorkerEmitter } from '@crowd/sqs'
3+
import { RedisClient } from '@crowd/redis'
4+
import { DataSinkWorkerEmitter, IntegrationStreamWorkerEmitter } from '@crowd/sqs'
5+
import IntegrationDataRepository from '../repo/integrationData.repo'
6+
import IntegrationDataService from '../service/integrationDataService'
77

88
export const processOldDataJob = async (
99
dbConn: DbConnection,
@@ -23,9 +23,9 @@ export const processOldDataJob = async (
2323
)
2424

2525
const loadNextBatch = async (): Promise<string[]> => {
26-
return await processWithLock(redis, 'process-old-data', 5 * 60, 3 * 60, async () => {
27-
const dataIds = await repo.getOldDataToProcess(5)
28-
await repo.touchUpdatedAt(dataIds)
26+
return await repo.transactionally(async (txRepo) => {
27+
const dataIds = await txRepo.getOldDataToProcess(5)
28+
await txRepo.touchUpdatedAt(dataIds)
2929
return dataIds
3030
})
3131
}

services/apps/integration_data_worker/src/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { processOldDataJob } from './jobs/processOldData'
1010
const tracer = getServiceTracer()
1111
const log = getServiceLogger()
1212

13-
const MAX_CONCURRENT_PROCESSING = 2
13+
const MAX_CONCURRENT_PROCESSING = 3
1414
const PROCESSING_INTERVAL_MINUTES = 5
1515

1616
setImmediate(async () => {

services/apps/integration_data_worker/src/repo/integrationData.repo.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ export default class IntegrationDataRepository extends RepositoryBase<Integratio
6161
}
6262

6363
public async getOldDataToProcess(limit: number): Promise<string[]> {
64+
this.ensureTransactional()
65+
6466
try {
6567
const results = await this.db().any(
6668
`
@@ -77,7 +79,8 @@ export default class IntegrationDataRepository extends RepositoryBase<Integratio
7779
order by case when "webhookId" is not null then 0 else 1 end,
7880
"webhookId" asc,
7981
"updatedAt" desc
80-
limit ${limit};
82+
limit ${limit}
83+
for update skip locked;
8184
`,
8285
{
8386
errorState: IntegrationStreamDataState.ERROR,

services/apps/integration_stream_worker/src/jobs/processOldStreams.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import { IInsertableWebhookStream } from '../repo/integrationStream.data'
2-
import IntegrationStreamRepository from '../repo/integrationStream.repo'
3-
import IntegrationStreamService from '../service/integrationStreamService'
41
import { DbConnection, DbStore } from '@crowd/database'
52
import { Logger } from '@crowd/logging'
6-
import { RedisClient, processWithLock } from '@crowd/redis'
3+
import { RedisClient } from '@crowd/redis'
74
import {
85
IntegrationDataWorkerEmitter,
96
IntegrationRunWorkerEmitter,
107
IntegrationStreamWorkerEmitter,
118
} from '@crowd/sqs'
9+
import { IInsertableWebhookStream } from '../repo/integrationStream.data'
10+
import IntegrationStreamRepository from '../repo/integrationStream.repo'
11+
import IntegrationStreamService from '../service/integrationStreamService'
1212

1313
export const processOldStreamsJob = async (
1414
dbConn: DbConnection,
@@ -30,8 +30,8 @@ export const processOldStreamsJob = async (
3030
const repo = new IntegrationStreamRepository(store, log)
3131

3232
const prepareWebhooks = async (): Promise<void> => {
33-
await processWithLock(redis, 'prepare-webhooks', 5 * 60, 3 * 60, async () => {
34-
const webhooks = await repo.getOldWebhooksToProcess(5)
33+
await repo.transactionally(async (txRepo) => {
34+
const webhooks = await txRepo.getOldWebhooksToProcess(5)
3535
const prepared: IInsertableWebhookStream[] = webhooks.map((w) => {
3636
return {
3737
identifier: w.id,
@@ -43,15 +43,15 @@ export const processOldStreamsJob = async (
4343
})
4444

4545
if (prepared.length > 0) {
46-
await repo.publishWebhookStreams(prepared)
46+
await txRepo.publishWebhookStreams(prepared)
4747
}
4848
})
4949
}
5050

5151
const loadNextBatch = async (): Promise<string[]> => {
52-
return await processWithLock(redis, 'process-old-streams', 5 * 60, 3 * 60, async () => {
53-
const streams = await repo.getOldStreamsToProcess(5)
54-
await repo.touchUpdatedAt(streams)
52+
return await repo.transactionally(async (txRepo) => {
53+
const streams = await txRepo.getOldStreamsToProcess(5)
54+
await txRepo.touchUpdatedAt(streams)
5555
return streams
5656
})
5757
}

0 commit comments

Comments
 (0)