001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.Serializable; 020import java.util.Locale; 021import java.util.concurrent.TimeUnit; 022 023import org.apache.logging.log4j.core.Appender; 024import org.apache.logging.log4j.core.Filter; 025import org.apache.logging.log4j.core.Layout; 026import org.apache.logging.log4j.core.LogEvent; 027import org.apache.logging.log4j.core.appender.AbstractAppender; 028import org.apache.logging.log4j.core.config.Property; 029import org.apache.logging.log4j.core.config.plugins.Plugin; 030import org.apache.logging.log4j.core.config.plugins.PluginAliases; 031import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 032import org.apache.logging.log4j.core.config.plugins.PluginElement; 033import org.apache.logging.log4j.core.config.plugins.PluginFactory; 034import org.apache.logging.log4j.core.layout.Rfc5424Layout; 035import org.apache.logging.log4j.core.net.Facility; 036import org.apache.logging.log4j.core.util.Booleans; 037import org.apache.logging.log4j.core.util.Integers; 038import org.apache.logging.log4j.util.Timer; 039 040/** 041 * An Appender that uses the Avro protocol to route events to Flume. 042 */ 043@Plugin(name = "Flume", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true) 044public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { 045 046 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"}; 047 private static final int DEFAULT_MAX_DELAY = 60000; 048 049 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5; 050 051 private final AbstractFlumeManager manager; 052 053 private final String mdcIncludes; 054 private final String mdcExcludes; 055 private final String mdcRequired; 056 057 private final String eventPrefix; 058 059 private final String mdcPrefix; 060 061 private final boolean compressBody; 062 063 private final FlumeEventFactory factory; 064 065 private Timer timer = new Timer("FlumeEvent", 5000); 066 private volatile long count; 067 068 /** 069 * Which Manager will be used by the appender instance. 070 */ 071 private enum ManagerType { 072 AVRO, EMBEDDED, PERSISTENT; 073 074 public static ManagerType getType(final String type) { 075 return valueOf(type.toUpperCase(Locale.US)); 076 } 077 } 078 079 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, 080 final boolean ignoreExceptions, final String includes, final String excludes, final String required, 081 final String mdcPrefix, final String eventPrefix, final boolean compress, final FlumeEventFactory factory, 082 final Property[] properties, final AbstractFlumeManager manager) { 083 super(name, filter, layout, ignoreExceptions, properties); 084 this.manager = manager; 085 this.mdcIncludes = includes; 086 this.mdcExcludes = excludes; 087 this.mdcRequired = required; 088 this.eventPrefix = eventPrefix; 089 this.mdcPrefix = mdcPrefix; 090 this.compressBody = compress; 091 this.factory = factory == null ? this : factory; 092 } 093 094 /** 095 * Publish the event. 096 * @param event The LogEvent. 097 */ 098 @Override 099 public void append(final LogEvent event) { 100 final String name = event.getLoggerName(); 101 if (name != null) { 102 for (final String pkg : EXCLUDED_PACKAGES) { 103 if (name.startsWith(pkg)) { 104 return; 105 } 106 } 107 } 108 timer.startOrResume(); 109 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 110 eventPrefix, compressBody); 111 flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); 112 if (update()) { 113 String msg = timer.stop(); 114 LOGGER.debug(msg); 115 } else { 116 timer.pause(); 117 } 118 manager.send(flumeEvent); 119 } 120 121 private synchronized boolean update() { 122 if (++count == 5000) { 123 count = 0; 124 return true; 125 } 126 return false; 127 } 128 129 @Override 130 public boolean stop(final long timeout, final TimeUnit timeUnit) { 131 setStopping(); 132 boolean stopped = super.stop(timeout, timeUnit, false); 133 stopped &= manager.stop(timeout, timeUnit); 134 setStopped(); 135 return stopped; 136 } 137 138 /** 139 * Create a Flume event. 140 * @param event The Log4j LogEvent. 141 * @param includes comma separated list of mdc elements to include. 142 * @param excludes comma separated list of mdc elements to exclude. 143 * @param required comma separated list of mdc elements that must be present with a value. 144 * @param mdcPrefix The prefix to add to MDC key names. 145 * @param eventPrefix The prefix to add to event fields. 146 * @param compress If true the body will be compressed. 147 * @return A Flume Event. 148 */ 149 @Override 150 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes, 151 final String required, final String mdcPrefix, final String eventPrefix, 152 final boolean compress) { 153 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 154 eventPrefix, compressBody); 155 } 156 157 /** 158 * Create a Flume Avro Appender. 159 * @param agents An array of Agents. 160 * @param properties Properties to pass to the embedded agent. 161 * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used. 162 * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i> 163 * @param type Avro (default), Embedded, or Persistent. 164 * @param dataDir The directory where the Flume FileChannel should write its data. 165 * @param connectionTimeoutMillis The amount of time in milliseconds to wait before a connection times out. Minimum is 166 * 1000. 167 * @param requestTimeoutMillis The amount of time in milliseconds to wait before a request times out. Minimum is 1000. 168 * @param agentRetries The number of times to retry an agent before failing to the next agent. 169 * @param maxDelayMillis The maximum number of milliseconds to wait for a complete batch. 170 * @param name The name of the Appender. 171 * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise 172 * they are propagated to the caller. 173 * @param excludes A comma separated list of MDC elements to exclude. 174 * @param includes A comma separated list of MDC elements to include. 175 * @param required A comma separated list of MDC elements that are required. 176 * @param mdcPrefix The prefix to add to MDC key names. 177 * @param eventPrefix The prefix to add to event key names. 178 * @param compressBody If true the event body will be compressed. 179 * @param batchSize Number of events to include in a batch. Defaults to 1. 180 * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB. 181 * @param factory The factory to use to create Flume events. 182 * @param layout The layout to format the event. 183 * @param filter A Filter to filter events. 184 * 185 * @return A Flume Avro Appender. 186 */ 187 @PluginFactory 188 public static FlumeAppender createAppender(@PluginElement("Agents") final Agent[] agents, 189 @PluginElement("Properties") final Property[] properties, 190 @PluginAttribute("hosts") final String hosts, 191 @PluginAttribute("embedded") final String embedded, 192 @PluginAttribute("type") final String type, 193 @PluginAttribute("dataDir") final String dataDir, 194 @PluginAliases("connectTimeout") 195 @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis, 196 @PluginAliases("requestTimeout") 197 @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis, 198 @PluginAttribute("agentRetries") final String agentRetries, 199 @PluginAliases("maxDelay") // deprecated 200 @PluginAttribute("maxDelayMillis") final String maxDelayMillis, 201 @PluginAttribute("name") final String name, 202 @PluginAttribute("ignoreExceptions") final String ignore, 203 @PluginAttribute("mdcExcludes") final String excludes, 204 @PluginAttribute("mdcIncludes") final String includes, 205 @PluginAttribute("mdcRequired") final String required, 206 @PluginAttribute("mdcPrefix") final String mdcPrefix, 207 @PluginAttribute("eventPrefix") final String eventPrefix, 208 @PluginAttribute("compress") final String compressBody, 209 @PluginAttribute("batchSize") final String batchSize, 210 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries, 211 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory, 212 @PluginElement("Layout") Layout<? extends Serializable> layout, 213 @PluginElement("Filter") final Filter filter) { 214 215 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) : 216 (agents == null || agents.length == 0 || hosts == null || hosts.isEmpty()) && properties != null && properties.length > 0; 217 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true); 218 final boolean compress = Booleans.parseBoolean(compressBody, true); 219 ManagerType managerType; 220 if (type != null) { 221 if (embed && embedded != null) { 222 try { 223 managerType = ManagerType.getType(type); 224 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type); 225 } catch (final Exception ex) { 226 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + 227 " is invalid."); 228 managerType = ManagerType.EMBEDDED; 229 } 230 } else { 231 try { 232 managerType = ManagerType.getType(type); 233 } catch (final Exception ex) { 234 LOGGER.warn("Type " + type + " is invalid."); 235 managerType = ManagerType.EMBEDDED; 236 } 237 } 238 } else if (embed) { 239 managerType = ManagerType.EMBEDDED; 240 } else { 241 managerType = ManagerType.AVRO; 242 } 243 244 final int batchCount = Integers.parseInt(batchSize, 1); 245 final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0); 246 final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0); 247 final int retries = Integers.parseInt(agentRetries, 0); 248 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT); 249 final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY); 250 251 if (layout == null) { 252 final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER; 253 layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID, 254 mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null, 255 null); 256 } 257 258 if (name == null) { 259 LOGGER.error("No name provided for Appender"); 260 return null; 261 } 262 263 AbstractFlumeManager manager; 264 265 switch (managerType) { 266 case EMBEDDED: 267 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); 268 break; 269 case AVRO: 270 manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis); 271 break; 272 case PERSISTENT: 273 manager = FlumePersistentManager.getManager(name, getAgents(agents, hosts), properties, batchCount, retries, 274 connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir); 275 break; 276 default: 277 LOGGER.debug("No manager type specified. Defaulting to AVRO"); 278 manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis); 279 } 280 281 if (manager == null) { 282 return null; 283 } 284 285 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes, 286 excludes, required, mdcPrefix, eventPrefix, compress, factory, Property.EMPTY_ARRAY, manager); 287 } 288 289 private static Agent[] getAgents(Agent[] agents, final String hosts) { 290 if (agents == null || agents.length == 0) { 291 if (hosts != null && !hosts.isEmpty()) { 292 LOGGER.debug("Parsing agents from hosts parameter"); 293 final String[] hostports = hosts.split(","); 294 agents = new Agent[hostports.length]; 295 for(int i = 0; i < hostports.length; ++i) { 296 final String[] h = hostports[i].split(":"); 297 agents[i] = Agent.createAgent(h[0], h.length > 1 ? h[1] : null); 298 } 299 } else { 300 LOGGER.debug("No agents provided, using defaults"); 301 agents = new Agent[] {Agent.createAgent(null, null)}; 302 } 303 } 304 305 LOGGER.debug("Using agents {}", agents); 306 return agents; 307 } 308}