1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.util.HashMap;
20 import java.util.Locale;
21 import java.util.Map;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.flume.Event;
25 import org.apache.flume.EventDeliveryException;
26 import org.apache.flume.agent.embedded.EmbeddedAgent;
27 import org.apache.logging.log4j.LoggingException;
28 import org.apache.logging.log4j.core.appender.ManagerFactory;
29 import org.apache.logging.log4j.core.config.ConfigurationException;
30 import org.apache.logging.log4j.core.config.Property;
31 import org.apache.logging.log4j.core.util.NameUtil;
32 import org.apache.logging.log4j.util.PropertiesUtil;
33 import org.apache.logging.log4j.util.Strings;
34
35 public class FlumeEmbeddedManager extends AbstractFlumeManager {
36
37 private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
38
39 private static final String IN_MEMORY = "InMemory";
40
41 private static final FlumeManagerFactory FACTORY = new FlumeManagerFactory();
42
43 private final EmbeddedAgent agent;
44
45 private final String shortName;
46
47
48
49
50
51
52
53
54 protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
55 super(name);
56 this.agent = agent;
57 this.shortName = shortName;
58 }
59
60
61
62
63
64
65
66
67
68
69 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
70 int batchSize, final String dataDir) {
71
72 if (batchSize <= 0) {
73 batchSize = 1;
74 }
75
76 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
77 throw new IllegalArgumentException("Either an Agent or properties are required");
78 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
79 throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
80 }
81
82 final String extendedName = extendManagerName(name, agents, properties);
83 return getManager(extendedName, FACTORY,
84 new FactoryData(name, agents, properties, batchSize, dataDir));
85
86 }
87
88 private static String extendManagerName(
89 final String name,
90 final Agent[] agents,
91 final Property[] properties) {
92
93 final StringBuilder sb = new StringBuilder();
94 boolean first = true;
95
96 if (agents != null && agents.length > 0) {
97 sb.append(name).append('[');
98 for (final Agent agent : agents) {
99 if (!first) {
100 sb.append('_');
101 }
102 sb.append(agent.getHost()).append('-').append(agent.getPort());
103 first = false;
104 }
105 sb.append(']');
106 } else {
107 String sep = Strings.EMPTY;
108 sb.append(name).append('-');
109 final StringBuilder props = new StringBuilder();
110 for (final Property prop : properties) {
111 props.append(sep);
112 props.append(prop.getName()).append('=').append(prop.getValue());
113 sep = "_";
114 }
115 sb.append(NameUtil.md5(props.toString()));
116 }
117
118 return sb.toString();
119
120 }
121
122 @Override
123 public void send(final Event event) {
124 try {
125 agent.put(event);
126 } catch (final EventDeliveryException ex) {
127 throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);
128 }
129 }
130
131 @Override
132 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
133 agent.stop();
134 return true;
135 }
136
137
138
139
140 private static class FactoryData {
141 private final Agent[] agents;
142 private final Property[] properties;
143 private final int batchSize;
144 private final String dataDir;
145 private final String name;
146
147
148
149
150
151
152
153
154
155 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
156 final String dataDir) {
157 this.name = name;
158 this.agents = agents;
159 this.batchSize = batchSize;
160 this.properties = properties;
161 this.dataDir = dataDir;
162 }
163 }
164
165
166
167
168 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
169
170
171
172
173
174
175
176 @Override
177 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
178 try {
179 final Map<String, String> props = createProperties(data.name, data.agents, data.properties,
180 data.batchSize, data.dataDir);
181 final EmbeddedAgent agent = new EmbeddedAgent(name);
182 agent.configure(props);
183 agent.start();
184 LOGGER.debug("Created Agent " + name);
185 return new FlumeEmbeddedManager(name, data.name, agent);
186 } catch (final Exception ex) {
187 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
188 }
189 return null;
190 }
191
192 private Map<String, String> createProperties(final String name, final Agent[] agents,
193 final Property[] properties, final int batchSize, String dataDir) {
194 final Map<String, String> props = new HashMap<>();
195
196 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
197 LOGGER.error("No Flume configuration provided");
198 throw new ConfigurationException("No Flume configuration provided");
199 }
200
201 if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
202 LOGGER.error("Agents and Flume configuration cannot both be specified");
203 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
204 }
205
206 if (agents != null && agents.length > 0) {
207
208 if (Strings.isNotEmpty(dataDir)) {
209 if (dataDir.equals(IN_MEMORY)) {
210 props.put("channel.type", "memory");
211 } else {
212 props.put("channel.type", "file");
213
214 if (!dataDir.endsWith(FILE_SEP)) {
215 dataDir = dataDir + FILE_SEP;
216 }
217
218 props.put("channel.checkpointDir", dataDir + "checkpoint");
219 props.put("channel.dataDirs", dataDir + "data");
220 }
221
222 } else {
223 props.put("channel.type", "file");
224 }
225
226 final StringBuilder sb = new StringBuilder();
227 String leading = Strings.EMPTY;
228 final int priority = agents.length;
229 for (int i = 0; i < priority; ++i) {
230 sb.append(leading).append("agent").append(i);
231 leading = " ";
232 final String prefix = "agent" + i;
233 props.put(prefix + ".type", "avro");
234 props.put(prefix + ".hostname", agents[i].getHost());
235 props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
236 props.put(prefix + ".batch-size", Integer.toString(batchSize));
237 props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
238 }
239 props.put("sinks", sb.toString());
240 props.put("processor.type", "failover");
241 } else {
242 String[] sinks = null;
243
244 for (final Property property : properties) {
245 final String key = property.getName();
246
247 if (Strings.isEmpty(key)) {
248 final String msg = "A property name must be provided";
249 LOGGER.error(msg);
250 throw new ConfigurationException(msg);
251 }
252
253 final String upperKey = key.toUpperCase(Locale.ENGLISH);
254
255 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
256 final String msg =
257 "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
258 LOGGER.error(msg);
259 throw new ConfigurationException(msg);
260 }
261
262 final String value = property.getValue();
263 if (Strings.isEmpty(value)) {
264 final String msg = "A value for property " + key + " must be provided";
265 LOGGER.error(msg);
266 throw new ConfigurationException(msg);
267 }
268
269 if (upperKey.equals("SINKS")) {
270 sinks = value.trim().split(" ");
271 }
272
273 props.put(key, value);
274 }
275
276 if (sinks == null || sinks.length == 0) {
277 final String msg = "At least one Sink must be specified";
278 LOGGER.error(msg);
279 throw new ConfigurationException(msg);
280 }
281 }
282 return props;
283 }
284
285 }
286
287 }