001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.kafka;
019
020import java.io.Serializable;
021import java.util.Objects;
022import java.util.concurrent.ExecutionException;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TimeoutException;
025import java.util.stream.Stream;
026
027import org.apache.logging.log4j.core.AbstractLifeCycle;
028import org.apache.logging.log4j.core.Appender;
029import org.apache.logging.log4j.core.Filter;
030import org.apache.logging.log4j.core.Layout;
031import org.apache.logging.log4j.core.LogEvent;
032import org.apache.logging.log4j.core.appender.AbstractAppender;
033import org.apache.logging.log4j.core.config.Configuration;
034import org.apache.logging.log4j.core.config.Node;
035import org.apache.logging.log4j.core.config.Property;
036import org.apache.logging.log4j.core.config.plugins.Plugin;
037import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
038import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
039import org.apache.logging.log4j.core.layout.SerializedLayout;
040
041/**
042 * Sends log events to an Apache Kafka topic.
043 */
044@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
045public final class KafkaAppender extends AbstractAppender {
046
047    /**
048         * Builds KafkaAppender instances.
049         * 
050         * @param <B> The type to build
051         */
052        public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
053                        implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
054
055                @PluginAttribute("retryCount")
056                private String retryCount;
057
058                @PluginAttribute("topic")
059                private String topic;
060
061                @PluginAttribute("key")
062                private String key;
063
064                @PluginAttribute(value = "syncSend", defaultBoolean = true)
065                private boolean syncSend;
066
067                @SuppressWarnings("resource")
068                @Override
069                public KafkaAppender build() {
070                        final Layout<? extends Serializable> layout = getLayout();
071                        if (layout == null) {
072                                AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
073                                return null;
074                        }
075                        final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(), getName(),
076                                        topic, syncSend, getPropertyArray(), key);
077                        return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager,
078                                        getPropertyArray(), getRetryCount());
079                }
080
081                public Integer getRetryCount() {
082                        Integer intRetryCount = null;
083                        try {
084                                intRetryCount = Integer.valueOf(retryCount);
085                        } catch (NumberFormatException e) {
086
087                        }
088                        return intRetryCount;
089
090                }
091
092                public String getTopic() {
093                        return topic;
094                }
095
096                public boolean isSyncSend() {
097                        return syncSend;
098                }
099
100                public B setKey(final String key) {
101                        this.key = key;
102                        return asBuilder();
103                }
104
105                public B setSyncSend(final boolean syncSend) {
106                        this.syncSend = syncSend;
107                        return asBuilder();
108                }
109
110                public B setTopic(final String topic) {
111                        this.topic = topic;
112                        return asBuilder();
113                }
114
115        }
116
117    private static final String[] KAFKA_CLIENT_PACKAGES = new String[] { "org.apache.kafka.common", "org.apache.kafka.clients" };
118
119        @Deprecated
120        public static KafkaAppender createAppender(final Layout<? extends Serializable> layout, final Filter filter,
121                        final String name, final boolean ignoreExceptions, final String topic, final Property[] properties,
122            final Configuration configuration,
123            final String key) {
124
125                if (layout == null) {
126                        AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
127                        return null;
128                }
129                final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic, true,
130                                properties, key);
131                return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null, null);
132        }
133
134        /**
135         * Tests if the given log event is from a Kafka Producer implementation.
136         *
137         * @param event The event to test.
138         * @return true to avoid recursion and skip logging, false to log.
139         */
140        private static boolean isRecursive(final LogEvent event) {
141            return Stream.of(KAFKA_CLIENT_PACKAGES).anyMatch(prefix -> event.getLoggerName().startsWith(prefix));
142    }
143
144        /**
145         * Creates a builder for a KafkaAppender.
146         * 
147         * @return a builder for a KafkaAppender.
148         */
149        @PluginBuilderFactory
150        public static <B extends Builder<B>> B newBuilder() {
151                return new Builder<B>().asBuilder();
152        }
153
154        private final Integer retryCount;
155
156        private final KafkaManager manager;
157
158        private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
159                        final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties,
160                        final Integer retryCount) {
161                super(name, filter, layout, ignoreExceptions, properties);
162                this.manager = Objects.requireNonNull(manager, "manager");
163                this.retryCount = retryCount;
164        }
165
166        @Override
167        public void append(final LogEvent event) {
168                if (event.getLoggerName() != null && isRecursive(event)) {
169                        LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
170                } else {
171                        try {
172                                tryAppend(event);
173                        } catch (final Exception e) {
174
175                                if (this.retryCount != null) {
176                                        int currentRetryAttempt = 0;
177                                        while (currentRetryAttempt < this.retryCount) {
178                                                currentRetryAttempt++;
179                                                try {
180                                                        tryAppend(event);
181                                                        break;
182                                                } catch (Exception e1) {
183
184                                                }
185                                        }
186                                }
187                                error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
188                        }
189                }
190        }
191
192        @Override
193        public void start() {
194                super.start();
195                manager.startup();
196        }
197
198        @Override
199        public boolean stop(final long timeout, final TimeUnit timeUnit) {
200                setStopping();
201                boolean stopped = super.stop(timeout, timeUnit, false);
202                stopped &= manager.stop(timeout, timeUnit);
203                setStopped();
204                return stopped;
205        }
206
207        @Override
208        public String toString() {
209                return "KafkaAppender{" + "name=" + getName() + ", state=" + getState() + ", topic=" + manager.getTopic() + '}';
210        }
211
212        private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
213                final Layout<? extends Serializable> layout = getLayout();
214                byte[] data;
215                if (layout instanceof SerializedLayout) {
216                        final byte[] header = layout.getHeader();
217                        final byte[] body = layout.toByteArray(event);
218                        data = new byte[header.length + body.length];
219                        System.arraycopy(header, 0, data, 0, header.length);
220                        System.arraycopy(body, 0, data, header.length, body.length);
221                } else {
222                        data = layout.toByteArray(event);
223                }
224                manager.send(data);
225        }
226}