diff --git a/services/libs/redis/src/rateLimiter.ts b/services/libs/redis/src/rateLimiter.ts index 87b8169cff..f310dfac7b 100644 --- a/services/libs/redis/src/rateLimiter.ts +++ b/services/libs/redis/src/rateLimiter.ts @@ -33,15 +33,19 @@ export class RateLimiter implements IRateLimiter { export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter { constructor( private readonly cache: ICache, + // max concurrent requests per integrationId private readonly maxConcurrentRequests: number, private readonly requestKey: string, + // cache key will be deleted after this time since last increment / decrement + private readonly maxLockTimeSeconds = 30, ) { this.cache = cache this.maxConcurrentRequests = maxConcurrentRequests this.requestKey = requestKey + this.maxLockTimeSeconds = maxLockTimeSeconds } - public async checkConcurrentRequestLimit(integrationId: string, retries = 5, sleepTimeMs = 1000) { + public async checkConcurrentRequestLimit(integrationId: string, retries = 200, sleepTimeMs = 50) { const key = this.getRequestKey(integrationId) const value = await this.cache.get(key) const currentRequests = value === null ? 0 : parseInt(value) @@ -60,12 +64,12 @@ export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter { public async incrementConcurrentRequest(integrationId: string) { const key = this.getRequestKey(integrationId) - await this.cache.increment(key, 1) + await this.cache.increment(key, 1, this.maxLockTimeSeconds) } public async decrementConcurrentRequest(integrationId: string) { const key = this.getRequestKey(integrationId) - await this.cache.decrement(key, 1) + await this.cache.decrement(key, 1, this.maxLockTimeSeconds) } public async processWithLimit(integrationId: string, func: () => Promise): Promise {