View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.zip.GZIPOutputStream;
26  
27  import org.apache.flume.event.SimpleEvent;
28  import org.apache.logging.log4j.Level;
29  import org.apache.logging.log4j.LoggingException;
30  import org.apache.logging.log4j.Marker;
31  import org.apache.logging.log4j.ThreadContext;
32  import org.apache.logging.log4j.core.LogEvent;
33  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
34  import org.apache.logging.log4j.core.impl.ThrowableProxy;
35  import org.apache.logging.log4j.core.time.Instant;
36  import org.apache.logging.log4j.core.util.Patterns;
37  import org.apache.logging.log4j.core.util.UuidUtil;
38  import org.apache.logging.log4j.message.MapMessage;
39  import org.apache.logging.log4j.message.Message;
40  import org.apache.logging.log4j.message.StructuredDataId;
41  import org.apache.logging.log4j.message.StructuredDataMessage;
42  import org.apache.logging.log4j.util.Constants;
43  import org.apache.logging.log4j.util.ReadOnlyStringMap;
44  import org.apache.logging.log4j.util.Strings;
45  
46  /**
47   * Class that is both a Flume and Log4j Event.
48   */
49  public class FlumeEvent extends SimpleEvent implements LogEvent {
50  
51      static final String GUID = "guId";
52      /**
53       * Generated serial version ID.
54       */
55      private static final long serialVersionUID = -8988674608627854140L;
56  
57      private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
58  
59      private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
60  
61      private static final String EVENT_TYPE = "eventType";
62  
63      private static final String EVENT_ID = "eventId";
64  
65      private static final String TIMESTAMP = "timeStamp";
66  
67      private final LogEvent event;
68  
69      private final Map<String, String> contextMap = new HashMap<>();
70  
71      private final boolean compress;
72  
73      /**
74       * Construct the FlumeEvent.
75       * @param event The Log4j LogEvent.
76       * @param includes A comma separated list of MDC elements to include.
77       * @param excludes A comma separated list of MDC elements to exclude.
78       * @param required A comma separated list of MDC elements that are required to be defined.
79       * @param mdcPrefix The value to prefix to MDC keys.
80       * @param eventPrefix The value to prefix to event keys.
81       * @param compress If true the event body should be compressed.
82       */
83      public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
84                        String mdcPrefix, String eventPrefix, final boolean compress) {
85          this.event = event;
86          this.compress = compress;
87          final Map<String, String> headers = getHeaders();
88          headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
89          if (mdcPrefix == null) {
90              mdcPrefix = DEFAULT_MDC_PREFIX;
91          }
92          if (eventPrefix == null) {
93              eventPrefix = DEFAULT_EVENT_PREFIX;
94          }
95          final Map<String, String> mdc = event.getContextData().toMap();
96          if (includes != null) {
97              final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
98              if (array.length > 0) {
99                  for (String str : array) {
100                     str = str.trim();
101                     if (mdc.containsKey(str)) {
102                         contextMap.put(str, mdc.get(str));
103                     }
104                 }
105             }
106         } else if (excludes != null) {
107             final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
108             if (array.length > 0) {
109                 final List<String> list = new ArrayList<>(array.length);
110                 for (final String value : array) {
111                     list.add(value.trim());
112                 }
113                 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
114                     if (!list.contains(entry.getKey())) {
115                         contextMap.put(entry.getKey(), entry.getValue());
116                     }
117                 }
118             }
119         } else {
120             contextMap.putAll(mdc);
121         }
122 
123         if (required != null) {
124             final String[] array = required.split(Patterns.COMMA_SEPARATOR);
125             if (array.length > 0) {
126                 for (String str : array) {
127                     str = str.trim();
128                     if (!mdc.containsKey(str)) {
129                         throw new LoggingException("Required key " + str + " is missing from the MDC");
130                     }
131                 }
132             }
133         }
134         final String guid =  UuidUtil.getTimeBasedUuid().toString();
135         final Message message = event.getMessage();
136         if (message instanceof MapMessage) {
137             // Add the guid to the Map so that it can be included in the Layout.
138         	@SuppressWarnings("unchecked")
139             final
140 			MapMessage<?, String> stringMapMessage = (MapMessage<?, String>) message;
141         	stringMapMessage.put(GUID, guid);
142             if (message instanceof StructuredDataMessage) {
143                 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
144             }
145             addMapData(eventPrefix, headers, stringMapMessage);
146         } else {
147             headers.put(GUID, guid);
148         }
149 
150         addContextData(mdcPrefix, headers, contextMap);
151     }
152 
153     protected void addStructuredData(final String prefix, final Map<String, String> fields,
154                                      final StructuredDataMessage msg) {
155         fields.put(prefix + EVENT_TYPE, msg.getType());
156         final StructuredDataId id = msg.getId();
157         fields.put(prefix + EVENT_ID, id.getName());
158     }
159 
160     protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage<?, String> msg) {
161         final Map<String, String> data = msg.getData();
162         for (final Map.Entry<String, String> entry : data.entrySet()) {
163             fields.put(prefix + entry.getKey(), entry.getValue());
164         }
165     }
166 
167     protected void addContextData(final String prefix, final Map<String, String> fields,
168                                   final Map<String, String> context) {
169         final Map<String, String> map = new HashMap<>();
170         for (final Map.Entry<String, String> entry : context.entrySet()) {
171             if (entry.getKey() != null && entry.getValue() != null) {
172                 fields.put(prefix + entry.getKey(), entry.getValue());
173                 map.put(prefix + entry.getKey(), entry.getValue());
174             }
175         }
176         context.clear();
177         context.putAll(map);
178     }
179 
180 	@Override
181 	public LogEvent toImmutable() {
182 		return Log4jLogEvent.createMemento(this);
183 	}
184 
185     /**
186      * Set the body in the event.
187      * @param body The body to add to the event.
188      */
189     @Override
190     public void setBody(final byte[] body) {
191         if (body == null || body.length == 0) {
192             super.setBody(Constants.EMPTY_BYTE_ARRAY);
193             return;
194         }
195         if (compress) {
196             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
197             try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
198                 os.write(body);
199             } catch (final IOException ioe) {
200                 throw new LoggingException("Unable to compress message", ioe);
201             }
202             super.setBody(baos.toByteArray());
203         } else {
204             super.setBody(body);
205         }
206     }
207 
208     /**
209      * Get the Frequently Qualified Class Name.
210      * @return the FQCN String.
211      */
212     @Override
213     public String getLoggerFqcn() {
214         return event.getLoggerFqcn();
215     }
216 
217     /**
218      * Returns the logging Level.
219      * @return the Level.
220      */
221     @Override
222     public Level getLevel() {
223         return event.getLevel();
224     }
225 
226     /**
227      * Returns the logger name.
228      * @return the logger name.
229      */
230     @Override
231     public String getLoggerName() {
232         return event.getLoggerName();
233     }
234 
235     /**
236      * Returns the StackTraceElement for the caller of the logging API.
237      * @return the StackTraceElement of the caller.
238      */
239     @Override
240     public StackTraceElement getSource() {
241         return event.getSource();
242     }
243 
244     /**
245      * Returns the Message.
246      * @return the Message.
247      */
248     @Override
249     public Message getMessage() {
250         return event.getMessage();
251     }
252 
253     /**
254      * Returns the Marker.
255      * @return the Marker.
256      */
257     @Override
258     public Marker getMarker() {
259         return event.getMarker();
260     }
261 
262     /**
263      * Returns the ID of the Thread.
264      * @return the ID of the Thread.
265      */
266     @Override
267     public long getThreadId() {
268         return event.getThreadId();
269     }
270 
271     /**
272      * Returns the priority of the Thread.
273      * @return the priority of the Thread.
274      */
275     @Override
276     public int getThreadPriority() {
277         return event.getThreadPriority();
278     }
279 
280     /**
281      * Returns the name of the Thread.
282      * @return the name of the Thread.
283      */
284     @Override
285     public String getThreadName() {
286         return event.getThreadName();
287     }
288 
289     /**
290      * Returns the event timestamp.
291      * @return the event timestamp.
292      */
293     @Override
294     public long getTimeMillis() {
295         return event.getTimeMillis();
296     }
297 
298     /**
299      * {@inheritDoc}
300      * @since 2.11
301      */
302     @Override
303     public Instant getInstant() {
304         return event.getInstant();
305     }
306 
307     /**
308      * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created,
309      * or a dummy value if it is known that this value will not be used downstream.
310      * @return the event nanosecond timestamp.
311      */
312     @Override
313     public long getNanoTime() {
314         return event.getNanoTime();
315     }
316 
317     /**
318      * Returns the Throwable associated with the event, if any.
319      * @return the Throwable.
320      */
321     @Override
322     public Throwable getThrown() {
323         return event.getThrown();
324     }
325 
326     /**
327      * Returns the Throwable associated with the event, if any.
328      * @return the Throwable.
329      */
330     @Override
331     public ThrowableProxy getThrownProxy() {
332         return event.getThrownProxy();
333     }
334 
335     /**
336      * Returns a copy of the context Map.
337      * @return a copy of the context Map.
338      */
339     @Override
340     public Map<String, String> getContextMap() {
341         return contextMap;
342     }
343 
344     /**
345      * Returns the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
346      * @return the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
347      */
348     @Override
349     public ReadOnlyStringMap getContextData() {
350         return event.getContextData();
351     }
352 
353     /**
354      * Returns a copy of the context stack.
355      * @return a copy of the context stack.
356      */
357     @Override
358     public ThreadContext.ContextStack getContextStack() {
359         return event.getContextStack();
360     }
361 
362     @Override
363     public boolean isIncludeLocation() {
364         return event.isIncludeLocation();
365     }
366 
367     @Override
368     public void setIncludeLocation(final boolean includeLocation) {
369         event.setIncludeLocation(includeLocation);
370     }
371 
372     @Override
373     public boolean isEndOfBatch() {
374         return event.isEndOfBatch();
375     }
376 
377     @Override
378     public void setEndOfBatch(final boolean endOfBatch) {
379         event.setEndOfBatch(endOfBatch);
380     }
381 
382 }