001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import java.util.Properties; 020 021 import org.apache.flume.Event; 022 import org.apache.flume.api.RpcClient; 023 import org.apache.flume.api.RpcClientFactory; 024 import org.apache.logging.log4j.core.appender.AppenderLoggingException; 025 import org.apache.logging.log4j.core.appender.ManagerFactory; 026 027 /** 028 * Manager for FlumeAvroAppenders. 029 */ 030 public class FlumeAvroManager extends AbstractFlumeManager { 031 032 private static final int MAX_RECONNECTS = 3; 033 private static final int MINIMUM_TIMEOUT = 1000; 034 035 private static AvroManagerFactory factory = new AvroManagerFactory(); 036 037 private final Agent[] agents; 038 039 private final int batchSize; 040 041 private final int retries; 042 043 private final int connectTimeoutMillis; 044 045 private final int requestTimeoutMillis; 046 047 private final int current = 0; 048 049 private RpcClient rpcClient = null; 050 051 /** 052 * Constructor 053 * @param name The unique name of this manager. 054 * @param agents An array of Agents. 055 * @param batchSize The number of events to include in a batch. 056 * @param retries The number of times to retry connecting before giving up. 057 * @param connectTimeout The connection timeout in ms. 058 * @param requestTimeout The request timeout in ms. 059 * 060 */ 061 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize, 062 final int retries, final int connectTimeout, final int requestTimeout) { 063 super(name); 064 this.agents = agents; 065 this.batchSize = batchSize; 066 this.retries = retries; 067 this.connectTimeoutMillis = connectTimeout; 068 this.requestTimeoutMillis = requestTimeout; 069 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout); 070 } 071 072 /** 073 * Returns a FlumeAvroManager. 074 * @param name The name of the manager. 075 * @param agents The agents to use. 076 * @param batchSize The number of events to include in a batch. 077 * @param retries The number of times to retry connecting before giving up. 078 * @param connectTimeoutMillis The connection timeout in ms. 079 * @param requestTimeoutMillis The request timeout in ms. 080 * @return A FlumeAvroManager. 081 */ 082 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, 083 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 084 if (agents == null || agents.length == 0) { 085 throw new IllegalArgumentException("At least one agent is required"); 086 } 087 088 if (batchSize <= 0) { 089 batchSize = 1; 090 } 091 092 final StringBuilder sb = new StringBuilder("FlumeAvro["); 093 boolean first = true; 094 for (final Agent agent : agents) { 095 if (!first) { 096 sb.append(','); 097 } 098 sb.append(agent.getHost()).append(':').append(agent.getPort()); 099 first = false; 100 } 101 sb.append(']'); 102 return getManager(sb.toString(), factory, 103 new FactoryData(name, agents, batchSize, retries, connectTimeoutMillis, requestTimeoutMillis)); 104 } 105 106 /** 107 * Returns the agents. 108 * @return The agent array. 109 */ 110 public Agent[] getAgents() { 111 return agents; 112 } 113 114 /** 115 * Returns the index of the current agent. 116 * @return The index for the current agent. 117 */ 118 public int getCurrent() { 119 return current; 120 } 121 122 public int getRetries() { 123 return retries; 124 } 125 126 public int getConnectTimeoutMillis() { 127 return connectTimeoutMillis; 128 } 129 130 public int getRequestTimeoutMillis() { 131 return requestTimeoutMillis; 132 } 133 134 public int getBatchSize() { 135 return batchSize; 136 } 137 138 public synchronized void send(final BatchEvent events) { 139 if (rpcClient == null) { 140 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); 141 } 142 143 if (rpcClient != null) { 144 try { 145 LOGGER.trace("Sending batch of {} events", events.getEvents().size()); 146 rpcClient.appendBatch(events.getEvents()); 147 } catch (final Exception ex) { 148 rpcClient.close(); 149 rpcClient = null; 150 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 151 agents[current].getPort(); 152 LOGGER.warn(msg, ex); 153 throw new AppenderLoggingException("No Flume agents are available"); 154 } 155 } else { 156 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 157 agents[current].getPort(); 158 LOGGER.warn(msg); 159 throw new AppenderLoggingException("No Flume agents are available"); 160 } 161 } 162 163 @Override 164 public synchronized void send(final Event event) { 165 if (rpcClient == null) { 166 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); 167 } 168 169 if (rpcClient != null) { 170 try { 171 rpcClient.append(event); 172 } catch (final Exception ex) { 173 rpcClient.close(); 174 rpcClient = null; 175 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 176 agents[current].getPort(); 177 LOGGER.warn(msg, ex); 178 throw new AppenderLoggingException("No Flume agents are available"); 179 } 180 } else { 181 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' + 182 agents[current].getPort(); 183 LOGGER.warn(msg); 184 throw new AppenderLoggingException("No Flume agents are available"); 185 } 186 } 187 188 /** 189 * There is a very good chance that this will always return the first agent even if it isn't available. 190 * @param agents The list of agents to choose from 191 * @return The FlumeEventAvroServer. 192 */ 193 private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { 194 try { 195 final Properties props = new Properties(); 196 197 props.put("client.type", "default_failover"); 198 199 int count = 1; 200 final StringBuilder sb = new StringBuilder(); 201 for (final Agent agent : agents) { 202 if (sb.length() > 0) { 203 sb.append(' '); 204 } 205 final String hostName = "host" + count++; 206 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort()); 207 sb.append(hostName); 208 } 209 props.put("hosts", sb.toString()); 210 if (batchSize > 0) { 211 props.put("batch-size", Integer.toString(batchSize)); 212 } 213 if (retries > 1) { 214 if (retries > MAX_RECONNECTS) { 215 retries = MAX_RECONNECTS; 216 } 217 props.put("max-attempts", Integer.toString(retries * agents.length)); 218 } 219 if (requestTimeoutMillis >= MINIMUM_TIMEOUT) { 220 props.put("request-timeout", Integer.toString(requestTimeoutMillis)); 221 } 222 if (connectTimeoutMillis >= MINIMUM_TIMEOUT) { 223 props.put("connect-timeout", Integer.toString(connectTimeoutMillis)); 224 } 225 return RpcClientFactory.getInstance(props); 226 } catch (final Exception ex) { 227 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage()); 228 return null; 229 } 230 } 231 232 @Override 233 protected void releaseSub() { 234 if (rpcClient != null) { 235 try { 236 rpcClient.close(); 237 } catch (final Exception ex) { 238 LOGGER.error("Attempt to close RPC client failed", ex); 239 } 240 } 241 rpcClient = null; 242 } 243 244 /** 245 * Factory data. 246 */ 247 private static class FactoryData { 248 private final String name; 249 private final Agent[] agents; 250 private final int batchSize; 251 private final int retries; 252 private final int conntectTimeoutMillis; 253 private final int requestTimeoutMillis; 254 255 /** 256 * Constructor. 257 * @param name The name of the Appender. 258 * @param agents The agents. 259 * @param batchSize The number of events to include in a batch. 260 */ 261 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 262 final int connectTimeoutMillis, final int requestTimeoutMillis) { 263 this.name = name; 264 this.agents = agents; 265 this.batchSize = batchSize; 266 this.retries = retries; 267 this.conntectTimeoutMillis = connectTimeoutMillis; 268 this.requestTimeoutMillis = requestTimeoutMillis; 269 } 270 } 271 272 /** 273 * Avro Manager Factory. 274 */ 275 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> { 276 277 /** 278 * Create the FlumeAvroManager. 279 * @param name The name of the entity to manage. 280 * @param data The data required to create the entity. 281 * @return The FlumeAvroManager. 282 */ 283 @Override 284 public FlumeAvroManager createManager(final String name, final FactoryData data) { 285 try { 286 287 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries, 288 data.conntectTimeoutMillis, data.requestTimeoutMillis); 289 } catch (final Exception ex) { 290 LOGGER.error("Could not create FlumeAvroManager", ex); 291 } 292 return null; 293 } 294 } 295 296 }