Skip to content
55 changes: 55 additions & 0 deletions backend/src/bin/scripts/generate-merge-suggestions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import commandLineArgs from 'command-line-args'
import commandLineUsage from 'command-line-usage'
import * as fs from 'fs'
import path from 'path'
import { mergeSuggestionsWorker } from '@/serverless/microservices/nodejs/merge-suggestions/mergeSuggestionsWorker'

/* eslint-disable no-console */

const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8')

const options = [
{
name: 'tenant',
alias: 't',
type: String,
description:
'The unique ID of that tenant that you would like to generate merge suggestions for.',
},
{
name: 'help',
alias: 'h',
type: Boolean,
description: 'Print this usage guide.',
},
]
const sections = [
{
content: banner,
raw: true,
},
{
header: 'Generate merge suggestions for a tenant',
content: 'Generate merge suggestions for a tenant',
},
{
header: 'Options',
optionList: options,
},
]

const usage = commandLineUsage(sections)
const parameters = commandLineArgs(options)

if (parameters.help || !parameters.tenant) {
console.log(usage)
} else {
setImmediate(async () => {
const tenantIds = parameters.tenant.split(',')

for (const tenantId of tenantIds) {
await mergeSuggestionsWorker(tenantId)
}
process.exit(0)
})
}
142 changes: 93 additions & 49 deletions backend/src/database/repositories/organizationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,18 @@ class OrganizationRepository {

let yieldChunk: IOrganizationMergeSuggestion[] = []

const prefixLength = (string: string) => {
if (string.length > 5 && string.length < 8) {
return 6
}

if (string.length > 8 && string.length < 12) {
return 9
}

return 10
}

const normalizeScore = (max: number, min: number, score: number): number => {
if (score > 100) {
return 1
Expand All @@ -1064,32 +1076,52 @@ class OrganizationRepository {
const queryBody = {
from: 0,
size: BATCH_SIZE,
query: {
bool: {
must: [
{
term: {
uuid_tenantId: tenant.id,
},
},
],
},
},
query: {},
sort: {
[`date_createdAt`]: 'desc',
[`uuid_organizationId`]: 'asc',
},
collapse: {
field: 'uuid_organizationId',
},
_source: ['uuid_organizationId', 'nested_identities', 'uuid_arr_noMergeIds'],
}

let organizations: IOrganizationPartialAggregatesOpensearch[]
let offset: number
let organizations: IOrganizationPartialAggregatesOpensearch[] = []
let lastUuid: string

do {
offset = organizations ? offset + BATCH_SIZE : 0
queryBody.from = offset
if (organizations.length > 0) {
queryBody.query = {
bool: {
filter: [
{
term: {
uuid_tenantId: tenant.id,
},
},
{
range: {
uuid_organizationId: {
gt: lastUuid,
},
},
},
],
},
}
} else {
queryBody.query = {
bool: {
filter: [
{
term: {
uuid_tenantId: tenant.id,
},
},
],
},
}
}

organizations =
(
Expand All @@ -1099,6 +1131,10 @@ class OrganizationRepository {
})
).body?.hits?.hits || []

if (organizations.length > 0) {
lastUuid = organizations[organizations.length - 1]._source.uuid_organizationId
}

for (const organization of organizations) {
if (
organization._source.nested_identities &&
Expand Down Expand Up @@ -1149,47 +1185,55 @@ class OrganizationRepository {
}

for (const identity of organization._source.nested_identities) {
// weak identity search
identitiesPartialQuery.should[0].nested.query.bool.should.push({
bool: {
must: [
{ match: { [`nested_weakIdentities.string_name`]: identity.string_name } },
{
match: { [`nested_weakIdentities.string_platform`]: identity.string_platform },
},
],
},
})

// fuzzy search for identities
identitiesPartialQuery.should[1].nested.query.bool.should.push({
match: {
[`nested_identities.string_name`]: {
query: identity.string_name,
prefix_length: 1,
fuzziness: 'auto',
if (identity.string_name.length > 0) {
// weak identity search
identitiesPartialQuery.should[0].nested.query.bool.should.push({
bool: {
must: [
{ match: { [`nested_weakIdentities.string_name`]: identity.string_name } },
{
match: {
[`nested_weakIdentities.string_platform`]: identity.string_platform,
},
},
],
},
},
})
})

// wildcard search for identities
identitiesPartialQuery.should[1].nested.query.bool.should.push({
wildcard: {
[`nested_identities.string_name`]: {
value: `${identity.string_name}*`,
// some identities have https? in the beginning, resulting in false positive suggestions
// remove these when making fuzzy, wildcard and prefix searches
const cleanedIdentityName = identity.string_name.replace(/^https?:\/\//, '')

// fuzzy search for identities
identitiesPartialQuery.should[1].nested.query.bool.should.push({
match: {
[`nested_identities.string_name`]: {
query: cleanedIdentityName,
prefix_length: 1,
fuzziness: 'auto',
},
},
},
})
})

// also check for prefix of 5 if identity is longer then 5 characters
if (identity.string_name.length > 5) {
// wildcard search for identities
identitiesPartialQuery.should[1].nested.query.bool.should.push({
prefix: {
wildcard: {
[`nested_identities.string_name`]: {
value: identity.string_name.slice(0, 5),
value: `${cleanedIdentityName}*`,
},
},
})

// also check for prefix for identities that has more than 5 characters
if (identity.string_name.length > 5) {
identitiesPartialQuery.should[1].nested.query.bool.should.push({
prefix: {
[`nested_identities.string_name`]: {
value: cleanedIdentityName.slice(0, prefixLength(cleanedIdentityName)),
},
},
})
}
}
}

Expand Down Expand Up @@ -1285,7 +1329,7 @@ class OrganizationRepository {
AND os."segmentId" IN (:segmentIds)
) AS "organizationsToMerge"
ORDER BY
"organizationsToMerge"."createdAt" DESC
"organizationsToMerge"."similarity" DESC
LIMIT :limit OFFSET :offset
`,
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { getOpensearchClient } from '@crowd/opensearch'
// import { OrganizationMergeSuggestionType } from '@crowd/types'
import { getServiceChildLogger } from '@crowd/logging'
import { OrganizationMergeSuggestionType } from '@crowd/types'
import getUserContext from '../../../../database/utils/getUserContext'
import MemberService from '../../../../services/memberService'
import { IRepositoryOptions } from '../../../../database/repositories/IRepositoryOptions'
Expand All @@ -8,16 +9,27 @@ import {
IMemberMergeSuggestion,
} from '../../../../database/repositories/types/memberTypes'
import SegmentService from '../../../../services/segmentService'
// import OrganizationService from '@/services/organizationService'
import OrganizationService from '@/services/organizationService'
import { OPENSEARCH_CONFIG } from '@/conf'

const log = getServiceChildLogger('mergeSuggestionsWorker')

async function mergeSuggestionsWorker(tenantId): Promise<void> {
const userContext: IRepositoryOptions = await getUserContext(tenantId)
const segmentService = new SegmentService(userContext)
const { rows: segments } = await segmentService.querySubprojects({})
userContext.currentSegments = segments
userContext.opensearch = getOpensearchClient(OPENSEARCH_CONFIG)

log.info(`Generating organization merge suggestions for tenant ${tenantId}!`)

const organizationService = new OrganizationService(userContext)
await organizationService.generateMergeSuggestions(OrganizationMergeSuggestionType.BY_IDENTITY)

log.info(`Done generating organization merge suggestions for tenant ${tenantId}!`)

log.info(`Generating member merge suggestions for tenant ${tenantId}!`)

const memberService = new MemberService(userContext)
// Splitting these because in the near future we will be treating them differently
const byUsername: IMemberMergeSuggestion[] = await memberService.getMergeSuggestions(
Expand All @@ -32,9 +44,7 @@ async function mergeSuggestionsWorker(tenantId): Promise<void> {
IMemberMergeSuggestionsType.SIMILARITY,
)
await memberService.addToMerge(bySimilarity)

// const organizationService = new OrganizationService(userContext)
// await organizationService.generateMergeSuggestions(OrganizationMergeSuggestionType.BY_IDENTITY)
log.info(`Done generating member merge suggestions for tenant ${tenantId}!`)
}

export { mergeSuggestionsWorker }
13 changes: 13 additions & 0 deletions backend/src/services/organizationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,13 @@ export default class OrganizationService extends LoggerBase {
}

async generateMergeSuggestions(type: OrganizationMergeSuggestionType): Promise<void> {
this.log.trace(`Generating merge suggestions for: ${this.options.currentTenant.id}`)
const transaction = await SequelizeRepository.createTransaction(this.options)

try {
if (type === OrganizationMergeSuggestionType.BY_IDENTITY) {
let mergeSuggestions
let hasSuggestions = false

const generator = OrganizationRepository.getMergeSuggestions({
...this.options,
Expand All @@ -256,6 +258,17 @@ export default class OrganizationService extends LoggerBase {
do {
mergeSuggestions = await generator.next()

if (mergeSuggestions.value) {
this.log.trace(
`Tenant: ${this.options.currentTenant.id}, adding ${mergeSuggestions.value.length} organizations to suggestions!`,
)
hasSuggestions = true
} else if (!hasSuggestions) {
this.log.trace(`Tenant doesn't have any merge suggestions`)
} else {
this.log.trace(`Finished going tru all suggestions!`)
}

await OrganizationRepository.addToMerge(mergeSuggestions.value, this.options)
} while (!mergeSuggestions.done)
}
Expand Down