001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.concurrent.ThreadFactory; 020import java.util.concurrent.TimeUnit; 021 022import org.apache.logging.log4j.Level; 023import org.apache.logging.log4j.core.AbstractLifeCycle; 024import org.apache.logging.log4j.core.LogEvent; 025import org.apache.logging.log4j.core.impl.Log4jLogEvent; 026import org.apache.logging.log4j.core.impl.LogEventFactory; 027import org.apache.logging.log4j.core.impl.MutableLogEvent; 028import org.apache.logging.log4j.core.impl.ReusableLogEventFactory; 029import org.apache.logging.log4j.core.jmx.RingBufferAdmin; 030import org.apache.logging.log4j.core.util.Log4jThread; 031import org.apache.logging.log4j.core.util.Log4jThreadFactory; 032import org.apache.logging.log4j.core.util.Throwables; 033import org.apache.logging.log4j.message.ReusableMessage; 034 035import com.lmax.disruptor.EventFactory; 036import com.lmax.disruptor.EventTranslatorTwoArg; 037import com.lmax.disruptor.ExceptionHandler; 038import com.lmax.disruptor.RingBuffer; 039import com.lmax.disruptor.Sequence; 040import com.lmax.disruptor.SequenceReportingEventHandler; 041import com.lmax.disruptor.TimeoutException; 042import com.lmax.disruptor.WaitStrategy; 043import com.lmax.disruptor.dsl.Disruptor; 044import com.lmax.disruptor.dsl.ProducerType; 045 046/** 047 * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library. 048 * <p> 049 * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or 050 * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or 051 * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in 052 * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins 053 * definition file. 054 * <p> 055 * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the 056 * {@code AsyncLoggerConfig} is actually used. 057 */ 058public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate { 059 060 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200; 061 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50; 062 063 /** 064 * RingBuffer events contain all information necessary to perform the work in a separate thread. 065 */ 066 public static class Log4jEventWrapper { 067 public Log4jEventWrapper() { 068 } 069 070 public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) { 071 event = mutableLogEvent; 072 } 073 074 private AsyncLoggerConfig loggerConfig; 075 private LogEvent event; 076 077 /** 078 * Release references held by ring buffer to allow objects to be garbage-collected. 079 */ 080 public void clear() { 081 loggerConfig = null; 082 if (event instanceof MutableLogEvent) { 083 ((MutableLogEvent) event).clear(); 084 } else { 085 event = null; 086 } 087 } 088 089 @Override 090 public String toString() { 091 return String.valueOf(event); 092 } 093 } 094 095 /** 096 * EventHandler performs the work in a separate thread. 097 */ 098 private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> { 099 private static final int NOTIFY_PROGRESS_THRESHOLD = 50; 100 private Sequence sequenceCallback; 101 private int counter; 102 103 @Override 104 public void setSequenceCallback(final Sequence sequenceCallback) { 105 this.sequenceCallback = sequenceCallback; 106 } 107 108 @Override 109 public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch) 110 throws Exception { 111 event.event.setEndOfBatch(endOfBatch); 112 event.loggerConfig.logToAsyncLoggerConfigsOnCurrentThread(event.event); 113 event.clear(); 114 115 notifyIntermediateProgress(sequence); 116 } 117 118 /** 119 * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not 120 * be progressed until the batch has completely finished. 121 */ 122 private void notifyIntermediateProgress(final long sequence) { 123 if (++counter > NOTIFY_PROGRESS_THRESHOLD) { 124 sequenceCallback.set(sequence); 125 counter = 0; 126 } 127 } 128 } 129 130 /** 131 * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the 132 * RingBuffer. 133 */ 134 private static final EventFactory<Log4jEventWrapper> FACTORY = Log4jEventWrapper::new; 135 136 /** 137 * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the 138 * RingBuffer. 139 */ 140 private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = () -> new Log4jEventWrapper(new MutableLogEvent()); 141 142 /** 143 * Object responsible for passing on data to a specific RingBuffer event. 144 */ 145 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR = 146 (ringBufferElement, sequence, logEvent, loggerConfig) -> { 147 ringBufferElement.event = logEvent; 148 ringBufferElement.loggerConfig = loggerConfig; 149 }; 150 151 /** 152 * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent. 153 */ 154 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR = 155 (ringBufferElement, sequence, logEvent, loggerConfig) -> { 156 ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent); 157 ringBufferElement.loggerConfig = loggerConfig; 158 }; 159 160 private int ringBufferSize; 161 private AsyncQueueFullPolicy asyncQueueFullPolicy; 162 private Boolean mutable = Boolean.FALSE; 163 164 private volatile Disruptor<Log4jEventWrapper> disruptor; 165 private long backgroundThreadId; // LOG4J2-471 166 private EventFactory<Log4jEventWrapper> factory; 167 private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator; 168 private volatile boolean alreadyLoggedWarning; 169 170 private final Object queueFullEnqueueLock = new Object(); 171 172 public AsyncLoggerConfigDisruptor() { 173 } 174 175 // called from AsyncLoggerConfig constructor 176 @Override 177 public void setLogEventFactory(final LogEventFactory logEventFactory) { 178 // if any AsyncLoggerConfig uses a ReusableLogEventFactory 179 // then we need to populate our ringbuffer with MutableLogEvents 180 this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory); 181 } 182 183 /** 184 * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently 185 * exists. 186 * 187 * @see #stop() 188 */ 189 @Override 190 public synchronized void start() { 191 if (disruptor != null) { 192 LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, " 193 + "using existing object."); 194 return; 195 } 196 LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration."); 197 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize"); 198 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy"); 199 200 final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLoggerConfig", true, Thread.NORM_PRIORITY) { 201 @Override 202 public Thread newThread(final Runnable r) { 203 final Thread result = super.newThread(r); 204 backgroundThreadId = result.getId(); 205 return result; 206 } 207 }; 208 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); 209 210 translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR; 211 factory = mutable ? MUTABLE_FACTORY : FACTORY; 212 disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy); 213 214 final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler(); 215 disruptor.setDefaultExceptionHandler(errorHandler); 216 217 final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()}; 218 disruptor.handleEventsWith(handlers); 219 220 LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, " 221 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy 222 .getClass().getSimpleName(), errorHandler); 223 disruptor.start(); 224 super.start(); 225 } 226 227 /** 228 * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are 229 * shut down and their references set to {@code null}. 230 */ 231 @Override 232 public boolean stop(final long timeout, final TimeUnit timeUnit) { 233 final Disruptor<Log4jEventWrapper> temp = disruptor; 234 if (temp == null) { 235 LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down."); 236 return true; // disruptor was already shut down by another thread 237 } 238 setStopping(); 239 LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration."); 240 241 // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown(). 242 disruptor = null; // client code fails with NPE if log after stop = OK 243 244 // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, 245 // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, 246 // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. 247 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 248 try { 249 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while 250 } catch (final InterruptedException e) { // ignored 251 } 252 } 253 try { 254 // busy-spins until all events currently in the disruptor have been processed, or timeout 255 temp.shutdown(timeout, timeUnit); 256 } catch (final TimeoutException e) { 257 LOGGER.warn("AsyncLoggerConfigDisruptor: shutdown timed out after {} {}", timeout, timeUnit); 258 temp.halt(); // give up on remaining log events, if any 259 } 260 LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor has been shut down."); 261 262 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { 263 LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy, 264 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); 265 } 266 setStopped(); 267 return true; 268 } 269 270 /** 271 * Returns {@code true} if the specified disruptor still has unprocessed events. 272 */ 273 private static boolean hasBacklog(final Disruptor<?> theDisruptor) { 274 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer(); 275 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize()); 276 } 277 278 @Override 279 public EventRoute getEventRoute(final Level logLevel) { 280 final int remainingCapacity = remainingDisruptorCapacity(); 281 if (remainingCapacity < 0) { 282 return EventRoute.DISCARD; 283 } 284 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel); 285 } 286 287 private int remainingDisruptorCapacity() { 288 final Disruptor<Log4jEventWrapper> temp = disruptor; 289 if (hasLog4jBeenShutDown(temp)) { 290 return -1; 291 } 292 return (int) temp.getRingBuffer().remainingCapacity(); 293 } 294 295 /** 296 * Returns {@code true} if the specified disruptor is null. 297 */ 298 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) { 299 if (aDisruptor == null) { // LOG4J2-639 300 LOGGER.warn("Ignoring log event after log4j was shut down"); 301 return true; 302 } 303 return false; 304 } 305 306 @Override 307 public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { 308 // LOG4J2-639: catch NPE if disruptor field was set to null after our check above 309 try { 310 final LogEvent logEvent = prepareEvent(event); 311 enqueue(logEvent, asyncLoggerConfig); 312 } catch (final NullPointerException npe) { 313 // Note: NPE prevents us from adding a log event to the disruptor after it was shut down, 314 // which could cause the publishEvent method to hang and never return. 315 LOGGER.warn("Ignoring log event after log4j was shut down: {} [{}] {}", event.getLevel(), 316 event.getLoggerName(), event.getMessage().getFormattedMessage() 317 + (event.getThrown() == null ? "" : Throwables.toStringList(event.getThrown()))); 318 } 319 } 320 321 private LogEvent prepareEvent(final LogEvent event) { 322 LogEvent logEvent = ensureImmutable(event); 323 if (logEvent.getMessage() instanceof ReusableMessage) { 324 if (logEvent instanceof Log4jLogEvent) { 325 ((Log4jLogEvent) logEvent).makeMessageImmutable(); 326 } else if (logEvent instanceof MutableLogEvent) { 327 // MutableLogEvents need to be translated into the RingBuffer by the MUTABLE_TRANSLATOR. 328 // That translator calls MutableLogEvent.initFrom to copy the event, which will makeMessageImmutable the message. 329 if (translator != MUTABLE_TRANSLATOR) { // should not happen... 330 // TRANSLATOR expects an immutable LogEvent 331 logEvent = ((MutableLogEvent) logEvent).createMemento(); 332 } 333 } else { // custom log event, with a ReusableMessage 334 showWarningAboutCustomLogEventWithReusableMessage(logEvent); 335 } 336 } else { // message is not a ReusableMessage; makeMessageImmutable it to prevent ConcurrentModificationExceptions 337 InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); // LOG4J2-1988, LOG4J2-1914 338 } 339 return logEvent; 340 } 341 342 private void showWarningAboutCustomLogEventWithReusableMessage(final LogEvent logEvent) { 343 if (!alreadyLoggedWarning) { 344 LOGGER.warn("Custom log event of type {} contains a mutable message of type {}." + 345 " AsyncLoggerConfig does not know how to make an immutable copy of this message." + 346 " This may result in ConcurrentModificationExceptions or incorrect log messages" + 347 " if the application modifies objects in the message while" + 348 " the background thread is writing it to the appenders.", 349 logEvent.getClass().getName(), logEvent.getMessage().getClass().getName()); 350 alreadyLoggedWarning = true; 351 } 352 } 353 354 private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) { 355 if (synchronizeEnqueueWhenQueueFull()) { 356 synchronized (queueFullEnqueueLock) { 357 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig); 358 } 359 } else { 360 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig); 361 } 362 } 363 364 private boolean synchronizeEnqueueWhenQueueFull() { 365 return DisruptorUtil.ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL 366 // Background thread must never block 367 && backgroundThreadId != Thread.currentThread().getId() 368 // Threads owned by log4j are most likely to result in 369 // deadlocks because they generally consume events. 370 // This prevents deadlocks between AsyncLoggerContext 371 // disruptors. 372 && !(Thread.currentThread() instanceof Log4jThread); 373 } 374 375 @Override 376 public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { 377 final LogEvent logEvent = prepareEvent(event); 378 return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig); 379 } 380 381 private LogEvent ensureImmutable(final LogEvent event) { 382 LogEvent result = event; 383 if (event instanceof RingBufferLogEvent) { 384 // Deal with special case where both types of Async Loggers are used together: 385 // RingBufferLogEvents are created by the all-loggers-async type, but 386 // this event is also consumed by the some-loggers-async type (this class). 387 // The original event will be re-used and modified in an application thread later, 388 // so take a snapshot of it, which can be safely processed in the 389 // some-loggers-async background thread. 390 result = ((RingBufferLogEvent) event).createMemento(); 391 } 392 return result; 393 } 394 395 /* 396 * (non-Javadoc) 397 * 398 * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String, 399 * java.lang.String) 400 */ 401 @Override 402 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) { 403 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName); 404 } 405}