View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.HashMap;
20  import java.util.Locale;
21  import java.util.Map;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.flume.Event;
25  import org.apache.flume.EventDeliveryException;
26  import org.apache.flume.agent.embedded.EmbeddedAgent;
27  import org.apache.logging.log4j.LoggingException;
28  import org.apache.logging.log4j.core.appender.ManagerFactory;
29  import org.apache.logging.log4j.core.config.ConfigurationException;
30  import org.apache.logging.log4j.core.config.Property;
31  import org.apache.logging.log4j.core.util.NameUtil;
32  import org.apache.logging.log4j.util.PropertiesUtil;
33  import org.apache.logging.log4j.util.Strings;
34  
35  public class FlumeEmbeddedManager extends AbstractFlumeManager {
36  
37      private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
38  
39      private static final String IN_MEMORY = "InMemory";
40  
41      private static final FlumeManagerFactory FACTORY = new FlumeManagerFactory();
42  
43      private final EmbeddedAgent agent;
44  
45      private final String shortName;
46  
47  
48      /**
49       * Constructor
50       * @param name The unique name of this manager.
51       * @param shortName The short version of the agent name.
52       * @param agent The embedded agent.
53       */
54      protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
55          super(name);
56          this.agent = agent;
57          this.shortName = shortName;
58      }
59  
60      /**
61       * Returns a FlumeEmbeddedManager.
62       * @param name The name of the manager.
63       * @param agents The agents to use.
64       * @param properties Properties for the embedded manager.
65       * @param batchSize The number of events to include in a batch.
66       * @param dataDir The directory where the Flume FileChannel should write to.
67       * @return A FlumeAvroManager.
68       */
69      public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
70                                                    int batchSize, final String dataDir) {
71  
72          if (batchSize <= 0) {
73              batchSize = 1;
74          }
75  
76          if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
77              throw new IllegalArgumentException("Either an Agent or properties are required");
78          } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
79              throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
80          }
81  
82          final String extendedName = extendManagerName(name, agents, properties);
83          return getManager(extendedName, FACTORY,
84                  new FactoryData(name, agents, properties, batchSize, dataDir));
85  
86      }
87  
88      private static String extendManagerName(
89              final String name,
90              final Agent[] agents,
91              final Property[] properties) {
92  
93          final StringBuilder sb = new StringBuilder();
94          boolean first = true;
95  
96          if (agents != null && agents.length > 0) {
97              sb.append(name).append('[');
98              for (final Agent agent : agents) {
99                  if (!first) {
100                     sb.append('_');
101                 }
102                 sb.append(agent.getHost()).append('-').append(agent.getPort());
103                 first = false;
104             }
105             sb.append(']');
106         } else {
107             String sep = Strings.EMPTY;
108             sb.append(name).append('-');
109             final StringBuilder props = new StringBuilder();
110             for (final Property prop : properties) {
111                 props.append(sep);
112                 props.append(prop.getName()).append('=').append(prop.getValue());
113                 sep = "_";
114             }
115             sb.append(NameUtil.md5(props.toString()));
116         }
117 
118         return sb.toString();
119 
120     }
121 
122     @Override
123     public void send(final Event event) {
124         try {
125             agent.put(event);
126         } catch (final EventDeliveryException ex) {
127             throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);
128         }
129     }
130 
131     @Override
132     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
133         agent.stop();
134         return true;
135     }
136 
137     /**
138      * Factory data.
139      */
140     private static class FactoryData {
141         private final Agent[] agents;
142         private final Property[] properties;
143         private final int batchSize;
144         private final String dataDir;
145         private final String name;
146 
147         /**
148          * Constructor.
149          * @param name The name of the Appender.
150          * @param agents The agents.
151          * @param properties The Flume configuration properties.
152          * @param batchSize The number of events to include in a batch.
153          * @param dataDir The directory where Flume should write to.
154          */
155         public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
156                            final String dataDir) {
157             this.name = name;
158             this.agents = agents;
159             this.batchSize = batchSize;
160             this.properties = properties;
161             this.dataDir = dataDir;
162         }
163     }
164 
165     /**
166      * Avro Manager Factory.
167      */
168     private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
169 
170         /**
171          * Create the FlumeAvroManager.
172          * @param name The name of the entity to manage.
173          * @param data The data required to create the entity.
174          * @return The FlumeAvroManager.
175          */
176         @Override
177         public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
178             try {
179                 final Map<String, String> props = createProperties(data.name, data.agents, data.properties,
180                     data.batchSize, data.dataDir);
181                 final EmbeddedAgent agent = new EmbeddedAgent(name);
182                 agent.configure(props);
183                 agent.start();
184                 LOGGER.debug("Created Agent " + name);
185                 return new FlumeEmbeddedManager(name, data.name, agent);
186             } catch (final Exception ex) {
187                 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
188             }
189             return null;
190         }
191 
192         private Map<String, String> createProperties(final String name, final Agent[] agents,
193                                                      final Property[] properties, final int batchSize, String dataDir) {
194             final Map<String, String> props = new HashMap<>();
195 
196             if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
197                 LOGGER.error("No Flume configuration provided");
198                 throw new ConfigurationException("No Flume configuration provided");
199             }
200 
201             if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
202                 LOGGER.error("Agents and Flume configuration cannot both be specified");
203                 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
204             }
205 
206             if (agents != null && agents.length > 0) {
207 
208                 if (Strings.isNotEmpty(dataDir)) {
209                     if (dataDir.equals(IN_MEMORY)) {
210                         props.put("channel.type", "memory");
211                     } else {
212                         props.put("channel.type", "file");
213 
214                         if (!dataDir.endsWith(FILE_SEP)) {
215                             dataDir = dataDir + FILE_SEP;
216                         }
217 
218                         props.put("channel.checkpointDir", dataDir + "checkpoint");
219                         props.put("channel.dataDirs", dataDir + "data");
220                     }
221 
222                 } else {
223                     props.put("channel.type", "file");
224                 }
225 
226                 final StringBuilder sb = new StringBuilder();
227                 String leading = Strings.EMPTY;
228                 final int priority = agents.length;
229                 for (int i = 0; i < priority; ++i) {
230                     sb.append(leading).append("agent").append(i);
231                     leading = " ";
232                     final String prefix = "agent" + i;
233                     props.put(prefix + ".type", "avro");
234                     props.put(prefix + ".hostname", agents[i].getHost());
235                     props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
236                     props.put(prefix + ".batch-size", Integer.toString(batchSize));
237                     props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
238                 }
239                 props.put("sinks", sb.toString());
240                 props.put("processor.type", "failover");
241             } else {
242                 String[] sinks = null;
243 
244                 for (final Property property : properties) {
245                     final String key = property.getName();
246 
247                     if (Strings.isEmpty(key)) {
248                         final String msg = "A property name must be provided";
249                         LOGGER.error(msg);
250                         throw new ConfigurationException(msg);
251                     }
252 
253                     final String upperKey = key.toUpperCase(Locale.ENGLISH);
254 
255                     if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
256                         final String msg =
257                             "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
258                         LOGGER.error(msg);
259                         throw new ConfigurationException(msg);
260                     }
261 
262                     final String value = property.getValue();
263                     if (Strings.isEmpty(value)) {
264                         final String msg = "A value for property " + key + " must be provided";
265                         LOGGER.error(msg);
266                         throw new ConfigurationException(msg);
267                     }
268 
269                     if (upperKey.equals("SINKS")) {
270                         sinks = value.trim().split(" ");
271                     }
272 
273                     props.put(key, value);
274                 }
275 
276                 if (sinks == null || sinks.length == 0) {
277                     final String msg = "At least one Sink must be specified";
278                     LOGGER.error(msg);
279                     throw new ConfigurationException(msg);
280                 }
281             }
282             return props;
283         }
284 
285     }
286 
287 }