001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.io.Serializable;
022import java.nio.Buffer;
023import java.nio.ByteBuffer;
024import java.util.Objects;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.logging.log4j.core.Layout;
028import org.apache.logging.log4j.core.LoggerContext;
029import org.apache.logging.log4j.core.layout.ByteBufferDestination;
030import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper;
031import org.apache.logging.log4j.core.util.Constants;
032
033/**
034 * Manages an OutputStream so that it can be shared by multiple Appenders and will
035 * allow appenders to reconfigure without requiring a new stream.
036 */
037public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
038    protected final Layout<?> layout;
039    protected ByteBuffer byteBuffer;
040    private volatile OutputStream outputStream;
041    private boolean skipFooter;
042
043    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
044            final boolean writeHeader) {
045        this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
046    }
047
048    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
049            final boolean writeHeader, final int bufferSize) {
050        this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
051    }
052
053    /**
054     * @since 2.6
055     * @deprecated
056     */
057    @Deprecated
058    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
059            final boolean writeHeader, final ByteBuffer byteBuffer) {
060        super(null, streamName);
061        this.outputStream = os;
062        this.layout = layout;
063        if (writeHeader) {
064            writeHeader(os);
065        }
066        this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
067    }
068
069    /**
070     * @since 2.7
071     */
072    protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
073            final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
074            final ByteBuffer byteBuffer) {
075        super(loggerContext, streamName);
076        if (createOnDemand && os != null) {
077            LOGGER.error(
078                    "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
079                    streamName);
080        }
081        this.layout = layout;
082        this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
083        this.outputStream = os;
084        if (writeHeader) {
085            writeHeader(os);
086        }
087    }
088
089    /**
090     * Creates a Manager.
091     *
092     * @param name The name of the stream to manage.
093     * @param data The data to pass to the Manager.
094     * @param factory The factory to use to create the Manager.
095     * @param <T> The type of the OutputStreamManager.
096     * @return An OutputStreamManager.
097     */
098    public static <T> OutputStreamManager getManager(final String name, final T data,
099                                                 final ManagerFactory<? extends OutputStreamManager, T> factory) {
100        return AbstractManager.getManager(name, factory, data);
101    }
102
103    @SuppressWarnings("unused")
104    protected OutputStream createOutputStream() throws IOException {
105        throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
106    }
107
108    /**
109     * Indicate whether the footer should be skipped or not.
110     * @param skipFooter true if the footer should be skipped.
111     */
112    public void skipFooter(final boolean skipFooter) {
113        this.skipFooter = skipFooter;
114    }
115
116    /**
117     * Default hook to write footer during close.
118     */
119    @Override
120    public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
121        writeFooter();
122        return closeOutputStream();
123    }
124
125    protected void writeHeader(OutputStream os) {
126        if (layout != null && os != null) {
127            final byte[] header = layout.getHeader();
128            if (header != null) {
129                try {
130                    os.write(header, 0, header.length);
131                } catch (final IOException e) {
132                    logError("Unable to write header", e);
133                }
134            }
135        }
136    }
137
138    /**
139     * Writes the footer.
140     */
141    protected void writeFooter() {
142        if (layout == null || skipFooter) {
143            return;
144        }
145        final byte[] footer = layout.getFooter();
146        if (footer != null) {
147            write(footer);
148        }
149    }
150
151    /**
152     * Returns the status of the stream.
153     * @return true if the stream is open, false if it is not.
154     */
155    public boolean isOpen() {
156        return getCount() > 0;
157    }
158
159    public boolean hasOutputStream() {
160        return outputStream != null;
161    }
162
163    protected OutputStream getOutputStream() throws IOException {
164        if (outputStream == null) {
165            outputStream = createOutputStream();
166        }
167        return outputStream;
168    }
169
170    protected void setOutputStream(final OutputStream os) {
171        this.outputStream = os;
172    }
173
174    /**
175     * Some output streams synchronize writes while others do not.
176     * @param bytes The serialized Log event.
177     * @throws AppenderLoggingException if an error occurs.
178     */
179    protected void write(final byte[] bytes)  {
180        write(bytes, 0, bytes.length, false);
181    }
182
183    /**
184     * Some output streams synchronize writes while others do not.
185     * @param bytes The serialized Log event.
186     * @param immediateFlush If true, flushes after writing.
187     * @throws AppenderLoggingException if an error occurs.
188     */
189    protected void write(final byte[] bytes, final boolean immediateFlush)  {
190        write(bytes, 0, bytes.length, immediateFlush);
191    }
192
193    @Override
194    public void writeBytes(final byte[] data, final int offset, final int length) {
195        write(data, offset, length, false);
196    }
197
198    /**
199     * Some output streams synchronize writes while others do not. Synchronizing here insures that
200     * log events won't be intertwined.
201     * @param bytes The serialized Log event.
202     * @param offset The offset into the byte array.
203     * @param length The number of bytes to write.
204     * @throws AppenderLoggingException if an error occurs.
205     */
206    protected void write(final byte[] bytes, final int offset, final int length) {
207        writeBytes(bytes, offset, length);
208    }
209
210    /**
211     * Some output streams synchronize writes while others do not. Synchronizing here insures that
212     * log events won't be intertwined.
213     * @param bytes The serialized Log event.
214     * @param offset The offset into the byte array.
215     * @param length The number of bytes to write.
216     * @param immediateFlush flushes immediately after writing.
217     * @throws AppenderLoggingException if an error occurs.
218     */
219    protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
220        if (immediateFlush && byteBuffer.position() == 0) {
221            writeToDestination(bytes, offset, length);
222            flushDestination();
223            return;
224        }
225        if (length >= byteBuffer.capacity()) {
226            // if request length exceeds buffer capacity, flush the buffer and write the data directly
227            flush();
228            writeToDestination(bytes, offset, length);
229        } else {
230            if (length > byteBuffer.remaining()) {
231                flush();
232            }
233            byteBuffer.put(bytes, offset, length);
234        }
235        if (immediateFlush) {
236            flush();
237        }
238    }
239
240    /**
241     * Writes the specified section of the specified byte array to the stream.
242     *
243     * @param bytes the array containing data
244     * @param offset from where to write
245     * @param length how many bytes to write
246     * @since 2.6
247     */
248    protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
249        try {
250            getOutputStream().write(bytes, offset, length);
251        } catch (final IOException ex) {
252            throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
253        }
254    }
255
256    /**
257     * Calls {@code flush()} on the underlying output stream.
258     * @since 2.6
259     */
260    protected synchronized void flushDestination() {
261        final OutputStream stream = outputStream; // access volatile field only once per method
262        if (stream != null) {
263            try {
264                stream.flush();
265            } catch (final IOException ex) {
266                throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
267            }
268        }
269    }
270
271    /**
272     * Drains the ByteBufferDestination's buffer into the destination. By default this calls
273     * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
274     * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
275     *
276     * @see #flushDestination()
277     * @since 2.6
278     */
279    protected synchronized void flushBuffer(final ByteBuffer buf) {
280        ((Buffer) buf).flip();
281        try {
282            if (buf.remaining() > 0) {
283                writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
284            }
285        } finally {
286            buf.clear();
287        }
288    }
289
290    /**
291     * Flushes any buffers.
292     */
293    public synchronized void flush() {
294        flushBuffer(byteBuffer);
295        flushDestination();
296    }
297
298    protected synchronized boolean closeOutputStream() {
299        flush();
300        final OutputStream stream = outputStream; // access volatile field only once per method
301        if (stream == null || stream == System.out || stream == System.err) {
302            return true;
303        }
304        try {
305            stream.close();
306            LOGGER.debug("OutputStream closed");
307        } catch (final IOException ex) {
308            logError("Unable to close stream", ex);
309            return false;
310        }
311        return true;
312    }
313
314    /**
315     * Returns this {@code ByteBufferDestination}'s buffer.
316     * @return the buffer
317     * @since 2.6
318     */
319    @Override
320    public ByteBuffer getByteBuffer() {
321        return byteBuffer;
322    }
323
324    /**
325     * Drains the ByteBufferDestination's buffer into the destination. By default this calls
326     * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
327     * <p>
328     * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
329     * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
330     * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
331     * buffer.
332     * </p><p>
333     * To just flush the buffered contents to the underlying stream, call
334     * {@link #flushBuffer(ByteBuffer)} directly instead.
335     * </p>
336     *
337     * @param buf the buffer whose contents to write the destination
338     * @return the specified buffer
339     * @since 2.6
340     */
341    @Override
342    public ByteBuffer drain(final ByteBuffer buf) {
343        flushBuffer(buf);
344        return buf;
345    }
346
347    @Override
348    public void writeBytes(final ByteBuffer data) {
349        if (data.remaining() == 0) {
350          return;
351        }
352        synchronized (this) {
353          ByteBufferDestinationHelper.writeToUnsynchronized(data, this);
354        }
355    }
356}