001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.concurrent.ThreadFactory;
020import java.util.concurrent.TimeUnit;
021
022import org.apache.logging.log4j.Level;
023import org.apache.logging.log4j.core.AbstractLifeCycle;
024import org.apache.logging.log4j.core.LogEvent;
025import org.apache.logging.log4j.core.impl.Log4jLogEvent;
026import org.apache.logging.log4j.core.impl.LogEventFactory;
027import org.apache.logging.log4j.core.impl.MutableLogEvent;
028import org.apache.logging.log4j.core.impl.ReusableLogEventFactory;
029import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
030import org.apache.logging.log4j.core.util.Log4jThread;
031import org.apache.logging.log4j.core.util.Log4jThreadFactory;
032import org.apache.logging.log4j.core.util.Throwables;
033import org.apache.logging.log4j.message.ReusableMessage;
034
035import com.lmax.disruptor.EventFactory;
036import com.lmax.disruptor.EventTranslatorTwoArg;
037import com.lmax.disruptor.ExceptionHandler;
038import com.lmax.disruptor.RingBuffer;
039import com.lmax.disruptor.Sequence;
040import com.lmax.disruptor.SequenceReportingEventHandler;
041import com.lmax.disruptor.TimeoutException;
042import com.lmax.disruptor.WaitStrategy;
043import com.lmax.disruptor.dsl.Disruptor;
044import com.lmax.disruptor.dsl.ProducerType;
045
046/**
047 * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
048 * <p>
049 * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or
050 * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or
051 * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in
052 * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins
053 * definition file.
054 * <p>
055 * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the
056 * {@code AsyncLoggerConfig} is actually used.
057 */
058public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate {
059
060    private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
061    private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
062
063    /**
064     * RingBuffer events contain all information necessary to perform the work in a separate thread.
065     */
066    public static class Log4jEventWrapper {
067        public Log4jEventWrapper() {
068        }
069
070        public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) {
071            event = mutableLogEvent;
072        }
073
074        private AsyncLoggerConfig loggerConfig;
075        private LogEvent event;
076
077        /**
078         * Release references held by ring buffer to allow objects to be garbage-collected.
079         */
080        public void clear() {
081            loggerConfig = null;
082            if (event instanceof MutableLogEvent) {
083                ((MutableLogEvent) event).clear();
084            } else {
085                event = null;
086            }
087        }
088
089        @Override
090        public String toString() {
091            return String.valueOf(event);
092        }
093    }
094
095    /**
096     * EventHandler performs the work in a separate thread.
097     */
098    private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
099        private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
100        private Sequence sequenceCallback;
101        private int counter;
102
103        @Override
104        public void setSequenceCallback(final Sequence sequenceCallback) {
105            this.sequenceCallback = sequenceCallback;
106        }
107
108        @Override
109        public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
110                throws Exception {
111            event.event.setEndOfBatch(endOfBatch);
112            event.loggerConfig.logToAsyncLoggerConfigsOnCurrentThread(event.event);
113            event.clear();
114
115            notifyIntermediateProgress(sequence);
116        }
117
118        /**
119         * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
120         * be progressed until the batch has completely finished.
121         */
122        private void notifyIntermediateProgress(final long sequence) {
123            if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
124                sequenceCallback.set(sequence);
125                counter = 0;
126            }
127        }
128    }
129
130    /**
131     * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
132     * RingBuffer.
133     */
134    private static final EventFactory<Log4jEventWrapper> FACTORY = Log4jEventWrapper::new;
135
136    /**
137     * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
138     * RingBuffer.
139     */
140    private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = () -> new Log4jEventWrapper(new MutableLogEvent());
141
142    /**
143     * Object responsible for passing on data to a specific RingBuffer event.
144     */
145    private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
146            (ringBufferElement, sequence, logEvent, loggerConfig) -> {
147         ringBufferElement.event = logEvent;
148         ringBufferElement.loggerConfig = loggerConfig;
149      };
150
151    /**
152     * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent.
153     */
154    private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR =
155            (ringBufferElement, sequence, logEvent, loggerConfig) -> {
156         ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent);
157         ringBufferElement.loggerConfig = loggerConfig;
158      };
159
160    private int ringBufferSize;
161    private AsyncQueueFullPolicy asyncQueueFullPolicy;
162    private Boolean mutable = Boolean.FALSE;
163
164    private volatile Disruptor<Log4jEventWrapper> disruptor;
165    private long backgroundThreadId; // LOG4J2-471
166    private EventFactory<Log4jEventWrapper> factory;
167    private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator;
168    private volatile boolean alreadyLoggedWarning;
169
170    private final Object queueFullEnqueueLock = new Object();
171
172    public AsyncLoggerConfigDisruptor() {
173    }
174
175    // called from AsyncLoggerConfig constructor
176    @Override
177    public void setLogEventFactory(final LogEventFactory logEventFactory) {
178        // if any AsyncLoggerConfig uses a ReusableLogEventFactory
179        // then we need to populate our ringbuffer with MutableLogEvents
180        this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory);
181    }
182
183    /**
184     * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently
185     * exists.
186     *
187     * @see #stop()
188     */
189    @Override
190    public synchronized void start() {
191        if (disruptor != null) {
192            LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, "
193                    + "using existing object.");
194            return;
195        }
196        LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration.");
197        ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
198        final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
199
200        final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLoggerConfig", true, Thread.NORM_PRIORITY) {
201            @Override
202            public Thread newThread(final Runnable r) {
203                final Thread result = super.newThread(r);
204                backgroundThreadId = result.getId();
205                return result;
206            }
207        };
208        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
209
210        translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR;
211        factory = mutable ? MUTABLE_FACTORY : FACTORY;
212        disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy);
213
214        final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
215        disruptor.setDefaultExceptionHandler(errorHandler);
216
217        final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
218        disruptor.handleEventsWith(handlers);
219
220        LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
221                + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
222                .getClass().getSimpleName(), errorHandler);
223        disruptor.start();
224        super.start();
225    }
226
227    /**
228     * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
229     * shut down and their references set to {@code null}.
230     */
231    @Override
232    public boolean stop(final long timeout, final TimeUnit timeUnit) {
233        final Disruptor<Log4jEventWrapper> temp = disruptor;
234        if (temp == null) {
235            LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down.");
236            return true; // disruptor was already shut down by another thread
237        }
238        setStopping();
239        LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration.");
240
241        // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
242        disruptor = null; // client code fails with NPE if log after stop = OK
243
244        // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
245        // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
246        // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
247        for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
248            try {
249                Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
250            } catch (final InterruptedException e) { // ignored
251            }
252        }
253        try {
254            // busy-spins until all events currently in the disruptor have been processed, or timeout
255            temp.shutdown(timeout, timeUnit);
256        } catch (final TimeoutException e) {
257            LOGGER.warn("AsyncLoggerConfigDisruptor: shutdown timed out after {} {}", timeout, timeUnit);
258            temp.halt(); // give up on remaining log events, if any
259        }
260        LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor has been shut down.");
261
262        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
263            LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
264                    DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
265        }
266        setStopped();
267        return true;
268    }
269
270    /**
271     * Returns {@code true} if the specified disruptor still has unprocessed events.
272     */
273    private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
274        final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
275        return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
276    }
277
278    @Override
279    public EventRoute getEventRoute(final Level logLevel) {
280        final int remainingCapacity = remainingDisruptorCapacity();
281        if (remainingCapacity < 0) {
282            return EventRoute.DISCARD;
283        }
284        return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
285    }
286
287    private int remainingDisruptorCapacity() {
288        final Disruptor<Log4jEventWrapper> temp = disruptor;
289        if (hasLog4jBeenShutDown(temp)) {
290            return -1;
291        }
292        return (int) temp.getRingBuffer().remainingCapacity();
293    }
294
295    /**
296     * Returns {@code true} if the specified disruptor is null.
297     */
298    private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
299        if (aDisruptor == null) { // LOG4J2-639
300            LOGGER.warn("Ignoring log event after log4j was shut down");
301            return true;
302        }
303        return false;
304    }
305
306    @Override
307    public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
308        // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
309        try {
310            final LogEvent logEvent = prepareEvent(event);
311            enqueue(logEvent, asyncLoggerConfig);
312        } catch (final NullPointerException npe) {
313            // Note: NPE prevents us from adding a log event to the disruptor after it was shut down,
314            // which could cause the publishEvent method to hang and never return.
315            LOGGER.warn("Ignoring log event after log4j was shut down: {} [{}] {}", event.getLevel(),
316                    event.getLoggerName(), event.getMessage().getFormattedMessage()
317                            + (event.getThrown() == null ? "" : Throwables.toStringList(event.getThrown())));
318        }
319    }
320
321    private LogEvent prepareEvent(final LogEvent event) {
322        LogEvent logEvent = ensureImmutable(event);
323        if (logEvent.getMessage() instanceof ReusableMessage) {
324            if (logEvent instanceof Log4jLogEvent) {
325                ((Log4jLogEvent) logEvent).makeMessageImmutable();
326            } else if (logEvent instanceof MutableLogEvent) {
327                // MutableLogEvents need to be translated into the RingBuffer by the MUTABLE_TRANSLATOR.
328                // That translator calls MutableLogEvent.initFrom to copy the event, which will makeMessageImmutable the message.
329                if (translator != MUTABLE_TRANSLATOR) { // should not happen...
330                    // TRANSLATOR expects an immutable LogEvent
331                    logEvent = ((MutableLogEvent) logEvent).createMemento();
332                }
333            } else { // custom log event, with a ReusableMessage
334                showWarningAboutCustomLogEventWithReusableMessage(logEvent);
335            }
336        } else { // message is not a ReusableMessage; makeMessageImmutable it to prevent ConcurrentModificationExceptions
337            InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); // LOG4J2-1988, LOG4J2-1914
338        }
339        return logEvent;
340    }
341
342    private void showWarningAboutCustomLogEventWithReusableMessage(final LogEvent logEvent) {
343        if (!alreadyLoggedWarning) {
344            LOGGER.warn("Custom log event of type {} contains a mutable message of type {}." +
345                            " AsyncLoggerConfig does not know how to make an immutable copy of this message." +
346                            " This may result in ConcurrentModificationExceptions or incorrect log messages" +
347                            " if the application modifies objects in the message while" +
348                            " the background thread is writing it to the appenders.",
349                    logEvent.getClass().getName(), logEvent.getMessage().getClass().getName());
350            alreadyLoggedWarning = true;
351        }
352    }
353
354    private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
355        if (synchronizeEnqueueWhenQueueFull()) {
356            synchronized (queueFullEnqueueLock) {
357                disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
358            }
359        } else {
360            disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
361        }
362    }
363
364    private boolean synchronizeEnqueueWhenQueueFull() {
365        return DisruptorUtil.ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL
366                // Background thread must never block
367                && backgroundThreadId != Thread.currentThread().getId()
368                // Threads owned by log4j are most likely to result in
369                // deadlocks because they generally consume events.
370                // This prevents deadlocks between AsyncLoggerContext
371                // disruptors.
372                && !(Thread.currentThread() instanceof Log4jThread);
373    }
374
375    @Override
376    public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
377        final LogEvent logEvent = prepareEvent(event);
378        return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
379    }
380
381    private LogEvent ensureImmutable(final LogEvent event) {
382        LogEvent result = event;
383        if (event instanceof RingBufferLogEvent) {
384            // Deal with special case where both types of Async Loggers are used together:
385            // RingBufferLogEvents are created by the all-loggers-async type, but
386            // this event is also consumed by the some-loggers-async type (this class).
387            // The original event will be re-used and modified in an application thread later,
388            // so take a snapshot of it, which can be safely processed in the
389            // some-loggers-async background thread.
390            result = ((RingBufferLogEvent) event).createMemento();
391        }
392        return result;
393    }
394
395    /*
396     * (non-Javadoc)
397     *
398     * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String,
399     * java.lang.String)
400     */
401    @Override
402    public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
403        return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
404    }
405}