Skip to content

Commit 673a025

Browse files
committed
Merge branch 'main' into script/member-to-org
2 parents cc8e07d + 970d708 commit 673a025

File tree

17 files changed

+260
-77
lines changed

17 files changed

+260
-77
lines changed

backend/src/types/webhooks.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { BaseError } from './baseError'
33
export enum WebhookState {
44
PENDING = 'PENDING',
55
PROCESSED = 'PROCESSED',
6+
PROCESSING = 'PROCESSING',
67
ERROR = 'ERROR',
78
}
89

services/apps/data_sink_worker/src/repo/member.repo.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,9 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
5858
): Promise<IDbMember | null> {
5959
return await this.db().oneOrNone(
6060
`${this.selectMemberQuery}
61-
where m.id in (select ms."memberId"
62-
from "memberSegments" ms
63-
inner join "memberIdentities" mi
64-
on ms."tenantId" = mi."tenantId" and ms."memberId" = mi."memberId"
65-
where ms."tenantId" = $(tenantId)
66-
and ms."segmentId" = $(segmentId)
61+
where m.id in (select mi."memberId"
62+
from "memberIdentities" mi
63+
where mi."tenantId" = $(tenantId)
6764
and mi.platform = $(platform)
6865
and mi.username = $(username));
6966
`,

services/apps/data_sink_worker/src/repo/organization.repo.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,7 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
136136
o.founded,
137137
o.attributes
138138
from organizations o
139-
where o."tenantId" = $(tenantId) and o.name = $(name)
140-
and o.id in (select os."organizationId"
141-
from "organizationSegments" os
142-
where os."segmentId" = $(segmentId))`,
139+
where o."tenantId" = $(tenantId) and o.name = $(name)`,
143140
{ tenantId, name, segmentId },
144141
)
145142

@@ -269,16 +266,24 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
269266
return id
270267
}
271268

