Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"callbackUrl": "CROWD_GITHUB_CALLBACK_URL",
"privateKey": "CROWD_GITHUB_PRIVATE_KEY",
"webhookSecret": "CROWD_GITHUB_WEBHOOK_SECRET",
"isCommitDataEnabled": "CROWD_GITHUB_IS_COMMIT_DATA_ENABLED"
"isCommitDataEnabled": "CROWD_GITHUB_IS_COMMIT_DATA_ENABLED",
"personalAccessTokens": "CROWD_GITHUB_PERSONAL_ACCESS_TOKENS"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ import { graphql } from '@octokit/graphql'
import { GraphQlQueryResponseData } from '@octokit/graphql/dist-types/types'
import { GraphQlQueryResponse } from '@crowd/types'
import { RateLimitError, IConcurrentRequestLimiter } from '@crowd/types'
import { GithubTokenRotator } from '../../tokenRotator'
import { getServiceChildLogger } from '@crowd/logging'

interface Limiter {
const logger = getServiceChildLogger('integrations:github:api:graphql:baseQuery')

export interface Limiter {
integrationId: string
concurrentRequestLimiter: IConcurrentRequestLimiter
}
Expand Down Expand Up @@ -89,26 +93,102 @@ class BaseQuery {
* @param beforeCursor Cursor to paginate records before it
* @returns parsed graphQl result
*/
async getSinglePage(beforeCursor: string, limiter?: Limiter): Promise<GraphQlQueryResponse> {
async getSinglePage(
beforeCursor: string,
limiter?: Limiter,
tokenRotator?: GithubTokenRotator,
): Promise<GraphQlQueryResponse> {
const paginatedQuery = BaseQuery.interpolate(this.query, {
beforeCursor: BaseQuery.getPagination(beforeCursor),
})

const process = async () => {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
}

try {
if (limiter) {
return await limiter.concurrentRequestLimiter.processWithLimit(
limiter.integrationId,
async () => {
return await process()
},
)
} else {
return await process()
}
} catch (err) {
logger.error('Error in getSinglePage')
if (
(err.status === 403 &&
err.message &&
(err.message as string).toLowerCase().includes('secondary rate limit')) ||
(err.errors && err.errors[0].type === 'RATE_LIMITED')
) {
logger.error('Error in getSinglePage: rate limit error. Trying token rotation')
// this is rate limit, let's try token rotation
if (tokenRotator) {
return await this.getSinglePageWithTokenRotation(beforeCursor, tokenRotator, limiter)
} else {
throw BaseQuery.processGraphQLError(err)
}
} else {
logger.error('Error in getSinglePage: other error')
throw BaseQuery.processGraphQLError(err)
}
}
}

private async getSinglePageWithTokenRotation(
beforeCursor: string,
tokenRotator: GithubTokenRotator,
limiter?: Limiter,
): Promise<GraphQlQueryResponse> {
logger.info('getSinglePageWithTokenRotation')

const paginatedQuery = BaseQuery.interpolate(this.query, {
beforeCursor: BaseQuery.getPagination(beforeCursor),
})

const token = await tokenRotator.getToken()

const process = async () => {
const graphqlWithTokenRotation = graphql.defaults({
headers: {
authorization: `token ${token}`,
},
})
const result = await graphqlWithTokenRotation(paginatedQuery)
await tokenRotator.updateRateLimitInfoFromApi(token)
await tokenRotator.returnToken(token)
return this.getEventData(result)
}

try {
if (limiter) {
return limiter.concurrentRequestLimiter.processWithLimit(
return await limiter.concurrentRequestLimiter.processWithLimit(
limiter.integrationId,
async () => {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
return await process()
},
)
} else {
const result = await this.graphQL(paginatedQuery)
return this.getEventData(result)
return await process()
}
} catch (err) {
logger.error('Error in getSinglePageWithTokenRotation')
// we might have exhausted one token, but we let another streams to continue
if (err.headers && err.headers['x-ratelimit-remaining'] && err.headers['x-ratelimit-reset']) {
const remaining = parseInt(err.headers['x-ratelimit-remaining'])
const reset = parseInt(err.headers['x-ratelimit-reset'])
await tokenRotator.updateTokenInfo(token, remaining, reset)
} else {
await tokenRotator.updateRateLimitInfoFromApi(token)
}
throw BaseQuery.processGraphQLError(err)
} finally {
await tokenRotator.returnToken(token)
}
}

Expand Down Expand Up @@ -162,3 +242,11 @@ class BaseQuery {
}

export default BaseQuery

export const process = async () => {
try {
return process()
} catch (err) {
// some logic here
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { graphql } from '@octokit/graphql'
import BaseQuery from './baseQuery'
import { GithubTokenRotator } from '../../tokenRotator'
import { Limiter } from './baseQuery'

/**
* Get information from a member using the GitHub GraphQL API.
* @param username GitHub username
* @param token GitHub personal access token
* @returns Information from member
*/
const getMember = async (username: string, token: string): Promise<any> => {
const getMember = async (
username: string,
token: string,
tokenRotator?: GithubTokenRotator,
limiter?: Limiter,
): Promise<any> => {
let user: string | null
try {
const graphqlWithAuth = graphql.defaults({
Expand All @@ -17,22 +24,99 @@ const getMember = async (username: string, token: string): Promise<any> => {
},
})

user = (
(await graphqlWithAuth(`{
const process = async () => {
user = (
(await graphqlWithAuth(`{
user(login: "${username}") ${BaseQuery.USER_SELECT}
}
`)) as any
).user
).user
}

if (limiter) {
await limiter.concurrentRequestLimiter.processWithLimit(limiter.integrationId, async () => {
await process()
})
} else {
await process()
}
} catch (err) {
// It may be that the user was not found, if for example it is a bot
// In that case we want to return null instead of throwing an error
if (err.errors && err.errors[0].type === 'NOT_FOUND') {
user = null
} else if (
(err.status === 403 &&
err.message &&
(err.message as string).toLowerCase().includes('secondary rate limit')) ||
(err.errors && err.errors[0].type === 'RATE_LIMITED')
) {
// this is rate limit, let's try token rotation
if (tokenRotator) {
user = await getMemberWithTokenRotation(username, tokenRotator, limiter)
}
} else {
throw BaseQuery.processGraphQLError(err)
}
}
return user
}

const getMemberWithTokenRotation = async (
username: string,
tokenRotator: GithubTokenRotator,
limiter?: Limiter,
): Promise<any> => {
let user: string | null
const token = await tokenRotator.getToken()
try {
const graphqlWithTokenRotation = graphql.defaults({
headers: {
authorization: `token ${token}`,
},
})

const process = async () => {
user = (
(await graphqlWithTokenRotation(`{
user(login: "${username}") ${BaseQuery.USER_SELECT}
}
`)) as any
).user
}

if (limiter) {
await limiter.concurrentRequestLimiter.processWithLimit(limiter.integrationId, async () => {
await process()
})
} else {
await process()
}

await tokenRotator.updateRateLimitInfoFromApi(token)
await tokenRotator.returnToken(token)

return user
} catch (err) {
if (err.errors && err.errors[0].type === 'NOT_FOUND') {
return null
}
// we might have exhausted one token, but we let another streams to continue
else if (
err.headers &&
err.headers['x-ratelimit-remaining'] &&
err.headers['x-ratelimit-reset']
) {
const remaining = parseInt(err.headers['x-ratelimit-remaining'])
const reset = parseInt(err.headers['x-ratelimit-reset'])
await tokenRotator.updateTokenInfo(token, remaining, reset)
} else {
await tokenRotator.updateRateLimitInfoFromApi(token)
}
throw BaseQuery.processGraphQLError(err)
} finally {
await tokenRotator.returnToken(token)
}
}

export default getMember
Loading