Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions services/apps/data_sink_worker/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
): Promise<IDbMember | null> {
return await this.db().oneOrNone(
`${this.selectMemberQuery}
where m.id in (select ms."memberId"
from "memberSegments" ms
inner join "memberIdentities" mi
on ms."tenantId" = mi."tenantId" and ms."memberId" = mi."memberId"
where ms."tenantId" = $(tenantId)
and ms."segmentId" = $(segmentId)
where m.id in (select mi."memberId"
from "memberIdentities" mi
where mi."tenantId" = $(tenantId)
and mi.platform = $(platform)
and mi.username = $(username));
`,
Expand Down
19 changes: 12 additions & 7 deletions services/apps/data_sink_worker/src/repo/organization.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
o.founded,
o.attributes
from organizations o
where o."tenantId" = $(tenantId) and o.name = $(name)
and o.id in (select os."organizationId"
from "organizationSegments" os
where os."segmentId" = $(segmentId))`,
where o."tenantId" = $(tenantId) and o.name = $(name)`,
{ tenantId, name, segmentId },
)

Expand Down Expand Up @@ -269,16 +266,24 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
return id
}

public async update(id: string, data: IDbUpdateOrganizationData): Promise<void> {
public async update(id: string, data: Partial<IDbUpdateOrganizationData>): Promise<void> {
const keys = Object.keys(data)
keys.push('updatedAt')
// construct dynamic column set
const dynamicColumnSet = new this.dbInstance.helpers.ColumnSet(keys, {
table: {
table: 'organizations',
},
})
const prepared = RepositoryBase.prepare(
{
...data,
updatedAt: new Date(),
},
this.updateOrganizationColumnSet,
dynamicColumnSet,
)

const query = this.dbInstance.helpers.update(prepared, this.updateOrganizationColumnSet)
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
const condition = this.format('where id = $(id)', { id })

const result = await this.db().result(`${query} ${condition}`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,10 @@ export default class ActivityService extends LoggerBase {
)
}
} finally {
// release lock no matter that
// release locks matter what
await releaseLock(
this.redisClient,
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
`member:processing:${tenantId}:${platform}:${username}`,
'check-member-inside-activity-exists',
)
}
Expand Down
8 changes: 4 additions & 4 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IDbMember, IDbMemberUpdateData } from '@/repo/member.data'
import MemberRepository from '@/repo/member.repo'
import { areArraysEqual, isObjectEmpty, singleOrDefault } from '@crowd/common'
import { firstArrayContainsSecondArray, isObjectEmpty, singleOrDefault } from '@crowd/common'
import { DbStore } from '@crowd/database'
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
import {
Expand Down Expand Up @@ -185,9 +185,9 @@ export default class MemberService extends LoggerBase {
await txRepo.addToSegment(id, tenantId, segmentId)

updated = true
await txRepo.addToSegment(id, tenantId, segmentId)
} else {
this.log.debug({ memberId: id }, 'Nothing to update in a member!')
await txRepo.addToSegment(id, tenantId, segmentId)
}

if (toUpdate.identities) {
Expand Down Expand Up @@ -419,8 +419,8 @@ export default class MemberService extends LoggerBase {
}

let emails: string[] | undefined
if (member.emails) {
if (!areArraysEqual(member.emails, dbMember.emails)) {
if (member.emails && member.emails.length > 0) {
if (!firstArrayContainsSecondArray(dbMember.emails, member.emails)) {
emails = [...new Set([...member.emails, ...dbMember.emails])]
}
}
Expand Down
97 changes: 56 additions & 41 deletions services/apps/data_sink_worker/src/service/organization.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,33 @@ export class OrganizationService extends LoggerBase {
// if exists in cache update it
const updateData: Partial<IOrganization> = {}
// no need to update name since it's aka primary key
if (data.url) updateData.url = data.url
if (data.description) updateData.description = data.description
if (data.emails) updateData.emails = data.emails
if (data.logo) updateData.logo = data.logo
if (data.tags) updateData.tags = data.tags
if (data.github) updateData.github = data.github as IOrganizationSocial
if (data.twitter) updateData.twitter = data.twitter as IOrganizationSocial
if (data.linkedin) updateData.linkedin = data.linkedin as IOrganizationSocial
if (data.crunchbase) updateData.crunchbase = data.crunchbase as IOrganizationSocial
if (data.employees) updateData.employees = data.employees
if (data.location) updateData.location = data.location
if (data.website) updateData.website = data.website
if (data.type) updateData.type = data.type
if (data.size) updateData.size = data.size
if (data.headline) updateData.headline = data.headline
if (data.industry) updateData.industry = data.industry
if (data.founded) updateData.founded = data.founded
const fields = [
'url',
'description',
'emails',
'logo',
'tags',
'github',
'twitter',
'linkedin',
'crunchbase',
'employees',
'location',
'website',
'type',
'size',
'headline',
'industry',
'founded',
]
fields.forEach((field) => {
if (data[field] && !isEqual(data[field], cached[field])) {
updateData[field] = data[field]
}
})
if (Object.keys(updateData).length > 0) {
await this.repo.updateCache(cached.id, updateData)
cached = { ...cached, ...updateData } // Update the cached data with the new data
}
} else {
// if it doesn't exists in cache create it
Expand Down Expand Up @@ -91,11 +99,9 @@ export class OrganizationService extends LoggerBase {
// now check if exists in this tenant
const existing = await this.repo.findByName(tenantId, segmentId, data.name)

const displayName = existing?.displayName ? existing?.displayName : data?.name

let attributes = existing?.attributes

if (data.attributes) {
if (data?.attributes) {
const temp = mergeWith({}, existing?.attributes, data?.attributes)
if (!isEqual(temp, existing?.attributes)) {
attributes = temp
Expand All @@ -104,28 +110,37 @@ export class OrganizationService extends LoggerBase {

if (existing) {
// if it does exists update it
await this.repo.update(existing.id, {
name: cached.name,
displayName,
url: cached.url,
description: cached.description,
emails: cached.emails,
logo: cached.logo,
tags: cached.tags,
github: cached.github,
twitter: cached.twitter,
linkedin: cached.linkedin,
crunchbase: cached.crunchbase,
employees: cached.employees,
location: cached.location,
website: cached.website,
type: cached.type,
size: cached.size,
headline: cached.headline,
industry: cached.industry,
founded: cached.founded,
attributes,
const updateData: Partial<IOrganization> = {}
const fields = [
'name',
'displayName',
'url',
'description',
'emails',
'logo',
'tags',
'github',
'twitter',
'linkedin',
'crunchbase',
'employees',
'location',
'website',
'type',
'size',
'headline',
'industry',
'founded',
'attributes',
]
fields.forEach((field) => {
if (cached[field] && !isEqual(cached[field], existing[field])) {
updateData[field] = cached[field]
}
})
if (Object.keys(updateData).length > 0) {
await this.repo.update(existing.id, updateData)
}

return existing.id
} else {
Expand Down
4 changes: 4 additions & 0 deletions services/libs/common/src/array.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,7 @@ export const areArraysEqual = <T>(a: T[], b: T[]): boolean => {

return true
}

export const firstArrayContainsSecondArray = <T>(array1: T[], array2: T[]): boolean => {
return array2.every((val) => array1.includes(val))
}