diff --git a/backend/src/services/__tests__/organizationService.test.ts b/backend/src/services/__tests__/organizationService.test.ts index 47106986e0..ba8cf1bbdc 100644 --- a/backend/src/services/__tests__/organizationService.test.ts +++ b/backend/src/services/__tests__/organizationService.test.ts @@ -107,9 +107,6 @@ describe('OrganizationService tests', () => { } const added = await service.createOrUpdate(toAdd) - console.log('added is: ') - console.log(added) - expect(added.identities[0].url).toEqual(toAdd.identities[0].url) expect(added.identities[0].name).toEqual(toAdd.identities[0].name) expect(added.description).toEqual(expectedEnriched.description) diff --git a/services/apps/data_sink_worker/package.json b/services/apps/data_sink_worker/package.json index 64d235cb60..79b55520f0 100644 --- a/services/apps/data_sink_worker/package.json +++ b/services/apps/data_sink_worker/package.json @@ -18,7 +18,8 @@ "script:restart-all-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-all-failed-results.ts", "script:restart-x-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-x-failed-results.ts", "script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts", - "script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts" + "script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts", + "script:assign-member-to-org": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/assign-member-to-org.ts" }, "dependencies": { "@crowd/common": "file:../../libs/common", diff --git a/services/apps/data_sink_worker/src/bin/assign-member-to-org.ts b/services/apps/data_sink_worker/src/bin/assign-member-to-org.ts new file mode 100644 index 0000000000..9d65556e0f --- /dev/null +++ b/services/apps/data_sink_worker/src/bin/assign-member-to-org.ts @@ -0,0 +1,100 @@ +import { DB_CONFIG, SQS_CONFIG } from '@/conf' +import { DbStore, getDbConnection } from '@crowd/database' +import { getServiceLogger } from '@crowd/logging' +import { + DataSinkWorkerEmitter, + NodejsWorkerEmitter, + SearchSyncWorkerEmitter, + getSqsClient, +} from '@crowd/sqs' +import MemberRepository from '@/repo/member.repo' +import DataSinkRepository from '@/repo/dataSink.repo' +import MemberService from '@/service/member.service' +import { OrganizationService } from '@/service/organization.service' + +const log = getServiceLogger() + +const processArguments = process.argv.slice(2) + +if (processArguments.length !== 1) { + log.error('Expected 1 argument: tenantId') + process.exit(1) +} + +const tenantId = processArguments[0] + +setImmediate(async () => { + const sqsClient = getSqsClient(SQS_CONFIG()) + const emitter = new DataSinkWorkerEmitter(sqsClient, log) + await emitter.init() + + const dbConnection = await getDbConnection(DB_CONFIG()) + const store = new DbStore(log, dbConnection) + + const dataSinkRepo = new DataSinkRepository(store, log) + const memberRepo = new MemberRepository(store, log) + + const segmentIds = await dataSinkRepo.getSegmentIds(tenantId) + const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id + + const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, log) + await nodejsWorkerEmitter.init() + + const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, log) + await searchSyncWorkerEmitter.init() + + const memberService = new MemberService(store, nodejsWorkerEmitter, searchSyncWorkerEmitter, log) + const orgService = new OrganizationService(store, log) + + const limit = 100 + let offset = 0 + let processedMembers = 0 + + try { + const { totalCount } = await memberRepo.getMemberIdsAndEmailsAndCount(tenantId, segmentIds, { + limit, + offset, + countOnly: true, + }) + + log.info({ tenantId }, `Total members found in the tenant: ${totalCount}`) + + do { + const { members } = await memberRepo.getMemberIdsAndEmailsAndCount(tenantId, segmentIds, { + limit, + offset, + }) + + // member -> organization based on email domain + for (const member of members) { + if (member.emails) { + const orgs = await memberService.assignOrganizationByEmailDomain( + tenantId, + segmentId, + member.emails, + ) + + if (orgs.length > 0) { + orgService.addToMember(tenantId, segmentId, member.id, orgs) + + for (const org of orgs) { + await searchSyncWorkerEmitter.triggerOrganizationSync(tenantId, org.id) + } + + await searchSyncWorkerEmitter.triggerMemberSync(tenantId, member.id) + } + } + + processedMembers++ + log.info(`Processed member ${member.id}. Progress: ${processedMembers}/${totalCount}`) + } + offset += limit + } while (totalCount > offset) + + log.info(`Member to organization association completed for the tenant ${tenantId}`) + process.exit(0) + } catch (err) { + log.error(`Failed to assign member to organizations for the tenant ${tenantId}`, err) + process.exit(1) + } +}) diff --git a/services/apps/data_sink_worker/src/repo/dataSink.repo.ts b/services/apps/data_sink_worker/src/repo/dataSink.repo.ts index 53668acc32..a50a8f7f83 100644 --- a/services/apps/data_sink_worker/src/repo/dataSink.repo.ts +++ b/services/apps/data_sink_worker/src/repo/dataSink.repo.ts @@ -147,4 +147,12 @@ export default class DataSinkRepository extends RepositoryBase { + const result = await this.db().any(`select id from "segments" where "tenantId" = $(tenantId)`, { + tenantId, + }) + + return result.map((r) => r.id) + } } diff --git a/services/apps/data_sink_worker/src/repo/member.repo.ts b/services/apps/data_sink_worker/src/repo/member.repo.ts index 37cfa8c1ce..1a19af6675 100644 --- a/services/apps/data_sink_worker/src/repo/member.repo.ts +++ b/services/apps/data_sink_worker/src/repo/member.repo.ts @@ -269,4 +269,80 @@ export default class MemberRepository extends RepositoryBase { }, ) } + + public async getMemberIdsAndEmailsAndCount( + tenantId: string, + segmentIds: string[], + { limit = 20, offset = 0, orderBy = 'joinedAt_DESC', countOnly = false }, + ) { + let orderByString = '' + const orderByParts = orderBy.split('_') + const direction = orderByParts[1].toLowerCase() + + switch (orderByParts[0]) { + case 'joinedAt': + orderByString = 'm."joinedAt"' + break + case 'displayName': + orderByString = 'm."displayName"' + break + case 'reach': + orderByString = "(m.reach ->> 'total')::int" + break + case 'score': + orderByString = 'm.score' + break + + default: + throw new Error(`Invalid order by: ${orderBy}!`) + } + + orderByString = `${orderByString} ${direction}` + + const memberCount = await this.db().one( + ` + SELECT count(*) FROM ( + SELECT m.id + FROM "members" m + JOIN "memberSegments" ms ON ms."memberId" = m.id + WHERE m."tenantId" = $(tenantId) + AND ms."segmentId" = ANY($(segmentIds)::uuid[]) + ) as count + `, + { + tenantId, + segmentIds, + }, + ) + + if (countOnly) { + return { + totalCount: Number(memberCount.count), + members: [], + } + } + + const members = await this.db().any( + ` + SELECT m.id, m.emails + FROM "members" m + JOIN "memberSegments" ms ON ms."memberId" = m.id + WHERE m."tenantId" = $(tenantId) + AND ms."segmentId" = ANY($(segmentIds)::uuid[]) + ORDER BY ${orderByString} + LIMIT $(limit) OFFSET $(offset) + `, + { + tenantId, + segmentIds, + limit, + offset, + }, + ) + + return { + totalCount: Number(memberCount.count), + members: members, + } + } }