001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import java.io.Serializable; 020 import java.util.Locale; 021 022 import org.apache.logging.log4j.core.Filter; 023 import org.apache.logging.log4j.core.Layout; 024 import org.apache.logging.log4j.core.LogEvent; 025 import org.apache.logging.log4j.core.appender.AbstractAppender; 026 import org.apache.logging.log4j.core.config.Property; 027 import org.apache.logging.log4j.core.config.plugins.Plugin; 028 import org.apache.logging.log4j.core.config.plugins.PluginAliases; 029 import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 030 import org.apache.logging.log4j.core.config.plugins.PluginElement; 031 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 032 import org.apache.logging.log4j.core.layout.Rfc5424Layout; 033 import org.apache.logging.log4j.core.net.Facility; 034 import org.apache.logging.log4j.core.util.Booleans; 035 import org.apache.logging.log4j.core.util.Integers; 036 037 /** 038 * An Appender that uses the Avro protocol to route events to Flume. 039 */ 040 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true) 041 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { 042 043 private static final long serialVersionUID = 1L; 044 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"}; 045 private static final int DEFAULT_MAX_DELAY = 60000; 046 047 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5; 048 049 private final AbstractFlumeManager manager; 050 051 private final String mdcIncludes; 052 private final String mdcExcludes; 053 private final String mdcRequired; 054 055 private final String eventPrefix; 056 057 private final String mdcPrefix; 058 059 private final boolean compressBody; 060 061 private final FlumeEventFactory factory; 062 063 /** 064 * Which Manager will be used by the appender instance. 065 */ 066 private enum ManagerType { 067 AVRO, EMBEDDED, PERSISTENT; 068 069 public static ManagerType getType(final String type) { 070 return valueOf(type.toUpperCase(Locale.US)); 071 } 072 } 073 074 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, 075 final boolean ignoreExceptions, final String includes, final String excludes, 076 final String required, final String mdcPrefix, final String eventPrefix, 077 final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) { 078 super(name, filter, layout, ignoreExceptions); 079 this.manager = manager; 080 this.mdcIncludes = includes; 081 this.mdcExcludes = excludes; 082 this.mdcRequired = required; 083 this.eventPrefix = eventPrefix; 084 this.mdcPrefix = mdcPrefix; 085 this.compressBody = compress; 086 this.factory = factory == null ? this : factory; 087 } 088 089 /** 090 * Publish the event. 091 * @param event The LogEvent. 092 */ 093 @Override 094 public void append(final LogEvent event) { 095 final String name = event.getLoggerName(); 096 if (name != null) { 097 for (final String pkg : EXCLUDED_PACKAGES) { 098 if (name.startsWith(pkg)) { 099 return; 100 } 101 } 102 } 103 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 104 eventPrefix, compressBody); 105 flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); 106 manager.send(flumeEvent); 107 } 108 109 @Override 110 public void stop() { 111 super.stop(); 112 manager.release(); 113 } 114 115 /** 116 * Create a Flume event. 117 * @param event The Log4j LogEvent. 118 * @param includes comma separated list of mdc elements to include. 119 * @param excludes comma separated list of mdc elements to exclude. 120 * @param required comma separated list of mdc elements that must be present with a value. 121 * @param mdcPrefix The prefix to add to MDC key names. 122 * @param eventPrefix The prefix to add to event fields. 123 * @param compress If true the body will be compressed. 124 * @return A Flume Event. 125 */ 126 @Override 127 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes, 128 final String required, final String mdcPrefix, final String eventPrefix, 129 final boolean compress) { 130 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 131 eventPrefix, compressBody); 132 } 133 134 /** 135 * Create a Flume Avro Appender. 136 * @param agents An array of Agents. 137 * @param properties Properties to pass to the embedded agent. 138 * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used. 139 * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i> 140 * @param type Avro (default), Embedded, or Persistent. 141 * @param dataDir The directory where the Flume FileChannel should write its data. 142 * @param connectionTimeoutMillis The amount of time in milliseconds to wait before a connection times out. Minimum is 143 * 1000. 144 * @param requestTimeoutMillis The amount of time in milliseconds to wait before a request times out. Minimum is 1000. 145 * @param agentRetries The number of times to retry an agent before failing to the next agent. 146 * @param maxDelayMillis The maximum number of milliseconds to wait for a complete batch. 147 * @param name The name of the Appender. 148 * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise 149 * they are propagated to the caller. 150 * @param excludes A comma separated list of MDC elements to exclude. 151 * @param includes A comma separated list of MDC elements to include. 152 * @param required A comma separated list of MDC elements that are required. 153 * @param mdcPrefix The prefix to add to MDC key names. 154 * @param eventPrefix The prefix to add to event key names. 155 * @param compressBody If true the event body will be compressed. 156 * @param batchSize Number of events to include in a batch. Defaults to 1. 157 * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB. 158 * @param factory The factory to use to create Flume events. 159 * @param layout The layout to format the event. 160 * @param filter A Filter to filter events. 161 * 162 * @return A Flume Avro Appender. 163 */ 164 @PluginFactory 165 public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents, 166 @PluginElement("Properties") final Property[] properties, 167 @PluginAttribute("embedded") final String embedded, 168 @PluginAttribute("type") final String type, 169 @PluginAttribute("dataDir") final String dataDir, 170 @PluginAliases("connectTimeout") 171 @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis, 172 @PluginAliases("requestTimeout") 173 @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis, 174 @PluginAttribute("agentRetries") final String agentRetries, 175 @PluginAliases("maxDelay") // deprecated 176 @PluginAttribute("maxDelayMillis") final String maxDelayMillis, 177 @PluginAttribute("name") final String name, 178 @PluginAttribute("ignoreExceptions") final String ignore, 179 @PluginAttribute("mdcExcludes") final String excludes, 180 @PluginAttribute("mdcIncludes") final String includes, 181 @PluginAttribute("mdcRequired") final String required, 182 @PluginAttribute("mdcPrefix") final String mdcPrefix, 183 @PluginAttribute("eventPrefix") final String eventPrefix, 184 @PluginAttribute("compress") final String compressBody, 185 @PluginAttribute("batchSize") final String batchSize, 186 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries, 187 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory, 188 @PluginElement("Layout") Layout<? extends Serializable> layout, 189 @PluginElement("Filter") final Filter filter) { 190 191 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) : 192 (agents == null || agents.length == 0) && properties != null && properties.length > 0; 193 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true); 194 final boolean compress = Booleans.parseBoolean(compressBody, true); 195 ManagerType managerType; 196 if (type != null) { 197 if (embed && embedded != null) { 198 try { 199 managerType = ManagerType.getType(type); 200 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type); 201 } catch (final Exception ex) { 202 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + 203 " is invalid."); 204 managerType = ManagerType.EMBEDDED; 205 } 206 } else { 207 try { 208 managerType = ManagerType.getType(type); 209 } catch (final Exception ex) { 210 LOGGER.warn("Type " + type + " is invalid."); 211 managerType = ManagerType.EMBEDDED; 212 } 213 } 214 } else if (embed) { 215 managerType = ManagerType.EMBEDDED; 216 } else { 217 managerType = ManagerType.AVRO; 218 } 219 220 final int batchCount = Integers.parseInt(batchSize, 1); 221 final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0); 222 final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0); 223 final int retries = Integers.parseInt(agentRetries, 0); 224 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT); 225 final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY); 226 227 if (layout == null) { 228 final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER; 229 layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID, 230 mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null, 231 null); 232 } 233 234 if (name == null) { 235 LOGGER.error("No name provided for Appender"); 236 return null; 237 } 238 239 AbstractFlumeManager manager; 240 241 switch (managerType) { 242 case EMBEDDED: 243 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); 244 break; 245 case AVRO: 246 if (agents == null || agents.length == 0) { 247 LOGGER.debug("No agents provided, using defaults"); 248 agents = new Agent[] {Agent.createAgent(null, null)}; 249 } 250 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis); 251 break; 252 case PERSISTENT: 253 if (agents == null || agents.length == 0) { 254 LOGGER.debug("No agents provided, using defaults"); 255 agents = new Agent[] {Agent.createAgent(null, null)}; 256 } 257 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries, 258 connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir); 259 break; 260 default: 261 LOGGER.debug("No manager type specified. Defaulting to AVRO"); 262 if (agents == null || agents.length == 0) { 263 LOGGER.debug("No agents provided, using defaults"); 264 agents = new Agent[] {Agent.createAgent(null, null)}; 265 } 266 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis); 267 } 268 269 if (manager == null) { 270 return null; 271 } 272 273 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes, 274 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager); 275 } 276 }