Skip to content

Commit a9a53ae

Browse files
committed
search sync worker syncOrganizationMembers queue messages
1 parent 6553bd8 commit a9a53ae

File tree

7 files changed

+122
-24
lines changed

7 files changed

+122
-24
lines changed

backend/src/database/repositories/organizationRepository.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ class OrganizationRepository {
808808
fromOrganizationId: string,
809809
toOrganizationId: string,
810810
options: IRepositoryOptions,
811-
): Promise<string[]> {
811+
): Promise<void> {
812812
const seq = SequelizeRepository.getSequelize(options)
813813

814814
const transaction = SequelizeRepository.getTransaction(options)
@@ -817,8 +817,6 @@ class OrganizationRepository {
817817

818818
let addRoles: IMemberOrganization[] = []
819819

820-
const updatedMembers = new Set<string>()
821-
822820
// first, handle members that belong to both organizations,
823821
// then make a full update on remaining org2 members (that doesn't belong to o1)
824822
const memberRolesWithBothOrganizations = await this.findMembersBelongToBothOrganizations(
@@ -917,50 +915,38 @@ class OrganizationRepository {
917915

918916
for (const removeRole of removeRoles) {
919917
await this.removeMemberRole(removeRole, options)
920-
updatedMembers.add(removeRole.memberId)
921918
}
922919

923920
for (const addRole of addRoles) {
924921
await this.addMemberRole(addRole, options)
925-
updatedMembers.add(addRole.memberId)
926922
}
927923

928924
addRoles = []
929925
removeRoles = []
930926
}
931927

932928
// update rest of the o2 members
933-
const results = await seq.query(
929+
await seq.query(
934930
`
935-
WITH updated AS (
936-
UPDATE "memberOrganizations"
931+
UPDATE "memberOrganizations"
937932
SET "organizationId" = :toOrganizationId
938933
WHERE "organizationId" = :fromOrganizationId
939934
AND "deletedAt" IS NULL
940935
AND "memberId" NOT IN (
941936
SELECT "memberId"
942937
FROM "memberOrganizations"
943938
WHERE "organizationId" = :toOrganizationId
944-
)
945-
RETURNING "memberId"
946-
)
947-
SELECT "memberId" FROM updated;
939+
);
948940
`,
949941
{
950942
replacements: {
951943
toOrganizationId,
952944
fromOrganizationId,
953945
},
954-
type: QueryTypes.SELECT,
946+
type: QueryTypes.UPDATE,
955947
transaction,
956948
},
957949
)
958-
959-
for (const result of results) {
960-
updatedMembers.add((result as any).memberId)
961-
}
962-
963-
return Array.from(updatedMembers)
964950
}
965951

966952
static async getOrganizationSegments(

backend/src/services/organizationService.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ export default class OrganizationService extends LoggerBase {
104104
await txService.update(originalId, toUpdate, repoOptions.transaction)
105105

106106
// update members that belong to source organization to destination org
107-
const updatedMembers = await OrganizationRepository.moveMembersBetweenOrganizations(
107+
await OrganizationRepository.moveMembersBetweenOrganizations(
108108
toMergeId,
109109
originalId,
110110
repoOptions,
@@ -136,10 +136,8 @@ export default class OrganizationService extends LoggerBase {
136136
await searchSyncEmitter.triggerOrganizationSync(this.options.currentTenant.id, originalId)
137137
await searchSyncEmitter.triggerRemoveOrganization(this.options.currentTenant.id, toMergeId)
138138

139-
while (updatedMembers.length > 0) {
140-
const updatedMember = updatedMembers.shift()
141-
await searchSyncEmitter.triggerMemberSync(this.options.currentTenant.id, updatedMember)
142-
}
139+
// sync organization members
140+
await searchSyncEmitter.triggerOrganizationMembersSync(originalId)
143141

144142
// sync organization activities
145143
await searchSyncEmitter.triggerOrganizationActivitiesSync(originalId)

services/apps/search_sync_worker/src/queue/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {
127127
.catch((err) => this.log.error(err, 'Error while syncing tenant members!'))
128128
}
129129

130+
break
131+
case SearchSyncWorkerQueueMessageType.SYNC_ORGANIZATION_MEMBERS:
132+
if (data.organizationId) {
133+
this.initMemberService()
134+
.syncOrganizationMembers(data.organizationId)
135+
.catch((err) => this.log.error(err, 'Error while syncing organization members!'))
136+
}
137+
130138
break
131139
// this one taks a while so we can't relly on it to be finished in time and the queue message might pop up again so we immediatelly return
132140
case SearchSyncWorkerQueueMessageType.CLEANUP_TENANT_MEMBERS:

services/apps/search_sync_worker/src/repo/member.repo.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,57 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
8989
return results.map((r) => r.id)
9090
}
9191

92+
public async getOrganizationMembersForSync(
93+
organizationId: string,
94+
perPage: number,
95+
lastId?: string,
96+
): Promise<string[]> {
97+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
98+
let results: any[]
99+
100+
if (lastId) {
101+
results = await this.db().any(
102+
`
103+
select distinct mo."memberId"
104+
from "memberOrganizations" mo
105+
where mo."organizationId" = $(organizationId) and
106+
mo."deletedAt" is null and
107+
mo."memberId" > $(lastId) and
108+
(
109+
exists (select 1 from activities where "memberId" = mo."memberId") or
110+
exists (select 1 from members where "memberId" = mo."memberId" and "manuallyCreated")
111+
) and
112+
exists (select 1 from "memberIdentities" where "memberId" = mo."memberId")
113+
order by mo."memberId"
114+
limit ${perPage};`,
115+
{
116+
organizationId,
117+
lastId,
118+
},
119+
)
120+
} else {
121+
results = await this.db().any(
122+
`
123+
select distinct mo."memberId"
124+
from "memberOrganizations" mo
125+
where mo."organizationId" = $(organizationId) and
126+
mo."deletedAt" is null and
127+
(
128+
exists (select 1 from activities where "memberId" = mo."memberId") or
129+
exists (select 1 from members where "memberId" = mo."memberId" and "manuallyCreated")
130+
) and
131+
exists (select 1 from "memberIdentities" where "memberId" = mo."memberId")
132+
order by mo."memberId"
133+
limit ${perPage};`,
134+
{
135+
organizationId,
136+
},
137+
)
138+
}
139+
140+
return results.map((r) => r.memberId)
141+
}
142+
92143
public async getRemainingTenantMembersForSync(
93144
tenantId: string,
94145
page: number,

services/apps/search_sync_worker/src/service/member.sync.service.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,50 @@ export class MemberSyncService extends LoggerBase {
265265
)
266266
}
267267

268+
public async syncOrganizationMembers(organizationId: string, batchSize = 200): Promise<void> {
269+
this.log.debug({ organizationId }, 'Syncing all organization members!')
270+
let docCount = 0
271+
let memberCount = 0
272+
273+
const now = new Date()
274+
275+
await logExecutionTime(
276+
async () => {
277+
let memberIds = await this.memberRepo.getOrganizationMembersForSync(
278+
organizationId,
279+
batchSize,
280+
)
281+
282+
while (memberIds.length > 0) {
283+
const { membersSynced, documentsIndexed } = await this.syncMembers(memberIds)
284+
285+
docCount += documentsIndexed
286+
memberCount += membersSynced
287+
288+
const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000
289+
this.log.info(
290+
{ organizationId },
291+
`Synced ${memberCount} members! Speed: ${Math.round(
292+
memberCount / diffInSeconds,
293+
)} members/second!`,
294+
)
295+
memberIds = await this.memberRepo.getOrganizationMembersForSync(
296+
organizationId,
297+
batchSize,
298+
memberIds[memberIds.length - 1],
299+
)
300+
}
301+
},
302+
this.log,
303+
'sync-organization-members',
304+
)
305+
306+
this.log.info(
307+
{ organizationId },
308+
`Synced total of ${memberCount} members with ${docCount} documents!`,
309+
)
310+
}
311+
268312
public async syncMembers(memberIds: string[]): Promise<IMemberSyncResult> {
269313
this.log.debug({ memberIds }, 'Syncing members!')
270314

services/libs/sqs/src/instances/searchSyncWorker.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ export class SearchSyncWorkerEmitter extends SqsQueueEmitter {
3535
})
3636
}
3737

38+
public async triggerOrganizationMembersSync(organizationId: string) {
39+
if (!organizationId) {
40+
throw new Error('organizationId is required!')
41+
}
42+
await this.sendMessage(organizationId, {
43+
type: SearchSyncWorkerQueueMessageType.SYNC_ORGANIZATION_MEMBERS,
44+
organizationId,
45+
})
46+
}
47+
3848
public async triggerRemoveMember(tenantId: string, memberId: string) {
3949
if (!tenantId) {
4050
throw new Error('tenantId is required!')

services/libs/types/src/queue/search_sync_worker/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export enum SearchSyncWorkerQueueMessageType {
22
SYNC_MEMBER = 'sync_member',
33
SYNC_TENANT_MEMBERS = 'sync_tenant_members',
4+
SYNC_ORGANIZATION_MEMBERS = 'sync_organization_members',
45
REMOVE_MEMBER = 'remove_member',
56
CLEANUP_TENANT_MEMBERS = 'cleanup_tenant_members',
67

0 commit comments

Comments
 (0)