Skip to content

Commit 3c9e332

Browse files
authored
Improve locks and upserts in data-sink-worker (#1451)
1 parent e945f2d commit 3c9e332

File tree

6 files changed

+81
-60
lines changed

6 files changed

+81
-60
lines changed

services/apps/data_sink_worker/src/repo/member.repo.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,9 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
5858
): Promise<IDbMember | null> {
5959
return await this.db().oneOrNone(
6060
`${this.selectMemberQuery}
61-
where m.id in (select ms."memberId"
62-
from "memberSegments" ms
63-
inner join "memberIdentities" mi
64-
on ms."tenantId" = mi."tenantId" and ms."memberId" = mi."memberId"
65-
where ms."tenantId" = $(tenantId)
66-
and ms."segmentId" = $(segmentId)
61+
where m.id in (select mi."memberId"
62+
from "memberIdentities" mi
63+
where mi."tenantId" = $(tenantId)
6764
and mi.platform = $(platform)
6865
and mi.username = $(username));
6966
`,

services/apps/data_sink_worker/src/repo/organization.repo.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,7 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
136136
o.founded,
137137
o.attributes
138138
from organizations o
139-
where o."tenantId" = $(tenantId) and o.name = $(name)
140-
and o.id in (select os."organizationId"
141-
from "organizationSegments" os
142-
where os."segmentId" = $(segmentId))`,
139+
where o."tenantId" = $(tenantId) and o.name = $(name)`,
143140
{ tenantId, name, segmentId },
144141
)
145142

@@ -269,16 +266,24 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
269266
return id
270267
}
271268

