Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions services/libs/redis/src/rateLimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@ export class RateLimiter implements IRateLimiter {
export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter {
constructor(
private readonly cache: ICache,
// max concurrent requests per integrationId
private readonly maxConcurrentRequests: number,
private readonly requestKey: string,
// cache key will be deleted after this time since last increment / decrement
private readonly maxLockTimeSeconds = 30,
) {
this.cache = cache
this.maxConcurrentRequests = maxConcurrentRequests
this.requestKey = requestKey
this.maxLockTimeSeconds = maxLockTimeSeconds
}

public async checkConcurrentRequestLimit(integrationId: string, retries = 5, sleepTimeMs = 1000) {
public async checkConcurrentRequestLimit(integrationId: string, retries = 200, sleepTimeMs = 50) {
const key = this.getRequestKey(integrationId)
const value = await this.cache.get(key)
const currentRequests = value === null ? 0 : parseInt(value)
Expand All @@ -60,12 +64,12 @@ export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter {

public async incrementConcurrentRequest(integrationId: string) {
const key = this.getRequestKey(integrationId)
await this.cache.increment(key, 1)
await this.cache.increment(key, 1, this.maxLockTimeSeconds)
}

public async decrementConcurrentRequest(integrationId: string) {
const key = this.getRequestKey(integrationId)
await this.cache.decrement(key, 1)
await this.cache.decrement(key, 1, this.maxLockTimeSeconds)
}

public async processWithLimit<T>(integrationId: string, func: () => Promise<T>): Promise<T> {
Expand Down