1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.zip.GZIPOutputStream;
26
27 import org.apache.flume.event.SimpleEvent;
28 import org.apache.logging.log4j.Level;
29 import org.apache.logging.log4j.LoggingException;
30 import org.apache.logging.log4j.Marker;
31 import org.apache.logging.log4j.ThreadContext;
32 import org.apache.logging.log4j.core.LogEvent;
33 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
34 import org.apache.logging.log4j.core.impl.ThrowableProxy;
35 import org.apache.logging.log4j.core.time.Instant;
36 import org.apache.logging.log4j.core.util.Patterns;
37 import org.apache.logging.log4j.core.util.UuidUtil;
38 import org.apache.logging.log4j.message.MapMessage;
39 import org.apache.logging.log4j.message.Message;
40 import org.apache.logging.log4j.message.StructuredDataId;
41 import org.apache.logging.log4j.message.StructuredDataMessage;
42 import org.apache.logging.log4j.util.Constants;
43 import org.apache.logging.log4j.util.ReadOnlyStringMap;
44 import org.apache.logging.log4j.util.Strings;
45
46
47
48
49 public class FlumeEvent extends SimpleEvent implements LogEvent {
50
51 static final String GUID = "guId";
52
53
54
55 private static final long serialVersionUID = -8988674608627854140L;
56
57 private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
58
59 private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
60
61 private static final String EVENT_TYPE = "eventType";
62
63 private static final String EVENT_ID = "eventId";
64
65 private static final String TIMESTAMP = "timeStamp";
66
67 private final LogEvent event;
68
69 private final Map<String, String> contextMap = new HashMap<>();
70
71 private final boolean compress;
72
73
74
75
76
77
78
79
80
81
82
83 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
84 String mdcPrefix, String eventPrefix, final boolean compress) {
85 this.event = event;
86 this.compress = compress;
87 final Map<String, String> headers = getHeaders();
88 headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
89 if (mdcPrefix == null) {
90 mdcPrefix = DEFAULT_MDC_PREFIX;
91 }
92 if (eventPrefix == null) {
93 eventPrefix = DEFAULT_EVENT_PREFIX;
94 }
95 final Map<String, String> mdc = event.getContextData().toMap();
96 if (includes != null) {
97 final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
98 if (array.length > 0) {
99 for (String str : array) {
100 str = str.trim();
101 if (mdc.containsKey(str)) {
102 contextMap.put(str, mdc.get(str));
103 }
104 }
105 }
106 } else if (excludes != null) {
107 final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
108 if (array.length > 0) {
109 final List<String> list = new ArrayList<>(array.length);
110 for (final String value : array) {
111 list.add(value.trim());
112 }
113 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
114 if (!list.contains(entry.getKey())) {
115 contextMap.put(entry.getKey(), entry.getValue());
116 }
117 }
118 }
119 } else {
120 contextMap.putAll(mdc);
121 }
122
123 if (required != null) {
124 final String[] array = required.split(Patterns.COMMA_SEPARATOR);
125 if (array.length > 0) {
126 for (String str : array) {
127 str = str.trim();
128 if (!mdc.containsKey(str)) {
129 throw new LoggingException("Required key " + str + " is missing from the MDC");
130 }
131 }
132 }
133 }
134 final String guid = UuidUtil.getTimeBasedUuid().toString();
135 final Message message = event.getMessage();
136 if (message instanceof MapMessage) {
137
138 @SuppressWarnings("unchecked")
139 final
140 MapMessage<?, String> stringMapMessage = (MapMessage<?, String>) message;
141 stringMapMessage.put(GUID, guid);
142 if (message instanceof StructuredDataMessage) {
143 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
144 }
145 addMapData(eventPrefix, headers, stringMapMessage);
146 } else {
147 headers.put(GUID, guid);
148 }
149
150 addContextData(mdcPrefix, headers, contextMap);
151 }
152
153 protected void addStructuredData(final String prefix, final Map<String, String> fields,
154 final StructuredDataMessage msg) {
155 fields.put(prefix + EVENT_TYPE, msg.getType());
156 final StructuredDataId id = msg.getId();
157 fields.put(prefix + EVENT_ID, id.getName());
158 }
159
160 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage<?, String> msg) {
161 final Map<String, String> data = msg.getData();
162 for (final Map.Entry<String, String> entry : data.entrySet()) {
163 fields.put(prefix + entry.getKey(), entry.getValue());
164 }
165 }
166
167 protected void addContextData(final String prefix, final Map<String, String> fields,
168 final Map<String, String> context) {
169 final Map<String, String> map = new HashMap<>();
170 for (final Map.Entry<String, String> entry : context.entrySet()) {
171 if (entry.getKey() != null && entry.getValue() != null) {
172 fields.put(prefix + entry.getKey(), entry.getValue());
173 map.put(prefix + entry.getKey(), entry.getValue());
174 }
175 }
176 context.clear();
177 context.putAll(map);
178 }
179
180 @Override
181 public LogEvent toImmutable() {
182 return Log4jLogEvent.createMemento(this);
183 }
184
185
186
187
188
189 @Override
190 public void setBody(final byte[] body) {
191 if (body == null || body.length == 0) {
192 super.setBody(Constants.EMPTY_BYTE_ARRAY);
193 return;
194 }
195 if (compress) {
196 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
197 try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
198 os.write(body);
199 } catch (final IOException ioe) {
200 throw new LoggingException("Unable to compress message", ioe);
201 }
202 super.setBody(baos.toByteArray());
203 } else {
204 super.setBody(body);
205 }
206 }
207
208
209
210
211
212 @Override
213 public String getLoggerFqcn() {
214 return event.getLoggerFqcn();
215 }
216
217
218
219
220
221 @Override
222 public Level getLevel() {
223 return event.getLevel();
224 }
225
226
227
228
229
230 @Override
231 public String getLoggerName() {
232 return event.getLoggerName();
233 }
234
235
236
237
238
239 @Override
240 public StackTraceElement getSource() {
241 return event.getSource();
242 }
243
244
245
246
247
248 @Override
249 public Message getMessage() {
250 return event.getMessage();
251 }
252
253
254
255
256
257 @Override
258 public Marker getMarker() {
259 return event.getMarker();
260 }
261
262
263
264
265
266 @Override
267 public long getThreadId() {
268 return event.getThreadId();
269 }
270
271
272
273
274
275 @Override
276 public int getThreadPriority() {
277 return event.getThreadPriority();
278 }
279
280
281
282
283
284 @Override
285 public String getThreadName() {
286 return event.getThreadName();
287 }
288
289
290
291
292
293 @Override
294 public long getTimeMillis() {
295 return event.getTimeMillis();
296 }
297
298
299
300
301
302 @Override
303 public Instant getInstant() {
304 return event.getInstant();
305 }
306
307
308
309
310
311
312 @Override
313 public long getNanoTime() {
314 return event.getNanoTime();
315 }
316
317
318
319
320
321 @Override
322 public Throwable getThrown() {
323 return event.getThrown();
324 }
325
326
327
328
329
330 @Override
331 public ThrowableProxy getThrownProxy() {
332 return event.getThrownProxy();
333 }
334
335
336
337
338
339 @Override
340 public Map<String, String> getContextMap() {
341 return contextMap;
342 }
343
344
345
346
347
348 @Override
349 public ReadOnlyStringMap getContextData() {
350 return event.getContextData();
351 }
352
353
354
355
356
357 @Override
358 public ThreadContext.ContextStack getContextStack() {
359 return event.getContextStack();
360 }
361
362 @Override
363 public boolean isIncludeLocation() {
364 return event.isIncludeLocation();
365 }
366
367 @Override
368 public void setIncludeLocation(final boolean includeLocation) {
369 event.setIncludeLocation(includeLocation);
370 }
371
372 @Override
373 public boolean isEndOfBatch() {
374 return event.isEndOfBatch();
375 }
376
377 @Override
378 public void setEndOfBatch(final boolean endOfBatch) {
379 event.setEndOfBatch(endOfBatch);
380 }
381
382 }