@@ -33,15 +33,19 @@ export class RateLimiter implements IRateLimiter {
33
33
export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter {
34
34
constructor (
35
35
private readonly cache : ICache ,
36
+ // max concurrent requests per integrationId
36
37
private readonly maxConcurrentRequests : number ,
37
38
private readonly requestKey : string ,
39
+ // cache key will be deleted after this time since last increment / decrement
40
+ private readonly maxLockTimeSeconds = 30 ,
38
41
) {
39
42
this . cache = cache
40
43
this . maxConcurrentRequests = maxConcurrentRequests
41
44
this . requestKey = requestKey
45
+ this . maxLockTimeSeconds = maxLockTimeSeconds
42
46
}
43
47
44
- public async checkConcurrentRequestLimit ( integrationId : string , retries = 5 , sleepTimeMs = 1000 ) {
48
+ public async checkConcurrentRequestLimit ( integrationId : string , retries = 200 , sleepTimeMs = 50 ) {
45
49
const key = this . getRequestKey ( integrationId )
46
50
const value = await this . cache . get ( key )
47
51
const currentRequests = value === null ? 0 : parseInt ( value )
@@ -60,12 +64,12 @@ export class ConcurrentRequestLimiter implements IConcurrentRequestLimiter {
60
64
61
65
public async incrementConcurrentRequest ( integrationId : string ) {
62
66
const key = this . getRequestKey ( integrationId )
63
- await this . cache . increment ( key , 1 )
67
+ await this . cache . increment ( key , 1 , this . maxLockTimeSeconds )
64
68
}
65
69
66
70
public async decrementConcurrentRequest ( integrationId : string ) {
67
71
const key = this . getRequestKey ( integrationId )
68
- await this . cache . decrement ( key , 1 )
72
+ await this . cache . decrement ( key , 1 , this . maxLockTimeSeconds )
69
73
}
70
74
71
75
public async processWithLimit < T > ( integrationId : string , func : ( ) => Promise < T > ) : Promise < T > {
0 commit comments