Skip to content

Commit 0d98166

Browse files
authored
Add script for member-to-organization association based on email (#1463)
1 parent 53bf734 commit 0d98166

File tree

5 files changed

+186
-4
lines changed

5 files changed

+186
-4
lines changed

backend/src/services/__tests__/organizationService.test.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,6 @@ describe('OrganizationService tests', () => {
107107
}
108108

109109
const added = await service.createOrUpdate(toAdd)
110-
console.log('added is: ')
111-
console.log(added)
112-
113110
expect(added.identities[0].url).toEqual(toAdd.identities[0].url)
114111
expect(added.identities[0].name).toEqual(toAdd.identities[0].name)
115112
expect(added.description).toEqual(expectedEnriched.description)

services/apps/data_sink_worker/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
"script:restart-all-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-all-failed-results.ts",
1919
"script:restart-x-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-x-failed-results.ts",
2020
"script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts",
21-
"script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts"
21+
"script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts",
22+
"script:assign-member-to-org": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/assign-member-to-org.ts"
2223
},
2324
"dependencies": {
2425
"@crowd/common": "file:../../libs/common",
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
2+
import { DbStore, getDbConnection } from '@crowd/database'
3+
import { getServiceLogger } from '@crowd/logging'
4+
import {
5+
DataSinkWorkerEmitter,
6+
NodejsWorkerEmitter,
7+
SearchSyncWorkerEmitter,
8+
getSqsClient,
9+
} from '@crowd/sqs'
10+
import MemberRepository from '@/repo/member.repo'
11+
import DataSinkRepository from '@/repo/dataSink.repo'
12+
import MemberService from '@/service/member.service'
13+
import { OrganizationService } from '@/service/organization.service'
14+
15+
const log = getServiceLogger()
16+
17+
const processArguments = process.argv.slice(2)
18+
19+
if (processArguments.length !== 1) {
20+
log.error('Expected 1 argument: tenantId')
21+
process.exit(1)
22+
}
23+
24+
const tenantId = processArguments[0]
25+
26+
setImmediate(async () => {
27+
const sqsClient = getSqsClient(SQS_CONFIG())
28+
const emitter = new DataSinkWorkerEmitter(sqsClient, log)
29+
await emitter.init()
30+
31+
const dbConnection = await getDbConnection(DB_CONFIG())
32+
const store = new DbStore(log, dbConnection)
33+
34+
const dataSinkRepo = new DataSinkRepository(store, log)
35+
const memberRepo = new MemberRepository(store, log)
36+
37+
const segmentIds = await dataSinkRepo.getSegmentIds(tenantId)
38+
const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id
39+
40+
const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, log)
41+
await nodejsWorkerEmitter.init()
42+
43+
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, log)
44+
await searchSyncWorkerEmitter.init()
45+
46+
const memberService = new MemberService(store, nodejsWorkerEmitter, searchSyncWorkerEmitter, log)
47+
const orgService = new OrganizationService(store, log)
48+
49+
const limit = 100
50+
let offset = 0
51+
let processedMembers = 0
52+
53+
try {
54+
const { totalCount } = await memberRepo.getMemberIdsAndEmailsAndCount(tenantId, segmentIds, {
55+
limit,
56+
offset,
57+
countOnly: true,
58+
})
59+
60+
log.info({ tenantId }, `Total members found in the tenant: ${totalCount}`)
61+
62+
do {
63+
const { members } = await memberRepo.getMemberIdsAndEmailsAndCount(tenantId, segmentIds, {
64+
limit,
65+
offset,
66+
})
67+
68+
// member -> organization based on email domain
69+
for (const member of members) {
70+
if (member.emails) {
71+
const orgs = await memberService.assignOrganizationByEmailDomain(
72+
tenantId,
73+
segmentId,
74+
member.emails,
75+
)
76+
77+
if (orgs.length > 0) {
78+
orgService.addToMember(tenantId, segmentId, member.id, orgs)
79+
80+
for (const org of orgs) {
81+
await searchSyncWorkerEmitter.triggerOrganizationSync(tenantId, org.id)
82+
}
83+
84+
await searchSyncWorkerEmitter.triggerMemberSync(tenantId, member.id)
85+
}
86+
}
87+
88+
processedMembers++
89+
log.info(`Processed member ${member.id}. Progress: ${processedMembers}/${totalCount}`)
90+
}
91+
offset += limit
92+
} while (totalCount > offset)
93+
94+
log.info(`Member to organization association completed for the tenant ${tenantId}`)
95+
process.exit(0)
96+
} catch (err) {
97+
log.error(`Failed to assign member to organizations for the tenant ${tenantId}`, err)
98+
process.exit(1)
99+
}
100+
})

services/apps/data_sink_worker/src/repo/dataSink.repo.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,12 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
147147

148148
this.checkUpdateRowCount(result.rowCount, resultIds.length)
149149
}
150+
151+
public async getSegmentIds(tenantId: string): Promise<string[]> {
152+
const result = await this.db().any(`select id from "segments" where "tenantId" = $(tenantId)`, {
153+
tenantId,
154+
})
155+
156+
return result.map((r) => r.id)
157+
}
150158
}

services/apps/data_sink_worker/src/repo/member.repo.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,4 +269,80 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
269269
},
270270
)
271271
}
272+
273+
public async getMemberIdsAndEmailsAndCount(
274+
tenantId: string,
275+
segmentIds: string[],
276+
{ limit = 20, offset = 0, orderBy = 'joinedAt_DESC', countOnly = false },
277+
) {
278+
let orderByString = ''
279+
const orderByParts = orderBy.split('_')
280+
const direction = orderByParts[1].toLowerCase()
281+
282+
switch (orderByParts[0]) {
283+
case 'joinedAt':
284+
orderByString = 'm."joinedAt"'
285+
break
286+
case 'displayName':
287+
orderByString = 'm."displayName"'
288+
break
289+
case 'reach':
290+
orderByString = "(m.reach ->> 'total')::int"
291+
break
292+
case 'score':
293+
orderByString = 'm.score'
294+
break
295+
296+
default:
297+
throw new Error(`Invalid order by: ${orderBy}!`)
298+
}
299+
300+
orderByString = `${orderByString} ${direction}`
301+
302+
const memberCount = await this.db().one(
303+
`
304+
SELECT count(*) FROM (
305+
SELECT m.id
306+
FROM "members" m
307+
JOIN "memberSegments" ms ON ms."memberId" = m.id
308+
WHERE m."tenantId" = $(tenantId)
309+
AND ms."segmentId" = ANY($(segmentIds)::uuid[])
310+
) as count
311+
`,
312+
{
313+
tenantId,
314+
segmentIds,
315+
},
316+
)
317+
318+
if (countOnly) {
319+
return {
320+
totalCount: Number(memberCount.count),
321+
members: [],
322+
}
323+
}
324+
325+
const members = await this.db().any(
326+
`
327+
SELECT m.id, m.emails
328+
FROM "members" m
329+
JOIN "memberSegments" ms ON ms."memberId" = m.id
330+
WHERE m."tenantId" = $(tenantId)
331+
AND ms."segmentId" = ANY($(segmentIds)::uuid[])
332+
ORDER BY ${orderByString}
333+
LIMIT $(limit) OFFSET $(offset)
334+
`,
335+
{
336+
tenantId,
337+
segmentIds,
338+
limit,
339+
offset,
340+
},
341+
)
342+
343+
return {
344+
totalCount: Number(memberCount.count),
345+
members: members,
346+
}
347+
}
272348
}

0 commit comments

Comments
 (0)