272-
public async update(id: string, data: IDbUpdateOrganizationData): Promise<void> {
269+
public async update(id: string, data: Partial<IDbUpdateOrganizationData>): Promise<void> {
270+
const keys = Object.keys(data)
271+
keys.push('updatedAt')
272+
// construct dynamic column set
273+
const dynamicColumnSet = new this.dbInstance.helpers.ColumnSet(keys, {
274+
table: {
275+
table: 'organizations',
276+
},
277+
})
273278
const prepared = RepositoryBase.prepare(
274279
{
275280
...data,
276281
updatedAt: new Date(),
277282
},
278-
this.updateOrganizationColumnSet,
283+
dynamicColumnSet,
279284
)
280285

281-
const query = this.dbInstance.helpers.update(prepared, this.updateOrganizationColumnSet)
286+
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
282287
const condition = this.format('where id = $(id)', { id })
283288

284289
const result = await this.db().result(`${query} ${condition}`)

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -800,10 +800,10 @@ export default class ActivityService extends LoggerBase {
800800
)
801801
}
802802
} finally {
803-
// release lock no matter that
803+
// release locks matter what
804804
await releaseLock(
805805
this.redisClient,
806-
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
806+
`member:processing:${tenantId}:${platform}:${username}`,
807807
'check-member-inside-activity-exists',
808808
)
809809
}

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { IDbMember, IDbMemberUpdateData } from '@/repo/member.data'
22
import MemberRepository from '@/repo/member.repo'
3-
import { areArraysEqual, isObjectEmpty, singleOrDefault } from '@crowd/common'
3+
import { firstArrayContainsSecondArray, isObjectEmpty, singleOrDefault } from '@crowd/common'
44
import { DbStore } from '@crowd/database'
55
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
66
import {
@@ -185,9 +185,9 @@ export default class MemberService extends LoggerBase {
185185
await txRepo.addToSegment(id, tenantId, segmentId)
186186

187187
updated = true
188-
await txRepo.addToSegment(id, tenantId, segmentId)
189188
} else {
190189
this.log.debug({ memberId: id }, 'Nothing to update in a member!')
190+
await txRepo.addToSegment(id, tenantId, segmentId)
191191
}
192192

193193
if (toUpdate.identities) {
@@ -419,8 +419,8 @@ export default class MemberService extends LoggerBase {
419419
}
420420

421421
let emails: string[] | undefined
422-
if (member.emails) {
423-
if (!areArraysEqual(member.emails, dbMember.emails)) {
422+
if (member.emails && member.emails.length > 0) {
423+
if (!firstArrayContainsSecondArray(dbMember.emails, member.emails)) {
424424
emails = [...new Set([...member.emails, ...dbMember.emails])]
425425
}
426426
}

services/apps/data_sink_worker/src/service/organization.service.ts

Lines changed: 56 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,33 @@ export class OrganizationService extends LoggerBase {
3636
// if exists in cache update it
3737
const updateData: Partial<IOrganization> = {}
3838
// no need to update name since it's aka primary key
39-
if (data.url) updateData.url = data.url
40-
if (data.description) updateData.description = data.description
41-
if (data.emails) updateData.emails = data.emails
42-
if (data.logo) updateData.logo = data.logo
43-
if (data.tags) updateData.tags = data.tags
44-
if (data.github) updateData.github = data.github as IOrganizationSocial
45-
if (data.twitter) updateData.twitter = data.twitter as IOrganizationSocial
46-
if (data.linkedin) updateData.linkedin = data.linkedin as IOrganizationSocial
47-
if (data.crunchbase) updateData.crunchbase = data.crunchbase as IOrganizationSocial
48-
if (data.employees) updateData.employees = data.employees
49-
if (data.location) updateData.location = data.location
50-
if (data.website) updateData.website = data.website
51-
if (data.type) updateData.type = data.type
52-
if (data.size) updateData.size = data.size
53-
if (data.headline) updateData.headline = data.headline
54-
if (data.industry) updateData.industry = data.industry
55-
if (data.founded) updateData.founded = data.founded
39+
const fields = [
40+
'url',
41+
'description',
42+
'emails',
43+
'logo',
44+
'tags',
45+
'github',
46+
'twitter',
47+
'linkedin',
48+
'crunchbase',
49+
'employees',
50+
'location',
51+
'website',
52+
'type',
53+
'size',
54+
'headline',
55+
'industry',
56+
'founded',
57+
]
58+
fields.forEach((field) => {
59+
if (data[field] && !isEqual(data[field], cached[field])) {
60+
updateData[field] = data[field]
61+
}
62+
})
5663
if (Object.keys(updateData).length > 0) {
5764
await this.repo.updateCache(cached.id, updateData)
65+
cached = { ...cached, ...updateData } // Update the cached data with the new data
5866
}
5967
} else {
6068
// if it doesn't exists in cache create it
@@ -91,11 +99,9 @@ export class OrganizationService extends LoggerBase {
9199
// now check if exists in this tenant
92100
const existing = await this.repo.findByName(tenantId, segmentId, data.name)
93101

94-
const displayName = existing?.displayName ? existing?.displayName : data?.name
95-
96102
let attributes = existing?.attributes
97103

98-
if (data.attributes) {
104+
if (data?.attributes) {
99105
const temp = mergeWith({}, existing?.attributes, data?.attributes)
100106
if (!isEqual(temp, existing?.attributes)) {
101107
attributes = temp
@@ -104,28 +110,37 @@ export class OrganizationService extends LoggerBase {
104110

105111
if (existing) {
106112
// if it does exists update it
107-
await this.repo.update(existing.id, {
108-
name: cached.name,
109-
displayName,
110-
url: cached.url,
111-
description: cached.description,
112-
emails: cached.emails,
113-
logo: cached.logo,
114-
tags: cached.tags,
115-
github: cached.github,
116-
twitter: cached.twitter,
117-
linkedin: cached.linkedin,
118-
crunchbase: cached.crunchbase,
119-
employees: cached.employees,
120-
location: cached.location,
121-
website: cached.website,
122-
type: cached.type,
123-
size: cached.size,
124-
headline: cached.headline,
125-
industry: cached.industry,
126-
founded: cached.founded,
127-
attributes,
113+
const updateData: Partial<IOrganization> = {}
114+
const fields = [
115+
'name',
116+
'displayName',
117+
'url',
118+
'description',
119+
'emails',
120+
'logo',
121+
'tags',
122+
'github',
123+
'twitter',
124+
'linkedin',
125+
'crunchbase',
126+
'employees',
127+
'location',
128+
'website',
129+
'type',
130+
'size',
131+
'headline',
132+
'industry',
133+
'founded',
134+
'attributes',
135+
]
136+
fields.forEach((field) => {
137+
if (cached[field] && !isEqual(cached[field], existing[field])) {
138+
updateData[field] = cached[field]
139+
}
128140
})
141+
if (Object.keys(updateData).length > 0) {
142+
await this.repo.update(existing.id, updateData)
143+
}
129144

130145
return existing.id
131146
} else {

services/apps/integration_stream_worker/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
"format": "./node_modules/.bin/prettier --write \"src/**/*.ts\"",
1515
"format-check": "./node_modules/.bin/prettier --check .",
1616
"tsc-check": "./node_modules/.bin/tsc --noEmit",
17-
"script:process-stream": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-stream.ts"
17+
"script:process-stream": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-stream.ts",
18+
"script:process-all-failed-webhooks": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-all-failed-webhooks.ts",
19+
"script:process-webhook": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-webhook.ts"
1820
},
1921
"dependencies": {
2022
"@crowd/common": "file:../../libs/common",
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
2+
import IncomingWebhookRepository from '@/repo/incomingWebhook.repo'
3+
import { DbStore, getDbConnection } from '@crowd/database'
4+
import { getServiceLogger } from '@crowd/logging'
5+
import { IntegrationStreamWorkerEmitter, getSqsClient } from '@crowd/sqs'
6+
7+
const batchSize = 500
8+
9+
const log = getServiceLogger()
10+
11+
setImmediate(async () => {
12+
const sqsClient = getSqsClient(SQS_CONFIG())
13+
const emitter = new IntegrationStreamWorkerEmitter(sqsClient, log)
14+
await emitter.init()
15+
16+
const dbConnection = await getDbConnection(DB_CONFIG())
17+
const store = new DbStore(log, dbConnection)
18+
19+
const repo = new IncomingWebhookRepository(store, log)
20+
let count = 0
21+
22+
let results = await repo.getFailedWebhooks(batchSize)
23+
while (results.length > 0) {
24+
for (const result of results) {
25+
await emitter.triggerWebhookProcessing(result.tenantId, result.platform, result.id)
26+
}
27+
28+
await repo.markWebhooksPendingBatch(results.map((r) => r.id))
29+
30+
count += results.length
31+
log.info(`Triggered total of ${count} failed webhooks!`)
32+
33+
results = await repo.getFailedWebhooks(batchSize)
34+
}
35+
})
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
2+
import IncomingWebhookRepository from '@/repo/incomingWebhook.repo'
3+
import { DbStore, getDbConnection } from '@crowd/database'
4+
import { getServiceLogger } from '@crowd/logging'
5+
import { IntegrationStreamWorkerEmitter, getSqsClient } from '@crowd/sqs'
6+
import { WebhookState, WebhookType } from '@crowd/types'
7+
8+
const log = getServiceLogger()
9+
10+
const processArguments = process.argv.slice(2)
11+
12+
if (processArguments.length !== 1) {
13+
log.error('Expected 1 argument: webhookId')
14+
process.exit(1)
15+
}
16+
17+
const webhookIds = processArguments[0].split(',')
18+
19+
setImmediate(async () => {
20+
const sqsClient = getSqsClient(SQS_CONFIG())
21+
const emitter = new IntegrationStreamWorkerEmitter(sqsClient, log)
22+
await emitter.init()
23+
24+
const dbConnection = await getDbConnection(DB_CONFIG())
25+
const store = new DbStore(log, dbConnection)
26+
const repo = new IncomingWebhookRepository(store, log)
27+
28+
for (const webhookId of webhookIds) {
29+
const info = await repo.getWebhookById(webhookId)
30+
31+
if (info) {
32+
log.info({ webhookId }, 'Found webhook!')
33+
34+
if (![WebhookType.GITHUB, WebhookType.GROUPSIO, WebhookType.DISCORD].includes(info.type)) {
35+
log.error({ webhookId }, 'Webhook is not a supported type!')
36+
process.exit(1)
37+
}
38+
39+
if (info.state !== WebhookState.PENDING) {
40+
log.info({ webhookId }, 'Webhook is not pending, resetting...')
41+
await repo.markWebhookPending(webhookId)
42+
}
43+
44+
log.info({ webhookId }, 'Triggering webhook processing...')
45+
await emitter.triggerWebhookProcessing(info.tenantId, info.platform, info.id)
46+
log.info({ webhookId }, 'Triggered webhook processing!')
47+
} else {
48+
log.error({ webhookId }, 'Webhook not found!')
49+
process.exit(1)
50+
}
51+
}
52+
})

services/apps/integration_stream_worker/src/repo/incomingWebhook.data.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export interface IWebhookData {
66
integrationId: string
77
state: WebhookState
88
type: WebhookType
9+
platform: string
910
// eslint-disable-next-line @typescript-eslint/no-explicit-any
1011
payload: any
1112
}

0 commit comments

Comments
 (0)