Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f3866c3
increase max concurrency in data-worker
garrrikkotua Nov 6, 2023
b5d4613
temp remove locks in data-sink worker
garrrikkotua Nov 6, 2023
94a89e8
disable transactions in data sink worker
garrrikkotua Nov 6, 2023
a14b868
prevent organization update if previous update happened less than 1 m…
garrrikkotua Nov 6, 2023
06d0578
increase max concurrent processing for data-sink from 3 to 4
garrrikkotua Nov 7, 2023
2917c85
increase max con processing to 7
garrrikkotua Nov 7, 2023
ef45a5f
decrease to 5
garrrikkotua Nov 7, 2023
5f17f37
back to 4
garrrikkotua Nov 7, 2023
9caff93
process old jobs async
garrrikkotua Nov 7, 2023
1020201
add enum for tenants plans
garrrikkotua Nov 7, 2023
cf361c9
adjust settings
garrrikkotua Nov 7, 2023
5f97865
change old results logic
garrrikkotua Nov 7, 2023
9010862
more concurrent version of processOldResults
garrrikkotua Nov 7, 2023
69fd30d
fix logic in processOldResults
garrrikkotua Nov 7, 2023
ff48d27
small fixes
garrrikkotua Nov 7, 2023
db687bd
Merge branch 'main' into improve/optimize-data-workers
garrrikkotua Nov 7, 2023
7a7e530
remove lock and load more results
garrrikkotua Nov 7, 2023
f96c87e
bring lock back
garrrikkotua Nov 7, 2023
c309104
increase results to load
garrrikkotua Nov 7, 2023
97b16ff
some optimizations
garrrikkotua Nov 7, 2023
c72dd38
bring messages back
garrrikkotua Nov 7, 2023
c856e15
disable transactions for the process results job
garrrikkotua Nov 7, 2023
6467638
use for update skip locked instead of redis lock
Nov 8, 2023
5f6dc52
decrease results to load and check more often
garrrikkotua Nov 8, 2023
2c0ba3c
fix and simplify results query
garrrikkotua Nov 8, 2023
efd6e63
increase checks frequency
garrrikkotua Nov 8, 2023
3f7f121
remove locks completely
garrrikkotua Nov 8, 2023
ab38390
trying to speed up
garrrikkotua Nov 8, 2023
ff64142
fix issue when github org name = @
garrrikkotua Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions services/apps/data_sink_worker/src/jobs/processOldResults.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import DataSinkRepository from '../repo/dataSink.repo'
import DataSinkService from '../service/dataSink.service'
import { timeout } from '@crowd/common'
import { DbConnection, DbStore } from '@crowd/database'
import { Unleash } from '@crowd/feature-flags'
import { Logger } from '@crowd/logging'
import { RedisClient, processWithLock } from '@crowd/redis'
import { RedisClient } from '@crowd/redis'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/sqs'
import { Client as TemporalClient } from '@crowd/temporal'
import DataSinkRepository from '../repo/dataSink.repo'
import DataSinkService from '../service/dataSink.service'

const MAX_CONCURRENT_PROMISES = 50
const MAX_RESULTS_TO_LOAD = 200

