001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.util.Properties;
020import java.util.concurrent.TimeUnit;
021
022import org.apache.flume.Event;
023import org.apache.flume.api.RpcClient;
024import org.apache.flume.api.RpcClientFactory;
025import org.apache.logging.log4j.core.appender.AppenderLoggingException;
026import org.apache.logging.log4j.core.appender.ManagerFactory;
027
028/**
029 * Manager for FlumeAvroAppenders.
030 */
031public class FlumeAvroManager extends AbstractFlumeManager {
032
033    private static final int MAX_RECONNECTS = 3;
034    private static final int MINIMUM_TIMEOUT = 1000;
035
036    private static AvroManagerFactory factory = new AvroManagerFactory();
037
038    private final Agent[] agents;
039
040    private final int batchSize;
041
042    private final long delayNanos;
043    private final int delayMillis;
044
045    private final int retries;
046
047    private final int connectTimeoutMillis;
048
049    private final int requestTimeoutMillis;
050
051    private final int current = 0;
052
053    private volatile RpcClient rpcClient;
054
055    private BatchEvent batchEvent = new BatchEvent();
056    private long nextSend = 0;
057
058    /**
059     * Constructor
060     * @param name The unique name of this manager.
061     * @param agents An array of Agents.
062     * @param batchSize The number of events to include in a batch.
063     * @param retries The number of times to retry connecting before giving up.
064     * @param connectTimeout The connection timeout in ms.
065     * @param requestTimeout The request timeout in ms.
066     *
067     */
068    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
069                               final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
070        super(name);
071        this.agents = agents;
072        this.batchSize = batchSize;
073        this.delayMillis = delayMillis;
074        this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
075        this.retries = retries;
076        this.connectTimeoutMillis = connectTimeout;
077        this.requestTimeoutMillis = requestTimeout;
078        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
079    }
080
081    /**
082     * Returns a FlumeAvroManager.
083     * @param name The name of the manager.
084     * @param agents The agents to use.
085     * @param batchSize The number of events to include in a batch.
086     * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
087     * @param retries The number of times to retry connecting before giving up.
088     * @param connectTimeoutMillis The connection timeout in ms.
089     * @param requestTimeoutMillis The request timeout in ms.
090     * @return A FlumeAvroManager.
091     */
092    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
093                                              final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
094        if (agents == null || agents.length == 0) {
095            throw new IllegalArgumentException("At least one agent is required");
096        }
097
098        if (batchSize <= 0) {
099            batchSize = 1;
100        }
101        final StringBuilder sb = new StringBuilder(name);
102        sb.append(" FlumeAvro[");
103        boolean first = true;
104        for (final Agent agent : agents) {
105            if (!first) {
106                sb.append(',');
107            }
108            sb.append(agent.getHost()).append(':').append(agent.getPort());
109            first = false;
110        }
111        sb.append(']');
112        return getManager(sb.toString(), factory,
113                new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
114    }
115
116    /**
117     * Returns the agents.
118     * @return The agent array.
119     */
120    public Agent[] getAgents() {
121        return agents;
122    }
123
124    /**
125     * Returns the index of the current agent.
126     * @return The index for the current agent.
127     */
128    public int getCurrent() {
129        return current;
130    }
131
132    public int getRetries() {
133        return retries;
134    }
135
136    public int getConnectTimeoutMillis() {
137        return connectTimeoutMillis;
138    }
139
140    public int getRequestTimeoutMillis() {
141        return requestTimeoutMillis;
142    }
143
144    public int getBatchSize() {
145        return batchSize;
146    }
147
148    public int getDelayMillis() {
149        return delayMillis;
150    }
151
152    public void send(final BatchEvent events) {
153        if (rpcClient == null) {
154            synchronized (this) {
155                if (rpcClient == null) {
156                    rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
157                }
158            }
159        }
160
161        if (rpcClient != null) {
162            try {
163                LOGGER.trace("Sending batch of {} events", events.getEvents().size());
164                rpcClient.appendBatch(events.getEvents());
165            } catch (final Exception ex) {
166                rpcClient.close();
167                rpcClient = null;
168                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
169                    agents[current].getPort();
170                LOGGER.warn(msg, ex);
171                throw new AppenderLoggingException("No Flume agents are available");
172            }
173        }  else {
174            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
175                agents[current].getPort();
176            LOGGER.warn(msg);
177            throw new AppenderLoggingException("No Flume agents are available");
178        }
179    }
180
181    @Override
182    public void send(final Event event)  {
183        if (batchSize == 1) {
184            if (rpcClient == null) {
185                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
186            }
187
188            if (rpcClient != null) {
189                try {
190                    rpcClient.append(event);
191                } catch (final Exception ex) {
192                    rpcClient.close();
193                    rpcClient = null;
194                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
195                            agents[current].getPort();
196                    LOGGER.warn(msg, ex);
197                    throw new AppenderLoggingException("No Flume agents are available");
198                }
199            } else {
200                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
201                        agents[current].getPort();
202                LOGGER.warn(msg);
203                throw new AppenderLoggingException("No Flume agents are available");
204            }
205        } else {
206            int eventCount;
207            BatchEvent batch = null;
208            synchronized(batchEvent) {
209                batchEvent.addEvent(event);
210                eventCount = batchEvent.size();
211                long now = System.nanoTime();
212                if (eventCount == 1) {
213                    nextSend = now + delayNanos;
214                }
215                if (eventCount >= batchSize || now >= nextSend) {
216                    batch = batchEvent;
217                    batchEvent = new BatchEvent();
218                }
219            }
220            if (batch != null) {
221                send(batch);
222            }
223        }
224    }
225
226    /**
227     * There is a very good chance that this will always return the first agent even if it isn't available.
228     * @param agents The list of agents to choose from
229     * @return The FlumeEventAvroServer.
230     */
231    private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
232        try {
233            final Properties props = new Properties();
234
235            props.put("client.type", "default_failover");
236
237            int agentCount = 1;
238            final StringBuilder sb = new StringBuilder();
239            for (final Agent agent : agents) {
240                if (sb.length() > 0) {
241                    sb.append(' ');
242                }
243                final String hostName = "host" + agentCount++;
244                props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
245                sb.append(hostName);
246            }
247            props.put("hosts", sb.toString());
248            if (batchSize > 0) {
249                props.put("batch-size", Integer.toString(batchSize));
250            }
251            if (retries > 1) {
252                if (retries > MAX_RECONNECTS) {
253                    retries = MAX_RECONNECTS;
254                }
255                props.put("max-attempts", Integer.toString(retries * agents.length));
256            }
257            if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
258                props.put("request-timeout", Integer.toString(requestTimeoutMillis));
259            }
260            if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
261                props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
262            }
263            return RpcClientFactory.getInstance(props);
264        } catch (final Exception ex) {
265            LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
266            return null;
267        }
268    }
269
270    @Override
271    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
272        boolean closed = true;
273        if (rpcClient != null) {
274            try {
275                synchronized(this) {
276                    try {
277                        if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
278                            send(batchEvent);
279                        }
280                    } catch (final Exception ex) {
281                        LOGGER.error("Error sending final batch: {}", ex.getMessage());
282                        closed = false;
283                    }
284                }
285                rpcClient.close();
286            } catch (final Exception ex) {
287                LOGGER.error("Attempt to close RPC client failed", ex);
288                closed = false;
289            }
290        }
291        rpcClient = null;
292        return closed;
293    }
294
295    /**
296     * Factory data.
297     */
298    private static class FactoryData {
299        private final String name;
300        private final Agent[] agents;
301        private final int batchSize;
302        private final int delayMillis;
303        private final int retries;
304        private final int conntectTimeoutMillis;
305        private final int requestTimeoutMillis;
306
307        /**
308         * Constructor.
309         * @param name The name of the Appender.
310         * @param agents The agents.
311         * @param batchSize The number of events to include in a batch.
312         */
313        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
314                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
315            this.name = name;
316            this.agents = agents;
317            this.batchSize = batchSize;
318            this.delayMillis = delayMillis;
319            this.retries = retries;
320            this.conntectTimeoutMillis = connectTimeoutMillis;
321            this.requestTimeoutMillis = requestTimeoutMillis;
322        }
323    }
324
325    /**
326     * Avro Manager Factory.
327     */
328    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
329
330        /**
331         * Create the FlumeAvroManager.
332         * @param name The name of the entity to manage.
333         * @param data The data required to create the entity.
334         * @return The FlumeAvroManager.
335         */
336        @Override
337        public FlumeAvroManager createManager(final String name, final FactoryData data) {
338            try {
339
340                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
341                        data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
342            } catch (final Exception ex) {
343                LOGGER.error("Could not create FlumeAvroManager", ex);
344            }
345            return null;
346        }
347    }
348
349}