001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import java.util.HashMap; 020 import java.util.Locale; 021 import java.util.Map; 022 023 import org.apache.flume.Event; 024 import org.apache.flume.EventDeliveryException; 025 import org.apache.flume.agent.embedded.EmbeddedAgent; 026 import org.apache.logging.log4j.LoggingException; 027 import org.apache.logging.log4j.core.appender.ManagerFactory; 028 import org.apache.logging.log4j.core.config.ConfigurationException; 029 import org.apache.logging.log4j.core.config.Property; 030 import org.apache.logging.log4j.core.util.NameUtil; 031 import org.apache.logging.log4j.util.PropertiesUtil; 032 import org.apache.logging.log4j.util.Strings; 033 034 /** 035 * 036 */ 037 public class FlumeEmbeddedManager extends AbstractFlumeManager { 038 039 private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator"); 040 041 private static final String IN_MEMORY = "InMemory"; 042 043 private static FlumeManagerFactory factory = new FlumeManagerFactory(); 044 045 private final EmbeddedAgent agent; 046 047 private final String shortName; 048 049 050 /** 051 * Constructor 052 * @param name The unique name of this manager. 053 * @param shortName The short version of the agent name. 054 * @param agent The embedded agent. 055 */ 056 protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) { 057 super(name); 058 this.agent = agent; 059 this.shortName = shortName; 060 } 061 062 /** 063 * Returns a FlumeEmbeddedManager. 064 * @param name The name of the manager. 065 * @param agents The agents to use. 066 * @param properties Properties for the embedded manager. 067 * @param batchSize The number of events to include in a batch. 068 * @param dataDir The directory where the Flume FileChannel should write to. 069 * @return A FlumeAvroManager. 070 */ 071 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties, 072 int batchSize, final String dataDir) { 073 074 if (batchSize <= 0) { 075 batchSize = 1; 076 } 077 078 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 079 throw new IllegalArgumentException("Either an Agent or properties are required"); 080 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { 081 throw new IllegalArgumentException("Cannot configure both Agents and Properties."); 082 } 083 084 final StringBuilder sb = new StringBuilder(); 085 boolean first = true; 086 087 if (agents != null && agents.length > 0) { 088 sb.append(name).append('['); 089 for (final Agent agent : agents) { 090 if (!first) { 091 sb.append('_'); 092 } 093 sb.append(agent.getHost()).append('-').append(agent.getPort()); 094 first = false; 095 } 096 sb.append(']'); 097 } else { 098 String sep = Strings.EMPTY; 099 sb.append(name).append('-'); 100 final StringBuilder props = new StringBuilder(); 101 for (final Property prop : properties) { 102 props.append(sep); 103 props.append(prop.getName()).append('=').append(prop.getValue()); 104 sep = "_"; 105 } 106 sb.append(NameUtil.md5(props.toString())); 107 } 108 return getManager(sb.toString(), factory, 109 new FactoryData(name, agents, properties, batchSize, dataDir)); 110 } 111 112 @Override 113 public void send(final Event event) { 114 try { 115 agent.put(event); 116 } catch (final EventDeliveryException ex) { 117 throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex); 118 } 119 } 120 121 @Override 122 protected void releaseSub() { 123 agent.stop(); 124 } 125 126 /** 127 * Factory data. 128 */ 129 private static class FactoryData { 130 private final Agent[] agents; 131 private final Property[] properties; 132 private final int batchSize; 133 private final String dataDir; 134 private final String name; 135 136 /** 137 * Constructor. 138 * @param name The name of the Appender. 139 * @param agents The agents. 140 * @param properties The Flume configuration properties. 141 * @param batchSize The number of events to include in a batch. 142 * @param dataDir The directory where Flume should write to. 143 */ 144 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize, 145 final String dataDir) { 146 this.name = name; 147 this.agents = agents; 148 this.batchSize = batchSize; 149 this.properties = properties; 150 this.dataDir = dataDir; 151 } 152 } 153 154 /** 155 * Avro Manager Factory. 156 */ 157 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> { 158 159 /** 160 * Create the FlumeAvroManager. 161 * @param name The name of the entity to manage. 162 * @param data The data required to create the entity. 163 * @return The FlumeAvroManager. 164 */ 165 @Override 166 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) { 167 try { 168 final Map<String, String> props = createProperties(data.name, data.agents, data.properties, 169 data.batchSize, data.dataDir); 170 final EmbeddedAgent agent = new EmbeddedAgent(name); 171 agent.configure(props); 172 agent.start(); 173 LOGGER.debug("Created Agent " + name); 174 return new FlumeEmbeddedManager(name, data.name, agent); 175 } catch (final Exception ex) { 176 LOGGER.error("Could not create FlumeEmbeddedManager", ex); 177 } 178 return null; 179 } 180 181 private Map<String, String> createProperties(final String name, final Agent[] agents, 182 final Property[] properties, final int batchSize, String dataDir) { 183 final Map<String, String> props = new HashMap<String, String>(); 184 185 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 186 LOGGER.error("No Flume configuration provided"); 187 throw new ConfigurationException("No Flume configuration provided"); 188 } 189 190 if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { 191 LOGGER.error("Agents and Flume configuration cannot both be specified"); 192 throw new ConfigurationException("Agents and Flume configuration cannot both be specified"); 193 } 194 195 if (agents != null && agents.length > 0) { 196 197 if (dataDir != null && dataDir.length() > 0) { 198 if (dataDir.equals(IN_MEMORY)) { 199 props.put("channel.type", "memory"); 200 } else { 201 props.put("channel.type", "file"); 202 203 if (!dataDir.endsWith(FILE_SEP)) { 204 dataDir = dataDir + FILE_SEP; 205 } 206 207 props.put("channel.checkpointDir", dataDir + "checkpoint"); 208 props.put("channel.dataDirs", dataDir + "data"); 209 } 210 211 } else { 212 props.put("channel.type", "file"); 213 } 214 215 final StringBuilder sb = new StringBuilder(); 216 String leading = Strings.EMPTY; 217 final int priority = agents.length; 218 for (int i = 0; i < priority; ++i) { 219 sb.append(leading).append("agent").append(i); 220 leading = " "; 221 final String prefix = "agent" + i; 222 props.put(prefix + ".type", "avro"); 223 props.put(prefix + ".hostname", agents[i].getHost()); 224 props.put(prefix + ".port", Integer.toString(agents[i].getPort())); 225 props.put(prefix + ".batch-size", Integer.toString(batchSize)); 226 props.put("processor.priority." + prefix, Integer.toString(agents.length - i)); 227 } 228 props.put("sinks", sb.toString()); 229 props.put("processor.type", "failover"); 230 } else { 231 String[] sinks = null; 232 233 for (final Property property : properties) { 234 final String key = property.getName(); 235 236 if (Strings.isEmpty(key)) { 237 final String msg = "A property name must be provided"; 238 LOGGER.error(msg); 239 throw new ConfigurationException(msg); 240 } 241 242 final String upperKey = key.toUpperCase(Locale.ENGLISH); 243 244 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) { 245 final String msg = 246 "Specification of the agent name is not allowed in Flume Appender configuration: " + key; 247 LOGGER.error(msg); 248 throw new ConfigurationException(msg); 249 } 250 251 final String value = property.getValue(); 252 if (Strings.isEmpty(value)) { 253 final String msg = "A value for property " + key + " must be provided"; 254 LOGGER.error(msg); 255 throw new ConfigurationException(msg); 256 } 257 258 if (upperKey.equals("SINKS")) { 259 sinks = value.trim().split(" "); 260 } 261 262 props.put(key, value); 263 } 264 265 if (sinks == null || sinks.length == 0) { 266 final String msg = "At least one Sink must be specified"; 267 LOGGER.error(msg); 268 throw new ConfigurationException(msg); 269 } 270 } 271 return props; 272 } 273 274 } 275 276 }