Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion services/apps/data_sink_worker/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
inner join "memberIdentities" mi
on ms."tenantId" = mi."tenantId" and ms."memberId" = mi."memberId"
where ms."tenantId" = $(tenantId)
and ms."segmentId" = $(segmentId)
and mi.platform = $(platform)
and mi.username = $(username));
`,
Expand Down
19 changes: 12 additions & 7 deletions services/apps/data_sink_worker/src/repo/organization.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
o.founded,
o.attributes
from organizations o
where o."tenantId" = $(tenantId) and o.name = $(name)
and o.id in (select os."organizationId"
from "organizationSegments" os
where os."segmentId" = $(segmentId))`,
where o."tenantId" = $(tenantId) and o.name = $(name)`,
{ tenantId, name, segmentId },
)

Expand Down Expand Up @@ -269,16 +266,24 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
return id
}

public async update(id: string, data: IDbUpdateOrganizationData): Promise<void> {
public async update(id: string, data: Partial<IDbUpdateOrganizationData>): Promise<void> {
const keys = Object.keys(data)
keys.push('updatedAt')
// construct dynamic column set
const dynamicColumnSet = new this.dbInstance.helpers.ColumnSet(keys, {
table: {
table: 'organizations',
},
})
const prepared = RepositoryBase.prepare(
{
...data,
updatedAt: new Date(),
},
this.updateOrganizationColumnSet,
dynamicColumnSet,
)

const query = this.dbInstance.helpers.update(prepared, this.updateOrganizationColumnSet)
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)
const condition = this.format('where id = $(id)', { id })

const result = await this.db().result(`${query} ${condition}`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,10 @@ export default class ActivityService extends LoggerBase {
)
}
} finally {
// release lock no matter that
// release locks matter what
await releaseLock(
this.redisClient,
`member:processing:${tenantId}:${segmentId}:${platform}:${username}`,
`member:processing:${tenantId}:${platform}:${username}`,
'check-member-inside-activity-exists',
)
}
Expand Down
8 changes: 4 additions & 4 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IDbMember, IDbMemberUpdateData } from '@/repo/member.data'
import MemberRepository from '@/repo/member.repo'
import { areArraysEqual, isObjectEmpty, singleOrDefault } from '@crowd/common'
import { firstArrayContainsSecondArray, isObjectEmpty, singleOrDefault } from '@crowd/common'
import { DbStore } from '@crowd/database'
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
import {
Expand Down Expand Up @@ -185,9 +185,9 @@ export default class MemberService extends LoggerBase {
await txRepo.addToSegment(id, tenantId, segmentId)

updated = true
await txRepo.addToSegment(id, tenantId, segmentId)
} else {
this.log.debug({ memberId: id }, 'Nothing to update in a member!')
await txRepo.addToSegment(id, tenantId, segmentId)
}

if (toUpdate.identities) {
Expand Down Expand Up @@ -419,8 +419,8 @@ export default class MemberService extends LoggerBase {
}

let emails: string[] | undefined
if (member.emails) {
if (!areArraysEqual(member.emails, dbMember.emails)) {
if (member.emails && member.emails.length > 0) {
if (!firstArrayContainsSecondArray(dbMember.emails, member.emails)) {
emails = [...new Set([...member.emails, ...dbMember.emails])]
}
}
Expand Down
97 changes: 56 additions & 41 deletions services/apps/data_sink_worker/src/service/organization.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,33 @@ export class OrganizationService extends LoggerBase {
// if exists in cache update it
const updateData: Partial<IOrganization> = {}
// no need to update name since it's aka primary key
if (data.url) updateData.url = data.url
if (data.description) updateData.description = data.description
if (data.emails) updateData.emails = data.emails
if (data.logo) updateData.logo = data.logo
if (data.tags) updateData.tags = data.tags
if (data.github) updateData.github = data.github as IOrganizationSocial
if (data.twitter) updateData.twitter = data.twitter as IOrganizationSocial
if (data.linkedin) updateData.linkedin = data.linkedin as IOrganizationSocial
if (data.crunchbase) updateData.crunchbase = data.crunchbase as IOrganizationSocial
if (data.employees) updateData.employees = data.employees
if (data.location) updateData.location = data.location
if (data.website) updateData.website = data.website
if (data.type) updateData.type = data.type
if (data.size) updateData.size = data.size
if (data.headline) updateData.headline = data.headline
if (data.industry) updateData.industry = data.industry
if (data.founded) updateData.founded = data.founded
const fields = [
'url',
'description',
'emails',
'logo',
'tags',
'github',
'twitter',
'linkedin',
'crunchbase',
'employees',
'location',
'website',
'type',
'size',
'headline',
'industry',
'founded',
]
fields.forEach((field) => {
if (data[field] && !isEqual(data[field], cached[field])) {
updateData[field] = data[field]
}
})
if (Object.keys(updateData).length > 0) {
await this.repo.updateCache(cached.id, updateData)
cached = { ...cached, ...updateData } // Update the cached data with the new data
}
} else {
// if it doesn't exists in cache create it
Expand Down Expand Up @@ -91,11 +99,9 @@ export class OrganizationService extends LoggerBase {
// now check if exists in this tenant
const existing = await this.repo.findByName(tenantId, segmentId, data.name)

const displayName = existing?.displayName ? existing?.displayName : data?.name

let attributes = existing?.attributes

if (data.attributes) {
if (data?.attributes) {
const temp = mergeWith({}, existing?.attributes, data?.attributes)
if (!isEqual(temp, existing?.attributes)) {
attributes = temp
Expand All @@ -104,28 +110,37 @@ export class OrganizationService extends LoggerBase {

if (existing) {
// if it does exists update it
await this.repo.update(existing.id, {
name: cached.name,
displayName,
url: cached.url,
description: cached.description,
emails: cached.emails,
logo: cached.logo,
tags: cached.tags,
github: cached.github,
twitter: cached.twitter,
linkedin: cached.linkedin,
crunchbase: cached.crunchbase,
employees: cached.employees,
location: cached.location,
website: cached.website,
type: cached.type,
size: cached.size,
headline: cached.headline,
industry: cached.industry,
founded: cached.founded,
attributes,
const updateData: Partial<IOrganization> = {}
const fields = [
'name',
'displayName',
'url',
'description',
'emails',
'logo',
'tags',
'github',
'twitter',
'linkedin',
'crunchbase',
'employees',
'location',
'website',
'type',
'size',
'headline',
'industry',
'founded',
'attributes',
]
fields.forEach((field) => {
if (cached[field] && !isEqual(cached[field], existing[field])) {
updateData[field] = cached[field]
}
})
if (Object.keys(updateData).length > 0) {
await this.repo.update(existing.id, updateData)
}

return existing.id
} else {
Expand Down
4 changes: 4 additions & 0 deletions services/libs/common/src/array.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,7 @@ export const areArraysEqual = <T>(a: T[], b: T[]): boolean => {

return true
}

export const firstArrayContainsSecondArray = <T>(array1: T[], array2: T[]): boolean => {
return array2.every((val) => array1.includes(val))
}