001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.util.HashMap; 020import java.util.Locale; 021import java.util.Map; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.flume.Event; 025import org.apache.flume.EventDeliveryException; 026import org.apache.flume.agent.embedded.EmbeddedAgent; 027import org.apache.logging.log4j.LoggingException; 028import org.apache.logging.log4j.core.appender.ManagerFactory; 029import org.apache.logging.log4j.core.config.ConfigurationException; 030import org.apache.logging.log4j.core.config.Property; 031import org.apache.logging.log4j.core.util.NameUtil; 032import org.apache.logging.log4j.util.PropertiesUtil; 033import org.apache.logging.log4j.util.Strings; 034 035public class FlumeEmbeddedManager extends AbstractFlumeManager { 036 037 private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator"); 038 039 private static final String IN_MEMORY = "InMemory"; 040 041 private static final FlumeManagerFactory FACTORY = new FlumeManagerFactory(); 042 043 private final EmbeddedAgent agent; 044 045 private final String shortName; 046 047 048 /** 049 * Constructor 050 * @param name The unique name of this manager. 051 * @param shortName The short version of the agent name. 052 * @param agent The embedded agent. 053 */ 054 protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) { 055 super(name); 056 this.agent = agent; 057 this.shortName = shortName; 058 } 059 060 /** 061 * Returns a FlumeEmbeddedManager. 062 * @param name The name of the manager. 063 * @param agents The agents to use. 064 * @param properties Properties for the embedded manager. 065 * @param batchSize The number of events to include in a batch. 066 * @param dataDir The directory where the Flume FileChannel should write to. 067 * @return A FlumeAvroManager. 068 */ 069 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties, 070 int batchSize, final String dataDir) { 071 072 if (batchSize <= 0) { 073 batchSize = 1; 074 } 075 076 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 077 throw new IllegalArgumentException("Either an Agent or properties are required"); 078 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { 079 throw new IllegalArgumentException("Cannot configure both Agents and Properties."); 080 } 081 082 final String extendedName = extendManagerName(name, agents, properties); 083 return getManager(extendedName, FACTORY, 084 new FactoryData(name, agents, properties, batchSize, dataDir)); 085 086 } 087 088 private static String extendManagerName( 089 final String name, 090 final Agent[] agents, 091 final Property[] properties) { 092 093 final StringBuilder sb = new StringBuilder(); 094 boolean first = true; 095 096 if (agents != null && agents.length > 0) { 097 sb.append(name).append('['); 098 for (final Agent agent : agents) { 099 if (!first) { 100 sb.append('_'); 101 } 102 sb.append(agent.getHost()).append('-').append(agent.getPort()); 103 first = false; 104 } 105 sb.append(']'); 106 } else { 107 String sep = Strings.EMPTY; 108 sb.append(name).append('-'); 109 final StringBuilder props = new StringBuilder(); 110 for (final Property prop : properties) { 111 props.append(sep); 112 props.append(prop.getName()).append('=').append(prop.getValue()); 113 sep = "_"; 114 } 115 sb.append(NameUtil.md5(props.toString())); 116 } 117 118 return sb.toString(); 119 120 } 121 122 @Override 123 public void send(final Event event) { 124 try { 125 agent.put(event); 126 } catch (final EventDeliveryException ex) { 127 throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex); 128 } 129 } 130 131 @Override 132 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 133 agent.stop(); 134 return true; 135 } 136 137 /** 138 * Factory data. 139 */ 140 private static class FactoryData { 141 private final Agent[] agents; 142 private final Property[] properties; 143 private final int batchSize; 144 private final String dataDir; 145 private final String name; 146 147 /** 148 * Constructor. 149 * @param name The name of the Appender. 150 * @param agents The agents. 151 * @param properties The Flume configuration properties. 152 * @param batchSize The number of events to include in a batch. 153 * @param dataDir The directory where Flume should write to. 154 */ 155 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize, 156 final String dataDir) { 157 this.name = name; 158 this.agents = agents; 159 this.batchSize = batchSize; 160 this.properties = properties; 161 this.dataDir = dataDir; 162 } 163 } 164 165 /** 166 * Avro Manager Factory. 167 */ 168 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> { 169 170 /** 171 * Create the FlumeAvroManager. 172 * @param name The name of the entity to manage. 173 * @param data The data required to create the entity. 174 * @return The FlumeAvroManager. 175 */ 176 @Override 177 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) { 178 try { 179 final Map<String, String> props = createProperties(data.name, data.agents, data.properties, 180 data.batchSize, data.dataDir); 181 final EmbeddedAgent agent = new EmbeddedAgent(name); 182 agent.configure(props); 183 agent.start(); 184 LOGGER.debug("Created Agent " + name); 185 return new FlumeEmbeddedManager(name, data.name, agent); 186 } catch (final Exception ex) { 187 LOGGER.error("Could not create FlumeEmbeddedManager", ex); 188 } 189 return null; 190 } 191 192 private Map<String, String> createProperties(final String name, final Agent[] agents, 193 final Property[] properties, final int batchSize, String dataDir) { 194 final Map<String, String> props = new HashMap<>(); 195 196 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 197 LOGGER.error("No Flume configuration provided"); 198 throw new ConfigurationException("No Flume configuration provided"); 199 } 200 201 if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { 202 LOGGER.error("Agents and Flume configuration cannot both be specified"); 203 throw new ConfigurationException("Agents and Flume configuration cannot both be specified"); 204 } 205 206 if (agents != null && agents.length > 0) { 207 208 if (Strings.isNotEmpty(dataDir)) { 209 if (dataDir.equals(IN_MEMORY)) { 210 props.put("channel.type", "memory"); 211 } else { 212 props.put("channel.type", "file"); 213 214 if (!dataDir.endsWith(FILE_SEP)) { 215 dataDir = dataDir + FILE_SEP; 216 } 217 218 props.put("channel.checkpointDir", dataDir + "checkpoint"); 219 props.put("channel.dataDirs", dataDir + "data"); 220 } 221 222 } else { 223 props.put("channel.type", "file"); 224 } 225 226 final StringBuilder sb = new StringBuilder(); 227 String leading = Strings.EMPTY; 228 final int priority = agents.length; 229 for (int i = 0; i < priority; ++i) { 230 sb.append(leading).append("agent").append(i); 231 leading = " "; 232 final String prefix = "agent" + i; 233 props.put(prefix + ".type", "avro"); 234 props.put(prefix + ".hostname", agents[i].getHost()); 235 props.put(prefix + ".port", Integer.toString(agents[i].getPort())); 236 props.put(prefix + ".batch-size", Integer.toString(batchSize)); 237 props.put("processor.priority." + prefix, Integer.toString(agents.length - i)); 238 } 239 props.put("sinks", sb.toString()); 240 props.put("processor.type", "failover"); 241 } else { 242 String[] sinks = null; 243 244 for (final Property property : properties) { 245 final String key = property.getName(); 246 247 if (Strings.isEmpty(key)) { 248 final String msg = "A property name must be provided"; 249 LOGGER.error(msg); 250 throw new ConfigurationException(msg); 251 } 252 253 final String upperKey = key.toUpperCase(Locale.ENGLISH); 254 255 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) { 256 final String msg = 257 "Specification of the agent name is not allowed in Flume Appender configuration: " + key; 258 LOGGER.error(msg); 259 throw new ConfigurationException(msg); 260 } 261 262 final String value = property.getValue(); 263 if (Strings.isEmpty(value)) { 264 final String msg = "A value for property " + key + " must be provided"; 265 LOGGER.error(msg); 266 throw new ConfigurationException(msg); 267 } 268 269 if (upperKey.equals("SINKS")) { 270 sinks = value.trim().split(" "); 271 } 272 273 props.put(key, value); 274 } 275 276 if (sinks == null || sinks.length == 0) { 277 final String msg = "At least one Sink must be specified"; 278 LOGGER.error(msg); 279 throw new ConfigurationException(msg); 280 } 281 } 282 return props; 283 } 284 285 } 286 287}