@@ -42,64 +42,70 @@ local schema = {
42
42
}
43
43
44
44
45
- local function schedule_func_exec (batch_processor , delay , batch )
46
- local hdl , err = timer_at (delay , execute_func , batch_processor , batch )
45
+ local function schedule_func_exec (self , delay , batch )
46
+ local hdl , err = timer_at (delay , execute_func , self , batch )
47
47
if not hdl then
48
48
core .log .error (" failed to create process timer: " , err )
49
49
return
50
50
end
51
51
end
52
52
53
53
54
- function execute_func (premature , batch_processor , batch )
54
+ function execute_func (premature , self , batch )
55
55
if premature then
56
56
return
57
57
end
58
58
59
- local ok , err = batch_processor .func (batch .entries , batch_processor .batch_max_size )
59
+ local ok , err = self .func (batch .entries , self .batch_max_size )
60
60
if not ok then
61
- core .log .error (" Batch Processor[" , batch_processor .name , " ] failed to process entries: " , err )
61
+ core .log .error (" Batch Processor[" , self .name ,
62
+ " ] failed to process entries: " , err )
62
63
batch .retry_count = batch .retry_count + 1
63
- if batch .retry_count <= batch_processor .max_retry_count then
64
- schedule_func_exec (batch_processor , batch_processor .retry_delay , batch )
64
+ if batch .retry_count <= self .max_retry_count then
65
+ schedule_func_exec (self , self .retry_delay ,
66
+ batch )
65
67
else
66
- core .log .error (" Batch Processor[" , batch_processor .name ," ] exceeded " ,
67
- " the max_retry_count[" , batch .retry_count ," ] dropping the entries" )
68
+ core .log .error (" Batch Processor[" , self .name ," ] exceeded " ,
69
+ " the max_retry_count[" , batch .retry_count ,
70
+ " ] dropping the entries" )
68
71
end
69
72
return
70
73
end
71
74
72
- core .log .debug (" Batch Processor[" , batch_processor .name ," ] successfully processed the entries" )
75
+ core .log .debug (" Batch Processor[" , self .name ,
76
+ " ] successfully processed the entries" )
73
77
end
74
78
75
79
76
- local function flush_buffer (premature , batch_processor )
80
+ local function flush_buffer (premature , self )
77
81
if premature then
78
82
return
79
83
end
80
84
81
- if now () - batch_processor .last_entry_t >= batch_processor .inactive_timeout or
82
- now () - batch_processor .first_entry_t >= batch_processor .buffer_duration then
83
- core .log .debug (" Batch Processor[" , batch_processor .name ," ] buffer " ,
85
+ if now () - self .last_entry_t >= self .inactive_timeout or
86
+ now () - self .first_entry_t >= self .buffer_duration
87
+ then
88
+ core .log .debug (" Batch Processor[" , self .name ," ] buffer " ,
84
89
" duration exceeded, activating buffer flush" )
85
- batch_processor :process_buffer ()
86
- batch_processor .is_timer_running = false
90
+ self :process_buffer ()
91
+ self .is_timer_running = false
87
92
return
88
93
end
89
94
90
- -- buffer duration did not exceed or the buffer is active, extending the timer
91
- core .log .debug (" Batch Processor[" , batch_processor .name ," ] extending buffer timer" )
92
- create_buffer_timer (batch_processor )
95
+ -- buffer duration did not exceed or the buffer is active,
96
+ -- extending the timer
97
+ core .log .debug (" Batch Processor[" , self .name ," ] extending buffer timer" )
98
+ create_buffer_timer (self )
93
99
end
94
100
95
101
96
- function create_buffer_timer (batch_processor )
97
- local hdl , err = timer_at (batch_processor .inactive_timeout , flush_buffer , batch_processor )
102
+ function create_buffer_timer (self )
103
+ local hdl , err = timer_at (self .inactive_timeout , flush_buffer , self )
98
104
if not hdl then
99
105
core .log .error (" failed to create buffer timer: " , err )
100
106
return
101
107
end
102
- batch_processor .is_timer_running = true
108
+ self .is_timer_running = true
103
109
end
104
110
105
111
@@ -149,7 +155,8 @@ function Batch_Processor:push(entry)
149
155
self .last_entry_t = now ()
150
156
151
157
if self .batch_max_size <= # entries then
152
- core .log .debug (" Batch Processor[" , self .name ," ] batch max size has exceeded" )
158
+ core .log .debug (" Batch Processor[" , self .name ,
159
+ " ] batch max size has exceeded" )
153
160
self :process_buffer ()
154
161
end
155
162
0 commit comments