001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import org.apache.logging.log4j.core.Appender;
020import org.apache.logging.log4j.core.Core;
021import org.apache.logging.log4j.core.Filter;
022import org.apache.logging.log4j.core.LogEvent;
023import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
024import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
025import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
026import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
027import org.apache.logging.log4j.core.async.BlockingQueueFactory;
028import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
029import org.apache.logging.log4j.core.async.EventRoute;
030import org.apache.logging.log4j.core.async.InternalAsyncUtil;
031import org.apache.logging.log4j.core.config.AppenderControl;
032import org.apache.logging.log4j.core.config.AppenderRef;
033import org.apache.logging.log4j.core.config.Configuration;
034import org.apache.logging.log4j.core.config.ConfigurationException;
035import org.apache.logging.log4j.core.config.Property;
036import org.apache.logging.log4j.core.config.plugins.Plugin;
037import org.apache.logging.log4j.core.config.plugins.PluginAliases;
038import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
039import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
040import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
041import org.apache.logging.log4j.core.config.plugins.PluginElement;
042import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
043import org.apache.logging.log4j.core.filter.AbstractFilterable;
044import org.apache.logging.log4j.core.impl.Log4jLogEvent;
045import org.apache.logging.log4j.spi.AbstractLogger;
046
047import java.util.ArrayList;
048import java.util.List;
049import java.util.Map;
050import java.util.concurrent.BlockingQueue;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.TransferQueue;
053
054/**
055 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
056 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
057 * references.
058 */
059@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
060public final class AsyncAppender extends AbstractAppender {
061
062    private static final int DEFAULT_QUEUE_SIZE = 1024;
063
064    private final BlockingQueue<LogEvent> queue;
065    private final int queueSize;
066    private final boolean blocking;
067    private final long shutdownTimeout;
068    private final Configuration config;
069    private final AppenderRef[] appenderRefs;
070    private final String errorRef;
071    private final boolean includeLocation;
072    private AppenderControl errorAppender;
073    private AsyncAppenderEventDispatcher dispatcher;
074    private AsyncQueueFullPolicy asyncQueueFullPolicy;
075
076    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
077            final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
078            final long shutdownTimeout, final Configuration config, final boolean includeLocation,
079            final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) {
080        super(name, filter, null, ignoreExceptions, properties);
081        this.queue = blockingQueueFactory.create(queueSize);
082        this.queueSize = queueSize;
083        this.blocking = blocking;
084        this.shutdownTimeout = shutdownTimeout;
085        this.config = config;
086        this.appenderRefs = appenderRefs;
087        this.errorRef = errorRef;
088        this.includeLocation = includeLocation;
089    }
090
091    @Override
092    public void start() {
093        final Map<String, Appender> map = config.getAppenders();
094        final List<AppenderControl> appenders = new ArrayList<>();
095        for (final AppenderRef appenderRef : appenderRefs) {
096            final Appender appender = map.get(appenderRef.getRef());
097            if (appender != null) {
098                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
099            } else {
100                LOGGER.error("No appender named {} was configured", appenderRef);
101            }
102        }
103        if (errorRef != null) {
104            final Appender appender = map.get(errorRef);
105            if (appender != null) {
106                errorAppender = new AppenderControl(appender, null, null);
107            } else {
108                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
109            }
110        }
111        if (appenders.size() > 0) {
112            dispatcher = new AsyncAppenderEventDispatcher(
113                    getName(), errorAppender, appenders, queue);
114        } else if (errorRef == null) {
115            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
116        }
117        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
118
119        dispatcher.start();
120        super.start();
121    }
122
123    @Override
124    public boolean stop(final long timeout, final TimeUnit timeUnit) {
125        setStopping();
126        super.stop(timeout, timeUnit, false);
127        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
128        try {
129            dispatcher.stop(shutdownTimeout);
130        } catch (final InterruptedException ignored) {
131            // Restore the interrupted flag cleared when the exception is caught.
132            Thread.currentThread().interrupt();
133            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
134        }
135        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
136
137        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
138            LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
139                DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
140        }
141        setStopped();
142        return true;
143    }
144
145    /**
146     * Actual writing occurs here.
147     *
148     * @param logEvent The LogEvent.
149     */
150    @Override
151    public void append(final LogEvent logEvent) {
152        if (!isStarted()) {
153            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
154        }
155        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
156        InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
157        if (!transfer(memento)) {
158            if (blocking) {
159                if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
160                    // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
161                    AsyncQueueFullMessageUtil.logWarningToStatusLogger();
162                    logMessageInCurrentThread(logEvent);
163                } else {
164                    // delegate to the event router (which may discard, enqueue and block, or log in current thread)
165                    final EventRoute route = asyncQueueFullPolicy.getRoute(dispatcher.getId(), memento.getLevel());
166                    route.logMessage(this, memento);
167                }
168            } else {
169                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
170                logToErrorAppenderIfNecessary(false, memento);
171            }
172        }
173    }
174
175    private boolean transfer(final LogEvent memento) {
176        return queue instanceof TransferQueue
177            ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
178            : queue.offer(memento);
179    }
180
181    /**
182     * FOR INTERNAL USE ONLY.
183     *
184     * @param logEvent the event to log
185     */
186    public void logMessageInCurrentThread(final LogEvent logEvent) {
187        logEvent.setEndOfBatch(queue.isEmpty());
188        dispatcher.dispatch(logEvent);
189    }
190
191    /**
192     * FOR INTERNAL USE ONLY.
193     *
194     * @param logEvent the event to log
195     */
196    public void logMessageInBackgroundThread(final LogEvent logEvent) {
197        try {
198            // wait for free slots in the queue
199            queue.put(logEvent);
200        } catch (final InterruptedException ignored) {
201            final boolean appendSuccessful = handleInterruptedException(logEvent);
202            logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
203        }
204    }
205
206    // LOG4J2-1049: Some applications use Thread.interrupt() to send
207    // messages between application threads. This does not necessarily
208    // mean that the queue is full. To prevent dropping a log message,
209    // quickly try to offer the event to the queue again.
210    // (Yes, this means there is a possibility the same event is logged twice.)
211    //
212    // Finally, catching the InterruptedException means the
213    // interrupted flag has been cleared on the current thread.
214    // This may interfere with the application's expectation of
215    // being interrupted, so when we are done, we set the interrupted
216    // flag again.
217    private boolean handleInterruptedException(final LogEvent memento) {
218        final boolean appendSuccessful = queue.offer(memento);
219        if (!appendSuccessful) {
220            LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
221                getName());
222        }
223        // set the interrupted flag again.
224        Thread.currentThread().interrupt();
225        return appendSuccessful;
226    }
227
228    private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
229        if (!appendSuccessful && errorAppender != null) {
230            errorAppender.callAppender(logEvent);
231        }
232    }
233
234    /**
235     * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
236     * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
237     * pre-2.7.
238     *
239     * @param appenderRefs     The Appenders to reference.
240     * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
241     * @param blocking         True if the Appender should wait when the queue is full. The default is true.
242     * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
243     *                         in the queue on shutdown. The default is zero which means to wait forever.
244     * @param size             The size of the event queue. The default is 128.
245     * @param name             The name of the Appender.
246     * @param includeLocation  whether to include location information. The default is false.
247     * @param filter           The Filter or null.
248     * @param config           The Configuration.
249     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
250     *                         otherwise they are propagated to the caller.
251     * @return The AsyncAppender.
252     * @deprecated use {@link Builder} instead
253     */
254    @Deprecated
255    public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
256                                               final boolean blocking, final long shutdownTimeout, final int size,
257                                               final String name, final boolean includeLocation, final Filter filter,
258                                               final Configuration config, final boolean ignoreExceptions) {
259        if (name == null) {
260            LOGGER.error("No name provided for AsyncAppender");
261            return null;
262        }
263        if (appenderRefs == null) {
264            LOGGER.error("No appender references provided to AsyncAppender {}", name);
265        }
266
267        return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
268            shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null);
269    }
270
271    @PluginBuilderFactory
272    public static Builder newBuilder() {
273        return new Builder();
274    }
275
276    public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B>
277            implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
278
279        @PluginElement("AppenderRef")
280        @Required(message = "No appender references provided to AsyncAppender")
281        private AppenderRef[] appenderRefs;
282
283        @PluginBuilderAttribute
284        @PluginAliases("error-ref")
285        private String errorRef;
286
287        @PluginBuilderAttribute
288        private boolean blocking = true;
289
290        @PluginBuilderAttribute
291        private long shutdownTimeout = 0L;
292
293        @PluginBuilderAttribute
294        private int bufferSize = DEFAULT_QUEUE_SIZE;
295
296        @PluginBuilderAttribute
297        @Required(message = "No name provided for AsyncAppender")
298        private String name;
299
300        @PluginBuilderAttribute
301        private boolean includeLocation = false;
302
303        @PluginConfiguration
304        private Configuration configuration;
305
306        @PluginBuilderAttribute
307        private boolean ignoreExceptions = true;
308
309        @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
310        private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
311
312        public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
313            this.appenderRefs = appenderRefs;
314            return this;
315        }
316
317        public Builder setErrorRef(final String errorRef) {
318            this.errorRef = errorRef;
319            return this;
320        }
321
322        public Builder setBlocking(final boolean blocking) {
323            this.blocking = blocking;
324            return this;
325        }
326
327        public Builder setShutdownTimeout(final long shutdownTimeout) {
328            this.shutdownTimeout = shutdownTimeout;
329            return this;
330        }
331
332        public Builder setBufferSize(final int bufferSize) {
333            this.bufferSize = bufferSize;
334            return this;
335        }
336
337        public Builder setName(final String name) {
338            this.name = name;
339            return this;
340        }
341
342        public Builder setIncludeLocation(final boolean includeLocation) {
343            this.includeLocation = includeLocation;
344            return this;
345        }
346
347        public Builder setConfiguration(final Configuration configuration) {
348            this.configuration = configuration;
349            return this;
350        }
351
352        public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
353            this.ignoreExceptions = ignoreExceptions;
354            return this;
355        }
356
357        public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
358            this.blockingQueueFactory = blockingQueueFactory;
359            return this;
360        }
361
362        @Override
363        public AsyncAppender build() {
364            return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
365                shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray());
366        }
367    }
368
369    /**
370     * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
371     *
372     * @return the names of the sink appenders
373     */
374    public String[] getAppenderRefStrings() {
375        final String[] result = new String[appenderRefs.length];
376        for (int i = 0; i < result.length; i++) {
377            result[i] = appenderRefs[i].getRef();
378        }
379        return result;
380    }
381
382    /**
383     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
384     * the class and method where the logging call was made.
385     *
386     * @return {@code true} if location is included with every event, {@code false} otherwise
387     */
388    public boolean isIncludeLocation() {
389        return includeLocation;
390    }
391
392    /**
393     * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
394     * dropped when the queue is full.
395     *
396     * @return whether this AsyncAppender will block or drop events when the queue is full.
397     */
398    public boolean isBlocking() {
399        return blocking;
400    }
401
402    /**
403     * Returns the name of the appender that any errors are logged to or {@code null}.
404     *
405     * @return the name of the appender that any errors are logged to or {@code null}
406     */
407    public String getErrorRef() {
408        return errorRef;
409    }
410
411    public int getQueueCapacity() {
412        return queueSize;
413    }
414
415    public int getQueueRemainingCapacity() {
416        return queue.remainingCapacity();
417    }
418
419    /**
420     * Returns the number of elements in the queue.
421     *
422     * @return the number of elements in the queue.
423     * @since 2.11.1
424     */
425    public int getQueueSize() {
426        return queue.size();
427    }
428
429}