From b480ecd95d50b332b6a9f736d44e563b6d0f0f18 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Wed, 4 Oct 2023 15:44:16 +0300 Subject: [PATCH 01/12] implement personal token rotation for GitHub when we hit rate limits --- .../config/custom-environment-variables.json | 3 +- .../github/api/graphql/baseQuery.ts | 69 +++++++- .../src/integrations/github/processStream.ts | 167 ++++++++++++------ .../src/integrations/github/tokenRotator.ts | 109 ++++++++++++ .../src/integrations/github/types.ts | 1 + services/libs/redis/src/cache.ts | 15 ++ services/libs/types/src/caching.ts | 3 + 7 files changed, 309 insertions(+), 58 deletions(-) create mode 100644 services/libs/integrations/src/integrations/github/tokenRotator.ts diff --git a/services/apps/integration_stream_worker/config/custom-environment-variables.json b/services/apps/integration_stream_worker/config/custom-environment-variables.json index ec8ede48cf..7d5182800d 100644 --- a/services/apps/integration_stream_worker/config/custom-environment-variables.json +++ b/services/apps/integration_stream_worker/config/custom-environment-variables.json @@ -37,6 +37,7 @@ "callbackUrl": "CROWD_GITHUB_CALLBACK_URL", "privateKey": "CROWD_GITHUB_PRIVATE_KEY", "webhookSecret": "CROWD_GITHUB_WEBHOOK_SECRET", - "isCommitDataEnabled": "CROWD_GITHUB_IS_COMMIT_DATA_ENABLED" + "isCommitDataEnabled": "CROWD_GITHUB_IS_COMMIT_DATA_ENABLED", + "personalAccessTokens": "CROWD_GITHUB_PERSONAL_ACCESS_TOKENS" } } diff --git a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts index 21ae1001fb..3fdbc0f81b 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts @@ -3,6 +3,7 @@ import { graphql } from '@octokit/graphql' import { GraphQlQueryResponseData } from '@octokit/graphql/dist-types/types' import { GraphQlQueryResponse } from '@crowd/types' import { RateLimitError, IConcurrentRequestLimiter } from '@crowd/types' +import { GithubTokenRotator } from '../../tokenRotator' interface Limiter { integrationId: string @@ -89,26 +90,84 @@ class BaseQuery { * @param beforeCursor Cursor to paginate records before it * @returns parsed graphQl result */ - async getSinglePage(beforeCursor: string, limiter?: Limiter): Promise { + async getSinglePage( + beforeCursor: string, + limiter?: Limiter, + tokenRotator?: GithubTokenRotator, + ): Promise { const paginatedQuery = BaseQuery.interpolate(this.query, { beforeCursor: BaseQuery.getPagination(beforeCursor), }) + const process = async () => { + const result = await this.graphQL(paginatedQuery) + return this.getEventData(result) + } + try { if (limiter) { return limiter.concurrentRequestLimiter.processWithLimit( limiter.integrationId, async () => { - const result = await this.graphQL(paginatedQuery) - return this.getEventData(result) + return await process() }, ) } else { - const result = await this.graphQL(paginatedQuery) - return this.getEventData(result) + return await process() } } catch (err) { + if ( + (err.status === 403 && + err.message && + (err.message as string).toLowerCase().includes('secondary rate limit')) || + (err.errors && err.errors[0].type === 'RATE_LIMITED') + ) { + // this is rate limit, let's try token rotation + if (tokenRotator) { + return await this.getSinglePageWithTokenRotation(beforeCursor, tokenRotator) + } + } else { + throw BaseQuery.processGraphQLError(err) + } + } + } + + private async getSinglePageWithTokenRotation( + beforeCursor: string, + tokenRotator: GithubTokenRotator, + ): Promise { + const paginatedQuery = BaseQuery.interpolate(this.query, { + beforeCursor: BaseQuery.getPagination(beforeCursor), + }) + + const token = await tokenRotator.getToken() + + const process = async () => { + const graphqlWithTokenRotation = graphql.defaults({ + headers: { + authorization: `token ${token}`, + }, + }) + const result = await graphqlWithTokenRotation(paginatedQuery) + await tokenRotator.updateRateLimitInfoFromApi(token) + await tokenRotator.returnToken(token) + return this.getEventData(result) + } + + try { + return await process() + } catch (err) { + // we might have exhausted one token, but we let another streams to continue + if (err.headers && err.headers['x-ratelimit-remaining'] && err.headers['x-ratelimit-reset']) { + const remaining = parseInt(err.headers['x-ratelimit-remaining']) + const reset = parseInt(err.headers['x-ratelimit-reset']) + await tokenRotator.updateTokenInfo(token, remaining, reset) + } else { + await tokenRotator.updateRateLimitInfoFromApi(token) + } throw BaseQuery.processGraphQLError(err) + } finally { + await tokenRotator.returnToken(token) } } diff --git a/services/libs/integrations/src/integrations/github/processStream.ts b/services/libs/integrations/src/integrations/github/processStream.ts index f444c8e1c8..de3c3b0d9c 100644 --- a/services/libs/integrations/src/integrations/github/processStream.ts +++ b/services/libs/integrations/src/integrations/github/processStream.ts @@ -39,11 +39,13 @@ import { Repo, Repos, } from './types' +import { GithubTokenRotator } from './tokenRotator' const IS_TEST_ENV: boolean = process.env.NODE_ENV === 'test' let githubAuthenticator: AuthInterface | undefined = undefined let concurrentRequestLimiter: IConcurrentRequestLimiter | undefined = undefined +let tokenRotator: GithubTokenRotator | undefined = undefined function getAuth(ctx: IProcessStreamContext): AuthInterface | undefined { const GITHUB_CONFIG = ctx.platformSettings as GithubPlatformSettings @@ -76,6 +78,14 @@ export function getConcurrentRequestLimiter( return concurrentRequestLimiter } +export function getTokenRotator(ctx: IProcessStreamContext): GithubTokenRotator { + const GITHUB_CONFIG = ctx.platformSettings as GithubPlatformSettings + if (tokenRotator === undefined) { + tokenRotator = new GithubTokenRotator(ctx.cache, GITHUB_CONFIG.personalAccessTokens.split(',')) + } + return tokenRotator +} + export async function getGithubToken(ctx: IProcessStreamContext): Promise { const auth = getAuth(ctx) if (auth) { @@ -204,10 +214,14 @@ const processRootStream: ProcessStreamHandler = async (ctx) => { try { // we don't need to get default 100 item per page, just 1 is enough to check if repo is available const stargazersQuery = new StargazersQuery(repo, await getGithubToken(ctx), 1) - await stargazersQuery.getSinglePage('', { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + await stargazersQuery.getSinglePage( + '', + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) repos.push(repo) } catch (e) { if (e.rateLimitResetSeconds) { @@ -251,10 +265,15 @@ const processRootStream: ProcessStreamHandler = async (ctx) => { const processStargazersStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const stargazersQuery = new StargazersQuery(data.repo, await getGithubToken(ctx)) - const result = await stargazersQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + + const result = await stargazersQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) result.data = result.data.filter((i) => (i as any).node?.login) // handle next page @@ -276,10 +295,14 @@ const processStargazersStream: ProcessStreamHandler = async (ctx) => { const processForksStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const forksQuery = new ForksQuery(data.repo, await getGithubToken(ctx)) - const result = await forksQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await forksQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) // filter out activities without authors (such as bots) -- may not the case for forks, but filter out anyway result.data = result.data.filter((i) => (i as any).owner?.login) @@ -303,10 +326,14 @@ const processForksStream: ProcessStreamHandler = async (ctx) => { const processPullsStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const forksQuery = new PullRequestsQuery(data.repo, await getGithubToken(ctx)) - const result = await forksQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await forksQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) // filter out activities without authors (such as bots) result.data = result.data.filter((i) => (i as any).author?.login) @@ -481,10 +508,14 @@ const processPullCommentsStream: ProcessStreamHandler = async (ctx) => { await getGithubToken(ctx), ) - const result = await pullRequestCommentsQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await pullRequestCommentsQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) result.data = result.data.filter((i) => (i as any).author?.login) // handle next page @@ -513,10 +544,14 @@ const processPullReviewThreadsStream: ProcessStreamHandler = async (ctx) => { await getGithubToken(ctx), ) - const result = await pullRequestReviewThreadsQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await pullRequestReviewThreadsQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) // handle next page await publishNextPageStream(ctx, result) @@ -544,10 +579,14 @@ const processPullReviewThreadCommentsStream: ProcessStreamHandler = async (ctx) await getGithubToken(ctx), ) - const result = await pullRequestReviewThreadCommentsQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await pullRequestReviewThreadCommentsQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) // filter out activities without authors (such as bots) result.data = result.data.filter((i) => (i as any).author?.login) @@ -580,10 +619,14 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => { const pullRequestCommitsQuery = new PullRequestCommitsQuery(data.repo, pullRequestNumber, token) try { - result = await pullRequestCommitsQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + result = await pullRequestCommitsQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) } catch (err) { ctx.log.warn( { @@ -598,10 +641,14 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => { pullRequestNumber, await getGithubToken(ctx), ) - result = await pullRequestCommitsQueryNoAdditions.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + result = await pullRequestCommitsQueryNoAdditions.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) } // handle next page @@ -636,10 +683,14 @@ export const processPullCommitsStream: ProcessStreamHandler = async (ctx) => { const processIssuesStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const issuesQuery = new IssuesQuery(data.repo, await getGithubToken(ctx)) - const result = await issuesQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await issuesQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) // filter out activities without authors (such as bots) result.data = result.data.filter((i) => (i as any).author?.login) @@ -702,10 +753,14 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => { issueNumber, await getGithubToken(ctx), ) - const result = await issueCommentsQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await issueCommentsQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) result.data = result.data.filter((i) => (i as any).author?.login) // handle next page @@ -728,10 +783,14 @@ const processIssueCommentsStream: ProcessStreamHandler = async (ctx) => { const processDiscussionsStream: ProcessStreamHandler = async (ctx) => { const data = ctx.stream.data as GithubBasicStream const discussionsQuery = new DiscussionsQuery(data.repo, await getGithubToken(ctx)) - const result = await discussionsQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await discussionsQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) result.data = result.data.filter((i) => (i as any).author?.login) @@ -771,10 +830,14 @@ const processDiscussionCommentsStream: ProcessStreamHandler = async (ctx) => { data.discussionNumber, await getGithubToken(ctx), ) - const result = await discussionCommentsQuery.getSinglePage(data.page, { - concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), - integrationId: ctx.integration.id, - }) + const result = await discussionCommentsQuery.getSinglePage( + data.page, + { + concurrentRequestLimiter: getConcurrentRequestLimiter(ctx), + integrationId: ctx.integration.id, + }, + getTokenRotator(ctx), + ) result.data = result.data.filter((i) => (i as any).author?.login) // handle next page diff --git a/services/libs/integrations/src/integrations/github/tokenRotator.ts b/services/libs/integrations/src/integrations/github/tokenRotator.ts new file mode 100644 index 0000000000..b99cbaa14f --- /dev/null +++ b/services/libs/integrations/src/integrations/github/tokenRotator.ts @@ -0,0 +1,109 @@ +import { ICache } from '@crowd/types' +import axios from 'axios' + +interface TokenInfo { + token: string + remaining: number + reset: number + inUse: boolean +} + +export class GithubTokenRotator { + constructor(private cache: ICache, private tokens: string[]) { + this.cache = cache + this.tokens = [...new Set(tokens)] + this.initializeTokens() + } + + private async initializeTokens(): Promise { + for (const token of this.tokens) { + const tokenInfo: TokenInfo = JSON.parse(await this.cache.hget('github:tokens', token)) + if (!tokenInfo) { + const newTokenInfo: TokenInfo = { + token, + remaining: 0, + reset: 0, + inUse: false, + } + await this.cache.hset('github:tokens', token, JSON.stringify(newTokenInfo)) + } + } + } + + public async getToken(): Promise { + const tokens = await this.cache.hgetall('github:tokens') + for (const token in tokens) { + const tokenInfo: TokenInfo = JSON.parse(tokens[token]) + if ( + !tokenInfo.inUse && + (tokenInfo.remaining > 0 || tokenInfo.reset < Math.floor(Date.now() / 1000)) + ) { + tokenInfo.inUse = true + await this.cache.hset('github:tokens', token, JSON.stringify(tokenInfo)) + return token + } + } + throw new Error('No available tokens in GitHubTokenRotator') + } + + public async returnToken(token: string): Promise { + const tokenInfo: TokenInfo = JSON.parse((await this.cache.hget('github:tokens', token)) || '') + if (tokenInfo) { + tokenInfo.inUse = false + await this.cache.hset('github:tokens', token, JSON.stringify(tokenInfo)) + } + } + + public async updateTokenInfo(token: string, remaining: number, reset: number): Promise { + const tokenInfo: TokenInfo = JSON.parse((await this.cache.hget('github:tokens', token)) || '') + if (tokenInfo) { + tokenInfo.remaining = remaining + tokenInfo.reset = reset + await this.cache.hset('github:tokens', token, JSON.stringify(tokenInfo)) + } + } + + public async updateRateLimitInfoFromApi(token: string): Promise { + // let's make API call to get the latest rate limit info + const tokenInfo: TokenInfo = JSON.parse((await this.cache.hget('github:tokens', token)) || '') + if (tokenInfo) { + const response = await axios({ + url: 'https://api.github.com/rate_limit', + method: 'get', + headers: { Authorization: `token ${token}` }, + }) + + const remaining = parseInt(response.data.resources.core.remaining) + const reset = parseInt(response.data.resources.core.reset) + await this.updateTokenInfo(token, remaining, reset) + } + } + + // public async apiRequest( + // url: string, + // method: 'get' | 'post' | 'put' | 'delete', + // data?: any, + // ): Promise { + // const token = await this.getToken() + // if (!token) { + // throw new Error('No available tokens') + // } + + // try { + // const response = await axios({ + // url, + // method, + // data, + // headers: { Authorization: `token ${token}` }, + // }) + + // const remaining = parseInt(response.headers['x-ratelimit-remaining']) + // const reset = parseInt(response.headers['x-ratelimit-reset']) + // await this.updateTokenInfo(token, remaining, reset) + + // return response.data + // } finally { + // await this.returnToken(token) + // } + // } +} diff --git a/services/libs/integrations/src/integrations/github/types.ts b/services/libs/integrations/src/integrations/github/types.ts index 1747b0cb80..a1425dd835 100644 --- a/services/libs/integrations/src/integrations/github/types.ts +++ b/services/libs/integrations/src/integrations/github/types.ts @@ -163,6 +163,7 @@ export interface GithubPlatformSettings { isCommitDataEnabled: string globalLimit?: number callbackUrl: string + personalAccessTokens: string } export interface GithubIntegrationSettings { diff --git a/services/libs/redis/src/cache.ts b/services/libs/redis/src/cache.ts index 9139d51fe1..562fb21e12 100644 --- a/services/libs/redis/src/cache.ts +++ b/services/libs/redis/src/cache.ts @@ -84,6 +84,21 @@ export class RedisCache extends LoggerBase implements ICache { return this.client.del(actualKey) } + public async hget(key: string, field: string): Promise { + const actualKey = this.prefixer(key) + return this.client.hGet(actualKey, field) + } + + public async hset(key: string, field: string, value: string): Promise { + const actualKey = this.prefixer(key) + return this.client.hSet(actualKey, field, value) + } + + public async hgetall(key: string): Promise<{ [key: string]: string }> { + const actualKey = this.prefixer(key) + return this.client.hGetAll(actualKey) + } + private async deleteByPattern(pattern: string): Promise { const script = ` local delpattern = ARGV[1] diff --git a/services/libs/types/src/caching.ts b/services/libs/types/src/caching.ts index d5cc759584..97e69e7a31 100644 --- a/services/libs/types/src/caching.ts +++ b/services/libs/types/src/caching.ts @@ -4,6 +4,9 @@ export interface ICache { delete(key: string): Promise increment(key: string, incrementBy?: number, ttlSeconds?: number): Promise decrement(key: string, decrementBy?: number, ttlSeconds?: number): Promise + hget(key: string, field: string): Promise + hgetall(key: string): Promise<{ [key: string]: string }> + hset(key: string, field: string, value: string): Promise } export interface IRateLimiter { From 9ac630774511e8c4acf28c6de498de9788f58f62 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Wed, 4 Oct 2023 15:58:40 +0300 Subject: [PATCH 02/12] check rate limits for graphql --- .../libs/integrations/src/integrations/github/tokenRotator.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/libs/integrations/src/integrations/github/tokenRotator.ts b/services/libs/integrations/src/integrations/github/tokenRotator.ts index b99cbaa14f..37b3be57de 100644 --- a/services/libs/integrations/src/integrations/github/tokenRotator.ts +++ b/services/libs/integrations/src/integrations/github/tokenRotator.ts @@ -73,8 +73,8 @@ export class GithubTokenRotator { headers: { Authorization: `token ${token}` }, }) - const remaining = parseInt(response.data.resources.core.remaining) - const reset = parseInt(response.data.resources.core.reset) + const remaining = parseInt(response.data.resources.graphql.remaining) + const reset = parseInt(response.data.resources.graphql.reset) await this.updateTokenInfo(token, remaining, reset) } } From a0fb9b7893c6d3b300b85809aa7b9136a2110a09 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Wed, 4 Oct 2023 16:07:54 +0300 Subject: [PATCH 03/12] switch to global cache and change cache key --- .../src/integrations/github/processStream.ts | 5 +++- .../src/integrations/github/tokenRotator.ts | 27 ++++++++++++------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/services/libs/integrations/src/integrations/github/processStream.ts b/services/libs/integrations/src/integrations/github/processStream.ts index de3c3b0d9c..898b7b7b4e 100644 --- a/services/libs/integrations/src/integrations/github/processStream.ts +++ b/services/libs/integrations/src/integrations/github/processStream.ts @@ -81,7 +81,10 @@ export function getConcurrentRequestLimiter( export function getTokenRotator(ctx: IProcessStreamContext): GithubTokenRotator { const GITHUB_CONFIG = ctx.platformSettings as GithubPlatformSettings if (tokenRotator === undefined) { - tokenRotator = new GithubTokenRotator(ctx.cache, GITHUB_CONFIG.personalAccessTokens.split(',')) + tokenRotator = new GithubTokenRotator( + ctx.globalCache, + GITHUB_CONFIG.personalAccessTokens.split(','), + ) } return tokenRotator } diff --git a/services/libs/integrations/src/integrations/github/tokenRotator.ts b/services/libs/integrations/src/integrations/github/tokenRotator.ts index 37b3be57de..f556bdaefc 100644 --- a/services/libs/integrations/src/integrations/github/tokenRotator.ts +++ b/services/libs/integrations/src/integrations/github/tokenRotator.ts @@ -9,6 +9,7 @@ interface TokenInfo { } export class GithubTokenRotator { + static CACHE_KEY = 'integration-cache:github-token-rotator:tokens' constructor(private cache: ICache, private tokens: string[]) { this.cache = cache this.tokens = [...new Set(tokens)] @@ -17,7 +18,9 @@ export class GithubTokenRotator { private async initializeTokens(): Promise { for (const token of this.tokens) { - const tokenInfo: TokenInfo = JSON.parse(await this.cache.hget('github:tokens', token)) + const tokenInfo: TokenInfo = JSON.parse( + await this.cache.hget(GithubTokenRotator.CACHE_KEY, token), + ) if (!tokenInfo) { const newTokenInfo: TokenInfo = { token, @@ -25,13 +28,13 @@ export class GithubTokenRotator { reset: 0, inUse: false, } - await this.cache.hset('github:tokens', token, JSON.stringify(newTokenInfo)) + await this.cache.hset(GithubTokenRotator.CACHE_KEY, token, JSON.stringify(newTokenInfo)) } } } public async getToken(): Promise { - const tokens = await this.cache.hgetall('github:tokens') + const tokens = await this.cache.hgetall(GithubTokenRotator.CACHE_KEY) for (const token in tokens) { const tokenInfo: TokenInfo = JSON.parse(tokens[token]) if ( @@ -39,7 +42,7 @@ export class GithubTokenRotator { (tokenInfo.remaining > 0 || tokenInfo.reset < Math.floor(Date.now() / 1000)) ) { tokenInfo.inUse = true - await this.cache.hset('github:tokens', token, JSON.stringify(tokenInfo)) + await this.cache.hset(GithubTokenRotator.CACHE_KEY, token, JSON.stringify(tokenInfo)) return token } } @@ -47,25 +50,31 @@ export class GithubTokenRotator { } public async returnToken(token: string): Promise { - const tokenInfo: TokenInfo = JSON.parse((await this.cache.hget('github:tokens', token)) || '') + const tokenInfo: TokenInfo = JSON.parse( + (await this.cache.hget(GithubTokenRotator.CACHE_KEY, token)) || '', + ) if (tokenInfo) { tokenInfo.inUse = false - await this.cache.hset('github:tokens', token, JSON.stringify(tokenInfo)) + await this.cache.hset(GithubTokenRotator.CACHE_KEY, token, JSON.stringify(tokenInfo)) } } public async updateTokenInfo(token: string, remaining: number, reset: number): Promise { - const tokenInfo: TokenInfo = JSON.parse((await this.cache.hget('github:tokens', token)) || '') + const tokenInfo: TokenInfo = JSON.parse( + (await this.cache.hget(GithubTokenRotator.CACHE_KEY, token)) || '', + ) if (tokenInfo) { tokenInfo.remaining = remaining tokenInfo.reset = reset - await this.cache.hset('github:tokens', token, JSON.stringify(tokenInfo)) + await this.cache.hset(GithubTokenRotator.CACHE_KEY, token, JSON.stringify(tokenInfo)) } } public async updateRateLimitInfoFromApi(token: string): Promise { // let's make API call to get the latest rate limit info - const tokenInfo: TokenInfo = JSON.parse((await this.cache.hget('github:tokens', token)) || '') + const tokenInfo: TokenInfo = JSON.parse( + (await this.cache.hget(GithubTokenRotator.CACHE_KEY, token)) || '', + ) if (tokenInfo) { const response = await axios({ url: 'https://api.github.com/rate_limit', From 13779bc965d8020bc227f333cddc315ca85931fe Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Wed, 4 Oct 2023 17:57:13 +0300 Subject: [PATCH 04/12] add logging --- .../src/integrations/github/api/graphql/baseQuery.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts index 3fdbc0f81b..4f633e85cd 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts @@ -4,6 +4,9 @@ import { GraphQlQueryResponseData } from '@octokit/graphql/dist-types/types' import { GraphQlQueryResponse } from '@crowd/types' import { RateLimitError, IConcurrentRequestLimiter } from '@crowd/types' import { GithubTokenRotator } from '../../tokenRotator' +import { getServiceChildLogger } from '@crowd/logging' + +const logger = getServiceChildLogger('integrations:github:api:graphql:baseQuery') interface Limiter { integrationId: string @@ -116,17 +119,20 @@ class BaseQuery { return await process() } } catch (err) { + logger.error('Error in getSinglePage') if ( (err.status === 403 && err.message && (err.message as string).toLowerCase().includes('secondary rate limit')) || (err.errors && err.errors[0].type === 'RATE_LIMITED') ) { + logger.error('Error in getSinglePage: rate limit error. Trying token rotation') // this is rate limit, let's try token rotation if (tokenRotator) { return await this.getSinglePageWithTokenRotation(beforeCursor, tokenRotator) } } else { + logger.error('Error in getSinglePage: other error') throw BaseQuery.processGraphQLError(err) } } @@ -136,6 +142,8 @@ class BaseQuery { beforeCursor: string, tokenRotator: GithubTokenRotator, ): Promise { + logger.info('getSinglePageWithTokenRotation') + const paginatedQuery = BaseQuery.interpolate(this.query, { beforeCursor: BaseQuery.getPagination(beforeCursor), }) @@ -157,6 +165,7 @@ class BaseQuery { try { return await process() } catch (err) { + logger.error('Error in getSinglePageWithTokenRotation') // we might have exhausted one token, but we let another streams to continue if (err.headers && err.headers['x-ratelimit-remaining'] && err.headers['x-ratelimit-reset']) { const remaining = parseInt(err.headers['x-ratelimit-remaining']) From cda4ff213143a311b216ad7248c24feb539d9e1d Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Wed, 4 Oct 2023 19:09:43 +0300 Subject: [PATCH 05/12] add token rotation for user and organization endpoints --- .../github/api/graphql/members.ts | 63 ++++++++++++++++- .../github/api/graphql/organizations.ts | 70 ++++++++++++++++++- .../src/integrations/github/processStream.ts | 4 +- 3 files changed, 133 insertions(+), 4 deletions(-) diff --git a/services/libs/integrations/src/integrations/github/api/graphql/members.ts b/services/libs/integrations/src/integrations/github/api/graphql/members.ts index b5faed4353..6af5a62933 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/members.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/members.ts @@ -1,6 +1,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { graphql } from '@octokit/graphql' import BaseQuery from './baseQuery' +import { GithubTokenRotator } from '../../tokenRotator' /** * Get information from a member using the GitHub GraphQL API. @@ -8,7 +9,11 @@ import BaseQuery from './baseQuery' * @param token GitHub personal access token * @returns Information from member */ -const getMember = async (username: string, token: string): Promise => { +const getMember = async ( + username: string, + token: string, + tokenRotator?: GithubTokenRotator, +): Promise => { let user: string | null try { const graphqlWithAuth = graphql.defaults({ @@ -28,6 +33,16 @@ const getMember = async (username: string, token: string): Promise => { // In that case we want to return null instead of throwing an error if (err.errors && err.errors[0].type === 'NOT_FOUND') { user = null + } else if ( + (err.status === 403 && + err.message && + (err.message as string).toLowerCase().includes('secondary rate limit')) || + (err.errors && err.errors[0].type === 'RATE_LIMITED') + ) { + // this is rate limit, let's try token rotation + if (tokenRotator) { + user = await getMemberWithTokenRotation(username, tokenRotator) + } } else { throw BaseQuery.processGraphQLError(err) } @@ -35,4 +50,50 @@ const getMember = async (username: string, token: string): Promise => { return user } +const getMemberWithTokenRotation = async ( + username: string, + tokenRotator: GithubTokenRotator, +): Promise => { + let user: string | null + const token = await tokenRotator.getToken() + try { + const graphqlWithTokenRotation = graphql.defaults({ + headers: { + authorization: `token ${token}`, + }, + }) + + user = ( + (await graphqlWithTokenRotation(`{ + user(login: "${username}") ${BaseQuery.USER_SELECT} + } + `)) as any + ).user + + await tokenRotator.updateRateLimitInfoFromApi(token) + await tokenRotator.returnToken(token) + + return user + } catch (err) { + if (err.errors && err.errors[0].type === 'NOT_FOUND') { + return null + } + // we might have exhausted one token, but we let another streams to continue + else if ( + err.headers && + err.headers['x-ratelimit-remaining'] && + err.headers['x-ratelimit-reset'] + ) { + const remaining = parseInt(err.headers['x-ratelimit-remaining']) + const reset = parseInt(err.headers['x-ratelimit-reset']) + await tokenRotator.updateTokenInfo(token, remaining, reset) + } else { + await tokenRotator.updateRateLimitInfoFromApi(token) + } + throw BaseQuery.processGraphQLError(err) + } finally { + await tokenRotator.returnToken(token) + } +} + export default getMember diff --git a/services/libs/integrations/src/integrations/github/api/graphql/organizations.ts b/services/libs/integrations/src/integrations/github/api/graphql/organizations.ts index 70c821602a..9f203ce203 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/organizations.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/organizations.ts @@ -2,6 +2,7 @@ import { getServiceChildLogger } from '@crowd/logging' import { graphql } from '@octokit/graphql' import BaseQuery from './baseQuery' +import { GithubTokenRotator } from '../../tokenRotator' const logger = getServiceChildLogger('github.getOrganization') @@ -11,7 +12,11 @@ const logger = getServiceChildLogger('github.getOrganization') * @param token GitHub personal access token * @returns Information from organization */ -const getOrganization = async (name: string, token: string): Promise => { +const getOrganization = async ( + name: string, + token: string, + tokenRotator?: GithubTokenRotator, +): Promise => { let organization: string | null try { const graphqlWithAuth = graphql.defaults({ @@ -46,6 +51,16 @@ const getOrganization = async (name: string, token: string): Promise => { // In that case we want to return null instead of throwing an error if (err.errors && err.errors[0].type === 'NOT_FOUND') { organization = null + } else if ( + (err.status === 403 && + err.message && + (err.message as string).toLowerCase().includes('secondary rate limit')) || + (err.errors && err.errors[0].type === 'RATE_LIMITED') + ) { + // this is rate limit, let's try token rotation + if (tokenRotator) { + organization = await getOrganizationWithTokenRotation(name, tokenRotator) + } } else { throw BaseQuery.processGraphQLError(err) } @@ -53,4 +68,57 @@ const getOrganization = async (name: string, token: string): Promise => { return organization } +const getOrganizationWithTokenRotation = async ( + name: string, + tokenRotator: GithubTokenRotator, +): Promise => { + const token = await tokenRotator.getToken() + try { + const graphqlWithTokenRotation = graphql.defaults({ + headers: { + authorization: `token ${token}`, + }, + }) + + const sanitizedName = name.replaceAll('\\', '').replaceAll('"', '') + + const organizationsQuery = `{ + search(query: "type:org ${sanitizedName}", type: USER, first: 10) { + nodes { + ... on Organization ${BaseQuery.ORGANIZATION_SELECT} + } + } + rateLimit { + limit + cost + remaining + resetAt + } + }` + + const organization = (await graphqlWithTokenRotation(organizationsQuery)) as any + + return (organization as any).search.nodes.length > 0 + ? (organization as any).search.nodes[0] + : null + } catch (err) { + if (err.errors && err.errors[0].type === 'NOT_FOUND') { + return null + } else if ( + err.headers && + err.headers['x-ratelimit-remaining'] && + err.headers['x-ratelimit-reset'] + ) { + const remaining = parseInt(err.headers['x-ratelimit-remaining']) + const reset = parseInt(err.headers['x-ratelimit-reset']) + await tokenRotator.updateTokenInfo(token, remaining, reset) + } else { + await tokenRotator.updateRateLimitInfoFromApi(token) + } + throw BaseQuery.processGraphQLError(err) + } finally { + await tokenRotator.returnToken(token) + } +} + export default getOrganization diff --git a/services/libs/integrations/src/integrations/github/processStream.ts b/services/libs/integrations/src/integrations/github/processStream.ts index 898b7b7b4e..3440fbc439 100644 --- a/services/libs/integrations/src/integrations/github/processStream.ts +++ b/services/libs/integrations/src/integrations/github/processStream.ts @@ -107,7 +107,7 @@ export async function getGithubToken(ctx: IProcessStreamContext): Promise { const appToken = await getGithubToken(ctx) - return getMember(login, appToken) + return getMember(login, appToken, getTokenRotator(ctx)) } async function getMemberEmail(ctx: IProcessStreamContext, login: string): Promise { @@ -170,7 +170,7 @@ export const prepareMember = async ( } else { const company = memberFromApi.company.replace('@', '').trim() const token = await getGithubToken(ctx) - const fromAPI = await getOrganization(company, token) + const fromAPI = await getOrganization(company, token, getTokenRotator(ctx)) orgs = fromAPI } From 21faffc50daeee266fb23973a44c236e0326f4de Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Wed, 4 Oct 2023 19:35:37 +0300 Subject: [PATCH 06/12] remove inuse parameter in token rotator --- .../integrations/src/integrations/github/tokenRotator.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/services/libs/integrations/src/integrations/github/tokenRotator.ts b/services/libs/integrations/src/integrations/github/tokenRotator.ts index f556bdaefc..c8cb509c63 100644 --- a/services/libs/integrations/src/integrations/github/tokenRotator.ts +++ b/services/libs/integrations/src/integrations/github/tokenRotator.ts @@ -5,7 +5,6 @@ interface TokenInfo { token: string remaining: number reset: number - inUse: boolean } export class GithubTokenRotator { @@ -26,7 +25,6 @@ export class GithubTokenRotator { token, remaining: 0, reset: 0, - inUse: false, } await this.cache.hset(GithubTokenRotator.CACHE_KEY, token, JSON.stringify(newTokenInfo)) } @@ -37,11 +35,7 @@ export class GithubTokenRotator { const tokens = await this.cache.hgetall(GithubTokenRotator.CACHE_KEY) for (const token in tokens) { const tokenInfo: TokenInfo = JSON.parse(tokens[token]) - if ( - !tokenInfo.inUse && - (tokenInfo.remaining > 0 || tokenInfo.reset < Math.floor(Date.now() / 1000)) - ) { - tokenInfo.inUse = true + if (tokenInfo.remaining > 0 || tokenInfo.reset < Math.floor(Date.now() / 1000)) { await this.cache.hset(GithubTokenRotator.CACHE_KEY, token, JSON.stringify(tokenInfo)) return token } @@ -54,7 +48,6 @@ export class GithubTokenRotator { (await this.cache.hget(GithubTokenRotator.CACHE_KEY, token)) || '', ) if (tokenInfo) { - tokenInfo.inUse = false await this.cache.hset(GithubTokenRotator.CACHE_KEY, token, JSON.stringify(tokenInfo)) } } From ada7c2e2c3781631d2f23ff0cee5dbe702b1774a Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Thu, 5 Oct 2023 12:02:21 +0300 Subject: [PATCH 07/12] await promise so the error will be catched in the closed try catch --- .../src/integrations/github/api/graphql/baseQuery.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts index 4f633e85cd..c618876f63 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts @@ -109,7 +109,7 @@ class BaseQuery { try { if (limiter) { - return limiter.concurrentRequestLimiter.processWithLimit( + return await limiter.concurrentRequestLimiter.processWithLimit( limiter.integrationId, async () => { return await process() From 52d3073c94a5e63352218774234fbb253d2c6887 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Thu, 5 Oct 2023 13:19:40 +0300 Subject: [PATCH 08/12] descreas number of allowed concurrent requests to 3 --- .../src/integrations/github/api/graphql/baseQuery.ts | 8 ++++++++ .../integrations/src/integrations/github/processStream.ts | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts index c618876f63..feb7313d83 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts @@ -230,3 +230,11 @@ class BaseQuery { } export default BaseQuery + +export const process = async () => { + try { + return process() + } catch (err) { + // some logic here + } +} diff --git a/services/libs/integrations/src/integrations/github/processStream.ts b/services/libs/integrations/src/integrations/github/processStream.ts index 3440fbc439..f20d9d70cf 100644 --- a/services/libs/integrations/src/integrations/github/processStream.ts +++ b/services/libs/integrations/src/integrations/github/processStream.ts @@ -71,7 +71,7 @@ export function getConcurrentRequestLimiter( ): IConcurrentRequestLimiter { if (concurrentRequestLimiter === undefined) { concurrentRequestLimiter = ctx.getConcurrentRequestLimiter( - 4, // max 2 concurrent requests + 3, // max 2 concurrent requests 'github-concurrent-request-limiter', ) } From c34733af20a41ba432d6a003f449f98c95a10638 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Thu, 5 Oct 2023 13:46:36 +0300 Subject: [PATCH 09/12] raise rate limits in token rotator --- .../src/integrations/github/tokenRotator.ts | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/services/libs/integrations/src/integrations/github/tokenRotator.ts b/services/libs/integrations/src/integrations/github/tokenRotator.ts index c8cb509c63..93f3dbbc17 100644 --- a/services/libs/integrations/src/integrations/github/tokenRotator.ts +++ b/services/libs/integrations/src/integrations/github/tokenRotator.ts @@ -1,5 +1,6 @@ import { ICache } from '@crowd/types' import axios from 'axios' +import { RateLimitError } from '@crowd/types' interface TokenInfo { token: string @@ -33,14 +34,34 @@ export class GithubTokenRotator { public async getToken(): Promise { const tokens = await this.cache.hgetall(GithubTokenRotator.CACHE_KEY) + let minResetTime = Infinity + for (const token in tokens) { const tokenInfo: TokenInfo = JSON.parse(tokens[token]) if (tokenInfo.remaining > 0 || tokenInfo.reset < Math.floor(Date.now() / 1000)) { await this.cache.hset(GithubTokenRotator.CACHE_KEY, token, JSON.stringify(tokenInfo)) return token } + minResetTime = Math.min(minResetTime, tokenInfo.reset) } - throw new Error('No available tokens in GitHubTokenRotator') + + const waitTime = + minResetTime - Math.floor(Date.now() / 1000) + Math.floor(Math.random() * 120) + 60 + + // if we have to wait less than 60 seconds, let's wait and try again + if (waitTime <= 60) { + return new Promise((resolve) => { + setTimeout(async () => { + resolve(await this.getToken()) + }, waitTime * 1000) + }) + } + + throw new RateLimitError( + waitTime + Math.floor(Math.random() * 120) + 60, + 'token-rotator', + `No available tokens in GitHubTokenRotator. Please wait for ${waitTime} seconds`, + ) } public async returnToken(token: string): Promise { From ff5e29750f893347adfdfd5279c831d79b2b0b62 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Thu, 5 Oct 2023 16:23:09 +0300 Subject: [PATCH 10/12] add request limiter everywhere --- .../github/api/graphql/baseQuery.ts | 16 +++++-- .../github/api/graphql/members.ts | 43 ++++++++++++++----- .../github/api/graphql/organizations.ts | 42 ++++++++++++++---- 3 files changed, 80 insertions(+), 21 deletions(-) diff --git a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts index feb7313d83..2f15b29d92 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts @@ -8,7 +8,7 @@ import { getServiceChildLogger } from '@crowd/logging' const logger = getServiceChildLogger('integrations:github:api:graphql:baseQuery') -interface Limiter { +export interface Limiter { integrationId: string concurrentRequestLimiter: IConcurrentRequestLimiter } @@ -129,7 +129,7 @@ class BaseQuery { logger.error('Error in getSinglePage: rate limit error. Trying token rotation') // this is rate limit, let's try token rotation if (tokenRotator) { - return await this.getSinglePageWithTokenRotation(beforeCursor, tokenRotator) + return await this.getSinglePageWithTokenRotation(beforeCursor, tokenRotator, limiter) } } else { logger.error('Error in getSinglePage: other error') @@ -141,6 +141,7 @@ class BaseQuery { private async getSinglePageWithTokenRotation( beforeCursor: string, tokenRotator: GithubTokenRotator, + limiter?: Limiter, ): Promise { logger.info('getSinglePageWithTokenRotation') @@ -163,7 +164,16 @@ class BaseQuery { } try { - return await process() + if (limiter) { + return await limiter.concurrentRequestLimiter.processWithLimit( + limiter.integrationId, + async () => { + return await process() + }, + ) + } else { + return await process() + } } catch (err) { logger.error('Error in getSinglePageWithTokenRotation') // we might have exhausted one token, but we let another streams to continue diff --git a/services/libs/integrations/src/integrations/github/api/graphql/members.ts b/services/libs/integrations/src/integrations/github/api/graphql/members.ts index 6af5a62933..d3a4ae99ce 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/members.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/members.ts @@ -2,6 +2,7 @@ import { graphql } from '@octokit/graphql' import BaseQuery from './baseQuery' import { GithubTokenRotator } from '../../tokenRotator' +import { Limiter } from './baseQuery' /** * Get information from a member using the GitHub GraphQL API. @@ -13,6 +14,7 @@ const getMember = async ( username: string, token: string, tokenRotator?: GithubTokenRotator, + limiter?: Limiter, ): Promise => { let user: string | null try { @@ -22,12 +24,22 @@ const getMember = async ( }, }) - user = ( - (await graphqlWithAuth(`{ + const process = async () => { + user = ( + (await graphqlWithAuth(`{ user(login: "${username}") ${BaseQuery.USER_SELECT} } `)) as any - ).user + ).user + } + + if (limiter) { + await limiter.concurrentRequestLimiter.processWithLimit(limiter.integrationId, async () => { + await process() + }) + } else { + await process() + } } catch (err) { // It may be that the user was not found, if for example it is a bot // In that case we want to return null instead of throwing an error @@ -41,7 +53,7 @@ const getMember = async ( ) { // this is rate limit, let's try token rotation if (tokenRotator) { - user = await getMemberWithTokenRotation(username, tokenRotator) + user = await getMemberWithTokenRotation(username, tokenRotator, limiter) } } else { throw BaseQuery.processGraphQLError(err) @@ -53,6 +65,7 @@ const getMember = async ( const getMemberWithTokenRotation = async ( username: string, tokenRotator: GithubTokenRotator, + limiter?: Limiter, ): Promise => { let user: string | null const token = await tokenRotator.getToken() @@ -63,12 +76,22 @@ const getMemberWithTokenRotation = async ( }, }) - user = ( - (await graphqlWithTokenRotation(`{ - user(login: "${username}") ${BaseQuery.USER_SELECT} - } - `)) as any - ).user + const process = async () => { + user = ( + (await graphqlWithTokenRotation(`{ + user(login: "${username}") ${BaseQuery.USER_SELECT} + } + `)) as any + ).user + } + + if (limiter) { + await limiter.concurrentRequestLimiter.processWithLimit(limiter.integrationId, async () => { + await process() + }) + } else { + await process() + } await tokenRotator.updateRateLimitInfoFromApi(token) await tokenRotator.returnToken(token) diff --git a/services/libs/integrations/src/integrations/github/api/graphql/organizations.ts b/services/libs/integrations/src/integrations/github/api/graphql/organizations.ts index 9f203ce203..25b3bc8cdb 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/organizations.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/organizations.ts @@ -3,6 +3,7 @@ import { getServiceChildLogger } from '@crowd/logging' import { graphql } from '@octokit/graphql' import BaseQuery from './baseQuery' import { GithubTokenRotator } from '../../tokenRotator' +import { Limiter } from './baseQuery' const logger = getServiceChildLogger('github.getOrganization') @@ -16,6 +17,7 @@ const getOrganization = async ( name: string, token: string, tokenRotator?: GithubTokenRotator, + limiter?: Limiter, ): Promise => { let organization: string | null try { @@ -41,10 +43,20 @@ const getOrganization = async ( } }` - organization = (await graphqlWithAuth(organizationsQuery)) as any + const process = async () => { + organization = (await graphqlWithAuth(organizationsQuery)) as any - organization = - (organization as any).search.nodes.length > 0 ? (organization as any).search.nodes[0] : null + organization = + (organization as any).search.nodes.length > 0 ? (organization as any).search.nodes[0] : null + } + + if (limiter) { + await limiter.concurrentRequestLimiter.processWithLimit(limiter.integrationId, async () => { + await process() + }) + } else { + await process() + } } catch (err) { logger.error(err, { name }, 'Error getting organization!') // It may be that the organization was not found, if for example it is a bot @@ -59,7 +71,7 @@ const getOrganization = async ( ) { // this is rate limit, let's try token rotation if (tokenRotator) { - organization = await getOrganizationWithTokenRotation(name, tokenRotator) + organization = await getOrganizationWithTokenRotation(name, tokenRotator, limiter) } } else { throw BaseQuery.processGraphQLError(err) @@ -71,6 +83,7 @@ const getOrganization = async ( const getOrganizationWithTokenRotation = async ( name: string, tokenRotator: GithubTokenRotator, + limiter?: Limiter, ): Promise => { const token = await tokenRotator.getToken() try { @@ -96,11 +109,24 @@ const getOrganizationWithTokenRotation = async ( } }` - const organization = (await graphqlWithTokenRotation(organizationsQuery)) as any + const process = async () => { + const organization = (await graphqlWithTokenRotation(organizationsQuery)) as any - return (organization as any).search.nodes.length > 0 - ? (organization as any).search.nodes[0] - : null + return (organization as any).search.nodes.length > 0 + ? (organization as any).search.nodes[0] + : null + } + + if (limiter) { + return await limiter.concurrentRequestLimiter.processWithLimit( + limiter.integrationId, + async () => { + return await process() + }, + ) + } else { + return await process() + } } catch (err) { if (err.errors && err.errors[0].type === 'NOT_FOUND') { return null From 0cd5990d37d4e17b40f142e156f88efee8d607a1 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Thu, 5 Oct 2023 16:42:41 +0300 Subject: [PATCH 11/12] raise error if we have rate limit but no token rotator provided --- .../src/integrations/github/api/graphql/baseQuery.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts index 2f15b29d92..06ae91e6ca 100644 --- a/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts +++ b/services/libs/integrations/src/integrations/github/api/graphql/baseQuery.ts @@ -130,6 +130,8 @@ class BaseQuery { // this is rate limit, let's try token rotation if (tokenRotator) { return await this.getSinglePageWithTokenRotation(beforeCursor, tokenRotator, limiter) + } else { + throw BaseQuery.processGraphQLError(err) } } else { logger.error('Error in getSinglePage: other error') From fa19e42f57ddbd9aee3effde93aecc11e4dd12f2 Mon Sep 17 00:00:00 2001 From: garrrikkotua Date: Thu, 5 Oct 2023 17:08:13 +0300 Subject: [PATCH 12/12] descrease concurrent requets to two --- .../libs/integrations/src/integrations/github/processStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/integrations/src/integrations/github/processStream.ts b/services/libs/integrations/src/integrations/github/processStream.ts index f20d9d70cf..0e73dbfed7 100644 --- a/services/libs/integrations/src/integrations/github/processStream.ts +++ b/services/libs/integrations/src/integrations/github/processStream.ts @@ -71,7 +71,7 @@ export function getConcurrentRequestLimiter( ): IConcurrentRequestLimiter { if (concurrentRequestLimiter === undefined) { concurrentRequestLimiter = ctx.getConcurrentRequestLimiter( - 3, // max 2 concurrent requests + 2, // max 2 concurrent requests 'github-concurrent-request-limiter', ) }