001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.appender; 018 019import org.apache.logging.log4j.core.Appender; 020import org.apache.logging.log4j.core.Core; 021import org.apache.logging.log4j.core.Filter; 022import org.apache.logging.log4j.core.LogEvent; 023import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory; 024import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil; 025import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy; 026import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory; 027import org.apache.logging.log4j.core.async.BlockingQueueFactory; 028import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy; 029import org.apache.logging.log4j.core.async.EventRoute; 030import org.apache.logging.log4j.core.async.InternalAsyncUtil; 031import org.apache.logging.log4j.core.config.AppenderControl; 032import org.apache.logging.log4j.core.config.AppenderRef; 033import org.apache.logging.log4j.core.config.Configuration; 034import org.apache.logging.log4j.core.config.ConfigurationException; 035import org.apache.logging.log4j.core.config.Property; 036import org.apache.logging.log4j.core.config.plugins.Plugin; 037import org.apache.logging.log4j.core.config.plugins.PluginAliases; 038import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; 039import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; 040import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 041import org.apache.logging.log4j.core.config.plugins.PluginElement; 042import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; 043import org.apache.logging.log4j.core.filter.AbstractFilterable; 044import org.apache.logging.log4j.core.impl.Log4jLogEvent; 045import org.apache.logging.log4j.spi.AbstractLogger; 046 047import java.util.ArrayList; 048import java.util.List; 049import java.util.Map; 050import java.util.concurrent.BlockingQueue; 051import java.util.concurrent.TimeUnit; 052import java.util.concurrent.TransferQueue; 053 054/** 055 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an 056 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender 057 * references. 058 */ 059@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true) 060public final class AsyncAppender extends AbstractAppender { 061 062 private static final int DEFAULT_QUEUE_SIZE = 1024; 063 064 private final BlockingQueue<LogEvent> queue; 065 private final int queueSize; 066 private final boolean blocking; 067 private final long shutdownTimeout; 068 private final Configuration config; 069 private final AppenderRef[] appenderRefs; 070 private final String errorRef; 071 private final boolean includeLocation; 072 private AppenderControl errorAppender; 073 private AsyncAppenderEventDispatcher dispatcher; 074 private AsyncQueueFullPolicy asyncQueueFullPolicy; 075 076 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 077 final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions, 078 final long shutdownTimeout, final Configuration config, final boolean includeLocation, 079 final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) { 080 super(name, filter, null, ignoreExceptions, properties); 081 this.queue = blockingQueueFactory.create(queueSize); 082 this.queueSize = queueSize; 083 this.blocking = blocking; 084 this.shutdownTimeout = shutdownTimeout; 085 this.config = config; 086 this.appenderRefs = appenderRefs; 087 this.errorRef = errorRef; 088 this.includeLocation = includeLocation; 089 } 090 091 @Override 092 public void start() { 093 final Map<String, Appender> map = config.getAppenders(); 094 final List<AppenderControl> appenders = new ArrayList<>(); 095 for (final AppenderRef appenderRef : appenderRefs) { 096 final Appender appender = map.get(appenderRef.getRef()); 097 if (appender != null) { 098 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter())); 099 } else { 100 LOGGER.error("No appender named {} was configured", appenderRef); 101 } 102 } 103 if (errorRef != null) { 104 final Appender appender = map.get(errorRef); 105 if (appender != null) { 106 errorAppender = new AppenderControl(appender, null, null); 107 } else { 108 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 109 } 110 } 111 if (appenders.size() > 0) { 112 dispatcher = new AsyncAppenderEventDispatcher( 113 getName(), errorAppender, appenders, queue); 114 } else if (errorRef == null) { 115 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); 116 } 117 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); 118 119 dispatcher.start(); 120 super.start(); 121 } 122 123 @Override 124 public boolean stop(final long timeout, final TimeUnit timeUnit) { 125 setStopping(); 126 super.stop(timeout, timeUnit, false); 127 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); 128 try { 129 dispatcher.stop(shutdownTimeout); 130 } catch (final InterruptedException ignored) { 131 // Restore the interrupted flag cleared when the exception is caught. 132 Thread.currentThread().interrupt(); 133 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); 134 } 135 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); 136 137 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { 138 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy, 139 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); 140 } 141 setStopped(); 142 return true; 143 } 144 145 /** 146 * Actual writing occurs here. 147 * 148 * @param logEvent The LogEvent. 149 */ 150 @Override 151 public void append(final LogEvent logEvent) { 152 if (!isStarted()) { 153 throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); 154 } 155 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); 156 InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); 157 if (!transfer(memento)) { 158 if (blocking) { 159 if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 160 // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock 161 AsyncQueueFullMessageUtil.logWarningToStatusLogger(); 162 logMessageInCurrentThread(logEvent); 163 } else { 164 // delegate to the event router (which may discard, enqueue and block, or log in current thread) 165 final EventRoute route = asyncQueueFullPolicy.getRoute(dispatcher.getId(), memento.getLevel()); 166 route.logMessage(this, memento); 167 } 168 } else { 169 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 170 logToErrorAppenderIfNecessary(false, memento); 171 } 172 } 173 } 174 175 private boolean transfer(final LogEvent memento) { 176 return queue instanceof TransferQueue 177 ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento) 178 : queue.offer(memento); 179 } 180 181 /** 182 * FOR INTERNAL USE ONLY. 183 * 184 * @param logEvent the event to log 185 */ 186 public void logMessageInCurrentThread(final LogEvent logEvent) { 187 logEvent.setEndOfBatch(queue.isEmpty()); 188 dispatcher.dispatch(logEvent); 189 } 190 191 /** 192 * FOR INTERNAL USE ONLY. 193 * 194 * @param logEvent the event to log 195 */ 196 public void logMessageInBackgroundThread(final LogEvent logEvent) { 197 try { 198 // wait for free slots in the queue 199 queue.put(logEvent); 200 } catch (final InterruptedException ignored) { 201 final boolean appendSuccessful = handleInterruptedException(logEvent); 202 logToErrorAppenderIfNecessary(appendSuccessful, logEvent); 203 } 204 } 205 206 // LOG4J2-1049: Some applications use Thread.interrupt() to send 207 // messages between application threads. This does not necessarily 208 // mean that the queue is full. To prevent dropping a log message, 209 // quickly try to offer the event to the queue again. 210 // (Yes, this means there is a possibility the same event is logged twice.) 211 // 212 // Finally, catching the InterruptedException means the 213 // interrupted flag has been cleared on the current thread. 214 // This may interfere with the application's expectation of 215 // being interrupted, so when we are done, we set the interrupted 216 // flag again. 217 private boolean handleInterruptedException(final LogEvent memento) { 218 final boolean appendSuccessful = queue.offer(memento); 219 if (!appendSuccessful) { 220 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", 221 getName()); 222 } 223 // set the interrupted flag again. 224 Thread.currentThread().interrupt(); 225 return appendSuccessful; 226 } 227 228 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) { 229 if (!appendSuccessful && errorAppender != null) { 230 errorAppender.callAppender(logEvent); 231 } 232 } 233 234 /** 235 * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the 236 * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior 237 * pre-2.7. 238 * 239 * @param appenderRefs The Appenders to reference. 240 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 241 * @param blocking True if the Appender should wait when the queue is full. The default is true. 242 * @param shutdownTimeout How many milliseconds the Appender should wait to flush outstanding log events 243 * in the queue on shutdown. The default is zero which means to wait forever. 244 * @param size The size of the event queue. The default is 128. 245 * @param name The name of the Appender. 246 * @param includeLocation whether to include location information. The default is false. 247 * @param filter The Filter or null. 248 * @param config The Configuration. 249 * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged; 250 * otherwise they are propagated to the caller. 251 * @return The AsyncAppender. 252 * @deprecated use {@link Builder} instead 253 */ 254 @Deprecated 255 public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef, 256 final boolean blocking, final long shutdownTimeout, final int size, 257 final String name, final boolean includeLocation, final Filter filter, 258 final Configuration config, final boolean ignoreExceptions) { 259 if (name == null) { 260 LOGGER.error("No name provided for AsyncAppender"); 261 return null; 262 } 263 if (appenderRefs == null) { 264 LOGGER.error("No appender references provided to AsyncAppender {}", name); 265 } 266 267 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions, 268 shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null); 269 } 270 271 @PluginBuilderFactory 272 public static Builder newBuilder() { 273 return new Builder(); 274 } 275 276 public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B> 277 implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> { 278 279 @PluginElement("AppenderRef") 280 @Required(message = "No appender references provided to AsyncAppender") 281 private AppenderRef[] appenderRefs; 282 283 @PluginBuilderAttribute 284 @PluginAliases("error-ref") 285 private String errorRef; 286 287 @PluginBuilderAttribute 288 private boolean blocking = true; 289 290 @PluginBuilderAttribute 291 private long shutdownTimeout = 0L; 292 293 @PluginBuilderAttribute 294 private int bufferSize = DEFAULT_QUEUE_SIZE; 295 296 @PluginBuilderAttribute 297 @Required(message = "No name provided for AsyncAppender") 298 private String name; 299 300 @PluginBuilderAttribute 301 private boolean includeLocation = false; 302 303 @PluginConfiguration 304 private Configuration configuration; 305 306 @PluginBuilderAttribute 307 private boolean ignoreExceptions = true; 308 309 @PluginElement(BlockingQueueFactory.ELEMENT_TYPE) 310 private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>(); 311 312 public Builder setAppenderRefs(final AppenderRef[] appenderRefs) { 313 this.appenderRefs = appenderRefs; 314 return this; 315 } 316 317 public Builder setErrorRef(final String errorRef) { 318 this.errorRef = errorRef; 319 return this; 320 } 321 322 public Builder setBlocking(final boolean blocking) { 323 this.blocking = blocking; 324 return this; 325 } 326 327 public Builder setShutdownTimeout(final long shutdownTimeout) { 328 this.shutdownTimeout = shutdownTimeout; 329 return this; 330 } 331 332 public Builder setBufferSize(final int bufferSize) { 333 this.bufferSize = bufferSize; 334 return this; 335 } 336 337 public Builder setName(final String name) { 338 this.name = name; 339 return this; 340 } 341 342 public Builder setIncludeLocation(final boolean includeLocation) { 343 this.includeLocation = includeLocation; 344 return this; 345 } 346 347 public Builder setConfiguration(final Configuration configuration) { 348 this.configuration = configuration; 349 return this; 350 } 351 352 public Builder setIgnoreExceptions(final boolean ignoreExceptions) { 353 this.ignoreExceptions = ignoreExceptions; 354 return this; 355 } 356 357 public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) { 358 this.blockingQueueFactory = blockingQueueFactory; 359 return this; 360 } 361 362 @Override 363 public AsyncAppender build() { 364 return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions, 365 shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray()); 366 } 367 } 368 369 /** 370 * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings. 371 * 372 * @return the names of the sink appenders 373 */ 374 public String[] getAppenderRefStrings() { 375 final String[] result = new String[appenderRefs.length]; 376 for (int i = 0; i < result.length; i++) { 377 result[i] = appenderRefs[i].getRef(); 378 } 379 return result; 380 } 381 382 /** 383 * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine 384 * the class and method where the logging call was made. 385 * 386 * @return {@code true} if location is included with every event, {@code false} otherwise 387 */ 388 public boolean isIncludeLocation() { 389 return includeLocation; 390 } 391 392 /** 393 * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are 394 * dropped when the queue is full. 395 * 396 * @return whether this AsyncAppender will block or drop events when the queue is full. 397 */ 398 public boolean isBlocking() { 399 return blocking; 400 } 401 402 /** 403 * Returns the name of the appender that any errors are logged to or {@code null}. 404 * 405 * @return the name of the appender that any errors are logged to or {@code null} 406 */ 407 public String getErrorRef() { 408 return errorRef; 409 } 410 411 public int getQueueCapacity() { 412 return queueSize; 413 } 414 415 public int getQueueRemainingCapacity() { 416 return queue.remainingCapacity(); 417 } 418 419 /** 420 * Returns the number of elements in the queue. 421 * 422 * @return the number of elements in the queue. 423 * @since 2.11.1 424 */ 425 public int getQueueSize() { 426 return queue.size(); 427 } 428 429}