Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions backend/src/services/__tests__/organizationService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ describe('OrganizationService tests', () => {
}

const added = await service.createOrUpdate(toAdd)
console.log('added is: ')
console.log(added)

expect(added.identities[0].url).toEqual(toAdd.identities[0].url)
expect(added.identities[0].name).toEqual(toAdd.identities[0].name)
expect(added.description).toEqual(expectedEnriched.description)
Expand Down
3 changes: 2 additions & 1 deletion services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"script:restart-all-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-all-failed-results.ts",
"script:restart-x-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-x-failed-results.ts",
"script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts",
"script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts"
"script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts",
"script:assign-member-to-org": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/assign-member-to-org.ts"
},
"dependencies": {
"@crowd/common": "file:../../libs/common",
Expand Down
100 changes: 100 additions & 0 deletions services/apps/data_sink_worker/src/bin/assign-member-to-org.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import {
DataSinkWorkerEmitter,
NodejsWorkerEmitter,
SearchSyncWorkerEmitter,
getSqsClient,
} from '@crowd/sqs'
import MemberRepository from '@/repo/member.repo'
import DataSinkRepository from '@/repo/dataSink.repo'
import MemberService from '@/service/member.service'
import { OrganizationService } from '@/service/organization.service'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: tenantId')
process.exit(1)
}

const tenantId = processArguments[0]

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const emitter = new DataSinkWorkerEmitter(sqsClient, log)
await emitter.init()

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)

const dataSinkRepo = new DataSinkRepository(store, log)
const memberRepo = new MemberRepository(store, log)

const segmentIds = await dataSinkRepo.getSegmentIds(tenantId)
const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id

const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, log)
await nodejsWorkerEmitter.init()

const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, log)
await searchSyncWorkerEmitter.init()

const memberService = new MemberService(store, nodejsWorkerEmitter, searchSyncWorkerEmitter, log)
const orgService = new OrganizationService(store, log)

const limit = 100
let offset = 0
let processedMembers = 0

try {
const { totalCount } = await memberRepo.getMemberIdsAndEmailsAndCount(tenantId, segmentIds, {
limit,
offset,
countOnly: true,
})

log.info({ tenantId }, `Total members found in the tenant: ${totalCount}`)

do {
const { members } = await memberRepo.getMemberIdsAndEmailsAndCount(tenantId, segmentIds, {
limit,
offset,
})

// member -> organization based on email domain
for (const member of members) {
if (member.emails) {
const orgs = await memberService.assignOrganizationByEmailDomain(
tenantId,
segmentId,
member.emails,
)

if (orgs.length > 0) {
orgService.addToMember(tenantId, segmentId, member.id, orgs)

for (const org of orgs) {
await searchSyncWorkerEmitter.triggerOrganizationSync(tenantId, org.id)
}

await searchSyncWorkerEmitter.triggerMemberSync(tenantId, member.id)
}
}

processedMembers++
log.info(`Processed member ${member.id}. Progress: ${processedMembers}/${totalCount}`)
}
offset += limit
} while (totalCount > offset)

log.info(`Member to organization association completed for the tenant ${tenantId}`)
process.exit(0)
} catch (err) {
log.error(`Failed to assign member to organizations for the tenant ${tenantId}`, err)
process.exit(1)
}
})
8 changes: 8 additions & 0 deletions services/apps/data_sink_worker/src/repo/dataSink.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,12 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor

this.checkUpdateRowCount(result.rowCount, resultIds.length)
}

public async getSegmentIds(tenantId: string): Promise<string[]> {
const result = await this.db().any(`select id from "segments" where "tenantId" = $(tenantId)`, {
tenantId,
})

return result.map((r) => r.id)
}
}
76 changes: 76 additions & 0 deletions services/apps/data_sink_worker/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,80 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
},
)
}

public async getMemberIdsAndEmailsAndCount(
tenantId: string,
segmentIds: string[],
{ limit = 20, offset = 0, orderBy = 'joinedAt_DESC', countOnly = false },
) {
let orderByString = ''
const orderByParts = orderBy.split('_')
const direction = orderByParts[1].toLowerCase()

switch (orderByParts[0]) {
case 'joinedAt':
orderByString = 'm."joinedAt"'
break
case 'displayName':
orderByString = 'm."displayName"'
break
case 'reach':
orderByString = "(m.reach ->> 'total')::int"
break
case 'score':
orderByString = 'm.score'
break

default:
throw new Error(`Invalid order by: ${orderBy}!`)
}

orderByString = `${orderByString} ${direction}`

const memberCount = await this.db().one(
`
SELECT count(*) FROM (
SELECT m.id
FROM "members" m
JOIN "memberSegments" ms ON ms."memberId" = m.id
WHERE m."tenantId" = $(tenantId)
AND ms."segmentId" = ANY($(segmentIds)::uuid[])
) as count
`,
{
tenantId,
segmentIds,
},
)

if (countOnly) {
return {
totalCount: Number(memberCount.count),
members: [],
}
}

const members = await this.db().any(
`
SELECT m.id, m.emails
FROM "members" m
JOIN "memberSegments" ms ON ms."memberId" = m.id
WHERE m."tenantId" = $(tenantId)
AND ms."segmentId" = ANY($(segmentIds)::uuid[])
ORDER BY ${orderByString}
LIMIT $(limit) OFFSET $(offset)
`,
{
tenantId,
segmentIds,
limit,
offset,
},
)

return {
totalCount: Number(memberCount.count),
members: members,
}
}
}