|
1 |
| -import { Logger, getChildLogger, getServiceLogger } from '@crowd/logging' |
| 1 | +import { Logger, getChildLogger, getServiceLogger, logExecutionTimeV2 } from '@crowd/logging' |
2 | 2 | import { DeleteMessageRequest, Message, ReceiveMessageRequest } from 'aws-sdk/clients/sqs'
|
3 | 3 | import moment from 'moment'
|
4 | 4 | import { timeout } from '@crowd/common'
|
@@ -119,7 +119,19 @@ async function handleMessages() {
|
119 | 119 | })
|
120 | 120 |
|
121 | 121 | try {
|
122 |
| - messageLogger.debug('Received a new queue message!') |
| 122 | + if ( |
| 123 | + msg.type === NodeWorkerMessageType.DB_OPERATIONS && |
| 124 | + (msg as any).operation === 'update_members' |
| 125 | + ) { |
| 126 | + messageLogger.warn('Skipping update_members message! TEMPORARY MEASURE!') |
| 127 | + await removeFromQueue(message.ReceiptHandle) |
| 128 | + return |
| 129 | + } |
| 130 | + |
| 131 | + messageLogger.info( |
| 132 | + { messageType: msg.type, messagePayload: JSON.stringify(msg) }, |
| 133 | + 'Received a new queue message!', |
| 134 | + ) |
123 | 135 |
|
124 | 136 | let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise<void>
|
125 | 137 | let keep = false
|
@@ -152,7 +164,13 @@ async function handleMessages() {
|
152 | 164 | await removeFromQueue(message.ReceiptHandle)
|
153 | 165 | messagesInProgress.set(message.MessageId, msg)
|
154 | 166 | try {
|
155 |
| - await processFunction(msg, messageLogger) |
| 167 | + await logExecutionTimeV2( |
| 168 | + async () => { |
| 169 | + await processFunction(msg, messageLogger) |
| 170 | + }, |
| 171 | + messageLogger, |
| 172 | + 'queueMessageProcessingTime', |
| 173 | + ) |
156 | 174 | } catch (err) {
|
157 | 175 | messageLogger.error(err, 'Error while processing queue message!')
|
158 | 176 | } finally {
|
|
0 commit comments