001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.util.HashMap;
020import java.util.Locale;
021import java.util.Map;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.flume.Event;
025import org.apache.flume.EventDeliveryException;
026import org.apache.flume.agent.embedded.EmbeddedAgent;
027import org.apache.logging.log4j.LoggingException;
028import org.apache.logging.log4j.core.appender.ManagerFactory;
029import org.apache.logging.log4j.core.config.ConfigurationException;
030import org.apache.logging.log4j.core.config.Property;
031import org.apache.logging.log4j.core.util.NameUtil;
032import org.apache.logging.log4j.util.PropertiesUtil;
033import org.apache.logging.log4j.util.Strings;
034
035public class FlumeEmbeddedManager extends AbstractFlumeManager {
036
037    private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
038
039    private static final String IN_MEMORY = "InMemory";
040
041    private static final FlumeManagerFactory FACTORY = new FlumeManagerFactory();
042
043    private final EmbeddedAgent agent;
044
045    private final String shortName;
046
047
048    /**
049     * Constructor
050     * @param name The unique name of this manager.
051     * @param shortName The short version of the agent name.
052     * @param agent The embedded agent.
053     */
054    protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
055        super(name);
056        this.agent = agent;
057        this.shortName = shortName;
058    }
059
060    /**
061     * Returns a FlumeEmbeddedManager.
062     * @param name The name of the manager.
063     * @param agents The agents to use.
064     * @param properties Properties for the embedded manager.
065     * @param batchSize The number of events to include in a batch.
066     * @param dataDir The directory where the Flume FileChannel should write to.
067     * @return A FlumeAvroManager.
068     */
069    public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
070                                                  int batchSize, final String dataDir) {
071
072        if (batchSize <= 0) {
073            batchSize = 1;
074        }
075
076        if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
077            throw new IllegalArgumentException("Either an Agent or properties are required");
078        } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
079            throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
080        }
081
082        final String extendedName = extendManagerName(name, agents, properties);
083        return getManager(extendedName, FACTORY,
084                new FactoryData(name, agents, properties, batchSize, dataDir));
085
086    }
087
088    private static String extendManagerName(
089            final String name,
090            final Agent[] agents,
091            final Property[] properties) {
092
093        final StringBuilder sb = new StringBuilder();
094        boolean first = true;
095
096        if (agents != null && agents.length > 0) {
097            sb.append(name).append('[');
098            for (final Agent agent : agents) {
099                if (!first) {
100                    sb.append('_');
101                }
102                sb.append(agent.getHost()).append('-').append(agent.getPort());
103                first = false;
104            }
105            sb.append(']');
106        } else {
107            String sep = Strings.EMPTY;
108            sb.append(name).append('-');
109            final StringBuilder props = new StringBuilder();
110            for (final Property prop : properties) {
111                props.append(sep);
112                props.append(prop.getName()).append('=').append(prop.getValue());
113                sep = "_";
114            }
115            sb.append(NameUtil.md5(props.toString()));
116        }
117
118        return sb.toString();
119
120    }
121
122    @Override
123    public void send(final Event event) {
124        try {
125            agent.put(event);
126        } catch (final EventDeliveryException ex) {
127            throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);
128        }
129    }
130
131    @Override
132    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
133        agent.stop();
134        return true;
135    }
136
137    /**
138     * Factory data.
139     */
140    private static class FactoryData {
141        private final Agent[] agents;
142        private final Property[] properties;
143        private final int batchSize;
144        private final String dataDir;
145        private final String name;
146
147        /**
148         * Constructor.
149         * @param name The name of the Appender.
150         * @param agents The agents.
151         * @param properties The Flume configuration properties.
152         * @param batchSize The number of events to include in a batch.
153         * @param dataDir The directory where Flume should write to.
154         */
155        public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
156                           final String dataDir) {
157            this.name = name;
158            this.agents = agents;
159            this.batchSize = batchSize;
160            this.properties = properties;
161            this.dataDir = dataDir;
162        }
163    }
164
165    /**
166     * Avro Manager Factory.
167     */
168    private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
169
170        /**
171         * Create the FlumeAvroManager.
172         * @param name The name of the entity to manage.
173         * @param data The data required to create the entity.
174         * @return The FlumeAvroManager.
175         */
176        @Override
177        public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
178            try {
179                final Map<String, String> props = createProperties(data.name, data.agents, data.properties,
180                    data.batchSize, data.dataDir);
181                final EmbeddedAgent agent = new EmbeddedAgent(name);
182                agent.configure(props);
183                agent.start();
184                LOGGER.debug("Created Agent " + name);
185                return new FlumeEmbeddedManager(name, data.name, agent);
186            } catch (final Exception ex) {
187                LOGGER.error("Could not create FlumeEmbeddedManager", ex);
188            }
189            return null;
190        }
191
192        private Map<String, String> createProperties(final String name, final Agent[] agents,
193                                                     final Property[] properties, final int batchSize, String dataDir) {
194            final Map<String, String> props = new HashMap<>();
195
196            if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
197                LOGGER.error("No Flume configuration provided");
198                throw new ConfigurationException("No Flume configuration provided");
199            }
200
201            if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
202                LOGGER.error("Agents and Flume configuration cannot both be specified");
203                throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
204            }
205
206            if (agents != null && agents.length > 0) {
207
208                if (Strings.isNotEmpty(dataDir)) {
209                    if (dataDir.equals(IN_MEMORY)) {
210                        props.put("channel.type", "memory");
211                    } else {
212                        props.put("channel.type", "file");
213
214                        if (!dataDir.endsWith(FILE_SEP)) {
215                            dataDir = dataDir + FILE_SEP;
216                        }
217
218                        props.put("channel.checkpointDir", dataDir + "checkpoint");
219                        props.put("channel.dataDirs", dataDir + "data");
220                    }
221
222                } else {
223                    props.put("channel.type", "file");
224                }
225
226                final StringBuilder sb = new StringBuilder();
227                String leading = Strings.EMPTY;
228                final int priority = agents.length;
229                for (int i = 0; i < priority; ++i) {
230                    sb.append(leading).append("agent").append(i);
231                    leading = " ";
232                    final String prefix = "agent" + i;
233                    props.put(prefix + ".type", "avro");
234                    props.put(prefix + ".hostname", agents[i].getHost());
235                    props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
236                    props.put(prefix + ".batch-size", Integer.toString(batchSize));
237                    props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
238                }
239                props.put("sinks", sb.toString());
240                props.put("processor.type", "failover");
241            } else {
242                String[] sinks = null;
243
244                for (final Property property : properties) {
245                    final String key = property.getName();
246
247                    if (Strings.isEmpty(key)) {
248                        final String msg = "A property name must be provided";
249                        LOGGER.error(msg);
250                        throw new ConfigurationException(msg);
251                    }
252
253                    final String upperKey = key.toUpperCase(Locale.ENGLISH);
254
255                    if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
256                        final String msg =
257                            "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
258                        LOGGER.error(msg);
259                        throw new ConfigurationException(msg);
260                    }
261
262                    final String value = property.getValue();
263                    if (Strings.isEmpty(value)) {
264                        final String msg = "A value for property " + key + " must be provided";
265                        LOGGER.error(msg);
266                        throw new ConfigurationException(msg);
267                    }
268
269                    if (upperKey.equals("SINKS")) {
270                        sinks = value.trim().split(" ");
271                    }
272
273                    props.put(key, value);
274                }
275
276                if (sinks == null || sinks.length == 0) {
277                    final String msg = "At least one Sink must be specified";
278                    LOGGER.error(msg);
279                    throw new ConfigurationException(msg);
280                }
281            }
282            return props;
283        }
284
285    }
286
287}