Skip to content

Commit 46dc4bd

Browse files
author
Uros Marolt
authored
Added audit log clean up job and modified the db pool settings a bit (#946)
1 parent de7a030 commit 46dc4bd

File tree

12 files changed

+62
-14
lines changed

12 files changed

+62
-14
lines changed

backend/src/bin/jobs/cleanUp.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import IncomingWebhookRepository from '../../database/repositories/incomingWebho
33
import IntegrationRunRepository from '../../database/repositories/integrationRunRepository'
44
import SequelizeRepository from '../../database/repositories/sequelizeRepository'
55
import { CrowdJob } from '../../types/jobTypes'
6+
import AuditLogRepository from '../../database/repositories/auditLogRepository'
67

78
const MAX_MONTHS_TO_KEEP = 3
89

@@ -38,6 +39,13 @@ export const cleanUpOldWebhooks = async () => {
3839
await repo.cleanUpOldWebhooks(MAX_MONTHS_TO_KEEP)
3940
}
4041

42+
export const cleanUpOldAuditLogs = async () => {
43+
const dbOptions = await SequelizeRepository.getDefaultIRepositoryOptions()
44+
45+
log.info(`Cleaning up audit logs that are older than 1 month!`)
46+
await AuditLogRepository.cleanUpOldAuditLogs(1, dbOptions)
47+
}
48+
4149
const job: CrowdJob = {
4250
name: 'Clean up old data',
4351
// run once every week on Sunday at 1AM

backend/src/database/models/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ function models() {
6767
write: { host: DB_CONFIG.writeHost },
6868
},
6969
pool: {
70-
max: SERVICE === configTypes.ServiceType.API ? 100 : 10,
70+
max: SERVICE === configTypes.ServiceType.API ? 20 : 10,
7171
min: 0,
72-
acquire: 30000,
72+
acquire: 50000,
7373
idle: 10000,
7474
},
7575
logging: DB_CONFIG.logging

backend/src/database/repositories/auditLogRepository.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import Sequelize from 'sequelize'
1+
import Sequelize, { QueryTypes } from 'sequelize'
22
import SequelizeRepository from './sequelizeRepository'
33
import SequelizeFilterUtils from '../utils/sequelizeFilterUtils'
44
import { IRepositoryOptions } from './IRepositoryOptions'
@@ -54,6 +54,22 @@ export default class AuditLogRepository {
5454
return log
5555
}
5656

57+
static async cleanUpOldAuditLogs(
58+
maxMonthsToKeep: number,
59+
options: IRepositoryOptions,
60+
): Promise<void> {
61+
const seq = SequelizeRepository.getSequelize(options)
62+
63+
await seq.query(
64+
`
65+
delete from "auditLogs" where timestamp < now() - interval '${maxMonthsToKeep} months'
66+
`,
67+
{
68+
type: QueryTypes.DELETE,
69+
},
70+
)
71+
}
72+
5773
static async findAndCountAll(
5874
{ filter, limit = 0, offset = 0, orderBy = '' },
5975
options: IRepositoryOptions,

services/apps/data_sink_worker/src/main.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,28 @@ import { initializeSentimentAnalysis } from '@crowd/sentiment'
77

88
const log = getServiceLogger()
99

10+
const MAX_CONCURRENT_PROCESSING = 2
11+
1012
setImmediate(async () => {
1113
log.info('Starting data sink worker...')
1214

1315
const sqsClient = getSqsClient(SQS_CONFIG())
1416

15-
const dbConnection = getDbConnection(DB_CONFIG())
17+
const dbConnection = getDbConnection(DB_CONFIG(), MAX_CONCURRENT_PROCESSING)
1618

1719
if (SENTIMENT_CONFIG()) {
1820
initializeSentimentAnalysis(SENTIMENT_CONFIG())
1921
}
2022

2123
const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, log)
2224

23-
const queue = new WorkerQueueReceiver(sqsClient, dbConnection, nodejsWorkerEmitter, log)
25+
const queue = new WorkerQueueReceiver(
26+
sqsClient,
27+
dbConnection,
28+
nodejsWorkerEmitter,
29+
log,
30+
MAX_CONCURRENT_PROCESSING,
31+
)
2432

2533
try {
2634
await nodejsWorkerEmitter.init()

services/apps/data_sink_worker/src/queue/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
1919
private readonly dbConn: DbConnection,
2020
private readonly nodejsWorkerEmitter: NodejsWorkerEmitter,
2121
parentLog: Logger,
22+
maxConcurrentProcessing: number,
2223
) {
23-
super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, 2, parentLog)
24+
super(client, DATA_SINK_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog)
2425
}
2526

2627
override async processMessage(message: IQueueMessage): Promise<void> {

services/apps/integration_data_worker/src/main.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import { WorkerQueueReceiver } from './queue'
77

88
const log = getServiceLogger()
99

10+
const MAX_CONCURRENT_PROCESSING = 2
11+
1012
setImmediate(async () => {
1113
log.info('Starting integration data worker...')
1214

1315
const sqsClient = getSqsClient(SQS_CONFIG())
1416

15-
const dbConnection = getDbConnection(DB_CONFIG())
17+
const dbConnection = getDbConnection(DB_CONFIG(), MAX_CONCURRENT_PROCESSING)
1618
const redisClient = await getRedisClient(REDIS_CONFIG(), true)
1719

1820
const streamWorkerEmitter = new IntegrationStreamWorkerEmitter(sqsClient, log)
@@ -25,6 +27,7 @@ setImmediate(async () => {
2527
streamWorkerEmitter,
2628
dataSinkWorkerEmitter,
2729
log,
30+
MAX_CONCURRENT_PROCESSING,
2831
)
2932

3033
try {

services/apps/integration_data_worker/src/queue/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
2323
private readonly streamWorkerEmitter: IntegrationStreamWorkerEmitter,
2424
private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter,
2525
parentLog: Logger,
26+
maxConcurrentProcessing: number,
2627
) {
27-
super(client, INTEGRATION_DATA_WORKER_QUEUE_SETTINGS, 2, parentLog)
28+
super(client, INTEGRATION_DATA_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog)
2829
}
2930

3031
override async processMessage(message: IQueueMessage): Promise<void> {

services/apps/integration_run_worker/src/main.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ import { ApiPubSubEmitter, getRedisClient } from '@crowd/redis'
1111

1212
const log = getServiceLogger()
1313

14+
const MAX_CONCURRENT_PROCESSING = 2
15+
1416
setImmediate(async () => {
1517
log.info('Starting integration run worker...')
1618

1719
const sqsClient = getSqsClient(SQS_CONFIG())
1820

19-
const dbConnection = getDbConnection(DB_CONFIG())
21+
const dbConnection = getDbConnection(DB_CONFIG(), MAX_CONCURRENT_PROCESSING)
2022
const redisClient = await getRedisClient(REDIS_CONFIG(), true)
2123

2224
const runWorkerEmitter = new IntegrationRunWorkerEmitter(sqsClient, log)
@@ -32,6 +34,7 @@ setImmediate(async () => {
3234
runWorkerEmitter,
3335
apiPubSubEmitter,
3436
log,
37+
MAX_CONCURRENT_PROCESSING,
3538
)
3639

3740
try {

services/apps/integration_run_worker/src/queue/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
2828
private readonly runWorkerEmitter: IntegrationRunWorkerEmitter,
2929
private readonly apiPubSubEmitter: ApiPubSubEmitter,
3030
parentLog: Logger,
31+
maxConcurrentProcessing: number,
3132
) {
32-
super(client, INTEGRATION_RUN_WORKER_QUEUE_SETTINGS, 2, parentLog)
33+
super(client, INTEGRATION_RUN_WORKER_QUEUE_SETTINGS, maxConcurrentProcessing, parentLog)
3334
}
3435

3536
override async processMessage(message: IQueueMessage): Promise<void> {

services/apps/integration_stream_worker/src/main.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ import { WorkerQueueReceiver } from './queue'
1212

1313
const log = getServiceLogger()
1414

15+
const MAX_CONCURRENT_PROCESSING = 2
16+
1517
setImmediate(async () => {
1618
log.info('Starting integration stream worker...')
1719

1820
const sqsClient = getSqsClient(SQS_CONFIG())
1921

20-
const dbConnection = getDbConnection(DB_CONFIG())
22+
const dbConnection = getDbConnection(DB_CONFIG(), MAX_CONCURRENT_PROCESSING)
2123
const redisClient = await getRedisClient(REDIS_CONFIG(), true)
2224

2325
const runWorkerEmiiter = new IntegrationRunWorkerEmitter(sqsClient, log)
@@ -32,6 +34,7 @@ setImmediate(async () => {
3234
dataWorkerEmitter,
3335
streamWorkerEmitter,
3436
log,
37+
MAX_CONCURRENT_PROCESSING,
3538
)
3639

3740
try {

0 commit comments

Comments
 (0)