001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.util.Properties; 020import java.util.concurrent.TimeUnit; 021 022import org.apache.flume.Event; 023import org.apache.flume.api.RpcClient; 024import org.apache.flume.api.RpcClientFactory; 025import org.apache.logging.log4j.core.appender.AppenderLoggingException; 026import org.apache.logging.log4j.core.appender.ManagerFactory; 027 028/** 029 * Manager for FlumeAvroAppenders. 030 */ 031public class FlumeAvroManager extends AbstractFlumeManager { 032 033 private static final int MAX_RECONNECTS = 3; 034 private static final int MINIMUM_TIMEOUT = 1000; 035 036 private static AvroManagerFactory factory = new AvroManagerFactory(); 037 038 private final Agent[] agents; 039 040 private final int batchSize; 041 042 private final long delayNanos; 043 private final int delayMillis; 044 045 private final int retries; 046 047 private final int connectTimeoutMillis; 048 049 private final int requestTimeoutMillis; 050 051 private final int current = 0; 052 053 private volatile RpcClient rpcClient; 054 055 private BatchEvent batchEvent = new BatchEvent(); 056 private long nextSend = 0; 057 058 /** 059 * Constructor 060 * @param name The unique name of this manager. 061 * @param agents An array of Agents. 062 * @param batchSize The number of events to include in a batch. 063 * @param retries The number of times to retry connecting before giving up. 064 * @param connectTimeout The connection timeout in ms. 065 * @param requestTimeout The request timeout in ms. 066 * 067 */ 068 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize, 069 final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) { 070 super(name); 071 this.agents = agents; 072 this.batchSize = batchSize; 073 this.delayMillis = delayMillis; 074 this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis); 075 this.retries = retries; 076 this.connectTimeoutMillis = connectTimeout; 077 this.requestTimeoutMillis = requestTimeout; 078 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout); 079 } 080 081 /** 082 * Returns a FlumeAvroManager. 083 * @param name The name of the manager. 084 * @param agents The agents to use. 085 * @param batchSize The number of events to include in a batch. 086 * @param delayMillis The number of milliseconds to wait before sending an incomplete batch. 087 * @param retries The number of times to retry connecting before giving up. 088 * @param connectTimeoutMillis The connection timeout in ms. 089 * @param requestTimeoutMillis The request timeout in ms. 090 * @return A FlumeAvroManager. 091 */ 092 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis, 093 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 094 if (agents == null || agents.length == 0) { 095 throw new IllegalArgumentException("At least one agent is required"); 096 } 097 098 if (batchSize <= 0) { 099 batchSize = 1; 100 } 101 final StringBuilder sb = new StringBuilder(name); 102 sb.append(" FlumeAvro["); 103 boolean first = true; 104 for (final Agent agent : agents) { 105 if (!first) { 106 sb.append(','); 107 } 108 sb.append(agent.getHost()).append(':').append(agent.getPort()); 109 first = false; 110 } 111 sb.append(']'); 112 return getManager(sb.toString(), factory, 113 new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis)); 114 } 115 116 /** 117 * Returns the agents. 118 * @return The agent array. 119 */ 120 public Agent[] getAgents() { 121 return agents; 122 } 123 124 /** 125 * Returns the index of the current agent. 126 * @return The index for the current agent. 127 */ 128 public int getCurrent() { 129 return current; 130 } 131 132 public int getRetries() { 133 return retries; 134 } 135 136 public int getConnectTimeoutMillis() { 137 return connectTimeoutMillis; 138 } 139 140 public int getRequestTimeoutMillis() { 141 return requestTimeoutMillis; 142 } 143 144 public int getBatchSize() { 145 return batchSize; 146 } 147 148 public int getDelayMillis() { 149 return delayMillis; 150 } 151 152 public void send(final BatchEvent events) { 153 if (rpcClient == null) { 154 synchronized (this) { 155 if (rpcClient == null) { 156 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); 157 } 158 } 159 } 160 161 if (rpcClient != null) { 162 try { 163 LOGGER.trace("Sending batch of {} events", events.getEvents().size()); 164 rpcClient.appendBatch(events.getEvents()); 165 } catch (final Exception ex) { 166 rpcClient.close(); 167 rpcClient = null; 168 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 169 agents[current].getPort(); 170 LOGGER.warn(msg, ex); 171 throw new AppenderLoggingException("No Flume agents are available"); 172 } 173 } else { 174 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 175 agents[current].getPort(); 176 LOGGER.warn(msg); 177 throw new AppenderLoggingException("No Flume agents are available"); 178 } 179 } 180 181 @Override 182 public void send(final Event event) { 183 if (batchSize == 1) { 184 if (rpcClient == null) { 185 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); 186 } 187 188 if (rpcClient != null) { 189 try { 190 rpcClient.append(event); 191 } catch (final Exception ex) { 192 rpcClient.close(); 193 rpcClient = null; 194 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 195 agents[current].getPort(); 196 LOGGER.warn(msg, ex); 197 throw new AppenderLoggingException("No Flume agents are available"); 198 } 199 } else { 200 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 201 agents[current].getPort(); 202 LOGGER.warn(msg); 203 throw new AppenderLoggingException("No Flume agents are available"); 204 } 205 } else { 206 int eventCount; 207 BatchEvent batch = null; 208 synchronized(batchEvent) { 209 batchEvent.addEvent(event); 210 eventCount = batchEvent.size(); 211 long now = System.nanoTime(); 212 if (eventCount == 1) { 213 nextSend = now + delayNanos; 214 } 215 if (eventCount >= batchSize || now >= nextSend) { 216 batch = batchEvent; 217 batchEvent = new BatchEvent(); 218 } 219 } 220 if (batch != null) { 221 send(batch); 222 } 223 } 224 } 225 226 /** 227 * There is a very good chance that this will always return the first agent even if it isn't available. 228 * @param agents The list of agents to choose from 229 * @return The FlumeEventAvroServer. 230 */ 231 private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 232 try { 233 final Properties props = new Properties(); 234 235 props.put("client.type", "default_failover"); 236 237 int agentCount = 1; 238 final StringBuilder sb = new StringBuilder(); 239 for (final Agent agent : agents) { 240 if (sb.length() > 0) { 241 sb.append(' '); 242 } 243 final String hostName = "host" + agentCount++; 244 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort()); 245 sb.append(hostName); 246 } 247 props.put("hosts", sb.toString()); 248 if (batchSize > 0) { 249 props.put("batch-size", Integer.toString(batchSize)); 250 } 251 if (retries > 1) { 252 if (retries > MAX_RECONNECTS) { 253 retries = MAX_RECONNECTS; 254 } 255 props.put("max-attempts", Integer.toString(retries * agents.length)); 256 } 257 if (requestTimeoutMillis >= MINIMUM_TIMEOUT) { 258 props.put("request-timeout", Integer.toString(requestTimeoutMillis)); 259 } 260 if (connectTimeoutMillis >= MINIMUM_TIMEOUT) { 261 props.put("connect-timeout", Integer.toString(connectTimeoutMillis)); 262 } 263 return RpcClientFactory.getInstance(props); 264 } catch (final Exception ex) { 265 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage()); 266 return null; 267 } 268 } 269 270 @Override 271 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 272 boolean closed = true; 273 if (rpcClient != null) { 274 try { 275 synchronized(this) { 276 try { 277 if (batchSize > 1 && batchEvent.getEvents().size() > 0) { 278 send(batchEvent); 279 } 280 } catch (final Exception ex) { 281 LOGGER.error("Error sending final batch: {}", ex.getMessage()); 282 closed = false; 283 } 284 } 285 rpcClient.close(); 286 } catch (final Exception ex) { 287 LOGGER.error("Attempt to close RPC client failed", ex); 288 closed = false; 289 } 290 } 291 rpcClient = null; 292 return closed; 293 } 294 295 /** 296 * Factory data. 297 */ 298 private static class FactoryData { 299 private final String name; 300 private final Agent[] agents; 301 private final int batchSize; 302 private final int delayMillis; 303 private final int retries; 304 private final int conntectTimeoutMillis; 305 private final int requestTimeoutMillis; 306 307 /** 308 * Constructor. 309 * @param name The name of the Appender. 310 * @param agents The agents. 311 * @param batchSize The number of events to include in a batch. 312 */ 313 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis, 314 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 315 this.name = name; 316 this.agents = agents; 317 this.batchSize = batchSize; 318 this.delayMillis = delayMillis; 319 this.retries = retries; 320 this.conntectTimeoutMillis = connectTimeoutMillis; 321 this.requestTimeoutMillis = requestTimeoutMillis; 322 } 323 } 324 325 /** 326 * Avro Manager Factory. 327 */ 328 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { 329 330 /** 331 * Create the FlumeAvroManager. 332 * @param name The name of the entity to manage. 333 * @param data The data required to create the entity. 334 * @return The FlumeAvroManager. 335 */ 336 @Override 337 public FlumeAvroManager createManager(final String name, final FactoryData data) { 338 try { 339 340 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis, 341 data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis); 342 } catch (final Exception ex) { 343 LOGGER.error("Could not create FlumeAvroManager", ex); 344 } 345 return null; 346 } 347 } 348 349}