Skip to content

Commit ce64e09

Browse files
authored
Add automatic retries to data-sink-worker (#1842)
1 parent fa714f3 commit ce64e09

File tree

16 files changed

+183
-11
lines changed

16 files changed

+183
-11
lines changed

backend/src/database/migrations/U1699626027__add-retries-to-results.sql

Whitespace-only changes.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE integration.results
2+
ADD COLUMN retries INT,
3+
ADD COLUMN "delayedUntil" TIMESTAMP with time zone NULL;

backend/src/serverless/integrations/services/integrationTickProcessor.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { processPaginated, singleOrDefault } from '@crowd/common'
22
import { INTEGRATION_SERVICES } from '@crowd/integrations'
33
import { LoggerBase, getChildLogger } from '@crowd/logging'
4-
import { IntegrationRunWorkerEmitter, IntegrationStreamWorkerEmitter } from '@crowd/sqs'
4+
import {
5+
IntegrationRunWorkerEmitter,
6+
IntegrationStreamWorkerEmitter,
7+
DataSinkWorkerEmitter,
8+
} from '@crowd/sqs'
59
import { IntegrationRunState, IntegrationType } from '@crowd/types'
610
import SequelizeRepository from '@/database/repositories/sequelizeRepository'
711
import MicroserviceRepository from '@/database/repositories/microserviceRepository'
@@ -14,6 +18,7 @@ import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS'
1418
import {
1519
getIntegrationRunWorkerEmitter,
1620
getIntegrationStreamWorkerEmitter,
21+
getDataSinkWorkerEmitter,
1722
} from '../../utils/serviceSQS'
1823
import { IntegrationServiceBase } from './integrationServiceBase'
1924

@@ -26,6 +31,8 @@ export class IntegrationTickProcessor extends LoggerBase {
2631

2732
private intStreamWorkerEmitter: IntegrationStreamWorkerEmitter
2833

34+
private dataSinkWorkerEmitter: DataSinkWorkerEmitter
35+
2936
constructor(
3037
options: IServiceOptions,
3138
private readonly integrationServices: IntegrationServiceBase[],
@@ -46,6 +53,7 @@ export class IntegrationTickProcessor extends LoggerBase {
4653
if (!this.emittersInitialized) {
4754
this.intRunWorkerEmitter = await getIntegrationRunWorkerEmitter()
4855
this.intStreamWorkerEmitter = await getIntegrationStreamWorkerEmitter()
56+
this.dataSinkWorkerEmitter = await getDataSinkWorkerEmitter()
4957

5058
this.emittersInitialized = true
5159
}
@@ -220,6 +228,7 @@ export class IntegrationTickProcessor extends LoggerBase {
220228
await this.initEmitters()
221229
await this.intRunWorkerEmitter.checkRuns()
222230
await this.intStreamWorkerEmitter.checkStreams()
231+
await this.dataSinkWorkerEmitter.checkResults()
223232

224233
// TODO check streams as well
225234
this.log.trace('Checking for delayed integration runs!')

backend/src/serverless/utils/serviceSQS.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
IntegrationStreamWorkerEmitter,
44
IntegrationSyncWorkerEmitter,
55
SearchSyncWorkerEmitter,
6+
DataSinkWorkerEmitter,
67
SqsClient,
78
getSqsClient,
89
} from '@crowd/sqs'
@@ -64,3 +65,12 @@ export const getIntegrationSyncWorkerEmitter = async (): Promise<IntegrationSync
6465
await integrationSyncWorkerEmitter.init()
6566
return integrationSyncWorkerEmitter
6667
}
68+
69+
let dataSinkWorkerEmitter: DataSinkWorkerEmitter
70+
export const getDataSinkWorkerEmitter = async (): Promise<DataSinkWorkerEmitter> => {
71+
if (dataSinkWorkerEmitter) return dataSinkWorkerEmitter
72+
73+
dataSinkWorkerEmitter = new DataSinkWorkerEmitter(getClient(), tracer, log)
74+
await dataSinkWorkerEmitter.init()
75+
return dataSinkWorkerEmitter
76+
}

services/apps/data_sink_worker/config/default.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,8 @@
55
"unleash": {},
66
"temporal": {
77
"automationsTaskQueue": "automations"
8+
},
9+
"worker": {
10+
"maxStreamRetries": 5
811
}
912
}

services/apps/data_sink_worker/src/bin/process-results.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ import { DbStore, getDbConnection } from '@crowd/database'
1212
import { getServiceTracer } from '@crowd/tracing'
1313
import { getServiceLogger } from '@crowd/logging'
1414
import { getRedisClient } from '@crowd/redis'
15-
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'
15+
import {
16+
NodejsWorkerEmitter,
17+
SearchSyncWorkerEmitter,
18+
DataSinkWorkerEmitter,
19+
getSqsClient,
20+
} from '@crowd/sqs'
1621
import { initializeSentimentAnalysis } from '@crowd/sentiment'
1722
import { getUnleashClient } from '@crowd/feature-flags'
1823
import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal'
@@ -49,13 +54,17 @@ setImmediate(async () => {
4954
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log)
5055
await searchSyncWorkerEmitter.init()
5156

57+
const dataSinkWorkerEmitter = new DataSinkWorkerEmitter(sqsClient, tracer, log)
58+
await dataSinkWorkerEmitter.init()
59+
5260
const dbConnection = await getDbConnection(DB_CONFIG())
5361
const store = new DbStore(log, dbConnection)
5462

5563
const service = new DataSinkService(
5664
store,
5765
nodejsWorkerEmitter,
5866
searchSyncWorkerEmitter,
67+
dataSinkWorkerEmitter,
5968
redisClient,
6069
unleash,
6170
temporal,

services/apps/data_sink_worker/src/conf/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ export interface ISlackAlertingConfig {
1111
url: string
1212
}
1313

14+
export interface IWorkerConfig {
15+
maxStreamRetries: number
16+
}
17+
18+
let workerSettings: IWorkerConfig
19+
export const WORKER_SETTINGS = (): IWorkerConfig => {
20+
if (workerSettings) return workerSettings
21+
22+
workerSettings = config.get<IWorkerConfig>('worker')
23+
return workerSettings
24+
}
25+
1426
let redisConfig: IRedisConfiguration
1527
export const REDIS_CONFIG = (): IRedisConfiguration => {
1628
if (redisConfig) return redisConfig

services/apps/data_sink_worker/src/jobs/processOldResults.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { DbConnection, DbStore } from '@crowd/database'
33
import { Unleash } from '@crowd/feature-flags'
44
import { Logger } from '@crowd/logging'
55
import { RedisClient } from '@crowd/redis'
6-
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/sqs'
6+
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, DataSinkWorkerEmitter } from '@crowd/sqs'
77
import { Client as TemporalClient } from '@crowd/temporal'
88
import DataSinkRepository from '../repo/dataSink.repo'
99
import DataSinkService from '../service/dataSink.service'
@@ -16,6 +16,7 @@ export const processOldResultsJob = async (
1616
redis: RedisClient,
1717
nodejsWorkerEmitter: NodejsWorkerEmitter,
1818
searchSyncWorkerEmitter: SearchSyncWorkerEmitter,
19+
dataSinkWorkerEmitter: DataSinkWorkerEmitter,
1920
unleash: Unleash | undefined,
2021
temporal: TemporalClient,
2122
log: Logger,
@@ -26,6 +27,7 @@ export const processOldResultsJob = async (
2627
store,
2728
nodejsWorkerEmitter,
2829
searchSyncWorkerEmitter,
30+
dataSinkWorkerEmitter,
2931
redis,
3032
unleash,
3133
temporal,

services/apps/data_sink_worker/src/main.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import { getDbConnection } from '@crowd/database'
22
import { getServiceTracer } from '@crowd/tracing'
33
import { getServiceLogger } from '@crowd/logging'
4-
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'
4+
import {
5+
NodejsWorkerEmitter,
6+
SearchSyncWorkerEmitter,
7+
DataSinkWorkerEmitter,
8+
getSqsClient,
9+
} from '@crowd/sqs'
510
import {
611
DB_CONFIG,
712
SENTIMENT_CONFIG,
@@ -48,11 +53,14 @@ setImmediate(async () => {
4853

4954
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log)
5055

56+
const dataWorkerEmitter = new DataSinkWorkerEmitter(sqsClient, tracer, log)
57+
5158
const queue = new WorkerQueueReceiver(
5259
sqsClient,
5360
dbConnection,
5461
nodejsWorkerEmitter,
5562
searchSyncWorkerEmitter,
63+
dataWorkerEmitter,
5664
redisClient,
5765
unleash,
5866
temporal,
@@ -64,6 +72,7 @@ setImmediate(async () => {
6472
try {
6573
await nodejsWorkerEmitter.init()
6674
await searchSyncWorkerEmitter.init()
75+
await dataWorkerEmitter.init()
6776

6877
let processing = false
6978
setInterval(async () => {
@@ -77,6 +86,7 @@ setImmediate(async () => {
7786
redisClient,
7887
nodejsWorkerEmitter,
7988
searchSyncWorkerEmitter,
89+
dataWorkerEmitter,
8090
unleash,
8191
temporal,
8292
log,

services/apps/data_sink_worker/src/queue/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
DATA_SINK_WORKER_QUEUE_SETTINGS,
66
NodejsWorkerEmitter,
77
SearchSyncWorkerEmitter,
8+
DataSinkWorkerEmitter,
89
SqsClient,
910
SqsQueueReceiver,
1011
} from '@crowd/sqs'
@@ -25,6 +26,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
2526
private readonly dbConn: DbConnection,
2627
private readonly nodejsWorkerEmitter: NodejsWorkerEmitter,
2728
private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter,
29+
private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter,
2830
private readonly redisClient: RedisClient,
2931
private readonly unleash: Unleash | undefined,
3032
private readonly temporal: TemporalClient,
@@ -44,6 +46,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
4446
new DbStore(this.log, this.dbConn, undefined, false),
4547
this.nodejsWorkerEmitter,
4648
this.searchSyncWorkerEmitter,
49+
this.dataSinkWorkerEmitter,
4750
this.redisClient,
4851
this.unleash,
4952
this.temporal,
@@ -65,6 +68,10 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
6568
)
6669
break
6770
}
71+
case DataSinkWorkerQueueMessageType.CHECK_RESULTS: {
72+
await service.checkResults()
73+
break
74+
}
6875

6976
default:
7077
throw new Error(`Unknown message type: ${message.type}`)

0 commit comments

Comments
 (0)