272-
public async update(id: string, data: IDbUpdateOrganizationData): Promise<void> {
269+
public async update(id: string, data: Partial<IDbUpdateOrganizationData>): Promise<void> {
270+
const keys = Object.keys(data)
271+
keys.push('updatedAt')
272+
// construct dynamic column set
273+
const dynamicColumnSet = new this.dbInstance.helpers.ColumnSet(keys, {
274+
table: {
275+
table: 'organizations',
276+
},
277+
})
273278
const prepared = RepositoryBase.prepare(
274279
{
275280
...data,
276281
updatedAt: new Date(),
277282
},
278-
this.updateOrganizationColumnSet,
283+
dynamicColumnSet,
279284
)
280285

281-
const query = this.dbInstance.helpers.update(prepared, this.updateOrganizationColumnSet)
286+
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
282287
const condition = this.format('where id = $(id)', { id })
283288

284289
const result = await this.db().result(`${query} ${condition}`)

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -800,10 +800,10 @@ export default class ActivityService extends LoggerBase {
800800
)
801801
}
802802
} finally {
803-
// release lock no matter that
803+
// release locks matter what
804804
await releaseLock(
805805
this.redisClient,
806-
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
806+
`member:processing:${tenantId}:${platform}:${username}`,
807807
'check-member-inside-activity-exists',
808808
)
809809
}

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { IDbMember, IDbMemberUpdateData } from '@/repo/member.data'
22
import MemberRepository from '@/repo/member.repo'
3-
import { areArraysEqual, isObjectEmpty, singleOrDefault } from '@crowd/common'
3+
import { firstArrayContainsSecondArray, isObjectEmpty, singleOrDefault } from '@crowd/common'
44
import { DbStore } from '@crowd/database'
55
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
66
import {
@@ -185,9 +185,9 @@ export default class MemberService extends LoggerBase {
185185
await txRepo.addToSegment(id, tenantId, segmentId)
186186

187187
updated = true
188-
await txRepo.addToSegment(id, tenantId, segmentId)
189188
} else {
190189
this.log.debug({ memberId: id }, 'Nothing to update in a member!')
190+
await txRepo.addToSegment(id, tenantId, segmentId)
191191
}
192192

193193
if (toUpdate.identities) {
@@ -419,8 +419,8 @@ export default class MemberService extends LoggerBase {
419419
}
420420

421421
let emails: string[] | undefined
422-
if (member.emails) {
423-
if (!areArraysEqual(member.emails, dbMember.emails)) {
422+
if (member.emails && member.emails.length > 0) {
423+
if (!firstArrayContainsSecondArray(dbMember.emails, member.emails)) {
424424
emails = [...new Set([...member.emails, ...dbMember.emails])]
425425
}
426426
}

services/apps/data_sink_worker/src/service/organization.service.ts

Lines changed: 56 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,33 @@ export class OrganizationService extends LoggerBase {
3636
// if exists in cache update it
3737
const updateData: Partial<IOrganization> = {}
3838
// no need to update name since it's aka primary key
39-
if (data.url) updateData.url = data.url
40-
if (data.description) updateData.description = data.description
41-
if (data.emails) updateData.emails = data.emails
42-
if (data.logo) updateData.logo = data.logo
43-
if (data.tags) updateData.tags = data.tags
44-
if (data.github) updateData.github = data.github as IOrganizationSocial
45-
if (data.twitter) updateData.twitter = data.twitter as IOrganizationSocial
46-
if (data.linkedin) updateData.linkedin = data.linkedin as IOrganizationSocial
47-
if (data.crunchbase) updateData.crunchbase = data.crunchbase as IOrganizationSocial
48-
if (data.employees) updateData.employees = data.employees
49-
if (data.location) updateData.location = data.location
50-
if (data.website) updateData.website = data.website
51-
if (data.type) updateData.type = data.type
52-
if (data.size) updateData.size = data.size
53-
if (data.headline) updateData.headline = data.headline
54-
if (data.industry) updateData.industry = data.industry
55-
if (data.founded) updateData.founded = data.founded
39+
const fields = [
40+
'url',
41+
'description',
42+
'emails',
43+
'logo',
44+
'tags',
45+
'github',
46+
'twitter',
47+
'linkedin',
48+
'crunchbase',
49+
'employees',
50+
'location',
51+
'website',
52+
'type',
53+
'size',
54+
'headline',
55+
'industry',
56+
'founded',
57+
]
58+
fields.forEach((field) => {
59+
if (data[field] && !isEqual(data[field], cached[field])) {
60+
updateData[field] = data[field]
61+
}
62+
})
5663
if (Object.keys(updateData).length > 0) {
5764
await this.repo.updateCache(cached.id, updateData)
65+
cached = { ...cached, ...updateData } // Update the cached data with the new data
5866
}
5967
} else {
6068
// if it doesn't exists in cache create it
@@ -91,11 +99,9 @@ export class OrganizationService extends LoggerBase {
9199
// now check if exists in this tenant
92100
const existing = await this.repo.findByName(tenantId, segmentId, data.name)
93101

94-
const displayName = existing?.displayName ? existing?.displayName : data?.name
95-
96102
let attributes = existing?.attributes
97103

98-
if (data.attributes) {
104+
if (data?.attributes) {
99105
const temp = mergeWith({}, existing?.attributes, data?.attributes)
100106
if (!isEqual(temp, existing?.attributes)) {
101107
attributes = temp
@@ -104,28 +110,37 @@ export class OrganizationService extends LoggerBase {
104110

105111
if (existing) {
106112
// if it does exists update it
107-
await this.repo.update(existing.id, {
108-
name: cached.name,
109-
displayName,
110-
url: cached.url,
111-
description: cached.description,
112-
emails: cached.emails,
113-
logo: cached.logo,
114-
tags: cached.tags,
115-
github: cached.github,
116-
twitter: cached.twitter,
117-
linkedin: cached.linkedin,
118-
crunchbase: cached.crunchbase,
119-
employees: cached.employees,
120-
location: cached.location,
121-
website: cached.website,
122-
type: cached.type,
123-
size: cached.size,
124-
headline: cached.headline,
125-
industry: cached.industry,
126-
founded: cached.founded,
127-
attributes,
113+
const updateData: Partial<IOrganization> = {}
114+
const fields = [
115+
'name',
116+
'displayName',
117+
'url',
118+
'description',
119+
'emails',
120+
'logo',
121+
'tags',
122+
'github',
123+
'twitter',
124+
'linkedin',
125+
'crunchbase',
126+
'employees',
127+
'location',
128+
'website',
129+
'type',
130+
'size',
131+
'headline',
132+
'industry',
133+
'founded',
134+
'attributes',
135+
]
136+
fields.forEach((field) => {
137+
if (cached[field] && !isEqual(cached[field], existing[field])) {
138+
updateData[field] = cached[field]
139+
}
128140
})
141+
if (Object.keys(updateData).length > 0) {
142+
await this.repo.update(existing.id, updateData)
143+
}
129144

130145
return existing.id
131146
} else {

services/libs/common/src/array.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,7 @@ export const areArraysEqual = <T>(a: T[], b: T[]): boolean => {
125125

126126
return true
127127
}
128+
129+
export const firstArrayContainsSecondArray = <T>(array1: T[], array2: T[]): boolean => {
130+
return array2.every((val) => array1.includes(val))
131+
}

0 commit comments

Comments
 (0)