Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion scripts/services/docker/Dockerfile.script_executor
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ COPY ./services/libs/redis/package.json ./services/libs/redis/package-lock.json
COPY ./services/libs/sentiment/package.json ./services/libs/sentiment/package-lock.json ./services/libs/sentiment/
COPY ./services/libs/sqs/package.json ./services/libs/sqs/package-lock.json ./services/libs/sqs/
COPY ./services/libs/types/package.json ./services/libs/types/package-lock.json ./services/libs/types/
RUN ./services/scripts/install_lib_packages.sh && ./services/scripts/build_libs.sh
RUN ./services/scripts/install_lib_packages.sh

# prepare new services
COPY ./services/apps/data_sink_worker/package.json ./services/apps/data_sink_worker/package-lock.json ./services/apps/data_sink_worker/
Expand All @@ -37,6 +37,7 @@ RUN (cd backend && npm ci)

COPY ./backend ./backend
COPY ./services/libs ./services/libs
RUN ./services/scripts/build_libs.sh
COPY ./services/apps ./services/apps

RUN npm cache clean --force
8 changes: 6 additions & 2 deletions services/apps/integration_stream_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
"format-check": "./node_modules/.bin/prettier --check .",
"tsc-check": "./node_modules/.bin/tsc --noEmit",
"script:process-stream": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-stream.ts",
"script:process-all-failed-webhooks": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-all-failed-webhooks.ts",
"script:process-webhook": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-webhook.ts"
"script:process-webhook": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-webhook.ts",
"script:process-all-streams": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-all-streams.ts",
"script:process-all-webhooks": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-all-webhooks.ts",
"script:trigger-all-failed-webhooks": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-all-failed-webhooks.ts",
"script:trigger-webhook": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-webhook.ts",
"script:trigger-all-streams": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/trigger-all-streams.ts"
},
"dependencies": {
"@crowd/common": "file:../../libs/common",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG } from '@/conf'
import IntegrationStreamService from '@/service/integrationStreamService'
import { timeout } from '@crowd/common'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import {
IntegrationDataWorkerEmitter,
IntegrationRunWorkerEmitter,
IntegrationStreamWorkerEmitter,
getSqsClient,
} from '@crowd/sqs'
import { IntegrationStreamState } from '@crowd/types'

const BATCH_SIZE = 100
const MAX_CONCURRENT = 3

const log = getServiceLogger()

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())

const redisClient = await getRedisClient(REDIS_CONFIG(), true)
const runWorkerEmiiter = new IntegrationRunWorkerEmitter(sqsClient, log)
const dataWorkerEmitter = new IntegrationDataWorkerEmitter(sqsClient, log)
const streamWorkerEmitter = new IntegrationStreamWorkerEmitter(sqsClient, log)

await runWorkerEmiiter.init()
await dataWorkerEmitter.init()
await streamWorkerEmitter.init()

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)

const service = new IntegrationStreamService(
redisClient,
runWorkerEmiiter,
dataWorkerEmitter,
streamWorkerEmitter,
store,
log,
)

let results = await dbConnection.any(
`
select id
from integration.streams
where state in ('${IntegrationStreamState.ERROR}', '${IntegrationStreamState.PENDING}')
order by id
limit ${BATCH_SIZE};
`,
)

let current = 0
while (results.length > 0) {
for (const result of results) {
while (current == MAX_CONCURRENT) {
await timeout(1000)
}
const streamId = result.id

log.info(`Processing stream ${streamId}!`)

current++
service
.processStream(streamId)
.then(() => {
current--
log.info(`Processed stream ${streamId}!`)
})
.catch((err) => {
current--
log.error(err, `Error processing stream ${streamId}!`)
})
}

results = await dbConnection.any(
`
select id
from integration.streams
where state in ('${IntegrationStreamState.ERROR}', '${IntegrationStreamState.PENDING}')
and id > $(lastId)
order by id
limit ${BATCH_SIZE};
`,
{
lastId: results[results.length - 1].id,
},
)
}

while (current > 0) {
await timeout(1000)
}

process.exit(0)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG } from '@/conf'
import IntegrationStreamService from '@/service/integrationStreamService'
import { timeout } from '@crowd/common'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import {
IntegrationDataWorkerEmitter,
IntegrationRunWorkerEmitter,
IntegrationStreamWorkerEmitter,
getSqsClient,
} from '@crowd/sqs'
import { WebhookType } from '@crowd/types'

const BATCH_SIZE = 100
const MAX_CONCURRENT = 3

const log = getServiceLogger()

async function processWebhook(
webhookId: string,
service: IntegrationStreamService,
): Promise<boolean> {
try {
return await service.processWebhookStream(webhookId)
} catch (err) {
return false
}
}

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())

const redisClient = await getRedisClient(REDIS_CONFIG(), true)
const runWorkerEmiiter = new IntegrationRunWorkerEmitter(sqsClient, log)
const dataWorkerEmitter = new IntegrationDataWorkerEmitter(sqsClient, log)
const streamWorkerEmitter = new IntegrationStreamWorkerEmitter(sqsClient, log)

await runWorkerEmiiter.init()
await dataWorkerEmitter.init()
await streamWorkerEmitter.init()

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)

const service = new IntegrationStreamService(
redisClient,
runWorkerEmiiter,
dataWorkerEmitter,
streamWorkerEmitter,
store,
log,
)