export const processOldResultsJob = async (
dbConn: DbConnection,
Expand All @@ -16,7 +20,7 @@ export const processOldResultsJob = async (
temporal: TemporalClient,
log: Logger,
): Promise<void> => {
const store = new DbStore(log, dbConn)
const store = new DbStore(log, dbConn, undefined, false)
const repo = new DataSinkRepository(store, log)
const service = new DataSinkService(
store,
Expand All @@ -28,39 +32,60 @@ export const processOldResultsJob = async (
log,
)

let current = 0
const loadNextBatch = async (): Promise<string[]> => {
return await processWithLock(redis, 'process-old-results', 5 * 60, 3 * 60, async () => {
const resultIds = await repo.getOldResultsToProcess(5)
await repo.touchUpdatedAt(resultIds)
return await repo.transactionally(async (txRepo) => {
const resultIds = await txRepo.getOldResultsToProcess(MAX_RESULTS_TO_LOAD)
await txRepo.touchUpdatedAt(resultIds)
return resultIds
})
}

// load 5 oldest results and try process them
let resultsToProcess = await loadNextBatch()

log.info(`Processing ${resultsToProcess.length} old results...`)

let successCount = 0
let errorCount = 0
let i = 0
let batchLength = resultsToProcess.length

while (resultsToProcess.length > 0) {
log.info(`Detected ${resultsToProcess.length} old results rows to process!`)
const resultId = resultsToProcess.pop()

for (const resultId of resultsToProcess) {
try {
const result = await service.processResult(resultId)
if (result) {
successCount++
} else {
errorCount++
}
} catch (err) {
log.error(err, 'Failed to process result!')
errorCount++
}
while (current == MAX_CONCURRENT_PROMISES) {
await timeout(1000)
}

log.info(`Processed ${successCount} old results successfully and ${errorCount} with errors.`)
const currentIndex = i
i += 1
log.info(`Processing result ${currentIndex + 1}/${batchLength}`)
current += 1
service
.processResult(resultId)
.then(() => {
current--
successCount++
log.info(`Processed result ${currentIndex + 1}/${batchLength}`)
})
.catch((err) => {
current--
errorCount++
log.error(err, `Error processing result ${currentIndex + 1}/${batchLength}!`)
})

if (resultsToProcess.length === 0) {
while (current > 0) {
await timeout(1000)
}

resultsToProcess = await loadNextBatch()
log.info(`Processed ${successCount} old results successfully and ${errorCount} with errors.`)
resultsToProcess = await loadNextBatch()
log.info(`Processing ${resultsToProcess.length} old results...`)
successCount = 0
errorCount = 0
i = 0
batchLength = resultsToProcess.length
}
}
}
6 changes: 4 additions & 2 deletions services/apps/data_sink_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal'
const tracer = getServiceTracer()
const log = getServiceLogger()

const MAX_CONCURRENT_PROCESSING = 3
const PROCESSING_INTERVAL_MINUTES = 5
const MAX_CONCURRENT_PROCESSING = 5
const PROCESSING_INTERVAL_MINUTES = 4

setImmediate(async () => {
log.info('Starting data sink worker...')
Expand Down Expand Up @@ -68,8 +68,10 @@ setImmediate(async () => {
let processing = false
setInterval(async () => {
try {
log.info('Checking for old results to process...')
if (!processing) {
processing = true
log.info('Processing old results...')
await processOldResultsJob(
dbConnection,
redisClient,
Expand Down
3 changes: 2 additions & 1 deletion services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
this.log.trace({ messageType: message.type }, 'Processing message!')

const service = new DataSinkService(
new DbStore(this.log, this.dbConn),
new DbStore(this.log, this.dbConn, undefined, false),
this.nodejsWorkerEmitter,
this.searchSyncWorkerEmitter,
this.redisClient,
Expand All @@ -52,6 +52,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {

switch (message.type) {
case DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT:
// this type of message will be processed by the processOldResultsJob
await service.processResult((message as ProcessIntegrationResultQueueMessage).resultId)
break
case DataSinkWorkerQueueMessageType.CREATE_AND_PROCESS_ACTIVITY_RESULT: {
Expand Down
21 changes: 10 additions & 11 deletions services/apps/data_sink_worker/src/repo/dataSink.repo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DbStore, RepositoryBase } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IIntegrationResult, IntegrationResultState } from '@crowd/types'
import { IIntegrationResult, IntegrationResultState, TenantPlans } from '@crowd/types'
import { IFailedResultData, IResultData } from './dataSink.data'

export default class DataSinkRepository extends RepositoryBase<DataSinkRepository> {
Expand Down Expand Up @@ -56,22 +56,21 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
}

public async getOldResultsToProcess(limit: number): Promise<string[]> {
this.ensureTransactional()

try {
const results = await this.db().any(
`
select id
from integration.results
where state in ($(pendingState), $(processingState))
and "updatedAt" < now() - interval '1 hour'
order by case when "webhookId" is not null then 0 else 1 end,
"webhookId" asc,
"updatedAt" desc
limit ${limit};
select r.id
from integration.results r
where r.state = $(pendingState)
and r."updatedAt" < now() - interval '1 hour'
limit ${limit}
for update skip locked;
`,
{
pendingState: IntegrationResultState.PENDING,
processingState: IntegrationResultState.PROCESSING,
maxRetries: 5,
plans: [TenantPlans.Growth, TenantPlans.Scale],
},
)

Expand Down
5 changes: 3 additions & 2 deletions services/apps/data_sink_worker/src/repo/organization.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
},
})
const updatedAt = new Date()
const oneMinuteAgo = new Date(updatedAt.getTime() - 60 * 1000)
const prepared = RepositoryBase.prepare(
{
...data,
Expand All @@ -472,9 +473,9 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
)

const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
const condition = this.format('where id = $(id) and "updatedAt" < $(updatedAt)', {
const condition = this.format('where id = $(id) and "updatedAt" <= $(oneMinuteAgo)', {
id,
updatedAt,
oneMinuteAgo,
})

await this.db().result(`${query} ${condition}`)
Expand Down
30 changes: 0 additions & 30 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ import IntegrationRepository from '../repo/integration.repo'
import GithubReposRepository from '../repo/githubRepos.repo'
import MemberAffiliationService from './memberAffiliation.service'
import { RedisClient } from '@crowd/redis'
import { acquireLock, releaseLock } from '@crowd/redis'
import { Unleash, isFeatureEnabled } from '@crowd/feature-flags'
import { Client as TemporalClient, WorkflowIdReusePolicy } from '@crowd/temporal'
import { TEMPORAL_CONFIG } from '../conf'

const MEMBER_LOCK_EXPIRE_AFTER = 10 * 60 // 10 minutes
const MEMBER_LOCK_TIMEOUT_AFTER = 5 * 60 // 5 minutes

export default class ActivityService extends LoggerBase {
private readonly conversationService: ConversationService

Expand Down Expand Up @@ -459,15 +455,6 @@ export default class ActivityService extends LoggerBase {
this.log.trace({ activityId: dbActivity.id }, 'Found existing activity. Updating it.')
// process member data

// acquiring lock for member inside activity exists
await acquireLock(
this.redisClient,
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
'check-member-inside-activity-exists',
MEMBER_LOCK_EXPIRE_AFTER,
MEMBER_LOCK_TIMEOUT_AFTER,
)

let dbMember = await txMemberRepo.findMember(tenantId, segmentId, platform, username)
if (dbMember) {
// we found a member for the identity from the activity
Expand Down Expand Up @@ -512,12 +499,6 @@ export default class ActivityService extends LoggerBase {
},
dbMember,
false,
async () =>
await releaseLock(
this.redisClient,
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
'check-member-inside-activity-exists',
),
)

if (!createActivity) {
Expand Down Expand Up @@ -556,12 +537,6 @@ export default class ActivityService extends LoggerBase {
},
dbMember,
false,
async () =>
await releaseLock(
this.redisClient,
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
'check-member-inside-activity-exists',
),
)

memberId = dbActivity.memberId
Expand Down Expand Up @@ -864,11 +839,6 @@ export default class ActivityService extends LoggerBase {
}
} finally {
// release locks matter what
await releaseLock(
this.redisClient,
`member:processing:${tenantId}:${platform}:${username}`,
'check-member-inside-activity-exists',
)
}
})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import IntegrationDataRepository from '../repo/integrationData.repo'
import IntegrationDataService from '../service/integrationDataService'
import { DbConnection, DbStore } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { RedisClient, processWithLock } from '@crowd/redis'
import { IntegrationStreamWorkerEmitter, DataSinkWorkerEmitter } from '@crowd/sqs'
import { RedisClient } from '@crowd/redis'
import { DataSinkWorkerEmitter, IntegrationStreamWorkerEmitter } from '@crowd/sqs'
import IntegrationDataRepository from '../repo/integrationData.repo'
import IntegrationDataService from '../service/integrationDataService'

export const processOldDataJob = async (
dbConn: DbConnection,
Expand All @@ -23,9 +23,9 @@ export const processOldDataJob = async (
)

const loadNextBatch = async (): Promise<string[]> => {
return await processWithLock(redis, 'process-old-data', 5 * 60, 3 * 60, async () => {
const dataIds = await repo.getOldDataToProcess(5)
await repo.touchUpdatedAt(dataIds)
return await repo.transactionally(async (txRepo) => {
const dataIds = await txRepo.getOldDataToProcess(5)
await txRepo.touchUpdatedAt(dataIds)
return dataIds
})
}
Expand Down
2 changes: 1 addition & 1 deletion services/apps/integration_data_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { processOldDataJob } from './jobs/processOldData'
const tracer = getServiceTracer()
const log = getServiceLogger()

const MAX_CONCURRENT_PROCESSING = 2
const MAX_CONCURRENT_PROCESSING = 3
const PROCESSING_INTERVAL_MINUTES = 5

setImmediate(async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export default class IntegrationDataRepository extends RepositoryBase<Integratio
}

public async getOldDataToProcess(limit: number): Promise<string[]> {
this.ensureTransactional()

try {
const results = await this.db().any(
`
Expand All @@ -77,7 +79,8 @@ export default class IntegrationDataRepository extends RepositoryBase<Integratio
order by case when "webhookId" is not null then 0 else 1 end,
"webhookId" asc,
"updatedAt" desc
limit ${limit};
limit ${limit}
for update skip locked;
`,
{
errorState: IntegrationStreamDataState.ERROR,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { IInsertableWebhookStream } from '../repo/integrationStream.data'
import IntegrationStreamRepository from '../repo/integrationStream.repo'
import IntegrationStreamService from '../service/integrationStreamService'
import { DbConnection, DbStore } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { RedisClient, processWithLock } from '@crowd/redis'
import { RedisClient } from '@crowd/redis'
import {
IntegrationDataWorkerEmitter,
IntegrationRunWorkerEmitter,
IntegrationStreamWorkerEmitter,
} from '@crowd/sqs'
import { IInsertableWebhookStream } from '../repo/integrationStream.data'
import IntegrationStreamRepository from '../repo/integrationStream.repo'
import IntegrationStreamService from '../service/integrationStreamService'

export const processOldStreamsJob = async (
dbConn: DbConnection,
Expand All @@ -30,8 +30,8 @@ export const processOldStreamsJob = async (
const repo = new IntegrationStreamRepository(store, log)

const prepareWebhooks = async (): Promise<void> => {
await processWithLock(redis, 'prepare-webhooks', 5 * 60, 3 * 60, async () => {
const webhooks = await repo.getOldWebhooksToProcess(5)
await repo.transactionally(async (txRepo) => {
const webhooks = await txRepo.getOldWebhooksToProcess(5)
const prepared: IInsertableWebhookStream[] = webhooks.map((w) => {
return {
identifier: w.id,
Expand All @@ -43,15 +43,15 @@ export const processOldStreamsJob = async (
})

if (prepared.length > 0) {
await repo.publishWebhookStreams(prepared)
await txRepo.publishWebhookStreams(prepared)
}
})
}

const loadNextBatch = async (): Promise<string[]> => {
return await processWithLock(redis, 'process-old-streams', 5 * 60, 3 * 60, async () => {
const streams = await repo.getOldStreamsToProcess(5)
await repo.touchUpdatedAt(streams)
return await repo.transactionally(async (txRepo) => {
const streams = await txRepo.getOldStreamsToProcess(5)
await txRepo.touchUpdatedAt(streams)
return streams
})
}
Expand Down
Loading