001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.ByteArrayOutputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.zip.GZIPOutputStream;
026
027import org.apache.flume.event.SimpleEvent;
028import org.apache.logging.log4j.Level;
029import org.apache.logging.log4j.LoggingException;
030import org.apache.logging.log4j.Marker;
031import org.apache.logging.log4j.ThreadContext;
032import org.apache.logging.log4j.core.LogEvent;
033import org.apache.logging.log4j.core.impl.Log4jLogEvent;
034import org.apache.logging.log4j.core.impl.ThrowableProxy;
035import org.apache.logging.log4j.core.time.Instant;
036import org.apache.logging.log4j.core.util.Patterns;
037import org.apache.logging.log4j.core.util.UuidUtil;
038import org.apache.logging.log4j.message.MapMessage;
039import org.apache.logging.log4j.message.Message;
040import org.apache.logging.log4j.message.StructuredDataId;
041import org.apache.logging.log4j.message.StructuredDataMessage;
042import org.apache.logging.log4j.util.Constants;
043import org.apache.logging.log4j.util.ReadOnlyStringMap;
044import org.apache.logging.log4j.util.Strings;
045
046/**
047 * Class that is both a Flume and Log4j Event.
048 */
049public class FlumeEvent extends SimpleEvent implements LogEvent {
050
051    static final String GUID = "guId";
052    /**
053     * Generated serial version ID.
054     */
055    private static final long serialVersionUID = -8988674608627854140L;
056
057    private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
058
059    private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
060
061    private static final String EVENT_TYPE = "eventType";
062
063    private static final String EVENT_ID = "eventId";
064
065    private static final String TIMESTAMP = "timeStamp";
066
067    private final LogEvent event;
068
069    private final Map<String, String> contextMap = new HashMap<>();
070
071    private final boolean compress;
072
073    /**
074     * Construct the FlumeEvent.
075     * @param event The Log4j LogEvent.
076     * @param includes A comma separated list of MDC elements to include.
077     * @param excludes A comma separated list of MDC elements to exclude.
078     * @param required A comma separated list of MDC elements that are required to be defined.
079     * @param mdcPrefix The value to prefix to MDC keys.
080     * @param eventPrefix The value to prefix to event keys.
081     * @param compress If true the event body should be compressed.
082     */
083    public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
084                      String mdcPrefix, String eventPrefix, final boolean compress) {
085        this.event = event;
086        this.compress = compress;
087        final Map<String, String> headers = getHeaders();
088        headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
089        if (mdcPrefix == null) {
090            mdcPrefix = DEFAULT_MDC_PREFIX;
091        }
092        if (eventPrefix == null) {
093            eventPrefix = DEFAULT_EVENT_PREFIX;
094        }
095        final Map<String, String> mdc = event.getContextData().toMap();
096        if (includes != null) {
097            final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
098            if (array.length > 0) {
099                for (String str : array) {
100                    str = str.trim();
101                    if (mdc.containsKey(str)) {
102                        contextMap.put(str, mdc.get(str));
103                    }
104                }
105            }
106        } else if (excludes != null) {
107            final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
108            if (array.length > 0) {
109                final List<String> list = new ArrayList<>(array.length);
110                for (final String value : array) {
111                    list.add(value.trim());
112                }
113                for (final Map.Entry<String, String> entry : mdc.entrySet()) {
114                    if (!list.contains(entry.getKey())) {
115                        contextMap.put(entry.getKey(), entry.getValue());
116                    }
117                }
118            }
119        } else {
120            contextMap.putAll(mdc);
121        }
122
123        if (required != null) {
124            final String[] array = required.split(Patterns.COMMA_SEPARATOR);
125            if (array.length > 0) {
126                for (String str : array) {
127                    str = str.trim();
128                    if (!mdc.containsKey(str)) {
129                        throw new LoggingException("Required key " + str + " is missing from the MDC");
130                    }
131                }
132            }
133        }
134        final String guid =  UuidUtil.getTimeBasedUuid().toString();
135        final Message message = event.getMessage();
136        if (message instanceof MapMessage) {
137            // Add the guid to the Map so that it can be included in the Layout.
138                @SuppressWarnings("unchecked")
139            final
140                        MapMessage<?, String> stringMapMessage = (MapMessage<?, String>) message;
141                stringMapMessage.put(GUID, guid);
142            if (message instanceof StructuredDataMessage) {
143                addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
144            }
145            addMapData(eventPrefix, headers, stringMapMessage);
146        } else {
147            headers.put(GUID, guid);
148        }
149
150        addContextData(mdcPrefix, headers, contextMap);
151    }
152
153    protected void addStructuredData(final String prefix, final Map<String, String> fields,
154                                     final StructuredDataMessage msg) {
155        fields.put(prefix + EVENT_TYPE, msg.getType());
156        final StructuredDataId id = msg.getId();
157        fields.put(prefix + EVENT_ID, id.getName());
158    }
159
160    protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage<?, String> msg) {
161        final Map<String, String> data = msg.getData();
162        for (final Map.Entry<String, String> entry : data.entrySet()) {
163            fields.put(prefix + entry.getKey(), entry.getValue());
164        }
165    }
166
167    protected void addContextData(final String prefix, final Map<String, String> fields,
168                                  final Map<String, String> context) {
169        final Map<String, String> map = new HashMap<>();
170        for (final Map.Entry<String, String> entry : context.entrySet()) {
171            if (entry.getKey() != null && entry.getValue() != null) {
172                fields.put(prefix + entry.getKey(), entry.getValue());
173                map.put(prefix + entry.getKey(), entry.getValue());
174            }
175        }
176        context.clear();
177        context.putAll(map);
178    }
179
180        @Override
181        public LogEvent toImmutable() {
182                return Log4jLogEvent.createMemento(this);
183        }
184
185    /**
186     * Set the body in the event.
187     * @param body The body to add to the event.
188     */
189    @Override
190    public void setBody(final byte[] body) {
191        if (body == null || body.length == 0) {
192            super.setBody(Constants.EMPTY_BYTE_ARRAY);
193            return;
194        }
195        if (compress) {
196            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
197            try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
198                os.write(body);
199            } catch (final IOException ioe) {
200                throw new LoggingException("Unable to compress message", ioe);
201            }
202            super.setBody(baos.toByteArray());
203        } else {
204            super.setBody(body);
205        }
206    }
207
208    /**
209     * Get the Frequently Qualified Class Name.
210     * @return the FQCN String.
211     */
212    @Override
213    public String getLoggerFqcn() {
214        return event.getLoggerFqcn();
215    }
216
217    /**
218     * Returns the logging Level.
219     * @return the Level.
220     */
221    @Override
222    public Level getLevel() {
223        return event.getLevel();
224    }
225
226    /**
227     * Returns the logger name.
228     * @return the logger name.
229     */
230    @Override
231    public String getLoggerName() {
232        return event.getLoggerName();
233    }
234
235    /**
236     * Returns the StackTraceElement for the caller of the logging API.
237     * @return the StackTraceElement of the caller.
238     */
239    @Override
240    public StackTraceElement getSource() {
241        return event.getSource();
242    }
243
244    /**
245     * Returns the Message.
246     * @return the Message.
247     */
248    @Override
249    public Message getMessage() {
250        return event.getMessage();
251    }
252
253    /**
254     * Returns the Marker.
255     * @return the Marker.
256     */
257    @Override
258    public Marker getMarker() {
259        return event.getMarker();
260    }
261
262    /**
263     * Returns the ID of the Thread.
264     * @return the ID of the Thread.
265     */
266    @Override
267    public long getThreadId() {
268        return event.getThreadId();
269    }
270
271    /**
272     * Returns the priority of the Thread.
273     * @return the priority of the Thread.
274     */
275    @Override
276    public int getThreadPriority() {
277        return event.getThreadPriority();
278    }
279
280    /**
281     * Returns the name of the Thread.
282     * @return the name of the Thread.
283     */
284    @Override
285    public String getThreadName() {
286        return event.getThreadName();
287    }
288
289    /**
290     * Returns the event timestamp.
291     * @return the event timestamp.
292     */
293    @Override
294    public long getTimeMillis() {
295        return event.getTimeMillis();
296    }
297
298    /**
299     * {@inheritDoc}
300     * @since 2.11
301     */
302    @Override
303    public Instant getInstant() {
304        return event.getInstant();
305    }
306
307    /**
308     * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created,
309     * or a dummy value if it is known that this value will not be used downstream.
310     * @return the event nanosecond timestamp.
311     */
312    @Override
313    public long getNanoTime() {
314        return event.getNanoTime();
315    }
316
317    /**
318     * Returns the Throwable associated with the event, if any.
319     * @return the Throwable.
320     */
321    @Override
322    public Throwable getThrown() {
323        return event.getThrown();
324    }
325
326    /**
327     * Returns the Throwable associated with the event, if any.
328     * @return the Throwable.
329     */
330    @Override
331    public ThrowableProxy getThrownProxy() {
332        return event.getThrownProxy();
333    }
334
335    /**
336     * Returns a copy of the context Map.
337     * @return a copy of the context Map.
338     */
339    @Override
340    public Map<String, String> getContextMap() {
341        return contextMap;
342    }
343
344    /**
345     * Returns the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
346     * @return the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
347     */
348    @Override
349    public ReadOnlyStringMap getContextData() {
350        return event.getContextData();
351    }
352
353    /**
354     * Returns a copy of the context stack.
355     * @return a copy of the context stack.
356     */
357    @Override
358    public ThreadContext.ContextStack getContextStack() {
359        return event.getContextStack();
360    }
361
362    @Override
363    public boolean isIncludeLocation() {
364        return event.isIncludeLocation();
365    }
366
367    @Override
368    public void setIncludeLocation(final boolean includeLocation) {
369        event.setIncludeLocation(includeLocation);
370    }
371
372    @Override
373    public boolean isEndOfBatch() {
374        return event.isEndOfBatch();
375    }
376
377    @Override
378    public void setEndOfBatch(final boolean endOfBatch) {
379        event.setEndOfBatch(endOfBatch);
380    }
381
382}