001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.ByteArrayOutputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.zip.GZIPOutputStream; 026 027import org.apache.flume.event.SimpleEvent; 028import org.apache.logging.log4j.Level; 029import org.apache.logging.log4j.LoggingException; 030import org.apache.logging.log4j.Marker; 031import org.apache.logging.log4j.ThreadContext; 032import org.apache.logging.log4j.core.LogEvent; 033import org.apache.logging.log4j.core.impl.Log4jLogEvent; 034import org.apache.logging.log4j.core.impl.ThrowableProxy; 035import org.apache.logging.log4j.core.time.Instant; 036import org.apache.logging.log4j.core.util.Patterns; 037import org.apache.logging.log4j.core.util.UuidUtil; 038import org.apache.logging.log4j.message.MapMessage; 039import org.apache.logging.log4j.message.Message; 040import org.apache.logging.log4j.message.StructuredDataId; 041import org.apache.logging.log4j.message.StructuredDataMessage; 042import org.apache.logging.log4j.util.Constants; 043import org.apache.logging.log4j.util.ReadOnlyStringMap; 044import org.apache.logging.log4j.util.Strings; 045 046/** 047 * Class that is both a Flume and Log4j Event. 048 */ 049public class FlumeEvent extends SimpleEvent implements LogEvent { 050 051 static final String GUID = "guId"; 052 /** 053 * Generated serial version ID. 054 */ 055 private static final long serialVersionUID = -8988674608627854140L; 056 057 private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY; 058 059 private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY; 060 061 private static final String EVENT_TYPE = "eventType"; 062 063 private static final String EVENT_ID = "eventId"; 064 065 private static final String TIMESTAMP = "timeStamp"; 066 067 private final LogEvent event; 068 069 private final Map<String, String> contextMap = new HashMap<>(); 070 071 private final boolean compress; 072 073 /** 074 * Construct the FlumeEvent. 075 * @param event The Log4j LogEvent. 076 * @param includes A comma separated list of MDC elements to include. 077 * @param excludes A comma separated list of MDC elements to exclude. 078 * @param required A comma separated list of MDC elements that are required to be defined. 079 * @param mdcPrefix The value to prefix to MDC keys. 080 * @param eventPrefix The value to prefix to event keys. 081 * @param compress If true the event body should be compressed. 082 */ 083 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required, 084 String mdcPrefix, String eventPrefix, final boolean compress) { 085 this.event = event; 086 this.compress = compress; 087 final Map<String, String> headers = getHeaders(); 088 headers.put(TIMESTAMP, Long.toString(event.getTimeMillis())); 089 if (mdcPrefix == null) { 090 mdcPrefix = DEFAULT_MDC_PREFIX; 091 } 092 if (eventPrefix == null) { 093 eventPrefix = DEFAULT_EVENT_PREFIX; 094 } 095 final Map<String, String> mdc = event.getContextData().toMap(); 096 if (includes != null) { 097 final String[] array = includes.split(Patterns.COMMA_SEPARATOR); 098 if (array.length > 0) { 099 for (String str : array) { 100 str = str.trim(); 101 if (mdc.containsKey(str)) { 102 contextMap.put(str, mdc.get(str)); 103 } 104 } 105 } 106 } else if (excludes != null) { 107 final String[] array = excludes.split(Patterns.COMMA_SEPARATOR); 108 if (array.length > 0) { 109 final List<String> list = new ArrayList<>(array.length); 110 for (final String value : array) { 111 list.add(value.trim()); 112 } 113 for (final Map.Entry<String, String> entry : mdc.entrySet()) { 114 if (!list.contains(entry.getKey())) { 115 contextMap.put(entry.getKey(), entry.getValue()); 116 } 117 } 118 } 119 } else { 120 contextMap.putAll(mdc); 121 } 122 123 if (required != null) { 124 final String[] array = required.split(Patterns.COMMA_SEPARATOR); 125 if (array.length > 0) { 126 for (String str : array) { 127 str = str.trim(); 128 if (!mdc.containsKey(str)) { 129 throw new LoggingException("Required key " + str + " is missing from the MDC"); 130 } 131 } 132 } 133 } 134 final String guid = UuidUtil.getTimeBasedUuid().toString(); 135 final Message message = event.getMessage(); 136 if (message instanceof MapMessage) { 137 // Add the guid to the Map so that it can be included in the Layout. 138 @SuppressWarnings("unchecked") 139 final 140 MapMessage<?, String> stringMapMessage = (MapMessage<?, String>) message; 141 stringMapMessage.put(GUID, guid); 142 if (message instanceof StructuredDataMessage) { 143 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); 144 } 145 addMapData(eventPrefix, headers, stringMapMessage); 146 } else { 147 headers.put(GUID, guid); 148 } 149 150 addContextData(mdcPrefix, headers, contextMap); 151 } 152 153 protected void addStructuredData(final String prefix, final Map<String, String> fields, 154 final StructuredDataMessage msg) { 155 fields.put(prefix + EVENT_TYPE, msg.getType()); 156 final StructuredDataId id = msg.getId(); 157 fields.put(prefix + EVENT_ID, id.getName()); 158 } 159 160 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage<?, String> msg) { 161 final Map<String, String> data = msg.getData(); 162 for (final Map.Entry<String, String> entry : data.entrySet()) { 163 fields.put(prefix + entry.getKey(), entry.getValue()); 164 } 165 } 166 167 protected void addContextData(final String prefix, final Map<String, String> fields, 168 final Map<String, String> context) { 169 final Map<String, String> map = new HashMap<>(); 170 for (final Map.Entry<String, String> entry : context.entrySet()) { 171 if (entry.getKey() != null && entry.getValue() != null) { 172 fields.put(prefix + entry.getKey(), entry.getValue()); 173 map.put(prefix + entry.getKey(), entry.getValue()); 174 } 175 } 176 context.clear(); 177 context.putAll(map); 178 } 179 180 @Override 181 public LogEvent toImmutable() { 182 return Log4jLogEvent.createMemento(this); 183 } 184 185 /** 186 * Set the body in the event. 187 * @param body The body to add to the event. 188 */ 189 @Override 190 public void setBody(final byte[] body) { 191 if (body == null || body.length == 0) { 192 super.setBody(Constants.EMPTY_BYTE_ARRAY); 193 return; 194 } 195 if (compress) { 196 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 197 try (GZIPOutputStream os = new GZIPOutputStream(baos)) { 198 os.write(body); 199 } catch (final IOException ioe) { 200 throw new LoggingException("Unable to compress message", ioe); 201 } 202 super.setBody(baos.toByteArray()); 203 } else { 204 super.setBody(body); 205 } 206 } 207 208 /** 209 * Get the Frequently Qualified Class Name. 210 * @return the FQCN String. 211 */ 212 @Override 213 public String getLoggerFqcn() { 214 return event.getLoggerFqcn(); 215 } 216 217 /** 218 * Returns the logging Level. 219 * @return the Level. 220 */ 221 @Override 222 public Level getLevel() { 223 return event.getLevel(); 224 } 225 226 /** 227 * Returns the logger name. 228 * @return the logger name. 229 */ 230 @Override 231 public String getLoggerName() { 232 return event.getLoggerName(); 233 } 234 235 /** 236 * Returns the StackTraceElement for the caller of the logging API. 237 * @return the StackTraceElement of the caller. 238 */ 239 @Override 240 public StackTraceElement getSource() { 241 return event.getSource(); 242 } 243 244 /** 245 * Returns the Message. 246 * @return the Message. 247 */ 248 @Override 249 public Message getMessage() { 250 return event.getMessage(); 251 } 252 253 /** 254 * Returns the Marker. 255 * @return the Marker. 256 */ 257 @Override 258 public Marker getMarker() { 259 return event.getMarker(); 260 } 261 262 /** 263 * Returns the ID of the Thread. 264 * @return the ID of the Thread. 265 */ 266 @Override 267 public long getThreadId() { 268 return event.getThreadId(); 269 } 270 271 /** 272 * Returns the priority of the Thread. 273 * @return the priority of the Thread. 274 */ 275 @Override 276 public int getThreadPriority() { 277 return event.getThreadPriority(); 278 } 279 280 /** 281 * Returns the name of the Thread. 282 * @return the name of the Thread. 283 */ 284 @Override 285 public String getThreadName() { 286 return event.getThreadName(); 287 } 288 289 /** 290 * Returns the event timestamp. 291 * @return the event timestamp. 292 */ 293 @Override 294 public long getTimeMillis() { 295 return event.getTimeMillis(); 296 } 297 298 /** 299 * {@inheritDoc} 300 * @since 2.11 301 */ 302 @Override 303 public Instant getInstant() { 304 return event.getInstant(); 305 } 306 307 /** 308 * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created, 309 * or a dummy value if it is known that this value will not be used downstream. 310 * @return the event nanosecond timestamp. 311 */ 312 @Override 313 public long getNanoTime() { 314 return event.getNanoTime(); 315 } 316 317 /** 318 * Returns the Throwable associated with the event, if any. 319 * @return the Throwable. 320 */ 321 @Override 322 public Throwable getThrown() { 323 return event.getThrown(); 324 } 325 326 /** 327 * Returns the Throwable associated with the event, if any. 328 * @return the Throwable. 329 */ 330 @Override 331 public ThrowableProxy getThrownProxy() { 332 return event.getThrownProxy(); 333 } 334 335 /** 336 * Returns a copy of the context Map. 337 * @return a copy of the context Map. 338 */ 339 @Override 340 public Map<String, String> getContextMap() { 341 return contextMap; 342 } 343 344 /** 345 * Returns the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with. 346 * @return the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with. 347 */ 348 @Override 349 public ReadOnlyStringMap getContextData() { 350 return event.getContextData(); 351 } 352 353 /** 354 * Returns a copy of the context stack. 355 * @return a copy of the context stack. 356 */ 357 @Override 358 public ThreadContext.ContextStack getContextStack() { 359 return event.getContextStack(); 360 } 361 362 @Override 363 public boolean isIncludeLocation() { 364 return event.isIncludeLocation(); 365 } 366 367 @Override 368 public void setIncludeLocation(final boolean includeLocation) { 369 event.setIncludeLocation(includeLocation); 370 } 371 372 @Override 373 public boolean isEndOfBatch() { 374 return event.isEndOfBatch(); 375 } 376 377 @Override 378 public void setEndOfBatch(final boolean endOfBatch) { 379 event.setEndOfBatch(endOfBatch); 380 } 381 382}