001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import java.util.HashMap;
020    import java.util.Locale;
021    import java.util.Map;
022    
023    import org.apache.flume.Event;
024    import org.apache.flume.EventDeliveryException;
025    import org.apache.flume.agent.embedded.EmbeddedAgent;
026    import org.apache.logging.log4j.LoggingException;
027    import org.apache.logging.log4j.core.appender.ManagerFactory;
028    import org.apache.logging.log4j.core.config.ConfigurationException;
029    import org.apache.logging.log4j.core.config.Property;
030    import org.apache.logging.log4j.core.util.NameUtil;
031    import org.apache.logging.log4j.util.PropertiesUtil;
032    import org.apache.logging.log4j.util.Strings;
033    
034    /**
035     *
036     */
037    public class FlumeEmbeddedManager extends AbstractFlumeManager {
038    
039        private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
040    
041        private static final String IN_MEMORY = "InMemory";
042    
043        private static FlumeManagerFactory factory = new FlumeManagerFactory();
044    
045        private final EmbeddedAgent agent;
046    
047        private final String shortName;
048    
049    
050        /**
051         * Constructor
052         * @param name The unique name of this manager.
053         * @param shortName The short version of the agent name.
054         * @param agent The embedded agent.
055         */
056        protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
057            super(name);
058            this.agent = agent;
059            this.shortName = shortName;
060        }
061    
062        /**
063         * Returns a FlumeEmbeddedManager.
064         * @param name The name of the manager.
065         * @param agents The agents to use.
066         * @param properties Properties for the embedded manager.
067         * @param batchSize The number of events to include in a batch.
068         * @param dataDir The directory where the Flume FileChannel should write to.
069         * @return A FlumeAvroManager.
070         */
071        public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
072                                                      int batchSize, final String dataDir) {
073    
074            if (batchSize <= 0) {
075                batchSize = 1;
076            }
077    
078            if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
079                throw new IllegalArgumentException("Either an Agent or properties are required");
080            } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
081                throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
082            }
083    
084            final StringBuilder sb = new StringBuilder();
085            boolean first = true;
086    
087            if (agents != null && agents.length > 0) {
088                sb.append(name).append('[');
089                for (final Agent agent : agents) {
090                    if (!first) {
091                        sb.append('_');
092                    }
093                    sb.append(agent.getHost()).append('-').append(agent.getPort());
094                    first = false;
095                }
096                sb.append(']');
097            } else {
098                String sep = Strings.EMPTY;
099                sb.append(name).append('-');
100                final StringBuilder props = new StringBuilder();
101                for (final Property prop : properties) {
102                    props.append(sep);
103                    props.append(prop.getName()).append('=').append(prop.getValue());
104                    sep = "_";
105                }
106                sb.append(NameUtil.md5(props.toString()));
107            }
108            return getManager(sb.toString(), factory,
109                    new FactoryData(name, agents, properties, batchSize, dataDir));
110        }
111    
112        @Override
113        public void send(final Event event) {
114            try {
115                agent.put(event);
116            } catch (final EventDeliveryException ex) {
117                throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);
118            }
119        }
120    
121        @Override
122        protected void releaseSub() {
123            agent.stop();
124        }
125    
126        /**
127         * Factory data.
128         */
129        private static class FactoryData {
130            private final Agent[] agents;
131            private final Property[] properties;
132            private final int batchSize;
133            private final String dataDir;
134            private final String name;
135    
136            /**
137             * Constructor.
138             * @param name The name of the Appender.
139             * @param agents The agents.
140             * @param properties The Flume configuration properties.
141             * @param batchSize The number of events to include in a batch.
142             * @param dataDir The directory where Flume should write to.
143             */
144            public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
145                               final String dataDir) {
146                this.name = name;
147                this.agents = agents;
148                this.batchSize = batchSize;
149                this.properties = properties;
150                this.dataDir = dataDir;
151            }
152        }
153    
154        /**
155         * Avro Manager Factory.
156         */
157        private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
158    
159            /**
160             * Create the FlumeAvroManager.
161             * @param name The name of the entity to manage.
162             * @param data The data required to create the entity.
163             * @return The FlumeAvroManager.
164             */
165            @Override
166            public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
167                try {
168                    final Map<String, String> props = createProperties(data.name, data.agents, data.properties,
169                        data.batchSize, data.dataDir);
170                    final EmbeddedAgent agent = new EmbeddedAgent(name);
171                    agent.configure(props);
172                    agent.start();
173                    LOGGER.debug("Created Agent " + name);
174                    return new FlumeEmbeddedManager(name, data.name, agent);
175                } catch (final Exception ex) {
176                    LOGGER.error("Could not create FlumeEmbeddedManager", ex);
177                }
178                return null;
179            }
180    
181            private Map<String, String> createProperties(final String name, final Agent[] agents,
182                                                         final Property[] properties, final int batchSize, String dataDir) {
183                final Map<String, String> props = new HashMap<String, String>();
184    
185                if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
186                    LOGGER.error("No Flume configuration provided");
187                    throw new ConfigurationException("No Flume configuration provided");
188                }
189    
190                if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
191                    LOGGER.error("Agents and Flume configuration cannot both be specified");
192                    throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
193                }
194    
195                if (agents != null && agents.length > 0) {
196    
197                    if (dataDir != null && dataDir.length() > 0) {
198                        if (dataDir.equals(IN_MEMORY)) {
199                            props.put("channel.type", "memory");
200                        } else {
201                            props.put("channel.type", "file");
202    
203                            if (!dataDir.endsWith(FILE_SEP)) {
204                                dataDir = dataDir + FILE_SEP;
205                            }
206    
207                            props.put("channel.checkpointDir", dataDir + "checkpoint");
208                            props.put("channel.dataDirs", dataDir + "data");
209                        }
210    
211                    } else {
212                        props.put("channel.type", "file");
213                    }
214    
215                    final StringBuilder sb = new StringBuilder();
216                    String leading = Strings.EMPTY;
217                    final int priority = agents.length;
218                    for (int i = 0; i < priority; ++i) {
219                        sb.append(leading).append("agent").append(i);
220                        leading = " ";
221                        final String prefix = "agent" + i;
222                        props.put(prefix + ".type", "avro");
223                        props.put(prefix + ".hostname", agents[i].getHost());
224                        props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
225                        props.put(prefix + ".batch-size", Integer.toString(batchSize));
226                        props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
227                    }
228                    props.put("sinks", sb.toString());
229                    props.put("processor.type", "failover");
230                } else {
231                    String[] sinks = null;
232    
233                    for (final Property property : properties) {
234                        final String key = property.getName();
235    
236                        if (Strings.isEmpty(key)) {
237                            final String msg = "A property name must be provided";
238                            LOGGER.error(msg);
239                            throw new ConfigurationException(msg);
240                        }
241    
242                        final String upperKey = key.toUpperCase(Locale.ENGLISH);
243    
244                        if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
245                            final String msg =
246                                "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
247                            LOGGER.error(msg);
248                            throw new ConfigurationException(msg);
249                        }
250    
251                        final String value = property.getValue();
252                        if (Strings.isEmpty(value)) {
253                            final String msg = "A value for property " + key + " must be provided";
254                            LOGGER.error(msg);
255                            throw new ConfigurationException(msg);
256                        }
257    
258                        if (upperKey.equals("SINKS")) {
259                            sinks = value.trim().split(" ");
260                        }
261    
262                        props.put(key, value);
263                    }
264    
265                    if (sinks == null || sinks.length == 0) {
266                        final String msg = "At least one Sink must be specified";
267                        LOGGER.error(msg);
268                        throw new ConfigurationException(msg);
269                    }
270                }
271                return props;
272            }
273    
274        }
275    
276    }