Skip to content

Commit 1ea4963

Browse files
committed
working version
1 parent d47c658 commit 1ea4963

File tree

15 files changed

+162
-9
lines changed

15 files changed

+162
-9
lines changed

backend/src/database/migrations/U1699626027__add-retries-to-results.sql

Whitespace-only changes.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE integration.results
2+
ADD COLUMN retries INT,
3+
ADD COLUMN "delayedUntil" TIMESTAMP with time zone NULL;

backend/src/serverless/integrations/services/integrationTickProcessor.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { processPaginated, singleOrDefault } from '@crowd/common'
22
import { INTEGRATION_SERVICES } from '@crowd/integrations'
33
import { LoggerBase, getChildLogger } from '@crowd/logging'
4-
import { IntegrationRunWorkerEmitter, IntegrationStreamWorkerEmitter } from '@crowd/sqs'
4+
import { IntegrationRunWorkerEmitter, IntegrationStreamWorkerEmitter, DataSinkWorkerEmitter } from '@crowd/sqs'
55
import { IntegrationRunState, IntegrationType } from '@crowd/types'
66
import SequelizeRepository from '@/database/repositories/sequelizeRepository'
77
import MicroserviceRepository from '@/database/repositories/microserviceRepository'
@@ -14,6 +14,7 @@ import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS'
1414
import {
1515
getIntegrationRunWorkerEmitter,
1616
getIntegrationStreamWorkerEmitter,
17+
getDataSinkWorkerEmitter,
1718
} from '../../utils/serviceSQS'
1819
import { IntegrationServiceBase } from './integrationServiceBase'
1920

