Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE integration.results
ADD COLUMN retries INT,
ADD COLUMN "delayedUntil" TIMESTAMP with time zone NULL;
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { processPaginated, singleOrDefault } from '@crowd/common'
import { INTEGRATION_SERVICES } from '@crowd/integrations'
import { LoggerBase, getChildLogger } from '@crowd/logging'
import { IntegrationRunWorkerEmitter, IntegrationStreamWorkerEmitter } from '@crowd/sqs'
import {
IntegrationRunWorkerEmitter,
IntegrationStreamWorkerEmitter,
DataSinkWorkerEmitter,
} from '@crowd/sqs'
import { IntegrationRunState, IntegrationType } from '@crowd/types'
import SequelizeRepository from '@/database/repositories/sequelizeRepository'
import MicroserviceRepository from '@/database/repositories/microserviceRepository'
Expand All @@ -14,6 +18,7 @@ import { sendNodeWorkerMessage } from '../../utils/nodeWorkerSQS'
import {
getIntegrationRunWorkerEmitter,
getIntegrationStreamWorkerEmitter,
getDataSinkWorkerEmitter,
} from '../../utils/serviceSQS'
import { IntegrationServiceBase } from './integrationServiceBase'

Expand All @@ -26,6 +31,8 @@ export class IntegrationTickProcessor extends LoggerBase {

private intStreamWorkerEmitter: IntegrationStreamWorkerEmitter

private dataSinkWorkerEmitter: DataSinkWorkerEmitter

constructor(
options: IServiceOptions,
private readonly integrationServices: IntegrationServiceBase[],
Expand All @@ -46,6 +53,7 @@ export class IntegrationTickProcessor extends LoggerBase {
if (!this.emittersInitialized) {
this.intRunWorkerEmitter = await getIntegrationRunWorkerEmitter()
this.intStreamWorkerEmitter = await getIntegrationStreamWorkerEmitter()
this.dataSinkWorkerEmitter = await getDataSinkWorkerEmitter()

this.emittersInitialized = true
}
Expand Down Expand Up @@ -220,6 +228,7 @@ export class IntegrationTickProcessor extends LoggerBase {
await this.initEmitters()
await this.intRunWorkerEmitter.checkRuns()
await this.intStreamWorkerEmitter.checkStreams()
await this.dataSinkWorkerEmitter.checkResults()

// TODO check streams as well
this.log.trace('Checking for delayed integration runs!')
Expand Down
10 changes: 10 additions & 0 deletions backend/src/serverless/utils/serviceSQS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
IntegrationStreamWorkerEmitter,
IntegrationSyncWorkerEmitter,
SearchSyncWorkerEmitter,
DataSinkWorkerEmitter,
SqsClient,
getSqsClient,
} from '@crowd/sqs'
Expand Down Expand Up @@ -64,3 +65,12 @@ export const getIntegrationSyncWorkerEmitter = async (): Promise<IntegrationSync
await integrationSyncWorkerEmitter.init()
return integrationSyncWorkerEmitter
}

let dataSinkWorkerEmitter: DataSinkWorkerEmitter
export const getDataSinkWorkerEmitter = async (): Promise<DataSinkWorkerEmitter> => {
if (dataSinkWorkerEmitter) return dataSinkWorkerEmitter

dataSinkWorkerEmitter = new DataSinkWorkerEmitter(getClient(), tracer, log)
await dataSinkWorkerEmitter.init()
return dataSinkWorkerEmitter
}
3 changes: 3 additions & 0 deletions services/apps/data_sink_worker/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@
"unleash": {},
"temporal": {
"automationsTaskQueue": "automations"
},
"worker": {
"maxStreamRetries": 5
}
}
11 changes: 10 additions & 1 deletion services/apps/data_sink_worker/src/bin/process-results.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceTracer } from '@crowd/tracing'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'
import {
NodejsWorkerEmitter,
SearchSyncWorkerEmitter,
DataSinkWorkerEmitter,
getSqsClient,
} from '@crowd/sqs'
import { initializeSentimentAnalysis } from '@crowd/sentiment'
import { getUnleashClient } from '@crowd/feature-flags'
import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal'
Expand Down Expand Up @@ -49,13 +54,17 @@ setImmediate(async () => {
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log)
await searchSyncWorkerEmitter.init()

const dataSinkWorkerEmitter = new DataSinkWorkerEmitter(sqsClient, tracer, log)
await dataSinkWorkerEmitter.init()

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)

