001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashMap;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.zip.GZIPOutputStream;
026    
027    import org.apache.flume.event.SimpleEvent;
028    import org.apache.logging.log4j.Level;
029    import org.apache.logging.log4j.LoggingException;
030    import org.apache.logging.log4j.Marker;
031    import org.apache.logging.log4j.ThreadContext;
032    import org.apache.logging.log4j.core.LogEvent;
033    import org.apache.logging.log4j.core.impl.ThrowableProxy;
034    import org.apache.logging.log4j.core.util.Patterns;
035    import org.apache.logging.log4j.core.util.UuidUtil;
036    import org.apache.logging.log4j.message.MapMessage;
037    import org.apache.logging.log4j.message.Message;
038    import org.apache.logging.log4j.message.StructuredDataId;
039    import org.apache.logging.log4j.message.StructuredDataMessage;
040    import org.apache.logging.log4j.util.Strings;
041    
042    /**
043     * Class that is both a Flume and Log4j Event.
044     */
045    public class FlumeEvent extends SimpleEvent implements LogEvent {
046    
047        static final String GUID = "guId";
048        /**
049         * Generated serial version ID.
050         */
051        private static final long serialVersionUID = -8988674608627854140L;
052    
053        private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
054    
055        private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
056    
057        private static final String EVENT_TYPE = "eventType";
058    
059        private static final String EVENT_ID = "eventId";
060    
061        private static final String TIMESTAMP = "timeStamp";
062    
063        private final LogEvent event;
064    
065        private final Map<String, String> contextMap = new HashMap<String, String>();
066    
067        private final boolean compress;
068    
069        /**
070         * Construct the FlumeEvent.
071         * @param event The Log4j LogEvent.
072         * @param includes A comma separated list of MDC elements to include.
073         * @param excludes A comma separated list of MDC elements to exclude.
074         * @param required A comma separated list of MDC elements that are required to be defined.
075         * @param mdcPrefix The value to prefix to MDC keys.
076         * @param eventPrefix The value to prefix to event keys.
077         * @param compress If true the event body should be compressed.
078         */
079        public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
080                          String mdcPrefix, String eventPrefix, final boolean compress) {
081            this.event = event;
082            this.compress = compress;
083            final Map<String, String> headers = getHeaders();
084            headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
085            if (mdcPrefix == null) {
086                mdcPrefix = DEFAULT_MDC_PREFIX;
087            }
088            if (eventPrefix == null) {
089                eventPrefix = DEFAULT_EVENT_PREFIX;
090            }
091            final Map<String, String> mdc = event.getContextMap();
092            if (includes != null) {
093                final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
094                if (array.length > 0) {
095                    for (String str : array) {
096                        str = str.trim();
097                        if (mdc.containsKey(str)) {
098                            contextMap.put(str, mdc.get(str));
099                        }
100                    }
101                }
102            } else if (excludes != null) {
103                final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
104                if (array.length > 0) {
105                    final List<String> list = new ArrayList<String>(array.length);
106                    for (final String value : array) {
107                        list.add(value.trim());
108                    }
109                    for (final Map.Entry<String, String> entry : mdc.entrySet()) {
110                        if (!list.contains(entry.getKey())) {
111                            contextMap.put(entry.getKey(), entry.getValue());
112                        }
113                    }
114                }
115            } else {
116                contextMap.putAll(mdc);
117            }
118    
119            if (required != null) {
120                final String[] array = required.split(Patterns.COMMA_SEPARATOR);
121                if (array.length > 0) {
122                    for (String str : array) {
123                        str = str.trim();
124                        if (!mdc.containsKey(str)) {
125                            throw new LoggingException("Required key " + str + " is missing from the MDC");
126                        }
127                    }
128                }
129            }
130            final String guid =  UuidUtil.getTimeBasedUuid().toString();
131            final Message message = event.getMessage();
132            if (message instanceof MapMessage) {
133                // Add the guid to the Map so that it can be included in the Layout.
134                ((MapMessage) message).put(GUID, guid);
135                if (message instanceof StructuredDataMessage) {
136                    addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
137                }
138                addMapData(eventPrefix, headers, (MapMessage) message);
139            } else {
140                headers.put(GUID, guid);
141            }
142    
143            addContextData(mdcPrefix, headers, contextMap);
144        }
145    
146        protected void addStructuredData(final String prefix, final Map<String, String> fields,
147                                         final StructuredDataMessage msg) {
148            fields.put(prefix + EVENT_TYPE, msg.getType());
149            final StructuredDataId id = msg.getId();
150            fields.put(prefix + EVENT_ID, id.getName());
151        }
152    
153        protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
154            final Map<String, String> data = msg.getData();
155            for (final Map.Entry<String, String> entry : data.entrySet()) {
156                fields.put(prefix + entry.getKey(), entry.getValue());
157            }
158        }
159    
160        protected void addContextData(final String prefix, final Map<String, String> fields,
161                                      final Map<String, String> context) {
162            final Map<String, String> map = new HashMap<String, String>();
163            for (final Map.Entry<String, String> entry : context.entrySet()) {
164                if (entry.getKey() != null && entry.getValue() != null) {
165                    fields.put(prefix + entry.getKey(), entry.getValue());
166                    map.put(prefix + entry.getKey(), entry.getValue());
167                }
168            }
169            context.clear();
170            context.putAll(map);
171        }
172    
173        /**
174         * Set the body in the event.
175         * @param body The body to add to the event.
176         */
177        @Override
178        public void setBody(final byte[] body) {
179            if (body == null || body.length == 0) {
180                super.setBody(new byte[0]);
181                return;
182            }
183            if (compress) {
184                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185                try {
186                    final GZIPOutputStream os = new GZIPOutputStream(baos);
187                    os.write(body);
188                    os.close();
189                } catch (final IOException ioe) {
190                    throw new LoggingException("Unable to compress message", ioe);
191                }
192                super.setBody(baos.toByteArray());
193            } else {
194                super.setBody(body);
195            }
196        }
197    
198        /**
199         * Get the Frequently Qualified Class Name.
200         * @return the FQCN String.
201         */
202        @Override
203        public String getLoggerFqcn() {
204            return event.getLoggerFqcn();
205        }
206    
207        /**
208         * Returns the logging Level.
209         * @return the Level.
210         */
211        @Override
212        public Level getLevel() {
213            return event.getLevel();
214        }
215    
216        /**
217         * Returns the logger name.
218         * @return the logger name.
219         */
220        @Override
221        public String getLoggerName() {
222            return event.getLoggerName();
223        }
224    
225        /**
226         * Returns the StackTraceElement for the caller of the logging API.
227         * @return the StackTraceElement of the caller.
228         */
229        @Override
230        public StackTraceElement getSource() {
231            return event.getSource();
232        }
233    
234        /**
235         * Returns the Message.
236         * @return the Message.
237         */
238        @Override
239        public Message getMessage() {
240            return event.getMessage();
241        }
242    
243        /**
244         * Returns the Marker.
245         * @return the Marker.
246         */
247        @Override
248        public Marker getMarker() {
249            return event.getMarker();
250        }
251    
252        /**
253         * Returns the name of the Thread.
254         * @return the name of the Thread.
255         */
256        @Override
257        public String getThreadName() {
258            return event.getThreadName();
259        }
260    
261        /**
262         * Returns the event timestamp.
263         * @return the event timestamp.
264         */
265        @Override
266        public long getTimeMillis() {
267            return event.getTimeMillis();
268        }
269    
270        /**
271         * Returns the Throwable associated with the event, if any.
272         * @return the Throwable.
273         */
274        @Override
275        public Throwable getThrown() {
276            return event.getThrown();
277        }
278    
279        /**
280         * Returns the Throwable associated with the event, if any.
281         * @return the Throwable.
282         */
283        @Override
284        public ThrowableProxy getThrownProxy() {
285            return event.getThrownProxy();
286        }
287    
288        /**
289         * Returns a copy of the context Map.
290         * @return a copy of the context Map.
291         */
292        @Override
293        public Map<String, String> getContextMap() {
294            return contextMap;
295        }
296    
297        /**
298         * Returns a copy of the context stack.
299         * @return a copy of the context stack.
300         */
301        @Override
302        public ThreadContext.ContextStack getContextStack() {
303            return event.getContextStack();
304        }
305    
306        @Override
307        public boolean isIncludeLocation() {
308            return event.isIncludeLocation();
309        }
310    
311        @Override
312        public void setIncludeLocation(final boolean includeLocation) {
313            event.setIncludeLocation(includeLocation);
314        }
315    
316        @Override
317        public boolean isEndOfBatch() {
318            return event.isEndOfBatch();
319        }
320    
321        @Override
322        public void setEndOfBatch(final boolean endOfBatch) {
323            event.setEndOfBatch(endOfBatch);
324        }
325    }