@@ -26,6 +27,8 @@ export class IntegrationTickProcessor extends LoggerBase {
2627

2728
private intStreamWorkerEmitter: IntegrationStreamWorkerEmitter
2829

30+
private dataSinkWorkerEmitter: DataSinkWorkerEmitter
31+
2932
constructor(
3033
options: IServiceOptions,
3134
private readonly integrationServices: IntegrationServiceBase[],
@@ -46,6 +49,7 @@ export class IntegrationTickProcessor extends LoggerBase {
4649
if (!this.emittersInitialized) {
4750
this.intRunWorkerEmitter = await getIntegrationRunWorkerEmitter()
4851
this.intStreamWorkerEmitter = await getIntegrationStreamWorkerEmitter()
52+
this.dataSinkWorkerEmitter = await getDataSinkWorkerEmitter()
4953

5054
this.emittersInitialized = true
5155
}
@@ -220,6 +224,7 @@ export class IntegrationTickProcessor extends LoggerBase {
220224
await this.initEmitters()
221225
await this.intRunWorkerEmitter.checkRuns()
222226
await this.intStreamWorkerEmitter.checkStreams()
227+
await this.dataSinkWorkerEmitter.checkResults()
223228

224229
// TODO check streams as well
225230
this.log.trace('Checking for delayed integration runs!')

backend/src/serverless/utils/serviceSQS.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
IntegrationStreamWorkerEmitter,
44
IntegrationSyncWorkerEmitter,
55
SearchSyncWorkerEmitter,
6+
DataSinkWorkerEmitter,
67
SqsClient,
78
getSqsClient,
89
} from '@crowd/sqs'
@@ -64,3 +65,12 @@ export const getIntegrationSyncWorkerEmitter = async (): Promise<IntegrationSync
6465
await integrationSyncWorkerEmitter.init()
6566
return integrationSyncWorkerEmitter
6667
}
68+
69+
let dataSinkWorkerEmitter: DataSinkWorkerEmitter
70+
export const getDataSinkWorkerEmitter = async (): Promise<DataSinkWorkerEmitter> => {
71+
if (dataSinkWorkerEmitter) return dataSinkWorkerEmitter
72+
73+
dataSinkWorkerEmitter = new DataSinkWorkerEmitter(getClient(), tracer, log)
74+
await dataSinkWorkerEmitter.init()
75+
return dataSinkWorkerEmitter
76+
}

services/apps/data_sink_worker/config/default.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,8 @@
55
"unleash": {},
66
"temporal": {
77
"automationsTaskQueue": "automations"
8+
},
9+
"worker": {
10+
"maxStreamRetries": 5
811
}
912
}

services/apps/data_sink_worker/src/conf/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ export interface ISlackAlertingConfig {
1111
url: string
1212
}
1313

14+
export interface IWorkerConfig {
15+
maxStreamRetries: number
16+
}
17+
18+
let workerSettings: IWorkerConfig
19+
export const WORKER_SETTINGS = (): IWorkerConfig => {
20+
if (workerSettings) return workerSettings
21+
22+
workerSettings = config.get<IWorkerConfig>('worker')
23+
return workerSettings
24+
}
25+
1426
let redisConfig: IRedisConfiguration
1527
export const REDIS_CONFIG = (): IRedisConfiguration => {
1628
if (redisConfig) return redisConfig

services/apps/data_sink_worker/src/jobs/processOldResults.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { DbConnection, DbStore } from '@crowd/database'
33
import { Unleash } from '@crowd/feature-flags'
44
import { Logger } from '@crowd/logging'
55
import { RedisClient } from '@crowd/redis'
6-
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/sqs'
6+
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, DataSinkWorkerEmitter } from '@crowd/sqs'
77
import { Client as TemporalClient } from '@crowd/temporal'
88
import DataSinkRepository from '../repo/dataSink.repo'
99
import DataSinkService from '../service/dataSink.service'
@@ -16,6 +16,7 @@ export const processOldResultsJob = async (
1616
redis: RedisClient,
1717
nodejsWorkerEmitter: NodejsWorkerEmitter,
1818
searchSyncWorkerEmitter: SearchSyncWorkerEmitter,
19+
dataSinkWorkerEmitter: DataSinkWorkerEmitter,
1920
unleash: Unleash | undefined,
2021
temporal: TemporalClient,
2122
log: Logger,
@@ -26,6 +27,7 @@ export const processOldResultsJob = async (
2627
store,
2728
nodejsWorkerEmitter,
2829
searchSyncWorkerEmitter,
30+
dataSinkWorkerEmitter,
2931
redis,
3032
unleash,
3133
temporal,

services/apps/data_sink_worker/src/main.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import { getDbConnection } from '@crowd/database'
22
import { getServiceTracer } from '@crowd/tracing'
33
import { getServiceLogger } from '@crowd/logging'
4-
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'
4+
import {
5+
NodejsWorkerEmitter,
6+
SearchSyncWorkerEmitter,
7+
DataSinkWorkerEmitter,
8+
getSqsClient,
9+
} from '@crowd/sqs'
510
import {
611
DB_CONFIG,
712
SENTIMENT_CONFIG,
@@ -48,11 +53,14 @@ setImmediate(async () => {
4853

4954
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log)
5055

56+
const dataWorkerEmitter = new DataSinkWorkerEmitter(sqsClient, tracer, log)
57+
5158
const queue = new WorkerQueueReceiver(
5259
sqsClient,
5360
dbConnection,
5461
nodejsWorkerEmitter,
5562
searchSyncWorkerEmitter,
63+
dataWorkerEmitter,
5664
redisClient,
5765
unleash,
5866
temporal,
@@ -64,6 +72,7 @@ setImmediate(async () => {
6472
try {
6573
await nodejsWorkerEmitter.init()
6674
await searchSyncWorkerEmitter.init()
75+
await dataWorkerEmitter.init()
6776

6877
let processing = false
6978
setInterval(async () => {
@@ -77,6 +86,7 @@ setImmediate(async () => {
7786
redisClient,
7887
nodejsWorkerEmitter,
7988
searchSyncWorkerEmitter,
89+
dataWorkerEmitter,
8090
unleash,
8191
temporal,
8292
log,

services/apps/data_sink_worker/src/queue/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
DATA_SINK_WORKER_QUEUE_SETTINGS,
66
NodejsWorkerEmitter,
77
SearchSyncWorkerEmitter,
8+
DataSinkWorkerEmitter,
89
SqsClient,
910
SqsQueueReceiver,
1011
} from '@crowd/sqs'
@@ -25,6 +26,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
2526
private readonly dbConn: DbConnection,
2627
private readonly nodejsWorkerEmitter: NodejsWorkerEmitter,
2728
private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter,
29+
private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter,
2830
private readonly redisClient: RedisClient,
2931
private readonly unleash: Unleash | undefined,
3032
private readonly temporal: TemporalClient,
@@ -44,6 +46,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
4446
new DbStore(this.log, this.dbConn, undefined, false),
4547
this.nodejsWorkerEmitter,
4648
this.searchSyncWorkerEmitter,
49+
this.dataSinkWorkerEmitter,
4750
this.redisClient,
4851
this.unleash,
4952
this.temporal,
@@ -65,6 +68,10 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
6568
)
6669
break
6770
}
71+
case DataSinkWorkerQueueMessageType.CHECK_RESULTS: {
72+
await service.checkResults()
73+
break
74+
}
6875

6976
default:
7077
throw new Error(`Unknown message type: ${message.type}`)

services/apps/data_sink_worker/src/repo/dataSink.data.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ export interface IResultData {
1717
plan: string
1818
isTrialPlan: boolean
1919
name: string
20+
21+
retries: number
22+
delayedUntil: string | null
2023
}
2124

2225
export interface IFailedResultData {

0 commit comments

Comments
 (0)