const service = new DataSinkService(
store,
nodejsWorkerEmitter,
searchSyncWorkerEmitter,
dataSinkWorkerEmitter,
redisClient,
unleash,
temporal,
Expand Down
12 changes: 12 additions & 0 deletions services/apps/data_sink_worker/src/conf/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ export interface ISlackAlertingConfig {
url: string
}

export interface IWorkerConfig {
maxStreamRetries: number
}

let workerSettings: IWorkerConfig
export const WORKER_SETTINGS = (): IWorkerConfig => {
if (workerSettings) return workerSettings

workerSettings = config.get<IWorkerConfig>('worker')
return workerSettings
}

let redisConfig: IRedisConfiguration
export const REDIS_CONFIG = (): IRedisConfiguration => {
if (redisConfig) return redisConfig
Expand Down
4 changes: 3 additions & 1 deletion services/apps/data_sink_worker/src/jobs/processOldResults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { DbConnection, DbStore } from '@crowd/database'
import { Unleash } from '@crowd/feature-flags'
import { Logger } from '@crowd/logging'
import { RedisClient } from '@crowd/redis'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/sqs'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, DataSinkWorkerEmitter } from '@crowd/sqs'
import { Client as TemporalClient } from '@crowd/temporal'
import DataSinkRepository from '../repo/dataSink.repo'
import DataSinkService from '../service/dataSink.service'
Expand All @@ -16,6 +16,7 @@ export const processOldResultsJob = async (
redis: RedisClient,
nodejsWorkerEmitter: NodejsWorkerEmitter,
searchSyncWorkerEmitter: SearchSyncWorkerEmitter,
dataSinkWorkerEmitter: DataSinkWorkerEmitter,
unleash: Unleash | undefined,
temporal: TemporalClient,
log: Logger,
Expand All @@ -26,6 +27,7 @@ export const processOldResultsJob = async (
store,
nodejsWorkerEmitter,
searchSyncWorkerEmitter,
dataSinkWorkerEmitter,
redis,
unleash,
temporal,
Expand Down
12 changes: 11 additions & 1 deletion services/apps/data_sink_worker/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { getDbConnection } from '@crowd/database'
import { getServiceTracer } from '@crowd/tracing'
import { getServiceLogger } from '@crowd/logging'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'
import {
NodejsWorkerEmitter,
SearchSyncWorkerEmitter,
DataSinkWorkerEmitter,
getSqsClient,
} from '@crowd/sqs'
import {
DB_CONFIG,
SENTIMENT_CONFIG,
Expand Down Expand Up @@ -48,11 +53,14 @@ setImmediate(async () => {

const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, tracer, log)

const dataWorkerEmitter = new DataSinkWorkerEmitter(sqsClient, tracer, log)

const queue = new WorkerQueueReceiver(
sqsClient,
dbConnection,
nodejsWorkerEmitter,
searchSyncWorkerEmitter,
dataWorkerEmitter,
redisClient,
unleash,
temporal,
Expand All @@ -64,6 +72,7 @@ setImmediate(async () => {
try {
await nodejsWorkerEmitter.init()
await searchSyncWorkerEmitter.init()
await dataWorkerEmitter.init()

let processing = false
setInterval(async () => {
Expand All @@ -77,6 +86,7 @@ setImmediate(async () => {
redisClient,
nodejsWorkerEmitter,
searchSyncWorkerEmitter,
dataWorkerEmitter,
unleash,
temporal,
log,
Expand Down
7 changes: 7 additions & 0 deletions services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
DATA_SINK_WORKER_QUEUE_SETTINGS,
NodejsWorkerEmitter,
SearchSyncWorkerEmitter,
DataSinkWorkerEmitter,
SqsClient,
SqsQueueReceiver,
} from '@crowd/sqs'
Expand All @@ -25,6 +26,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
private readonly dbConn: DbConnection,
private readonly nodejsWorkerEmitter: NodejsWorkerEmitter,
private readonly searchSyncWorkerEmitter: SearchSyncWorkerEmitter,
private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter,
private readonly redisClient: RedisClient,
private readonly unleash: Unleash | undefined,
private readonly temporal: TemporalClient,
Expand All @@ -44,6 +46,7 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
new DbStore(this.log, this.dbConn, undefined, false),
this.nodejsWorkerEmitter,
this.searchSyncWorkerEmitter,
this.dataSinkWorkerEmitter,
this.redisClient,
this.unleash,
this.temporal,
Expand All @@ -65,6 +68,10 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
)
break
}
case DataSinkWorkerQueueMessageType.CHECK_RESULTS: {
await service.checkResults()
break
}

default:
throw new Error(`Unknown message type: ${message.type}`)
Expand Down
3 changes: 3 additions & 0 deletions services/apps/data_sink_worker/src/repo/dataSink.data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ export interface IResultData {
plan: string
isTrialPlan: boolean
name: string

retries: number
delayedUntil: string | null
}

export interface IFailedResultData {
Expand Down
53 changes: 51 additions & 2 deletions services/apps/data_sink_worker/src/repo/dataSink.repo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DbStore, RepositoryBase } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IIntegrationResult, IntegrationResultState, TenantPlans } from '@crowd/types'
import { IIntegrationResult, IntegrationResultState, PlatformType, TenantPlans } from '@crowd/types'
import { IFailedResultData, IResultData } from './dataSink.data'

export default class DataSinkRepository extends RepositoryBase<DataSinkRepository> {
Expand All @@ -18,6 +18,8 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
r."streamId",
r."apiDataId",
r."integrationId",
r.retries,
r."delayedUntil",
i.platform,
t."hasSampleData",
t."plan",
Expand Down Expand Up @@ -206,7 +208,8 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
`update integration.results
set state = $(newState),
error = null,
"updatedAt" = now()
"updatedAt" = now(),
"delayedUntil" = null
where id in ($(resultIds:csv))`,
{
resultIds,
Expand All @@ -224,4 +227,50 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor

return result.map((r) => r.id)
}

public async delayResult(resultId: string, until: Date): Promise<void> {
const result = await this.db().result(
`update integration.results
set state = $(state),
"delayedUntil" = $(until),
retries = coalesce(retries, 0) + 1,
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
until,
state: IntegrationResultState.DELAYED,
},
)

this.checkUpdateRowCount(result.rowCount, 1)
}

public async getDelayedResults(
limit: number,
): Promise<{ id: string; tenantId: string; platform: PlatformType }[]> {
this.ensureTransactional()

try {
const results = await this.db().any(
`
select r.id, r."tenantId", i.platform
from integration.results r
join integrations i on r."integrationId" = i.id
where r.state = $(delayedState)
and r."delayedUntil" < now()
limit ${limit}
for update skip locked;
`,
{
delayedState: IntegrationResultState.DELAYED,
},
)

return results.map((s) => ({ id: s.id, tenantId: s.tenantId, platform: s.platform }))
} catch (err) {
this.log.error(err, 'Failed to get delayed results!')
throw err
}
}
}
Loading