001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import java.io.Serializable;
020    import java.util.Locale;
021    
022    import org.apache.logging.log4j.core.Filter;
023    import org.apache.logging.log4j.core.Layout;
024    import org.apache.logging.log4j.core.LogEvent;
025    import org.apache.logging.log4j.core.appender.AbstractAppender;
026    import org.apache.logging.log4j.core.config.Property;
027    import org.apache.logging.log4j.core.config.plugins.Plugin;
028    import org.apache.logging.log4j.core.config.plugins.PluginAliases;
029    import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
030    import org.apache.logging.log4j.core.config.plugins.PluginElement;
031    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
032    import org.apache.logging.log4j.core.layout.Rfc5424Layout;
033    import org.apache.logging.log4j.core.net.Facility;
034    import org.apache.logging.log4j.core.util.Booleans;
035    import org.apache.logging.log4j.core.util.Integers;
036    
037    /**
038     * An Appender that uses the Avro protocol to route events to Flume.
039     */
040    @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
041    public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
042    
043        private static final long serialVersionUID = 1L;
044        private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
045        private static final int DEFAULT_MAX_DELAY = 60000;
046    
047        private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
048    
049        private final AbstractFlumeManager manager;
050    
051        private final String mdcIncludes;
052        private final String mdcExcludes;
053        private final String mdcRequired;
054    
055        private final String eventPrefix;
056    
057        private final String mdcPrefix;
058    
059        private final boolean compressBody;
060    
061        private final FlumeEventFactory factory;
062    
063        /**
064         * Which Manager will be used by the appender instance.
065         */
066        private enum ManagerType {
067            AVRO, EMBEDDED, PERSISTENT;
068    
069            public static ManagerType getType(final String type) {
070                return valueOf(type.toUpperCase(Locale.US));
071            }
072        }
073    
074        private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
075                              final boolean ignoreExceptions, final String includes, final String excludes,
076                              final String required, final String mdcPrefix, final String eventPrefix,
077                              final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
078            super(name, filter, layout, ignoreExceptions);
079            this.manager = manager;
080            this.mdcIncludes = includes;
081            this.mdcExcludes = excludes;
082            this.mdcRequired = required;
083            this.eventPrefix = eventPrefix;
084            this.mdcPrefix = mdcPrefix;
085            this.compressBody = compress;
086            this.factory = factory == null ? this : factory;
087        }
088    
089        /**
090         * Publish the event.
091         * @param event The LogEvent.
092         */
093        @Override
094        public void append(final LogEvent event) {
095            final String name = event.getLoggerName();
096            if (name != null) {
097                for (final String pkg : EXCLUDED_PACKAGES) {
098                    if (name.startsWith(pkg)) {
099                        return;
100                    }
101                }
102            }
103            final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
104                eventPrefix, compressBody);
105            flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
106            manager.send(flumeEvent);
107        }
108    
109        @Override
110        public void stop() {
111            super.stop();
112            manager.release();
113        }
114    
115        /**
116         * Create a Flume event.
117         * @param event The Log4j LogEvent.
118         * @param includes comma separated list of mdc elements to include.
119         * @param excludes comma separated list of mdc elements to exclude.
120         * @param required comma separated list of mdc elements that must be present with a value.
121         * @param mdcPrefix The prefix to add to MDC key names.
122         * @param eventPrefix The prefix to add to event fields.
123         * @param compress If true the body will be compressed.
124         * @return A Flume Event.
125         */
126        @Override
127        public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
128                                      final String required, final String mdcPrefix, final String eventPrefix,
129                                      final boolean compress) {
130            return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
131                eventPrefix, compressBody);
132        }
133    
134        /**
135         * Create a Flume Avro Appender.
136         * @param agents An array of Agents.
137         * @param properties Properties to pass to the embedded agent.
138         * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
139         * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
140         * @param type Avro (default), Embedded, or Persistent.
141         * @param dataDir The directory where the Flume FileChannel should write its data.
142         * @param connectionTimeoutMillis The amount of time in milliseconds to wait before a connection times out. Minimum is
143         *                          1000.
144         * @param requestTimeoutMillis The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
145         * @param agentRetries The number of times to retry an agent before failing to the next agent.
146         * @param maxDelayMillis The maximum number of milliseconds to wait for a complete batch.
147         * @param name The name of the Appender.
148         * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
149         *               they are propagated to the caller.
150         * @param excludes A comma separated list of MDC elements to exclude.
151         * @param includes A comma separated list of MDC elements to include.
152         * @param required A comma separated list of MDC elements that are required.
153         * @param mdcPrefix The prefix to add to MDC key names.
154         * @param eventPrefix The prefix to add to event key names.
155         * @param compressBody If true the event body will be compressed.
156         * @param batchSize Number of events to include in a batch. Defaults to 1.
157         * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
158         * @param factory The factory to use to create Flume events.
159         * @param layout The layout to format the event.
160         * @param filter A Filter to filter events.
161         *
162         * @return A Flume Avro Appender.
163         */
164        @PluginFactory
165        public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
166                                                   @PluginElement("Properties") final Property[] properties,
167                                                   @PluginAttribute("embedded") final String embedded,
168                                                   @PluginAttribute("type") final String type,
169                                                   @PluginAttribute("dataDir") final String dataDir,
170                                                   @PluginAliases("connectTimeout")
171                                                   @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis,
172                                                   @PluginAliases("requestTimeout")
173                                                   @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis,
174                                                   @PluginAttribute("agentRetries") final String agentRetries,
175                                                   @PluginAliases("maxDelay") // deprecated
176                                                   @PluginAttribute("maxDelayMillis") final String maxDelayMillis,
177                                                   @PluginAttribute("name") final String name,
178                                                   @PluginAttribute("ignoreExceptions") final String ignore,
179                                                   @PluginAttribute("mdcExcludes") final String excludes,
180                                                   @PluginAttribute("mdcIncludes") final String includes,
181                                                   @PluginAttribute("mdcRequired") final String required,
182                                                   @PluginAttribute("mdcPrefix") final String mdcPrefix,
183                                                   @PluginAttribute("eventPrefix") final String eventPrefix,
184                                                   @PluginAttribute("compress") final String compressBody,
185                                                   @PluginAttribute("batchSize") final String batchSize,
186                                                   @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
187                                                   @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
188                                                   @PluginElement("Layout") Layout<? extends Serializable> layout,
189                                                   @PluginElement("Filter") final Filter filter) {
190    
191            final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
192                (agents == null || agents.length == 0) && properties != null && properties.length > 0;
193            final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
194            final boolean compress = Booleans.parseBoolean(compressBody, true);
195            ManagerType managerType;
196            if (type != null) {
197                if (embed && embedded != null) {
198                    try {
199                        managerType = ManagerType.getType(type);
200                        LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
201                    } catch (final Exception ex) {
202                        LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
203                            " is invalid.");
204                        managerType = ManagerType.EMBEDDED;
205                    }
206                } else {
207                    try {
208                        managerType = ManagerType.getType(type);
209                    } catch (final Exception ex) {
210                        LOGGER.warn("Type " + type + " is invalid.");
211                        managerType = ManagerType.EMBEDDED;
212                    }
213                }
214            }  else if (embed) {
215               managerType = ManagerType.EMBEDDED;
216            }  else {
217               managerType = ManagerType.AVRO;
218            }
219    
220            final int batchCount = Integers.parseInt(batchSize, 1);
221            final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0);
222            final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0);
223            final int retries = Integers.parseInt(agentRetries, 0);
224            final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
225            final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY);
226    
227            if (layout == null) {
228                final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
229                layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
230                        mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
231                        null);
232            }
233    
234            if (name == null) {
235                LOGGER.error("No name provided for Appender");
236                return null;
237            }
238    
239            AbstractFlumeManager manager;
240    
241            switch (managerType) {
242                case EMBEDDED:
243                    manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
244                    break;
245                case AVRO:
246                    if (agents == null || agents.length == 0) {
247                        LOGGER.debug("No agents provided, using defaults");
248                        agents = new Agent[] {Agent.createAgent(null, null)};
249                    }
250                    manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis);
251                    break;
252                case PERSISTENT:
253                    if (agents == null || agents.length == 0) {
254                        LOGGER.debug("No agents provided, using defaults");
255                        agents = new Agent[] {Agent.createAgent(null, null)};
256                    }
257                    manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
258                        connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir);
259                    break;
260                default:
261                    LOGGER.debug("No manager type specified. Defaulting to AVRO");
262                    if (agents == null || agents.length == 0) {
263                        LOGGER.debug("No agents provided, using defaults");
264                        agents = new Agent[] {Agent.createAgent(null, null)};
265                    }
266                    manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis);
267            }
268    
269            if (manager == null) {
270                return null;
271            }
272    
273            return new FlumeAppender(name, filter, layout,  ignoreExceptions, includes,
274                excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
275        }
276    }