001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.kafka; 019 020import java.io.Serializable; 021import java.util.Objects; 022import java.util.concurrent.ExecutionException; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.TimeoutException; 025import java.util.stream.Stream; 026 027import org.apache.logging.log4j.core.AbstractLifeCycle; 028import org.apache.logging.log4j.core.Appender; 029import org.apache.logging.log4j.core.Filter; 030import org.apache.logging.log4j.core.Layout; 031import org.apache.logging.log4j.core.LogEvent; 032import org.apache.logging.log4j.core.appender.AbstractAppender; 033import org.apache.logging.log4j.core.config.Configuration; 034import org.apache.logging.log4j.core.config.Node; 035import org.apache.logging.log4j.core.config.Property; 036import org.apache.logging.log4j.core.config.plugins.Plugin; 037import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 038import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; 039import org.apache.logging.log4j.core.layout.SerializedLayout; 040 041/** 042 * Sends log events to an Apache Kafka topic. 043 */ 044@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) 045public final class KafkaAppender extends AbstractAppender { 046 047 /** 048 * Builds KafkaAppender instances. 049 * 050 * @param <B> The type to build 051 */ 052 public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B> 053 implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> { 054 055 @PluginAttribute("retryCount") 056 private String retryCount; 057 058 @PluginAttribute("topic") 059 private String topic; 060 061 @PluginAttribute("key") 062 private String key; 063 064 @PluginAttribute(value = "syncSend", defaultBoolean = true) 065 private boolean syncSend; 066 067 @SuppressWarnings("resource") 068 @Override 069 public KafkaAppender build() { 070 final Layout<? extends Serializable> layout = getLayout(); 071 if (layout == null) { 072 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender"); 073 return null; 074 } 075 final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(), getName(), 076 topic, syncSend, getPropertyArray(), key); 077 return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager, 078 getPropertyArray(), getRetryCount()); 079 } 080 081 public Integer getRetryCount() { 082 Integer intRetryCount = null; 083 try { 084 intRetryCount = Integer.valueOf(retryCount); 085 } catch (NumberFormatException e) { 086 087 } 088 return intRetryCount; 089 090 } 091 092 public String getTopic() { 093 return topic; 094 } 095 096 public boolean isSyncSend() { 097 return syncSend; 098 } 099 100 public B setKey(final String key) { 101 this.key = key; 102 return asBuilder(); 103 } 104 105 public B setSyncSend(final boolean syncSend) { 106 this.syncSend = syncSend; 107 return asBuilder(); 108 } 109 110 public B setTopic(final String topic) { 111 this.topic = topic; 112 return asBuilder(); 113 } 114 115 } 116 117 private static final String[] KAFKA_CLIENT_PACKAGES = new String[] { "org.apache.kafka.common", "org.apache.kafka.clients" }; 118 119 @Deprecated 120 public static KafkaAppender createAppender(final Layout<? extends Serializable> layout, final Filter filter, 121 final String name, final boolean ignoreExceptions, final String topic, final Property[] properties, 122 final Configuration configuration, 123 final String key) { 124 125 if (layout == null) { 126 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender"); 127 return null; 128 } 129 final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic, true, 130 properties, key); 131 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null, null); 132 } 133 134 /** 135 * Tests if the given log event is from a Kafka Producer implementation. 136 * 137 * @param event The event to test. 138 * @return true to avoid recursion and skip logging, false to log. 139 */ 140 private static boolean isRecursive(final LogEvent event) { 141 return Stream.of(KAFKA_CLIENT_PACKAGES).anyMatch(prefix -> event.getLoggerName().startsWith(prefix)); 142 } 143 144 /** 145 * Creates a builder for a KafkaAppender. 146 * 147 * @return a builder for a KafkaAppender. 148 */ 149 @PluginBuilderFactory 150 public static <B extends Builder<B>> B newBuilder() { 151 return new Builder<B>().asBuilder(); 152 } 153 154 private final Integer retryCount; 155 156 private final KafkaManager manager; 157 158 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, 159 final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties, 160 final Integer retryCount) { 161 super(name, filter, layout, ignoreExceptions, properties); 162 this.manager = Objects.requireNonNull(manager, "manager"); 163 this.retryCount = retryCount; 164 } 165 166 @Override 167 public void append(final LogEvent event) { 168 if (event.getLoggerName() != null && isRecursive(event)) { 169 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); 170 } else { 171 try { 172 tryAppend(event); 173 } catch (final Exception e) { 174 175 if (this.retryCount != null) { 176 int currentRetryAttempt = 0; 177 while (currentRetryAttempt < this.retryCount) { 178 currentRetryAttempt++; 179 try { 180 tryAppend(event); 181 break; 182 } catch (Exception e1) { 183 184 } 185 } 186 } 187 error("Unable to write to Kafka in appender [" + getName() + "]", event, e); 188 } 189 } 190 } 191 192 @Override 193 public void start() { 194 super.start(); 195 manager.startup(); 196 } 197 198 @Override 199 public boolean stop(final long timeout, final TimeUnit timeUnit) { 200 setStopping(); 201 boolean stopped = super.stop(timeout, timeUnit, false); 202 stopped &= manager.stop(timeout, timeUnit); 203 setStopped(); 204 return stopped; 205 } 206 207 @Override 208 public String toString() { 209 return "KafkaAppender{" + "name=" + getName() + ", state=" + getState() + ", topic=" + manager.getTopic() + '}'; 210 } 211 212 private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException { 213 final Layout<? extends Serializable> layout = getLayout(); 214 byte[] data; 215 if (layout instanceof SerializedLayout) { 216 final byte[] header = layout.getHeader(); 217 final byte[] body = layout.toByteArray(event); 218 data = new byte[header.length + body.length]; 219 System.arraycopy(header, 0, data, 0, header.length); 220 System.arraycopy(body, 0, data, header.length, body.length); 221 } else { 222 data = layout.toByteArray(event); 223 } 224 manager.send(data); 225 } 226}