Skip to content

Commit fe40b07

Browse files
author
Uros Marolt
authored
Detect which channel the thread started from. (#322)
1 parent 5d48924 commit fe40b07

File tree

3 files changed

+89
-57
lines changed

3 files changed

+89
-57
lines changed

backend/src/serverless/integrations/services/integrationProcessor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ export class IntegrationProcessor extends LoggingBase {
386386

387387
if (streams.length > 0 || failedStreams.length > 0) {
388388
logger.warn(
389-
`${failedStreams.length} streams have not been successfully processed or are remaining - retrying them with delay! We have ${streams.length} streams left to process in total!`,
389+
`${failedStreams.length} streams have not been successfully processed - retrying them with delay! We also have ${streams.length} remaining streams left to process!`,
390390
)
391391

392392
const existingRetryStreams = req.retryStreams || []

backend/src/serverless/integrations/services/integrations/discordIntegrationService.ts

Lines changed: 77 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import moment from 'moment/moment'
22
import lodash from 'lodash'
3-
import { DiscordMessages, DiscordMembers, DiscordMention } from '../../types/discordTypes'
3+
import {
4+
DiscordMessages,
5+
DiscordMembers,
6+
DiscordMention,
7+
DiscordStreamProcessResult,
8+
} from '../../types/discordTypes'
49
import { DISCORD_CONFIG } from '../../../../config'
510
import { DiscordMemberAttributes } from '../../../../database/attributes/member/discord'
611
import { MemberAttributeName } from '../../../../database/attributes/member/enums'
@@ -15,15 +20,15 @@ import { IntegrationType, PlatformType } from '../../../../types/integrationEnum
1520
import { timeout } from '../../../../utils/timing'
1621
import Operations from '../../../dbOperations/operations'
1722
import { DiscordGrid } from '../../grid/discordGrid'
18-
import { AddActivitiesSingle } from '../../types/messageTypes'
1923
import { Channels } from '../../types/regularTypes'
2024
import getChannels from '../../usecases/discord/getChannels'
2125
import getMembers from '../../usecases/discord/getMembers'
2226
import getMessages from '../../usecases/discord/getMessages'
23-
import getThreads from '../../usecases/discord/getThreads'
2427
import { IntegrationServiceBase } from '../integrationServiceBase'
2528
import { sendNodeWorkerMessage } from '../../../utils/nodeWorkerSQS'
2629
import { NodeWorkerIntegrationProcessMessage } from '../../../../types/mq/nodeWorkerIntegrationProcessMessage'
30+
import { AddActivitiesSingle } from '../../types/messageTypes'
31+
import { singleOrDefault } from '../../../../utils/arrays'
2732

2833
/* eslint class-methods-use-this: 0 */
2934

@@ -78,13 +83,6 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
7883
async preprocess(context: IStepContext): Promise<void> {
7984
const guildId = context.integration.integrationIdentifier
8085

81-
const threads: Channels = await getThreads(
82-
{
83-
guildId,
84-
token: this.getToken(context),
85-
},
86-
this.logger(context),
87-
)
8886
let channelsFromDiscordAPI: Channels = await getChannels(
8987
{
9088
guildId,
@@ -105,15 +103,12 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
105103
return c
106104
})
107105

108-
const channelsWithThreads = channelsFromDiscordAPI.concat(threads)
109-
110106
context.pipelineData = {
111-
channelsFromDiscordAPI,
112-
channels: channelsWithThreads,
113-
channelsInfo: channelsWithThreads.reduce((acc, channel) => {
107+
settingsChannels: channels,
108+
channels: channelsFromDiscordAPI,
109+
channelsInfo: channelsFromDiscordAPI.reduce((acc, channel) => {
114110
acc[channel.id] = {
115111
name: channel.name,
116-
thread: !!channel.thread,
117112
new: !!channel.new,
118113
}
119114
return acc
@@ -139,8 +134,9 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
139134

140135
return predefined.concat(
141136
context.pipelineData.channels.map((c) => ({
142-
value: c.id,
137+
value: 'channel',
143138
metadata: {
139+
id: c.id,
144140
page: '',
145141
},
146142
})),
@@ -162,7 +158,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
162158
) {
163159
try {
164160
const { fn, arg } = DiscordIntegrationService.getUsecase(
165-
stream.value,
161+
stream,
166162
context.pipelineData.guildId,
167163
)
168164
const { records, nextPage, limit, timeUntilReset } = await fn(
@@ -176,7 +172,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
176172
)
177173

178174
const nextPageStream = nextPage
179-
? { value: stream.value, metadata: { page: nextPage } }
175+
? { value: stream.value, metadata: { ...stream.metadata, page: nextPage } }
180176
: undefined
181177

182178
const sleep = limit <= 1 ? timeUntilReset : undefined
@@ -189,7 +185,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
189185
}
190186
}
191187

192-
const activities = this.parseActivities(stream, context, records)
188+
const { activities, newStreams } = this.parseActivities(stream, context, records)
193189

194190
const lastRecord = activities.length > 0 ? activities[activities.length - 1] : undefined
195191
return {
@@ -202,6 +198,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
202198
lastRecord,
203199
lastRecordTimestamp: lastRecord ? lastRecord.timestamp.getTime() : undefined,
204200
nextPageStream,
201+
newStreams,
205202
sleep,
206203
}
207204
} catch (err) {
@@ -244,7 +241,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
244241
)
245242

246243
default:
247-
if (context.pipelineData.channelsInfo[currentStream.value].new) return false
244+
if (context.pipelineData.channelsInfo[currentStream.metadata.id].new) return false
248245

249246
return IntegrationServiceBase.isRetrospectOver(
250247
lastRecordTimestamp,
@@ -259,44 +256,36 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
259256
failedStreams?: IIntegrationStream[],
260257
remainingStreams?: IIntegrationStream[],
261258
): Promise<void> {
262-
context.integration.settings.channels = context.pipelineData.channelsFromDiscordAPI.map(
263-
(ch) => {
264-
const { new: _, ...raw } = ch
265-
return raw
266-
},
267-
)
259+
context.integration.settings.channels = context.pipelineData.channels.map((ch) => {
260+
const { new: _, ...raw } = ch
261+
return raw
262+
})
268263
}
269264

270265
parseActivities(
271266
stream: IIntegrationStream,
272267
context: IStepContext,
273268
records: DiscordMessages | DiscordMembers,
274-
): AddActivitiesSingle[] {
269+
): DiscordStreamProcessResult {
275270
switch (stream.value) {
276271
case 'members':
277-
return this.parseMembers(context.integration.tenantId, records as DiscordMembers)
272+
return this.parseMembers(context, records as DiscordMembers)
278273
default:
279-
return this.parseMessages(
280-
context.pipelineData.guildId,
281-
context.integration.tenantId,
282-
context.pipelineData.channelsInfo,
283-
records as DiscordMessages,
284-
stream,
285-
)
274+
return this.parseMessages(context, records as DiscordMessages, stream)
286275
}
287276
}
288277

289-
parseMembers(tenantId: string, records: Array<any>): Array<AddActivitiesSingle> {
278+
parseMembers(context: IStepContext, records: Array<any>): DiscordStreamProcessResult {
290279
// We only need the members if they are not bots
291-
return records.reduce((acc, record) => {
280+
const activities: AddActivitiesSingle[] = records.reduce((acc, record) => {
292281
if (!record.user.bot) {
293282
let avatarUrl: string | boolean = false
294283

295284
if (record.user.avatar !== null && record.user.avatar !== undefined) {
296285
avatarUrl = `https://cdn.discordapp.com/avatars/${record.user.id}/${record.user.avatar}.png`
297286
}
298287
acc.push({
299-
tenant: tenantId,
288+
tenant: context.integration.tenantId,
300289
platform: PlatformType.DISCORD,
301290
type: 'joined_guild',
302291
sourceId: IntegrationServiceBase.generateSourceIdHash(
@@ -325,26 +314,49 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
325314
}
326315
return acc
327316
}, [])
317+
318+
return {
319+
activities,
320+
newStreams: [],
321+
}
328322
}
329323

330324
parseMessages(
331-
guildId: string,
332-
tenantId: string,
333-
channelsInfo: any,
325+
context: IStepContext,
334326
records: DiscordMessages,
335327
stream: IIntegrationStream,
336-
): Array<AddActivitiesSingle> {
337-
return records.reduce((acc, record) => {
328+
): DiscordStreamProcessResult {
329+
const newStreams: IIntegrationStream[] = []
330+
const activities: AddActivitiesSingle[] = records.reduce((acc, record) => {
338331
let parent = ''
339332

340-
// if we're parsing a thread, mark each message as a child
341-
const channelInfo = channelsInfo[stream.value]
342-
if (channelInfo.thread) {
343-
parent = stream.value
344-
}
333+
const channelInfo = context.pipelineData.channelsInfo[stream.metadata.id]
334+
335+
// is the message starting a thread?
336+
if (record.thread) {
337+
parent = record.thread.id
338+
newStreams.push({
339+
value: 'thread',
340+
metadata: {
341+
id: record.thread.id,
342+
},
343+
})
345344

345+
context.pipelineData.channelsInfo[record.thread.id] = {
346+
name: context.pipelineData.channelsInfo[record.channel_id].name,
347+
new:
348+
singleOrDefault(
349+
context.pipelineData.settingsChannels,
350+
(c) => c.id === record.thread.id,
351+
) === undefined,
352+
}
353+
}
354+
// if we're parsing a thread, mark each message as a child of this thread
355+
else if (stream.value === 'thread') {
356+
parent = stream.metadata.id
357+
}
346358
// record.parentId means that it's a reply
347-
if (record.message_reference && record.message_reference.message_id) {
359+
else if (record.message_reference && record.message_reference.message_id) {
348360
parent = record.message_reference.message_id
349361
}
350362

@@ -355,7 +367,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
355367

356368
if (!record.author.bot) {
357369
const activityObject = {
358-
tenant: tenantId,
370+
tenant: context.integration.tenantId,
359371
platform: PlatformType.DISCORD,
360372
type: 'message',
361373
sourceId: record.id,
@@ -364,10 +376,10 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
364376
body: record.content
365377
? DiscordIntegrationService.replaceMentions(record.content, record.mentions)
366378
: '',
367-
url: `https://discordapp.com/channels/${guildId}/${stream.value}/${record.id}`,
379+
url: `https://discordapp.com/channels/${context.pipelineData.guildId}/${stream.metadata.id}/${record.id}`,
368380
channel: channelInfo.name,
369381
attributes: {
370-
thread: channelInfo.thread ? channelInfo.name : false,
382+
thread: record.thread !== undefined || stream.value === 'thread',
371383
reactions: record.reactions ? record.reactions : [],
372384
attachments: record.attachments ? record.attachments : [],
373385
},
@@ -392,11 +404,17 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
392404
}
393405
return acc
394406
}, [])
407+
408+
return {
409+
activities,
410+
newStreams,
411+
}
395412
}
396413

397414
/**
398415
* Parse mentions
399416
* @param text Message text
417+
* @param mentions
400418
* @returns Message text, swapping mention IDs by mentions
401419
*/
402420
private static replaceMentions(text: string, mentions: [DiscordMention] | undefined): string {
@@ -423,17 +441,20 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
423441
* @returns The function to call, as well as its main argument
424442
*/
425443
private static getUsecase(
426-
stream: string,
444+
stream: IIntegrationStream,
427445
guildId: string,
428446
): {
429447
fn: Function
430448
arg: any
431449
} {
432-
switch (stream) {
450+
switch (stream.value) {
433451
case 'members':
434452
return { fn: getMembers, arg: { guildId } }
453+
case 'channel':
454+
case 'thread':
455+
return { fn: getMessages, arg: { channelId: stream.metadata.id } }
435456
default:
436-
return { fn: getMessages, arg: { channelId: stream } }
457+
throw new Error(`Unknown stream ${stream.value}!`)
437458
}
438459
}
439460
}

backend/src/serverless/integrations/types/discordTypes.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import { AddActivitiesSingle } from './messageTypes'
2+
import { IIntegrationStream } from '../../../types/integration/stepResult'
3+
14
export interface DiscordGetChannelsInput {
25
guildId: string
36
token: string
@@ -53,10 +56,18 @@ export interface DiscordMessage {
5356
guild_id: string
5457
channel_id: string
5558
}
59+
thread: {
60+
id: string
61+
}
5662
}
5763

5864
export type DiscordMessages = DiscordMessage[]
5965

66+
export interface DiscordStreamProcessResult {
67+
activities: AddActivitiesSingle[]
68+
newStreams: IIntegrationStream[]
69+
}
70+
6071
export interface DiscordMember {
6172
user: DiscordAuthor
6273
joined_at: string

0 commit comments

Comments
 (0)