5959 */
6060class MessageDispatcher {
6161 private static final Logger logger = Logger .getLogger (MessageDispatcher .class .getName ());
62+ private static final Logger slowAckLogger = Logger .getLogger ("slow-ack" );
63+ private static final Logger callbackDeliveryLogger = Logger .getLogger ("callback-delivery" );
64+ private static final Logger expiryLogger = Logger .getLogger ("expiry" );
65+ private static final Logger callbackExceptionsLogger = Logger .getLogger ("callback-exceptions" );
66+ private static final Logger ackBatchLogger = Logger .getLogger ("ack-batch" );
67+ private static final Logger subscriberFlowControlLogger = Logger .getLogger ("subscriber-flow-control" );
68+ private static final Logger ackNackLogger = Logger .getLogger ("ack-nack" );
6269
6370 @ InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9 ;
6471 @ InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration .ofMillis (100 );
@@ -159,14 +166,11 @@ private void forget() {
159166
160167 @ Override
161168 public void onFailure (Throwable t ) {
162- logger .log (
163- Level .WARNING ,
164- "pubsub:callback-exception - MessageReceiver failed to process ack ID: "
165- + this .ackRequestData .getAckId ()
166- + ", the message will be nacked."
167- + " Message ID: "
168- + this .ackRequestData .getMessageWrapper ().getPubsubMessage ().getMessageId (),
169- t );
169+ if (callbackExceptionsLogger .isLoggable (Level .WARNING )) {
170+ String prefix = LoggingUtil .getLogPrefix (this .ackRequestData .getMessageWrapper (),
171+ this .getAckRequestData ().getAckId (), exactlyOnceDeliveryEnabled .get ());
172+ callbackExceptionsLogger .log (Level .WARNING , "pubsub:callback-exceptions - MessageReceiver exception. " + prefix , t );
173+ }
170174 this .ackRequestData .setResponse (AckResponse .OTHER , false );
171175 pendingNacks .add (this .ackRequestData );
172176 tracer .endSubscribeProcessSpan (this .ackRequestData .getMessageWrapper (), "nack" );
@@ -177,9 +181,13 @@ public void onFailure(Throwable t) {
177181 public void onSuccess (AckReply reply ) {
178182 int ackLatency =
179183 Ints .saturatedCast ((long ) Math .ceil ((clock .millisTime () - receivedTimeMillis ) / 1000D ));
180- String messageId = this .ackRequestData .getMessageWrapper ().getPubsubMessage ().getMessageId ();
184+ String logPrefix = "" ;
185+ if (slowAckLogger .isLoggable (Level .FINE ) || ackNackLogger .isLoggable (Level .FINE )) {
186+ logPrefix = LoggingUtil .getLogPrefix (this .ackRequestData .getMessageWrapper (),
187+ this .ackRequestData .getAckId (), exactlyOnceDeliveryEnabled .get ());
188+ }
181189 if (ackLatency >= ackLatencyDistribution .getPercentile (slowAckPercentile )) {
182- logger .log (Level .FINE , "pubsub:slow-ack - Message ID: {0}" , messageId );
190+ slowAckLogger .log (Level .FINE , "pubsub:slow-ack - " + logPrefix );
183191 }
184192
185193 switch (reply ) {
@@ -194,12 +202,12 @@ public void onSuccess(AckReply reply) {
194202 ackLatencyDistribution .record (ackLatency );
195203 tracer .endSubscribeProcessSpan (this .ackRequestData .getMessageWrapper (), "ack" );
196204 }
197- logger .log (Level .FINE , "pubsub:ack-nack - Action: ACK - Message ID: {0}" , messageId );
205+ ackNackLogger .log (Level .FINE , "pubsub:ack-nack - " + logPrefix + " - Action: ACK" );
198206 break ;
199207 case NACK :
200208 pendingNacks .add (this .ackRequestData );
201209 tracer .endSubscribeProcessSpan (this .ackRequestData .getMessageWrapper (), "nack" );
202- logger .log (Level .FINE , "pubsub:ack-nack - Action: NACK - Message ID: {0}" , messageId );
210+ ackNackLogger .log (Level .FINE , "pubsub:ack-nack - " + logPrefix + " - Action: NACK" );
203211 break ;
204212 default :
205213 throw new IllegalArgumentException (String .format ("AckReply: %s not supported" , reply ));
@@ -577,17 +585,20 @@ private void processBatch(List<OutstandingMessage> batch) {
577585 for (OutstandingMessage message : batch ) {
578586 // This is a blocking flow controller. We have already incremented messagesWaiter, so
579587 // shutdown will block on processing of all these messages anyway.
588+ String logPrefix = "" ;
589+ if (subscriberFlowControlLogger .isLoggable (Level .FINE )) {
590+ logPrefix = LoggingUtil .getLogPrefix (message .messageWrapper (),
591+ message .messageWrapper ().getAckId (), exactlyOnceDeliveryEnabled .get ());
592+ }
580593 tracer .startSubscribeConcurrencyControlSpan (message .messageWrapper ());
581594 try {
582- logger .log (Level .FINE , "pubsub:subscriber-flow-control - Flow controller is blocking." );
595+ subscriberFlowControlLogger .log (Level .FINE , "pubsub:subscriber-flow-control - " + logPrefix + " - Flow controller is blocking." );
583596 flowController .reserve (1 , message .messageWrapper ().getPubsubMessage ().getSerializedSize ());
584- logger .log (
585- Level .FINE , "pubsub:subscriber-flow-control - Flow controller is done blocking." );
597+ subscriberFlowControlLogger .log (Level .FINE , "pubsub:subscriber-flow-control - " + logPrefix + " - Flow controller is done blocking." );
586598 tracer .endSubscribeConcurrencyControlSpan (message .messageWrapper ());
587599 } catch (FlowControlException unexpectedException ) {
588600 // This should be a blocking flow controller and never throw an exception.
589- logger .log (
590- Level .FINE , "pubsub:subscriber-flow-control - Flow controller unexpected exception." );
601+ subscriberFlowControlLogger .log (Level .FINE , "pubsub:subscriber-flow-control - " + logPrefix + " - Flow controller unexpected exception." );
591602 tracer .setSubscribeConcurrencyControlSpanException (
592603 message .messageWrapper (), unexpectedException );
593604 throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
@@ -626,6 +637,11 @@ private void processOutstandingMessage(final AckHandler ackHandler) {
626637 @ Override
627638 public void run () {
628639 try {
640+ String logPrefix = "" ;
641+ if (expiryLogger .isLoggable (Level .FINE ) || callbackDeliveryLogger .isLoggable (Level .FINE )) {
642+ logPrefix = LoggingUtil .getLogPrefix (messageWrapper ,
643+ ackHandler .ackRequestData .getAckId (), exactlyOnceDeliveryEnabled .get ());
644+ }
629645 if (ackHandler
630646 .totalExpiration
631647 .plusSeconds (messageDeadlineSeconds .get ())
@@ -635,12 +651,11 @@ public void run() {
635651 // Don't nack it either, because we'd be nacking someone else's message.
636652 ackHandler .forget ();
637653 tracer .setSubscriberSpanExpirationResult (messageWrapper );
638- logger .log (Level .FINE , "pubsub:expiry - Message ID: {0}" , message . getMessageId () );
654+ expiryLogger .log (Level .FINE , "pubsub:expiry - " + logPrefix );
639655 return ;
640656 }
641657 tracer .startSubscribeProcessSpan (messageWrapper );
642- logger .log (
643- Level .FINE , "pubsub:callback-delivery - Message ID: {0}" , message .getMessageId ());
658+ callbackDeliveryLogger .log (Level .FINE , "pubsub:callback-delivery - " + logPrefix );
644659 if (shouldSetMessageFuture ()) {
645660 // This is the message future that is propagated to the user
646661 SettableApiFuture <AckResponse > messageFuture =
@@ -744,7 +759,6 @@ void processOutstandingOperations() {
744759 if (!nackRequestDataList .isEmpty ()) {
745760 modackRequestData .add (new ModackRequestData (0 , nackRequestDataList ));
746761 }
747- logger .log (Level .FINER , "pubsub:ack-batch - Sending {0} nacks" , nackRequestDataList .size ());
748762
749763 List <AckRequestData > ackRequestDataReceipts = new ArrayList <AckRequestData >();
750764 pendingReceipts .drainTo (ackRequestDataReceipts );
@@ -754,13 +768,13 @@ void processOutstandingOperations() {
754768 receiptModack .setIsReceiptModack (true );
755769 modackRequestData .add (receiptModack );
756770 }
757- logger .log (Level .FINER , "Sending {0} receipts" , ackRequestDataReceipts .size ());
758771
759772 ackProcessor .sendModackOperations (modackRequestData );
760773
761774 List <AckRequestData > ackRequestDataList = new ArrayList <AckRequestData >();
762775 pendingAcks .drainTo (ackRequestDataList );
763- logger .log (Level .FINER , "pubsub:ack-bbatch - Sending {0} acks" , ackRequestDataList .size ());
776+ ackBatchLogger .log (Level .FINE , "pubsub:ack-batch - Sending {0} ACKs, {1} NACKs, {2} receipts. Exactly Once Delivery: {3}" ,
777+ new Object []{ackRequestDataList .size (), nackRequestDataList .size (), ackRequestDataList .size (), exactlyOnceDeliveryEnabled .get ()});
764778
765779 ackProcessor .sendAckOperations (ackRequestDataList );
766780 }
0 commit comments