001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.Collection; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.locks.LockSupport; 023 024import org.apache.logging.log4j.core.config.Node; 025import org.apache.logging.log4j.core.config.plugins.Plugin; 026import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 027import org.apache.logging.log4j.core.config.plugins.PluginFactory; 028import org.jctools.queues.MpscArrayQueue; 029 030/** 031 * Factory for creating instances of BlockingQueues backed by JCTools {@link MpscArrayQueue}. 032 * 033 * @since 2.7 034 */ 035@Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = BlockingQueueFactory.ELEMENT_TYPE) 036public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> { 037 038 private final WaitStrategy waitStrategy; 039 040 private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) { 041 this.waitStrategy = waitStrategy; 042 } 043 044 @Override 045 public BlockingQueue<E> create(final int capacity) { 046 return new MpscBlockingQueue<>(capacity, waitStrategy); 047 } 048 049 @PluginFactory 050 public static <E> JCToolsBlockingQueueFactory<E> createFactory( 051 @PluginAttribute(value = "WaitStrategy", defaultString = "PARK") final WaitStrategy waitStrategy) { 052 return new JCToolsBlockingQueueFactory<>(waitStrategy); 053 } 054 055 /** 056 * BlockingQueue wrapper for JCTools multiple producer single consumer array queue. 057 */ 058 private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> implements BlockingQueue<E> { 059 060 private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy; 061 062 MpscBlockingQueue(final int capacity, final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) { 063 super(capacity); 064 this.waitStrategy = waitStrategy; 065 } 066 067 @Override 068 public int drainTo(final Collection<? super E> c) { 069 return drainTo(c, capacity()); 070 } 071 072 @Override 073 public int drainTo(final Collection<? super E> c, final int maxElements) { 074 return drain(e -> c.add(e), maxElements); 075 } 076 077 @Override 078 public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { 079 int idleCounter = 0; 080 final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); 081 do { 082 if (offer(e)) { 083 return true; 084 } else if (System.nanoTime() - timeoutNanos > 0) { 085 return false; 086 } 087 idleCounter = waitStrategy.idle(idleCounter); 088 } while (!Thread.interrupted()); //clear interrupted flag 089 throw new InterruptedException(); 090 } 091 092 @Override 093 public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { 094 int idleCounter = 0; 095 final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); 096 do { 097 final E result = poll(); 098 if (result != null) { 099 return result; 100 } else if (System.nanoTime() - timeoutNanos > 0) { 101 return null; 102 } 103 idleCounter = waitStrategy.idle(idleCounter); 104 } while (!Thread.interrupted()); //clear interrupted flag 105 throw new InterruptedException(); 106 } 107 108 @Override 109 public void put(final E e) throws InterruptedException { 110 int idleCounter = 0; 111 do { 112 if (offer(e)) { 113 return; 114 } 115 idleCounter = waitStrategy.idle(idleCounter); 116 } while (!Thread.interrupted()); //clear interrupted flag 117 throw new InterruptedException(); 118 } 119 120 @Override 121 public boolean offer(final E e) { 122 //keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full. 123 return offerIfBelowThreshold(e, capacity() - 32); 124 } 125 126 @Override 127 public int remainingCapacity() { 128 return capacity() - size(); 129 } 130 131 @Override 132 public E take() throws InterruptedException { 133 int idleCounter = 100; 134 do { 135 final E result = relaxedPoll(); 136 if (result != null) { 137 return result; 138 } 139 idleCounter = waitStrategy.idle(idleCounter); 140 } while (!Thread.interrupted()); //clear interrupted flag 141 throw new InterruptedException(); 142 } 143 } 144 145 public enum WaitStrategy { 146 SPIN(idleCounter -> idleCounter + 1), 147 YIELD(idleCounter -> { 148 Thread.yield(); 149 return idleCounter + 1; 150 }), 151 PARK(idleCounter -> { 152 LockSupport.parkNanos(1L); 153 return idleCounter + 1; 154 }), 155 PROGRESSIVE(idleCounter -> { 156 if (idleCounter > 200) { 157 LockSupport.parkNanos(1L); 158 } else if (idleCounter > 100) { 159 Thread.yield(); 160 } 161 return idleCounter + 1; 162 }); 163 164 private final Idle idle; 165 166 private int idle(final int idleCounter) { 167 return idle.idle(idleCounter); 168 } 169 170 WaitStrategy(final Idle idle) { 171 this.idle = idle; 172 } 173 } 174 175 private interface Idle { 176 int idle(int idleCounter); 177 } 178 179}