001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import java.util.Properties;
020    
021    import org.apache.flume.Event;
022    import org.apache.flume.api.RpcClient;
023    import org.apache.flume.api.RpcClientFactory;
024    import org.apache.logging.log4j.core.appender.AppenderLoggingException;
025    import org.apache.logging.log4j.core.appender.ManagerFactory;
026    
027    /**
028     * Manager for FlumeAvroAppenders.
029     */
030    public class FlumeAvroManager extends AbstractFlumeManager {
031    
032        private static final int MAX_RECONNECTS = 3;
033        private static final int MINIMUM_TIMEOUT = 1000;
034    
035        private static AvroManagerFactory factory = new AvroManagerFactory();
036    
037        private final Agent[] agents;
038    
039        private final int batchSize;
040    
041        private final int retries;
042    
043        private final int connectTimeoutMillis;
044    
045        private final int requestTimeoutMillis;
046    
047        private final int current = 0;
048    
049        private RpcClient rpcClient = null;
050    
051        /**
052         * Constructor
053         * @param name The unique name of this manager.
054         * @param agents An array of Agents.
055         * @param batchSize The number of events to include in a batch.
056         * @param retries The number of times to retry connecting before giving up.
057         * @param connectTimeout The connection timeout in ms.
058         * @param requestTimeout The request timeout in ms.
059         *
060         */
061        protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
062                                   final int retries, final int connectTimeout, final int requestTimeout) {
063            super(name);
064            this.agents = agents;
065            this.batchSize = batchSize;
066            this.retries = retries;
067            this.connectTimeoutMillis = connectTimeout;
068            this.requestTimeoutMillis = requestTimeout;
069            this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
070        }
071    
072        /**
073         * Returns a FlumeAvroManager.
074         * @param name The name of the manager.
075         * @param agents The agents to use.
076         * @param batchSize The number of events to include in a batch.
077         * @param retries The number of times to retry connecting before giving up.
078         * @param connectTimeoutMillis The connection timeout in ms.
079         * @param requestTimeoutMillis The request timeout in ms.
080         * @return A FlumeAvroManager.
081         */
082        public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
083                                                  final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
084            if (agents == null || agents.length == 0) {
085                throw new IllegalArgumentException("At least one agent is required");
086            }
087    
088            if (batchSize <= 0) {
089                batchSize = 1;
090            }
091    
092            final StringBuilder sb = new StringBuilder("FlumeAvro[");
093            boolean first = true;
094            for (final Agent agent : agents) {
095                if (!first) {
096                    sb.append(',');
097                }
098                sb.append(agent.getHost()).append(':').append(agent.getPort());
099                first = false;
100            }
101            sb.append(']');
102            return getManager(sb.toString(), factory,
103                    new FactoryData(name, agents, batchSize, retries, connectTimeoutMillis, requestTimeoutMillis));
104        }
105    
106        /**
107         * Returns the agents.
108         * @return The agent array.
109         */
110        public Agent[] getAgents() {
111            return agents;
112        }
113    
114        /**
115         * Returns the index of the current agent.
116         * @return The index for the current agent.
117         */
118        public int getCurrent() {
119            return current;
120        }
121    
122        public int getRetries() {
123            return retries;
124        }
125    
126        public int getConnectTimeoutMillis() {
127            return connectTimeoutMillis;
128        }
129    
130        public int getRequestTimeoutMillis() {
131            return requestTimeoutMillis;
132        }
133    
134        public int getBatchSize() {
135            return batchSize;
136        }
137    
138        public synchronized void send(final BatchEvent events) {
139            if (rpcClient == null) {
140                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
141            }
142    
143            if (rpcClient != null) {
144                try {
145                    LOGGER.trace("Sending batch of {} events", events.getEvents().size());
146                    rpcClient.appendBatch(events.getEvents());
147                } catch (final Exception ex) {
148                    rpcClient.close();
149                    rpcClient = null;
150                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
151                        agents[current].getPort();
152                    LOGGER.warn(msg, ex);
153                    throw new AppenderLoggingException("No Flume agents are available");
154                }
155            }  else {
156                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
157                    agents[current].getPort();
158                LOGGER.warn(msg);
159                throw new AppenderLoggingException("No Flume agents are available");
160            }
161        }
162    
163        @Override
164        public synchronized void send(final Event event)  {
165            if (rpcClient == null) {
166                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
167            }
168    
169            if (rpcClient != null) {
170                try {
171                    rpcClient.append(event);
172                } catch (final Exception ex) {
173                    rpcClient.close();
174                    rpcClient = null;
175                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
176                        agents[current].getPort();
177                    LOGGER.warn(msg, ex);
178                    throw new AppenderLoggingException("No Flume agents are available");
179                }
180            } else {
181                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
182                    agents[current].getPort();
183                LOGGER.warn(msg);
184                throw new AppenderLoggingException("No Flume agents are available");
185            }
186        }
187    
188        /**
189         * There is a very good chance that this will always return the first agent even if it isn't available.
190         * @param agents The list of agents to choose from
191         * @return The FlumeEventAvroServer.
192         */
193        private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
194            try {
195                final Properties props = new Properties();
196    
197                props.put("client.type", "default_failover");
198    
199                int count = 1;
200                final StringBuilder sb = new StringBuilder();
201                for (final Agent agent : agents) {
202                    if (sb.length() > 0) {
203                        sb.append(' ');
204                    }
205                    final String hostName = "host" + count++;
206                    props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
207                    sb.append(hostName);
208                }
209                props.put("hosts", sb.toString());
210                if (batchSize > 0) {
211                    props.put("batch-size", Integer.toString(batchSize));
212                }
213                if (retries > 1) {
214                    if (retries > MAX_RECONNECTS) {
215                        retries = MAX_RECONNECTS;
216                    }
217                    props.put("max-attempts", Integer.toString(retries * agents.length));
218                }
219                if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
220                    props.put("request-timeout", Integer.toString(requestTimeoutMillis));
221                }
222                if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
223                    props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
224                }
225                return RpcClientFactory.getInstance(props);
226            } catch (final Exception ex) {
227                LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
228                return null;
229            }
230        }
231    
232        @Override
233        protected void releaseSub() {
234            if (rpcClient != null) {
235                try {
236                    rpcClient.close();
237                } catch (final Exception ex) {
238                    LOGGER.error("Attempt to close RPC client failed", ex);
239                }
240            }
241            rpcClient = null;
242        }
243    
244        /**
245         * Factory data.
246         */
247        private static class FactoryData {
248            private final String name;
249            private final Agent[] agents;
250            private final int batchSize;
251            private final int retries;
252            private final int conntectTimeoutMillis;
253            private final int requestTimeoutMillis;
254    
255            /**
256             * Constructor.
257             * @param name The name of the Appender.
258             * @param agents The agents.
259             * @param batchSize The number of events to include in a batch.
260             */
261            public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
262                               final int connectTimeoutMillis, final int requestTimeoutMillis) {
263                this.name = name;
264                this.agents = agents;
265                this.batchSize = batchSize;
266                this.retries = retries;
267                this.conntectTimeoutMillis = connectTimeoutMillis;
268                this.requestTimeoutMillis = requestTimeoutMillis;
269            }
270        }
271    
272        /**
273         * Avro Manager Factory.
274         */
275        private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
276    
277            /**
278             * Create the FlumeAvroManager.
279             * @param name The name of the entity to manage.
280             * @param data The data required to create the entity.
281             * @return The FlumeAvroManager.
282             */
283            @Override
284            public FlumeAvroManager createManager(final String name, final FactoryData data) {
285                try {
286    
287                    return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
288                        data.conntectTimeoutMillis, data.requestTimeoutMillis);
289                } catch (final Exception ex) {
290                    LOGGER.error("Could not create FlumeAvroManager", ex);
291                }
292                return null;
293            }
294        }
295    
296    }