From e5349e8e6be1292b93b1330d8c974c36577e83ec Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Fri, 8 Sep 2023 11:07:11 +0300 Subject: [PATCH 1/8] add member locks when activity doesn't exist --- .../src/service/activity.service.ts | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index a1b4b67cc0..d69f9f2335 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -658,6 +658,15 @@ export default class ActivityService extends LoggerBase { // release lock for member inside activity exists - this migth be redundant, but just in case } else { + // acquiring lock for member inside activity not exists + await acquireLock( + this.redisClient, + `member:processing:${tenantId}:${platform}:${username}`, + 'check-member-inside-activity-not-exists', + MEMBER_LOCK_EXPIRE_AFTER, + MEMBER_LOCK_TIMEOUT_AFTER, + ) + this.log.trace('We did not find an existing activity. Creating a new one.') createActivity = true @@ -683,6 +692,12 @@ export default class ActivityService extends LoggerBase { }, dbMember, false, + async () => + await releaseLock( + this.redisClient, + `member:processing:${tenantId}:${platform}:${username}`, + 'check-member-inside-activity-not-exists', + ), ) memberId = dbMember.id } else { @@ -705,6 +720,12 @@ export default class ActivityService extends LoggerBase { organizations: member.organizations, }, false, + async () => + await releaseLock( + this.redisClient, + `member:processing:${tenantId}:${platform}:${username}`, + 'check-member-inside-activity-not-exists', + ), ) } @@ -800,12 +821,19 @@ export default class ActivityService extends LoggerBase { ) } } finally { - // release lock no matter that - await releaseLock( - this.redisClient, - `member:processing:${tenantId}:${segmentId}:${platform}:${username}`, - 'check-member-inside-activity-exists', - ) + // release locks in parallel no matter what + await Promise.all([ + releaseLock( + this.redisClient, + `member:processing:${tenantId}:${platform}:${username}`, + 'check-member-inside-activity-exists', + ), + releaseLock( + this.redisClient, + `member:processing:${tenantId}:${platform}:${username}`, + 'check-member-inside-activity-not-exists', + ), + ]) } }) From 9eaa45b90f1f4ed34236467e3f59aa0bf07d615a Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Fri, 8 Sep 2023 12:16:56 +0300 Subject: [PATCH 2/8] try to lookup members without segments --- services/apps/data_sink_worker/src/repo/member.repo.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/repo/member.repo.ts b/services/apps/data_sink_worker/src/repo/member.repo.ts index 98f7f8d8aa..d1c4cb52d1 100644 --- a/services/apps/data_sink_worker/src/repo/member.repo.ts +++ b/services/apps/data_sink_worker/src/repo/member.repo.ts @@ -63,7 +63,6 @@ export default class MemberRepository extends RepositoryBase { inner join "memberIdentities" mi on ms."tenantId" = mi."tenantId" and ms."memberId" = mi."memberId" where ms."tenantId" = $(tenantId) - and ms."segmentId" = $(segmentId) and mi.platform = $(platform) and mi.username = $(username)); `, From 9f615a4a3579cafb1a42cd721c27523b6e5fb203 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Fri, 8 Sep 2023 12:43:35 +0300 Subject: [PATCH 3/8] remove check on segments when searching organizations --- services/apps/data_sink_worker/src/repo/organization.repo.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/services/apps/data_sink_worker/src/repo/organization.repo.ts b/services/apps/data_sink_worker/src/repo/organization.repo.ts index 5d1d56a71b..522087d755 100644 --- a/services/apps/data_sink_worker/src/repo/organization.repo.ts +++ b/services/apps/data_sink_worker/src/repo/organization.repo.ts @@ -136,10 +136,7 @@ export class OrganizationRepository extends RepositoryBase Date: Fri, 8 Sep 2023 13:52:24 +0300 Subject: [PATCH 4/8] optimize member email updates --- .../apps/data_sink_worker/src/service/member.service.ts | 6 +++--- services/libs/common/src/array.ts | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 0ee2b831b6..d57fe8f105 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -1,6 +1,6 @@ import { IDbMember, IDbMemberUpdateData } from '@/repo/member.data' import MemberRepository from '@/repo/member.repo' -import { areArraysEqual, isObjectEmpty, singleOrDefault } from '@crowd/common' +import { firstArrayContainsSecondArray, isObjectEmpty, singleOrDefault } from '@crowd/common' import { DbStore } from '@crowd/database' import { Logger, LoggerBase, getChildLogger } from '@crowd/logging' import { @@ -419,8 +419,8 @@ export default class MemberService extends LoggerBase { } let emails: string[] | undefined - if (member.emails) { - if (!areArraysEqual(member.emails, dbMember.emails)) { + if (member.emails && member.emails.length > 0) { + if (!firstArrayContainsSecondArray(dbMember.emails, member.emails)) { emails = [...new Set([...member.emails, ...dbMember.emails])] } } diff --git a/services/libs/common/src/array.ts b/services/libs/common/src/array.ts index 5a0ee7bd8e..7bb53f773a 100644 --- a/services/libs/common/src/array.ts +++ b/services/libs/common/src/array.ts @@ -125,3 +125,7 @@ export const areArraysEqual = (a: T[], b: T[]): boolean => { return true } + +export const firstArrayContainsSecondArray = (array1: T[], array2: T[]): boolean => { + return array2.every((val) => array1.includes(val)) +} From f55c5099a49032a52c173c0f509342e7ae27e8e7 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Fri, 8 Sep 2023 14:01:45 +0300 Subject: [PATCH 5/8] add member to segment even if there is nothing to update --- services/apps/data_sink_worker/src/service/member.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index d57fe8f105..1fcc6355b4 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -185,9 +185,9 @@ export default class MemberService extends LoggerBase { await txRepo.addToSegment(id, tenantId, segmentId) updated = true - await txRepo.addToSegment(id, tenantId, segmentId) } else { this.log.debug({ memberId: id }, 'Nothing to update in a member!') + await txRepo.addToSegment(id, tenantId, segmentId) } if (toUpdate.identities) { From 9f639b4e88b1764f7c8ac4cbd7c537aab39ec8ca Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Fri, 8 Sep 2023 15:58:54 +0300 Subject: [PATCH 6/8] remove member locks when activity not exists --- .../src/service/activity.service.ts | 40 +++---------------- 1 file changed, 6 insertions(+), 34 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index d69f9f2335..900f68ee16 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -658,15 +658,6 @@ export default class ActivityService extends LoggerBase { // release lock for member inside activity exists - this migth be redundant, but just in case } else { - // acquiring lock for member inside activity not exists - await acquireLock( - this.redisClient, - `member:processing:${tenantId}:${platform}:${username}`, - 'check-member-inside-activity-not-exists', - MEMBER_LOCK_EXPIRE_AFTER, - MEMBER_LOCK_TIMEOUT_AFTER, - ) - this.log.trace('We did not find an existing activity. Creating a new one.') createActivity = true @@ -692,12 +683,6 @@ export default class ActivityService extends LoggerBase { }, dbMember, false, - async () => - await releaseLock( - this.redisClient, - `member:processing:${tenantId}:${platform}:${username}`, - 'check-member-inside-activity-not-exists', - ), ) memberId = dbMember.id } else { @@ -720,12 +705,6 @@ export default class ActivityService extends LoggerBase { organizations: member.organizations, }, false, - async () => - await releaseLock( - this.redisClient, - `member:processing:${tenantId}:${platform}:${username}`, - 'check-member-inside-activity-not-exists', - ), ) } @@ -821,19 +800,12 @@ export default class ActivityService extends LoggerBase { ) } } finally { - // release locks in parallel no matter what - await Promise.all([ - releaseLock( - this.redisClient, - `member:processing:${tenantId}:${platform}:${username}`, - 'check-member-inside-activity-exists', - ), - releaseLock( - this.redisClient, - `member:processing:${tenantId}:${platform}:${username}`, - 'check-member-inside-activity-not-exists', - ), - ]) + // release locks matter what + await releaseLock( + this.redisClient, + `member:processing:${tenantId}:${platform}:${username}`, + 'check-member-inside-activity-exists', + ) } }) From fcf1da36f6babc90bbaf1e622a87b91150b9922c Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Fri, 8 Sep 2023 16:37:12 +0300 Subject: [PATCH 7/8] optimize organizations updates --- .../src/repo/organization.repo.ts | 14 ++- .../src/service/organization.service.ts | 97 +++++++++++-------- 2 files changed, 67 insertions(+), 44 deletions(-) diff --git a/services/apps/data_sink_worker/src/repo/organization.repo.ts b/services/apps/data_sink_worker/src/repo/organization.repo.ts index 522087d755..c3f3cecfe1 100644 --- a/services/apps/data_sink_worker/src/repo/organization.repo.ts +++ b/services/apps/data_sink_worker/src/repo/organization.repo.ts @@ -266,16 +266,24 @@ export class OrganizationRepository extends RepositoryBase { + public async update(id: string, data: Partial): Promise { + const keys = Object.keys(data) + keys.push('updatedAt') + // construct dynamic column set + const dynamicColumnSet = new this.dbInstance.helpers.ColumnSet(keys, { + table: { + table: 'organizations', + }, + }) const prepared = RepositoryBase.prepare( { ...data, updatedAt: new Date(), }, - this.updateOrganizationColumnSet, + dynamicColumnSet, ) - const query = this.dbInstance.helpers.update(prepared, this.updateOrganizationColumnSet) + const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet) const condition = this.format('where id = $(id)', { id }) const result = await this.db().result(`${query} ${condition}`) diff --git a/services/apps/data_sink_worker/src/service/organization.service.ts b/services/apps/data_sink_worker/src/service/organization.service.ts index bd9399f167..70dbbd82ca 100644 --- a/services/apps/data_sink_worker/src/service/organization.service.ts +++ b/services/apps/data_sink_worker/src/service/organization.service.ts @@ -36,25 +36,33 @@ export class OrganizationService extends LoggerBase { // if exists in cache update it const updateData: Partial = {} // no need to update name since it's aka primary key - if (data.url) updateData.url = data.url - if (data.description) updateData.description = data.description - if (data.emails) updateData.emails = data.emails - if (data.logo) updateData.logo = data.logo - if (data.tags) updateData.tags = data.tags - if (data.github) updateData.github = data.github as IOrganizationSocial - if (data.twitter) updateData.twitter = data.twitter as IOrganizationSocial - if (data.linkedin) updateData.linkedin = data.linkedin as IOrganizationSocial - if (data.crunchbase) updateData.crunchbase = data.crunchbase as IOrganizationSocial - if (data.employees) updateData.employees = data.employees - if (data.location) updateData.location = data.location - if (data.website) updateData.website = data.website - if (data.type) updateData.type = data.type - if (data.size) updateData.size = data.size - if (data.headline) updateData.headline = data.headline - if (data.industry) updateData.industry = data.industry - if (data.founded) updateData.founded = data.founded + const fields = [ + 'url', + 'description', + 'emails', + 'logo', + 'tags', + 'github', + 'twitter', + 'linkedin', + 'crunchbase', + 'employees', + 'location', + 'website', + 'type', + 'size', + 'headline', + 'industry', + 'founded', + ] + fields.forEach((field) => { + if (data[field] && !isEqual(data[field], cached[field])) { + updateData[field] = data[field] + } + }) if (Object.keys(updateData).length > 0) { await this.repo.updateCache(cached.id, updateData) + cached = { ...cached, ...updateData } // Update the cached data with the new data } } else { // if it doesn't exists in cache create it @@ -91,11 +99,9 @@ export class OrganizationService extends LoggerBase { // now check if exists in this tenant const existing = await this.repo.findByName(tenantId, segmentId, data.name) - const displayName = existing?.displayName ? existing?.displayName : data?.name - let attributes = existing?.attributes - if (data.attributes) { + if (data?.attributes) { const temp = mergeWith({}, existing?.attributes, data?.attributes) if (!isEqual(temp, existing?.attributes)) { attributes = temp @@ -104,28 +110,37 @@ export class OrganizationService extends LoggerBase { if (existing) { // if it does exists update it - await this.repo.update(existing.id, { - name: cached.name, - displayName, - url: cached.url, - description: cached.description, - emails: cached.emails, - logo: cached.logo, - tags: cached.tags, - github: cached.github, - twitter: cached.twitter, - linkedin: cached.linkedin, - crunchbase: cached.crunchbase, - employees: cached.employees, - location: cached.location, - website: cached.website, - type: cached.type, - size: cached.size, - headline: cached.headline, - industry: cached.industry, - founded: cached.founded, - attributes, + const updateData: Partial = {} + const fields = [ + 'name', + 'displayName', + 'url', + 'description', + 'emails', + 'logo', + 'tags', + 'github', + 'twitter', + 'linkedin', + 'crunchbase', + 'employees', + 'location', + 'website', + 'type', + 'size', + 'headline', + 'industry', + 'founded', + 'attributes', + ] + fields.forEach((field) => { + if (cached[field] && !isEqual(cached[field], existing[field])) { + updateData[field] = cached[field] + } }) + if (Object.keys(updateData).length > 0) { + await this.repo.update(existing.id, updateData) + } return existing.id } else { From 3842af987109213ad6549c54bf3887d3c3ab0ee4 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Fri, 8 Sep 2023 18:56:05 +0300 Subject: [PATCH 8/8] simplify member search query --- services/apps/data_sink_worker/src/repo/member.repo.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/services/apps/data_sink_worker/src/repo/member.repo.ts b/services/apps/data_sink_worker/src/repo/member.repo.ts index d1c4cb52d1..37cfa8c1ce 100644 --- a/services/apps/data_sink_worker/src/repo/member.repo.ts +++ b/services/apps/data_sink_worker/src/repo/member.repo.ts @@ -58,11 +58,9 @@ export default class MemberRepository extends RepositoryBase { ): Promise { return await this.db().oneOrNone( `${this.selectMemberQuery} - where m.id in (select ms."memberId" - from "memberSegments" ms - inner join "memberIdentities" mi - on ms."tenantId" = mi."tenantId" and ms."memberId" = mi."memberId" - where ms."tenantId" = $(tenantId) + where m.id in (select mi."memberId" + from "memberIdentities" mi + where mi."tenantId" = $(tenantId) and mi.platform = $(platform) and mi.username = $(username)); `,