001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import java.io.ByteArrayOutputStream; 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.HashMap; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.zip.GZIPOutputStream; 026 027 import org.apache.flume.event.SimpleEvent; 028 import org.apache.logging.log4j.Level; 029 import org.apache.logging.log4j.LoggingException; 030 import org.apache.logging.log4j.Marker; 031 import org.apache.logging.log4j.ThreadContext; 032 import org.apache.logging.log4j.core.LogEvent; 033 import org.apache.logging.log4j.core.impl.ThrowableProxy; 034 import org.apache.logging.log4j.core.util.Patterns; 035 import org.apache.logging.log4j.core.util.UuidUtil; 036 import org.apache.logging.log4j.message.MapMessage; 037 import org.apache.logging.log4j.message.Message; 038 import org.apache.logging.log4j.message.StructuredDataId; 039 import org.apache.logging.log4j.message.StructuredDataMessage; 040 import org.apache.logging.log4j.util.Strings; 041 042 /** 043 * Class that is both a Flume and Log4j Event. 044 */ 045 public class FlumeEvent extends SimpleEvent implements LogEvent { 046 047 static final String GUID = "guId"; 048 /** 049 * Generated serial version ID. 050 */ 051 private static final long serialVersionUID = -8988674608627854140L; 052 053 private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY; 054 055 private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY; 056 057 private static final String EVENT_TYPE = "eventType"; 058 059 private static final String EVENT_ID = "eventId"; 060 061 private static final String TIMESTAMP = "timeStamp"; 062 063 private final LogEvent event; 064 065 private final Map<String, String> contextMap = new HashMap<String, String>(); 066 067 private final boolean compress; 068 069 /** 070 * Construct the FlumeEvent. 071 * @param event The Log4j LogEvent. 072 * @param includes A comma separated list of MDC elements to include. 073 * @param excludes A comma separated list of MDC elements to exclude. 074 * @param required A comma separated list of MDC elements that are required to be defined. 075 * @param mdcPrefix The value to prefix to MDC keys. 076 * @param eventPrefix The value to prefix to event keys. 077 * @param compress If true the event body should be compressed. 078 */ 079 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required, 080 String mdcPrefix, String eventPrefix, final boolean compress) { 081 this.event = event; 082 this.compress = compress; 083 final Map<String, String> headers = getHeaders(); 084 headers.put(TIMESTAMP, Long.toString(event.getTimeMillis())); 085 if (mdcPrefix == null) { 086 mdcPrefix = DEFAULT_MDC_PREFIX; 087 } 088 if (eventPrefix == null) { 089 eventPrefix = DEFAULT_EVENT_PREFIX; 090 } 091 final Map<String, String> mdc = event.getContextMap(); 092 if (includes != null) { 093 final String[] array = includes.split(Patterns.COMMA_SEPARATOR); 094 if (array.length > 0) { 095 for (String str : array) { 096 str = str.trim(); 097 if (mdc.containsKey(str)) { 098 contextMap.put(str, mdc.get(str)); 099 } 100 } 101 } 102 } else if (excludes != null) { 103 final String[] array = excludes.split(Patterns.COMMA_SEPARATOR); 104 if (array.length > 0) { 105 final List<String> list = new ArrayList<String>(array.length); 106 for (final String value : array) { 107 list.add(value.trim()); 108 } 109 for (final Map.Entry<String, String> entry : mdc.entrySet()) { 110 if (!list.contains(entry.getKey())) { 111 contextMap.put(entry.getKey(), entry.getValue()); 112 } 113 } 114 } 115 } else { 116 contextMap.putAll(mdc); 117 } 118 119 if (required != null) { 120 final String[] array = required.split(Patterns.COMMA_SEPARATOR); 121 if (array.length > 0) { 122 for (String str : array) { 123 str = str.trim(); 124 if (!mdc.containsKey(str)) { 125 throw new LoggingException("Required key " + str + " is missing from the MDC"); 126 } 127 } 128 } 129 } 130 final String guid = UuidUtil.getTimeBasedUuid().toString(); 131 final Message message = event.getMessage(); 132 if (message instanceof MapMessage) { 133 // Add the guid to the Map so that it can be included in the Layout. 134 ((MapMessage) message).put(GUID, guid); 135 if (message instanceof StructuredDataMessage) { 136 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); 137 } 138 addMapData(eventPrefix, headers, (MapMessage) message); 139 } else { 140 headers.put(GUID, guid); 141 } 142 143 addContextData(mdcPrefix, headers, contextMap); 144 } 145 146 protected void addStructuredData(final String prefix, final Map<String, String> fields, 147 final StructuredDataMessage msg) { 148 fields.put(prefix + EVENT_TYPE, msg.getType()); 149 final StructuredDataId id = msg.getId(); 150 fields.put(prefix + EVENT_ID, id.getName()); 151 } 152 153 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) { 154 final Map<String, String> data = msg.getData(); 155 for (final Map.Entry<String, String> entry : data.entrySet()) { 156 fields.put(prefix + entry.getKey(), entry.getValue()); 157 } 158 } 159 160 protected void addContextData(final String prefix, final Map<String, String> fields, 161 final Map<String, String> context) { 162 final Map<String, String> map = new HashMap<String, String>(); 163 for (final Map.Entry<String, String> entry : context.entrySet()) { 164 if (entry.getKey() != null && entry.getValue() != null) { 165 fields.put(prefix + entry.getKey(), entry.getValue()); 166 map.put(prefix + entry.getKey(), entry.getValue()); 167 } 168 } 169 context.clear(); 170 context.putAll(map); 171 } 172 173 /** 174 * Set the body in the event. 175 * @param body The body to add to the event. 176 */ 177 @Override 178 public void setBody(final byte[] body) { 179 if (body == null || body.length == 0) { 180 super.setBody(new byte[0]); 181 return; 182 } 183 if (compress) { 184 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 185 try { 186 final GZIPOutputStream os = new GZIPOutputStream(baos); 187 os.write(body); 188 os.close(); 189 } catch (final IOException ioe) { 190 throw new LoggingException("Unable to compress message", ioe); 191 } 192 super.setBody(baos.toByteArray()); 193 } else { 194 super.setBody(body); 195 } 196 } 197 198 /** 199 * Get the Frequently Qualified Class Name. 200 * @return the FQCN String. 201 */ 202 @Override 203 public String getLoggerFqcn() { 204 return event.getLoggerFqcn(); 205 } 206 207 /** 208 * Returns the logging Level. 209 * @return the Level. 210 */ 211 @Override 212 public Level getLevel() { 213 return event.getLevel(); 214 } 215 216 /** 217 * Returns the logger name. 218 * @return the logger name. 219 */ 220 @Override 221 public String getLoggerName() { 222 return event.getLoggerName(); 223 } 224 225 /** 226 * Returns the StackTraceElement for the caller of the logging API. 227 * @return the StackTraceElement of the caller. 228 */ 229 @Override 230 public StackTraceElement getSource() { 231 return event.getSource(); 232 } 233 234 /** 235 * Returns the Message. 236 * @return the Message. 237 */ 238 @Override 239 public Message getMessage() { 240 return event.getMessage(); 241 } 242 243 /** 244 * Returns the Marker. 245 * @return the Marker. 246 */ 247 @Override 248 public Marker getMarker() { 249 return event.getMarker(); 250 } 251 252 /** 253 * Returns the name of the Thread. 254 * @return the name of the Thread. 255 */ 256 @Override 257 public String getThreadName() { 258 return event.getThreadName(); 259 } 260 261 /** 262 * Returns the event timestamp. 263 * @return the event timestamp. 264 */ 265 @Override 266 public long getTimeMillis() { 267 return event.getTimeMillis(); 268 } 269 270 /** 271 * Returns the Throwable associated with the event, if any. 272 * @return the Throwable. 273 */ 274 @Override 275 public Throwable getThrown() { 276 return event.getThrown(); 277 } 278 279 /** 280 * Returns the Throwable associated with the event, if any. 281 * @return the Throwable. 282 */ 283 @Override 284 public ThrowableProxy getThrownProxy() { 285 return event.getThrownProxy(); 286 } 287 288 /** 289 * Returns a copy of the context Map. 290 * @return a copy of the context Map. 291 */ 292 @Override 293 public Map<String, String> getContextMap() { 294 return contextMap; 295 } 296 297 /** 298 * Returns a copy of the context stack. 299 * @return a copy of the context stack. 300 */ 301 @Override 302 public ThreadContext.ContextStack getContextStack() { 303 return event.getContextStack(); 304 } 305 306 @Override 307 public boolean isIncludeLocation() { 308 return event.isIncludeLocation(); 309 } 310 311 @Override 312 public void setIncludeLocation(final boolean includeLocation) { 313 event.setIncludeLocation(includeLocation); 314 } 315 316 @Override 317 public boolean isEndOfBatch() { 318 return event.isEndOfBatch(); 319 } 320 321 @Override 322 public void setEndOfBatch(final boolean endOfBatch) { 323 event.setEndOfBatch(endOfBatch); 324 } 325 }