001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.async;
018    
019    import java.util.Map;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.Executors;
022    
023    import org.apache.logging.log4j.Level;
024    import org.apache.logging.log4j.Marker;
025    import org.apache.logging.log4j.ThreadContext;
026    import org.apache.logging.log4j.core.Logger;
027    import org.apache.logging.log4j.core.LoggerContext;
028    import org.apache.logging.log4j.core.config.Property;
029    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
030    import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
031    import org.apache.logging.log4j.core.util.Clock;
032    import org.apache.logging.log4j.core.util.ClockFactory;
033    import org.apache.logging.log4j.core.util.Integers;
034    import org.apache.logging.log4j.core.util.Loader;
035    import org.apache.logging.log4j.message.Message;
036    import org.apache.logging.log4j.message.MessageFactory;
037    import org.apache.logging.log4j.message.TimestampMessage;
038    import org.apache.logging.log4j.status.StatusLogger;
039    import org.apache.logging.log4j.util.PropertiesUtil;
040    
041    import com.lmax.disruptor.BlockingWaitStrategy;
042    import com.lmax.disruptor.ExceptionHandler;
043    import com.lmax.disruptor.RingBuffer;
044    import com.lmax.disruptor.SleepingWaitStrategy;
045    import com.lmax.disruptor.WaitStrategy;
046    import com.lmax.disruptor.YieldingWaitStrategy;
047    import com.lmax.disruptor.dsl.Disruptor;
048    import com.lmax.disruptor.dsl.ProducerType;
049    
050    /**
051     * AsyncLogger is a logger designed for high throughput and low latency logging.
052     * It does not perform any I/O in the calling (application) thread, but instead
053     * hands off the work to another thread as soon as possible. The actual logging
054     * is performed in the background thread. It uses the LMAX Disruptor library for
055     * inter-thread communication. (<a
056     * href="http://lmax-exchange.github.com/disruptor/"
057     * >http://lmax-exchange.github.com/disruptor/</a>)
058     * <p>
059     * To use AsyncLogger, specify the System property
060     * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
061     * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
062     * will be AsyncLoggers.
063     * <p>
064     * Note that for performance reasons, this logger does not include source
065     * location by default. You need to specify {@code includeLocation="true"} in
066     * the configuration or any %class, %location or %line conversion patterns in
067     * your log4j.xml configuration will produce either a "?" character or no output
068     * at all.
069     * <p>
070     * For best performance, use AsyncLogger with the RandomAccessFileAppender or
071     * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders
072     * have built-in support for the batching mechanism used by the Disruptor
073     * library, and they will flush to disk at the end of each batch. This means
074     * that even with immediateFlush=false, there will never be any items left in
075     * the buffer; all log events will all be written to disk in a very efficient
076     * manner.
077     */
078    public class AsyncLogger extends Logger {
079        private static final long serialVersionUID = 1L;
080        private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
081        private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
082        private static final int RINGBUFFER_MIN_SIZE = 128;
083        private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
084        private static final StatusLogger LOGGER = StatusLogger.getLogger();
085        private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
086        private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
087    
088        static enum ThreadNameStrategy { // LOG4J2-467
089            CACHED {
090                @Override
091                public String getThreadName(final Info info) {
092                    return info.cachedThreadName;
093                }
094            },
095            UNCACHED {
096                @Override
097                public String getThreadName(final Info info) {
098                    return Thread.currentThread().getName();
099                }
100            };
101            abstract String getThreadName(Info info);
102    
103            static ThreadNameStrategy create() {
104                final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
105                try {
106                    return ThreadNameStrategy.valueOf(name);
107                } catch (final Exception ex) {
108                    LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
109                    return CACHED;
110                }
111            }
112        }
113        private static volatile Disruptor<RingBufferLogEvent> disruptor;
114        private static final Clock clock = ClockFactory.getClock();
115    
116        private static final ExecutorService executor = Executors
117                .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
118    
119        static {
120            initInfoForExecutorThread();
121            LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
122            final int ringBufferSize = calculateRingBufferSize();
123    
124            final WaitStrategy waitStrategy = createWaitStrategy();
125            disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
126                    ProducerType.MULTI, waitStrategy);
127            disruptor.handleExceptionsWith(getExceptionHandler());
128            disruptor.handleEventsWith(new RingBufferLogEventHandler());
129    
130            LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
131                    .getBufferSize());
132            disruptor.start();
133        }
134    
135        private static int calculateRingBufferSize() {
136            int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
137            final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.RingBufferSize",
138                    String.valueOf(ringBufferSize));
139            try {
140                int size = Integer.parseInt(userPreferredRBSize);
141                if (size < RINGBUFFER_MIN_SIZE) {
142                    size = RINGBUFFER_MIN_SIZE;
143                    LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
144                            RINGBUFFER_MIN_SIZE);
145                }
146                ringBufferSize = size;
147            } catch (final Exception ex) {
148                LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
149            }
150            return Integers.ceilingNextPowerOfTwo(ringBufferSize);
151        }
152    
153        /**
154         * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread.
155         * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}.
156         * All other Info objects will have this attribute set to {@code false}.
157         * This allows us to detect Logger.log() calls initiated from the appender thread,
158         * which may cause deadlock when the RingBuffer is full. (LOG4J2-471)
159         */
160        private static void initInfoForExecutorThread() {
161            executor.submit(new Runnable(){
162                @Override
163                public void run() {
164                    final boolean isAppenderThread = true;
165                    final Info info = new Info(new RingBufferLogEventTranslator(), //
166                            Thread.currentThread().getName(), isAppenderThread);
167                    threadlocalInfo.set(info);
168                }
169            });
170        }
171    
172        private static WaitStrategy createWaitStrategy() {
173            final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
174            LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
175            if ("Sleep".equals(strategy)) {
176                return new SleepingWaitStrategy();
177            } else if ("Yield".equals(strategy)) {
178                return new YieldingWaitStrategy();
179            } else if ("Block".equals(strategy)) {
180                return new BlockingWaitStrategy();
181            }
182            LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
183            return new BlockingWaitStrategy();
184        }
185    
186        private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
187            final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
188            if (cls == null) {
189                LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
190                return null;
191            }
192            try {
193                final ExceptionHandler<RingBufferLogEvent> result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
194                LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
195                return result;
196            } catch (final Exception ignored) {
197                LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
198                return null;
199            }
200        }
201    
202        /**
203         * Constructs an {@code AsyncLogger} with the specified context, name and
204         * message factory.
205         *
206         * @param context context of this logger
207         * @param name name of this logger
208         * @param messageFactory message factory of this logger
209         */
210        public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
211            super(context, name, messageFactory);
212        }
213    
214        /**
215         * Tuple with the event translator and thread name for a thread.
216         */
217        static class Info {
218            private final RingBufferLogEventTranslator translator;
219            private final String cachedThreadName;
220            private final boolean isAppenderThread;
221            public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
222                this.translator = translator;
223                this.cachedThreadName = threadName;
224                this.isAppenderThread = appenderThread;
225            }
226        }
227    
228        @Override
229        public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
230            // TODO refactor to reduce size to <= 35 bytecodes to allow JVM to inline it
231            Info info = threadlocalInfo.get();
232            if (info == null) {
233                info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
234                threadlocalInfo.set(info);
235            }
236            
237            final Disruptor<RingBufferLogEvent> temp = disruptor;
238            if (temp == null) { // LOG4J2-639
239                LOGGER.fatal("Ignoring log event after log4j was shut down");
240                return;
241            }
242    
243            // LOG4J2-471: prevent deadlock when RingBuffer is full and object
244            // being logged calls Logger.log() from its toString() method
245            if (info.isAppenderThread && temp.getRingBuffer().remainingCapacity() == 0) {
246                // bypass RingBuffer and invoke Appender directly
247                config.loggerConfig.log(getName(), fqcn, marker, level, message, thrown);
248                return;
249            }
250            message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
251            final boolean includeLocation = config.loggerConfig.isIncludeLocation();
252            info.translator.setValues(this, getName(), marker, fqcn, level, message, //
253                    // don't construct ThrowableProxy until required
254                    thrown, //
255    
256                    // config properties are taken care of in the EventHandler
257                    // thread in the #actualAsyncLog method
258    
259                    // needs shallow copy to be fast (LOG4J2-154)
260                    ThreadContext.getImmutableContext(), //
261    
262                    // needs shallow copy to be fast (LOG4J2-154)
263                    ThreadContext.getImmutableStack(), //
264    
265                    // Thread.currentThread().getName(), //
266                    // info.cachedThreadName, //
267                    THREAD_NAME_STRATEGY.getThreadName(info), // LOG4J2-467
268    
269                    // location: very expensive operation. LOG4J2-153:
270                    // Only include if "includeLocation=true" is specified,
271                    // exclude if not specified or if "false" was specified.
272                    includeLocation ? location(fqcn) : null,
273    
274                    // System.currentTimeMillis());
275                    // CoarseCachedClock: 20% faster than system clock, 16ms gaps
276                    // CachedClock: 10% faster than system clock, smaller gaps
277                    // LOG4J2-744 avoid calling clock altogether if message has the timestamp
278                    message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() :
279                            clock.currentTimeMillis());
280    
281            // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
282            try {
283                // Note: do NOT use the temp variable above!
284                // That could result in adding a log event to the disruptor after it was shut down,
285                // which could cause the publishEvent method to hang and never return.
286                disruptor.publishEvent(info.translator);
287            } catch (final NullPointerException npe) {
288                LOGGER.fatal("Ignoring log event after log4j was shut down.");
289            }
290        }
291    
292        private static StackTraceElement location(final String fqcnOfLogger) {
293            return Log4jLogEvent.calcLocation(fqcnOfLogger);
294        }
295    
296        /**
297         * This method is called by the EventHandler that processes the
298         * RingBufferLogEvent in a separate thread.
299         *
300         * @param event the event to log
301         */
302        public void actualAsyncLog(final RingBufferLogEvent event) {
303            final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
304            event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
305            config.logEvent(event);
306        }
307    
308        public static void stop() {
309            final Disruptor<RingBufferLogEvent> temp = disruptor;
310    
311            // Must guarantee that publishing to the RingBuffer has stopped
312            // before we call disruptor.shutdown()
313            disruptor = null; // client code fails with NPE if log after stop = OK
314            if (temp == null) {
315                return; // stop() has already been called
316            }
317    
318            // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
319            // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
320            // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
321            for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
322                try {
323                    Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
324                } catch (final InterruptedException e) { // ignored
325                }
326            }
327            temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
328            executor.shutdown(); // finally, kill the processor thread
329            threadlocalInfo.remove(); // LOG4J2-323
330        }
331    
332        /**
333         * Returns {@code true} if the specified disruptor still has unprocessed events.
334         */
335        private static boolean hasBacklog(final Disruptor<?> disruptor) {
336            final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
337            return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
338        }
339    
340        /**
341         * Creates and returns a new {@code RingBufferAdmin} that instruments the
342         * ringbuffer of the {@code AsyncLogger}.
343         *
344         * @param contextName name of the global {@code AsyncLoggerContext}
345         */
346        public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
347            return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
348        }
349    }