Skip to content

Commit f410c2b

Browse files
authored
Add concurrent request limiter and implement it in GitHub integration (#1603)
1 parent c748d3f commit f410c2b

File tree

8 files changed

+200
-24
lines changed

8 files changed

+200
-24
lines changed

services/apps/integration_stream_worker/src/service/integrationStreamService.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
IProcessWebhookStreamContext,
77
} from '@crowd/integrations'
88
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
9-
import { RedisCache, RedisClient, RateLimiter } from '@crowd/redis'
9+
import { RedisCache, RedisClient, RateLimiter, ConcurrentRequestLimiter } from '@crowd/redis'
1010
import {
1111
IntegrationDataWorkerEmitter,
1212
IntegrationRunWorkerEmitter,
@@ -349,6 +349,9 @@ export default class IntegrationStreamService extends LoggerBase {
349349
getRateLimiter: (maxRequests: number, timeWindowSeconds: number, counterKey: string) => {
350350
return new RateLimiter(globalCache, maxRequests, timeWindowSeconds, counterKey)
351351
},
352+
getConcurrentRequestLimiter: (maxConcurrentRequests: number, counterKey: string) => {
353+
return new ConcurrentRequestLimiter(globalCache, maxConcurrentRequests, counterKey)
354+
},
352355
}
353356

354357
this.log.debug('Processing webhook stream!')
@@ -520,6 +523,9 @@ export default class IntegrationStreamService extends LoggerBase {
520523
getRateLimiter: (maxRequests: number, timeWindowSeconds: number, counterKey: string) => {
521524
return new RateLimiter(globalCache, maxRequests, timeWindowSeconds, counterKey)
522525
},
526+
getConcurrentRequestLimiter: (maxConcurrentRequests: number, counterKey: string) => {
527+
return new ConcurrentRequestLimiter(globalCache, maxConcurrentRequests, counterKey)
528+
},
523529
}
524530

525531
this.log.debug('Processing stream!')

services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22
import { graphql } from '@octokit/graphql'
33
import { GraphQlQueryResponseData } from '@octokit/graphql/dist-types/types'
44
import { GraphQlQueryResponse } from '@crowd/types'
5-
import { RateLimitError } from '@crowd/types'
5+
import { RateLimitError, IConcurrentRequestLimiter } from '@crowd/types'
6+
7+
interface Limiter {
8+
integrationId: string
9+
concurrentRequestLimiter: IConcurrentRequestLimiter
10+
}
611

712
class BaseQuery {
813
static BASE_URL = 'https://api.github.com/graphql'
@@ -84,14 +89,24 @@ class BaseQuery {
8489
* @param beforeCursor Cursor to paginate records before it
8590
* @returns parsed graphQl result
8691
*/
87-
async getSinglePage(beforeCursor: string): Promise<GraphQlQueryResponse> {
92+
async getSinglePage(beforeCursor: string, limiter?: Limiter): Promise<GraphQlQueryResponse> {
8893
const paginatedQuery = BaseQuery.interpolate(this.query, {
8994
beforeCursor: BaseQuery.getPagination(beforeCursor),
9095
})
9196

9297
try {
93-
const result = await this.graphQL(paginatedQuery)
94-
return this.getEventData(result)
98+
if (limiter) {
99+
return limiter.concurrentRequestLimiter.processWithLimit(
100+
limiter.integrationId,
101+
async () => {
102+
const result = await this.graphQL(paginatedQuery)
103+
return this.getEventData(result)
104+
},
105+
)
106+
} else {
107+
const result = await this.graphQL(paginatedQuery)
108+
return this.getEventData(result)
109+
}
95110
} catch (err) {
96111
throw BaseQuery.processGraphQLError(err)
97112
}

services/libs/integrations/src/integrations/github/processStream.ts

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
/* eslint-disable @typescript-eslint/no-explicit-any */
22
// processStream.ts content
33
import { singleOrDefault, timeout } from '@crowd/common'
4-
import { GraphQlQueryResponse } from '@crowd/types'
4+
import { GraphQlQueryResponse, IConcurrentRequestLimiter } from '@crowd/types'
55
import { createAppAuth } from '@octokit/auth-app'
66
import { AuthInterface } from '@octokit/auth-app/dist-types/types'
7-
import { IProcessStreamContext, ProcessStreamHandler } from '../../types'
7+
import {
8+
IProcessStreamContext,
9+
ProcessStreamHandler,
10+
IProcessWebhookStreamContext,
11+
} from '../../types'
812
import DiscussionCommentsQuery from './api/graphql/discussionComments'
913
import DiscussionsQuery from './api/graphql/discussions'
1014
import ForksQuery from './api/graphql/forks'
@@ -40,6 +44,7 @@ import {
4044
const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test'
4145

4246
let githubAuthenticator: AuthInterface | undefined = undefined
47+
let concurrentRequestLimiter: IConcurrentRequestLimiter | undefined = undefined
4348

4449
function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined {
4550
const GITHUB_CONFIG = ctx.platformSettings as GithubPlatformSettings
@@ -60,6 +65,18 @@ function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined {
6065
return githubAuthenticator
6166
}
6267

68+
export function getConcurrentRequestLimiter(
69+
ctx: IProcessStreamContext | IProcessWebhookStreamContext,
70+
): IConcurrentRequestLimiter {
71+
if (concurrentRequestLimiter === undefined) {
72+
concurrentRequestLimiter = ctx.getConcurrentRequestLimiter(
73+
2, // max 2 concurrent requests
74+
'github-concurrent-request-limiter',
75+
)
76+
}
77+
return concurrentRequestLimiter
78+
}
79+
6380
// const getTokenFromCache = async (ctx: IProcessStreamContext) => {
6481
// const key = 'github-token-cache'
6582
// const cache = ctx.integrationCache // this cache is tied up with integrationId
@@ -219,7 +236,10 @@ const processRootStream: ProcessStreamHandler = async (ctx) => {
219236
try {
220237
// we don't need to get default 100 item per page, just 1 is enough to check if repo is available
221238
const stargazersQuery = new StargazersQuery(repo, ctx.integration.token, 1)
222-
await stargazersQuery.getSinglePage('')
239+
await stargazersQuery.getSinglePage('', {
240+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
241+
integrationId: ctx.integration.id,
242+
})
223243
repos.push(repo)
224244
} catch (e) {
225245
if (e.rateLimitResetSeconds) {
@@ -263,7 +283,10 @@ const processRootStream: ProcessStreamHandler = async (ctx) => {
263283
const processStargazersStream: ProcessStreamHandler = async (ctx) => {
264284
const data = ctx.stream.data as GithubBasicStream
265285
const stargazersQuery = new StargazersQuery(data.repo, ctx.integration.token)
266-
const result = await stargazersQuery.getSinglePage(data.page)
286+
const result = await stargazersQuery.getSinglePage(data.page, {
287+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
288+
integrationId: ctx.integration.id,
289+
})
267290
result.data = result.data.filter((i) => (i as any).node?.login)
268291

269292
// handle next page
@@ -285,7 +308,10 @@ const processStargazersStream: ProcessStreamHandler = async (ctx) => {
285308
const processForksStream: ProcessStreamHandler = async (ctx) => {
286309
const data = ctx.stream.data as GithubBasicStream
287310
const forksQuery = new ForksQuery(data.repo, ctx.integration.token)
288-
const result = await forksQuery.getSinglePage(data.page)
311+
const result = await forksQuery.getSinglePage(data.page, {
312+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
313+
integrationId: ctx.integration.id,
314+
})
289315

290316
// filter out activities without authors (such as bots) -- may not the case for forks, but filter out anyway
291317
result.data = result.data.filter((i) => (i as any).owner?.login)
@@ -309,7 +335,10 @@ const processForksStream: ProcessStreamHandler = async (ctx) => {
309335
const processPullsStream: ProcessStreamHandler = async (ctx) => {
310336
const data = ctx.stream.data as GithubBasicStream
311337
const forksQuery = new PullRequestsQuery(data.repo, ctx.integration.token)
312-
const result = await forksQuery.getSinglePage(data.page)
338+
const result = await forksQuery.getSinglePage(data.page, {
339+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
340+
integrationId: ctx.integration.id,
341+
})
313342

314343
// filter out activities without authors (such as bots)
315344
result.data = result.data.filter((i) => (i as any).author?.login)
@@ -484,7 +513,10 @@ const processPullCommentsStream: ProcessStreamHandler = async (ctx) => {
484513
ctx.integration.token,
485514
)
486515

487-
const result = await pullRequestCommentsQuery.getSinglePage(data.page)
516+
const result = await pullRequestCommentsQuery.getSinglePage(data.page, {
517+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
518+
integrationId: ctx.integration.id,
519+
})
488520
result.data = result.data.filter((i) => (i as any).author?.login)
489521

490522
// handle next page
@@ -513,7 +545,10 @@ const processPullReviewThreadsStream: ProcessStreamHandler = async (ctx) => {
513545
ctx.integration.token,
514546
)
515547

516-
const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page)
548+
const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page, {
549+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
550+
integrationId: ctx.integration.id,
551+
})
517552

518553
// handle next page
519554
await publishNextPageStream(ctx, result)
@@ -541,7 +576,10 @@ const processPullReviewThreadCommentsStream: ProcessStreamHandler = async (ctx)
541576
ctx.integration.token,
542577
)
543578

544-
const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page)
579+
const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page, {
580+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
581+
integrationId: ctx.integration.id,
582+
})
545583

546584
// filter out activities without authors (such as bots)
547585
result.data = result.data.filter((i) => (i as any).author?.login)
@@ -574,7 +612,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
574612
const pullRequestCommitsQuery = new PullRequestCommitsQuery(data.repo, pullRequestNumber, token)
575613

576614
try {
577-
result = await pullRequestCommitsQuery.getSinglePage(data.page)
615+
result = await pullRequestCommitsQuery.getSinglePage(data.page, {
616+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
617+
integrationId: ctx.integration.id,
618+
})
578619
} catch (err) {
579620
ctx.log.warn(
580621
{
@@ -589,7 +630,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
589630
pullRequestNumber,
590631
ctx.integration.token,
591632
)
592-
result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page)
633+
result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page, {
634+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
635+
integrationId: ctx.integration.id,
636+
})
593637
}
594638

595639
// handle next page
@@ -624,7 +668,10 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => {
624668
const processIssuesStream: ProcessStreamHandler = async (ctx) => {
625669
const data = ctx.stream.data as GithubBasicStream
626670
const issuesQuery = new IssuesQuery(data.repo, ctx.integration.token)
627-
const result = await issuesQuery.getSinglePage(data.page)
671+
const result = await issuesQuery.getSinglePage(data.page, {
672+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
673+
integrationId: ctx.integration.id,
674+
})
628675

629676
// filter out activities without authors (such as bots)
630677
result.data = result.data.filter((i) => (i as any).author?.login)
@@ -683,7 +730,10 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => {
683730
const data = ctx.stream.data as GithubBasicStream
684731
const issueNumber = data.issueNumber
685732
const issueCommentsQuery = new IssueCommentsQuery(data.repo, issueNumber, ctx.integration.token)
686-
const result = await issueCommentsQuery.getSinglePage(data.page)
733+
const result = await issueCommentsQuery.getSinglePage(data.page, {
734+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
735+
integrationId: ctx.integration.id,
736+
})
687737
result.data = result.data.filter((i) => (i as any).author?.login)
688738

689739
// handle next page
@@ -706,7 +756,10 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => {
706756
const processDiscussionsStream: ProcessStreamHandler = async (ctx) => {
707757
const data = ctx.stream.data as GithubBasicStream
708758
const discussionsQuery = new DiscussionsQuery(data.repo, ctx.integration.token)
709-
const result = await discussionsQuery.getSinglePage(data.page)
759+
const result = await discussionsQuery.getSinglePage(data.page, {
760+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
761+
integrationId: ctx.integration.id,
762+
})
710763

711764
result.data = result.data.filter((i) => (i as any).author?.login)
712765

@@ -746,7 +799,10 @@ const processDiscussionCommentsStream: ProcessStreamHandler = async (ctx) => {
746799
data.discussionNumber,
747800
ctx.integration.token,
748801
)
749-
const result = await discussionCommentsQuery.getSinglePage(data.page)
802+
const result = await discussionCommentsQuery.getSinglePage(data.page, {
803+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
804+
integrationId: ctx.integration.id,
805+
})
750806
result.data = result.data.filter((i) => (i as any).author?.login)
751807

752808
// handle next page

services/libs/integrations/src/integrations/github/processWebhookStream.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ import getMember from './api/graphql/members'
2020
import { prepareMember } from './processStream'
2121
import TeamsQuery from './api/graphql/teams'
2222
import { GithubWebhookTeam } from './api/graphql/types'
23-
import { processPullCommitsStream, getGithubToken } from './processStream'
23+
import {
24+
processPullCommitsStream,
25+
getGithubToken,
26+
getConcurrentRequestLimiter,
27+
} from './processStream'
2428

2529
const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test'
2630

@@ -198,7 +202,10 @@ const parseWebhookPullRequest = async (payload: any, ctx: IProcessWebhookStreamC
198202
// a team sent as reviewer, first we need to find members in this team
199203
const team: GithubWebhookTeam = payload.requested_team
200204
const token = await getGithubToken(ctx as IProcessStreamContext)
201-
const teamMembers = await new TeamsQuery(team.node_id, token).getSinglePage('')
205+
const teamMembers = await new TeamsQuery(team.node_id, token).getSinglePage('', {
206+
concurrentRequestLimiter: getConcurrentRequestLimiter(ctx),
207+
integrationId: ctx.integration.id,
208+
})
202209

203210
for (const teamMember of teamMembers.data) {
204211
await parseWebhookPullRequestEvents({ ...payload, requested_reviewer: teamMember }, ctx)

services/libs/integrations/src/types.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@ import {
66
IAutomation,
77
} from '@crowd/types'
88
import { Logger } from '@crowd/logging'
9-
import { ICache, IIntegration, IIntegrationStream, IRateLimiter } from '@crowd/types'
9+
import {
10+
ICache,
11+
IIntegration,
12+
IIntegrationStream,
13+
IRateLimiter,
14+
IConcurrentRequestLimiter,
15+
} from '@crowd/types'
1016

1117
import { IntegrationSyncWorkerEmitter } from '@crowd/sqs'
1218
import { IBatchOperationResult } from './integrations/premium/hubspot/api/types'
@@ -71,6 +77,10 @@ export interface IProcessStreamContext extends IIntegrationContext {
7177
integrationCache: ICache
7278

7379
getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter
80+
getConcurrentRequestLimiter: (
81+
maxConcurrentRequests: number,
82+
cacheKey: string,
83+
) => IConcurrentRequestLimiter
7484
}
7585

7686
export interface IProcessWebhookStreamContext {
@@ -99,6 +109,10 @@ export interface IProcessWebhookStreamContext {
99109
integrationCache: ICache
100110

101111
getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter
112+
getConcurrentRequestLimiter: (
113+
maxConcurrentRequests: number,
114+
cacheKey: string,
115+
) => IConcurrentRequestLimiter
102116
}
103117

104118
export interface IProcessDataContext extends IIntegrationContext {

services/libs/redis/src/cache.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,22 @@ export class RedisCache extends LoggerBase implements ICache {
5858
return result
5959
}
6060

61+
async decrement(key: string, decrementBy = 1, ttlSeconds?: number): Promise<number> {
62+
const actualKey = this.prefixer(key)
63+
64+
if (ttlSeconds !== undefined) {
65+
const [decrResult] = await this.client
66+
.multi()
67+
.decrBy(actualKey, decrementBy)
68+
.expire(actualKey, ttlSeconds)
69+
.exec()
70+
return decrResult as number
71+
}
72+
73+
const result = await this.client.decrBy(actualKey, decrementBy)
74+
return result
75+
}
76+
6177
public setIfNotExistsAlready(key: string, value: string): Promise<boolean> {
6278
const actualKey = this.prefixer(key)
6379
return this.client.setNX(actualKey, value)

0 commit comments

Comments
 (0)