001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.Collection;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.locks.LockSupport;
023
024import org.apache.logging.log4j.core.config.Node;
025import org.apache.logging.log4j.core.config.plugins.Plugin;
026import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
027import org.apache.logging.log4j.core.config.plugins.PluginFactory;
028import org.jctools.queues.MpscArrayQueue;
029
030/**
031 * Factory for creating instances of BlockingQueues backed by JCTools {@link MpscArrayQueue}.
032 *
033 * @since 2.7
034 */
035@Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = BlockingQueueFactory.ELEMENT_TYPE)
036public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
037
038    private final WaitStrategy waitStrategy;
039
040    private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) {
041        this.waitStrategy = waitStrategy;
042    }
043
044    @Override
045    public BlockingQueue<E> create(final int capacity) {
046        return new MpscBlockingQueue<>(capacity, waitStrategy);
047    }
048
049    @PluginFactory
050    public static <E> JCToolsBlockingQueueFactory<E> createFactory(
051        @PluginAttribute(value = "WaitStrategy", defaultString = "PARK") final WaitStrategy waitStrategy) {
052        return new JCToolsBlockingQueueFactory<>(waitStrategy);
053    }
054
055    /**
056     * BlockingQueue wrapper for JCTools multiple producer single consumer array queue.
057     */
058    private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> implements BlockingQueue<E> {
059
060        private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy;
061
062        MpscBlockingQueue(final int capacity, final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) {
063            super(capacity);
064            this.waitStrategy = waitStrategy;
065        }
066
067        @Override
068        public int drainTo(final Collection<? super E> c) {
069            return drainTo(c, capacity());
070        }
071
072        @Override
073        public int drainTo(final Collection<? super E> c, final int maxElements) {
074            return drain(e -> c.add(e), maxElements);
075        }
076
077        @Override
078        public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
079            int idleCounter = 0;
080            final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
081            do {
082                if (offer(e)) {
083                    return true;
084                } else if (System.nanoTime() - timeoutNanos > 0) {
085                    return false;
086                }
087                idleCounter = waitStrategy.idle(idleCounter);
088            } while (!Thread.interrupted()); //clear interrupted flag
089            throw new InterruptedException();
090        }
091
092        @Override
093        public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
094            int idleCounter = 0;
095            final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
096            do {
097                final E result = poll();
098                if (result != null) {
099                    return result;
100                } else if (System.nanoTime() - timeoutNanos > 0) {
101                    return null;
102                }
103                idleCounter = waitStrategy.idle(idleCounter);
104            } while (!Thread.interrupted()); //clear interrupted flag
105            throw new InterruptedException();
106        }
107
108        @Override
109        public void put(final E e) throws InterruptedException {
110            int idleCounter = 0;
111            do {
112                if (offer(e)) {
113                    return;
114                }
115                idleCounter = waitStrategy.idle(idleCounter);
116            } while (!Thread.interrupted()); //clear interrupted flag
117            throw new InterruptedException();
118        }
119
120        @Override
121        public boolean offer(final E e) {
122            //keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full.
123            return offerIfBelowThreshold(e, capacity() - 32);
124        }
125
126        @Override
127        public int remainingCapacity() {
128            return capacity() - size();
129        }
130
131        @Override
132        public E take() throws InterruptedException {
133            int idleCounter = 100;
134            do {
135                final E result = relaxedPoll();
136                if (result != null) {
137                    return result;
138                }
139                idleCounter = waitStrategy.idle(idleCounter);
140            } while (!Thread.interrupted()); //clear interrupted flag
141            throw new InterruptedException();
142        }
143    }
144
145    public enum WaitStrategy {
146        SPIN(idleCounter -> idleCounter + 1),
147        YIELD(idleCounter -> {
148            Thread.yield();
149            return idleCounter + 1;
150        }),
151        PARK(idleCounter -> {
152            LockSupport.parkNanos(1L);
153            return idleCounter + 1;
154        }),
155        PROGRESSIVE(idleCounter -> {
156            if (idleCounter > 200) {
157                LockSupport.parkNanos(1L);
158            } else if (idleCounter > 100) {
159                Thread.yield();
160            }
161            return idleCounter + 1;
162        });
163
164        private final Idle idle;
165
166        private int idle(final int idleCounter) {
167            return idle.idle(idleCounter);
168        }
169
170        WaitStrategy(final Idle idle) {
171            this.idle = idle;
172        }
173    }
174
175    private interface Idle {
176        int idle(int idleCounter);
177    }
178
179}