let results = await dbConnection.any(
`
select id
from "incomingWebhooks"
where state in ('PENDING', 'ERROR') and type not in ('${WebhookType.DISCOURSE}', '${WebhookType.CROWD_GENERATED}')
order by
case when state = 'PENDING' then 1
else 2
end,
id
limit ${BATCH_SIZE};
`,
)

let current = 0
let total = 0
let errors = 0
while (results.length > 0) {
for (const result of results) {
while (current == MAX_CONCURRENT) {
await timeout(1000)
}
const webhookId = result.id

current++
processWebhook(webhookId, service).then((res) => {
current--

if (res) {
total++
} else {
errors++
}

log.info({ res }, `Processed ${total} webhooks successfully so far! ${errors} errors!`)
})
}

results = await dbConnection.any(
`
select id
from "incomingWebhooks"
where state in ('PENDING', 'ERROR') and type not in ('${WebhookType.DISCOURSE}', '${WebhookType.CROWD_GENERATED}')
and id > $(lastId)
order by
case when state = 'PENDING' then 1
else 2
end,
id
limit ${BATCH_SIZE};
`,
{
lastId: results[results.length - 1].id,
},
)
}

while (current > 0) {
await timeout(1000)
}

process.exit(0)
})
Original file line number Diff line number Diff line change
@@ -1,52 +1,57 @@
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
import IncomingWebhookRepository from '@/repo/incomingWebhook.repo'
import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG } from '@/conf'
import IntegrationStreamService from '@/service/integrationStreamService'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { IntegrationStreamWorkerEmitter, getSqsClient } from '@crowd/sqs'
import { WebhookState, WebhookType } from '@crowd/types'
import { getRedisClient } from '@crowd/redis'
import {
IntegrationDataWorkerEmitter,
IntegrationRunWorkerEmitter,
IntegrationStreamWorkerEmitter,
getSqsClient,
} from '@crowd/sqs'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: webhookId')
log.error('Expected 1 argument: webhookIds')
process.exit(1)
}

const webhookIds = processArguments[0].split(',')

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const emitter = new IntegrationStreamWorkerEmitter(sqsClient, log)
await emitter.init()

const redisClient = await getRedisClient(REDIS_CONFIG(), true)
const runWorkerEmiiter = new IntegrationRunWorkerEmitter(sqsClient, log)
const dataWorkerEmitter = new IntegrationDataWorkerEmitter(sqsClient, log)
const streamWorkerEmitter = new IntegrationStreamWorkerEmitter(sqsClient, log)

await runWorkerEmiiter.init()
await dataWorkerEmitter.init()
await streamWorkerEmitter.init()

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)
const repo = new IncomingWebhookRepository(store, log)

const service = new IntegrationStreamService(
redisClient,
runWorkerEmiiter,
dataWorkerEmitter,
streamWorkerEmitter,
store,
log,
)

for (const webhookId of webhookIds) {
const info = await repo.getWebhookById(webhookId)

if (info) {
log.info({ webhookId }, 'Found webhook!')

if (![WebhookType.GITHUB, WebhookType.GROUPSIO, WebhookType.DISCORD].includes(info.type)) {
log.error({ webhookId }, 'Webhook is not a supported type!')
process.exit(1)
}

if (info.state !== WebhookState.PENDING) {
log.info({ webhookId }, 'Webhook is not pending, resetting...')
await repo.markWebhookPending(webhookId)
}

log.info({ webhookId }, 'Triggering webhook processing...')
await emitter.triggerWebhookProcessing(info.tenantId, info.platform, info.id)
log.info({ webhookId }, 'Triggered webhook processing!')
} else {
log.error({ webhookId }, 'Webhook not found!')
process.exit(1)
try {
await service.processWebhookStream(webhookId)
} catch (err) {
log.error({ webhookId, err }, 'Failed to process webhook stream!')
}
}

process.exit(0)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
import IncomingWebhookRepository from '@/repo/incomingWebhook.repo'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { IntegrationStreamWorkerEmitter, getSqsClient } from '@crowd/sqs'
import { WebhookState, WebhookType } from '@crowd/types'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: webhookId')
process.exit(1)
}

const webhookIds = processArguments[0].split(',')

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const emitter = new IntegrationStreamWorkerEmitter(sqsClient, log)
await emitter.init()

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)
const repo = new IncomingWebhookRepository(store, log)

for (const webhookId of webhookIds) {
const info = await repo.getWebhookById(webhookId)

if (info) {
log.info({ webhookId }, 'Found webhook!')

if (![WebhookType.GITHUB, WebhookType.GROUPSIO, WebhookType.DISCORD].includes(info.type)) {
log.error({ webhookId }, 'Webhook is not a supported type!')
process.exit(1)
}

if (info.state !== WebhookState.PENDING) {
log.info({ webhookId }, 'Webhook is not pending, resetting...')
await repo.markWebhookPending(webhookId)
}

log.info({ webhookId }, 'Triggering webhook processing...')
await emitter.triggerWebhookProcessing(info.tenantId, info.platform, info.id)
log.info({ webhookId }, 'Triggered webhook processing!')
} else {
log.error({ webhookId }, 'Webhook not found!')
process.exit(1)
}
}
})
Loading