001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.nio.charset.Charset;
025    import java.util.HashMap;
026    import java.util.Map;
027    import java.util.concurrent.Callable;
028    import java.util.concurrent.ExecutorService;
029    import java.util.concurrent.Executors;
030    import java.util.concurrent.Future;
031    import java.util.concurrent.ThreadFactory;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicInteger;
034    import java.util.concurrent.atomic.AtomicLong;
035    
036    import javax.crypto.Cipher;
037    import javax.crypto.SecretKey;
038    
039    import org.apache.flume.Event;
040    import org.apache.flume.event.SimpleEvent;
041    import org.apache.logging.log4j.LoggingException;
042    import org.apache.logging.log4j.core.appender.ManagerFactory;
043    import org.apache.logging.log4j.core.config.Property;
044    import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
045    import org.apache.logging.log4j.core.config.plugins.util.PluginType;
046    import org.apache.logging.log4j.core.util.FileUtils;
047    import org.apache.logging.log4j.core.util.SecretKeyProvider;
048    import org.apache.logging.log4j.util.Strings;
049    
050    import com.sleepycat.je.Cursor;
051    import com.sleepycat.je.CursorConfig;
052    import com.sleepycat.je.Database;
053    import com.sleepycat.je.DatabaseConfig;
054    import com.sleepycat.je.DatabaseEntry;
055    import com.sleepycat.je.Environment;
056    import com.sleepycat.je.EnvironmentConfig;
057    import com.sleepycat.je.LockConflictException;
058    import com.sleepycat.je.LockMode;
059    import com.sleepycat.je.OperationStatus;
060    import com.sleepycat.je.StatsConfig;
061    import com.sleepycat.je.Transaction;
062    
063    /**
064     * Manager that persists data to Berkeley DB before passing it on to Flume.
065     */
066    public class FlumePersistentManager extends FlumeAvroManager {
067    
068        /** Attribute name for the key provider. */
069        public static final String KEY_PROVIDER = "keyProvider";
070    
071        private static final Charset UTF8 = Charset.forName("UTF-8");
072    
073        private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
074    
075        private static final int SHUTDOWN_WAIT = 60;
076    
077        private static final int MILLIS_PER_SECOND = 1000;
078    
079        private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
080    
081        private static BDBManagerFactory factory = new BDBManagerFactory();
082    
083        private final Database database;
084    
085        private final Environment environment;
086    
087        private final WriterThread worker;
088    
089        private final Gate gate = new Gate();
090    
091        private final SecretKey secretKey;
092    
093        private final int delayMillis;
094    
095        private final int lockTimeoutRetryCount;
096    
097        private final ExecutorService threadPool;
098    
099        private final AtomicLong dbCount = new AtomicLong();
100    
101        /**
102         * Constructor
103         * @param name The unique name of this manager.
104         * @param shortName Original name for the Manager.
105         * @param agents An array of Agents.
106         * @param batchSize The number of events to include in a batch.
107         * @param retries The number of times to retry connecting before giving up.
108         * @param connectionTimeout The amount of time to wait for a connection to be established.
109         * @param requestTimeout The amount of time to wair for a response to a request.
110         * @param delay The amount of time to wait between retries.
111         * @param database The database to write to.
112         * @param environment The database environment.
113         * @param secretKey The SecretKey to use for encryption.
114         * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
115         */
116        protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117                                         final int batchSize, final int retries, final int connectionTimeout,
118                                         final int requestTimeout, final int delay, final Database database,
119                                         final Environment environment, final SecretKey secretKey,
120                                         final int lockTimeoutRetryCount) {
121            super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
122            this.delayMillis = delay;
123            this.database = database;
124            this.environment = environment;
125            dbCount.set(database.count());
126            this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
127                lockTimeoutRetryCount);
128            this.worker.start();
129            this.secretKey = secretKey;
130            this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
131            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
132        }
133    
134    
135        /**
136         * Returns a FlumeAvroManager.
137         * @param name The name of the manager.
138         * @param agents The agents to use.
139         * @param properties Properties to pass to the Manager.
140         * @param batchSize The number of events to include in a batch.
141         * @param retries The number of times to retry connecting before giving up.
142         * @param connectionTimeout The amount of time to wait to establish a connection.
143         * @param requestTimeout The amount of time to wait for a response to a request.
144         * @param delayMillis Amount of time to delay before delivering a batch.
145         * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
146         * @param dataDir The location of the Berkeley database.
147         * @return A FlumeAvroManager.
148         */
149        public static FlumePersistentManager getManager(final String name, final Agent[] agents,
150                                                        final Property[] properties, int batchSize, final int retries,
151                                                        final int connectionTimeout, final int requestTimeout,
152                                                        final int delayMillis, final int lockTimeoutRetryCount,
153                                                        final String dataDir) {
154            if (agents == null || agents.length == 0) {
155                throw new IllegalArgumentException("At least one agent is required");
156            }
157    
158            if (batchSize <= 0) {
159                batchSize = 1;
160            }
161            final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
162    
163            final StringBuilder sb = new StringBuilder("FlumePersistent[");
164            boolean first = true;
165            for (final Agent agent : agents) {
166                if (!first) {
167                    sb.append(',');
168                }
169                sb.append(agent.getHost()).append(':').append(agent.getPort());
170                first = false;
171            }
172            sb.append(']');
173            sb.append(' ').append(dataDirectory);
174            return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
175                connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
176        }
177    
178        @Override
179        public void send(final Event event)  {
180            if (worker.isShutdown()) {
181                throw new LoggingException("Unable to record event");
182            }
183    
184            final Map<String, String> headers = event.getHeaders();
185            final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
186            try {
187                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
188                final DataOutputStream daos = new DataOutputStream(baos);
189                daos.writeInt(event.getBody().length);
190                daos.write(event.getBody(), 0, event.getBody().length);
191                daos.writeInt(event.getHeaders().size());
192                for (final Map.Entry<String, String> entry : headers.entrySet()) {
193                    daos.writeUTF(entry.getKey());
194                    daos.writeUTF(entry.getValue());
195                }
196                byte[] eventData = baos.toByteArray();
197                if (secretKey != null) {
198                    final Cipher cipher = Cipher.getInstance("AES");
199                    cipher.init(Cipher.ENCRYPT_MODE, secretKey);
200                    eventData = cipher.doFinal(eventData);
201                }
202                final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
203                    gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
204                boolean interrupted = false;
205                int count = 0;
206                do {
207                    try {
208                        future.get();
209                    } catch (final InterruptedException ie) {
210                        interrupted = true;
211                        ++count;
212                    }
213                } while (interrupted && count <= 1);
214    
215            } catch (final Exception ex) {
216                throw new LoggingException("Exception occurred writing log event", ex);
217            }
218        }
219    
220        @Override
221        protected void releaseSub() {
222            LOGGER.debug("Shutting down FlumePersistentManager");
223            worker.shutdown();
224            try {
225                worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
226            } catch (final InterruptedException ie) {
227                // Ignore the exception and shutdown.
228            }
229            threadPool.shutdown();
230            try {
231                threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
232            } catch (final InterruptedException ie) {
233                LOGGER.warn("PersistentManager Thread pool failed to shut down");
234            }
235            try {
236                worker.join();
237            } catch (final InterruptedException ex) {
238                LOGGER.debug("Interrupted while waiting for worker to complete");
239            }
240            try {
241                LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
242                database.close();
243            } catch (final Exception ex) {
244                LOGGER.warn("Failed to close database", ex);
245            }
246            try {
247                environment.cleanLog();
248                environment.close();
249            } catch (final Exception ex) {
250                LOGGER.warn("Failed to close environment", ex);
251            }
252            super.releaseSub();
253        }
254    
255        private void doSend(final SimpleEvent event) {
256            LOGGER.debug("Sending event to Flume");
257            super.send(event);
258        }
259    
260        /**
261         * Thread for writing to Berkeley DB to avoid having interrupts close the database.
262         */
263        private static class BDBWriter implements Callable<Integer> {
264            private final byte[] eventData;
265            private final byte[] keyData;
266            private final Environment environment;
267            private final Database database;
268            private final Gate gate;
269            private final AtomicLong dbCount;
270            private final long batchSize;
271            private final int lockTimeoutRetryCount;
272    
273            public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
274                             final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
275                             final int lockTimeoutRetryCount) {
276                this.keyData = keyData;
277                this.eventData = eventData;
278                this.environment = environment;
279                this.database = database;
280                this.gate = gate;
281                this.dbCount = dbCount;
282                this.batchSize = batchSize;
283                this.lockTimeoutRetryCount = lockTimeoutRetryCount;
284            }
285    
286            @Override
287            public Integer call() throws Exception {
288                final DatabaseEntry key = new DatabaseEntry(keyData);
289                final DatabaseEntry data = new DatabaseEntry(eventData);
290                Exception exception = null;
291                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
292                    Transaction txn = null;
293                    try {
294                        txn = environment.beginTransaction(null, null);
295                        try {
296                            database.put(txn, key, data);
297                            txn.commit();
298                            txn = null;
299                            if (dbCount.incrementAndGet() >= batchSize) {
300                                gate.open();
301                            }
302                            exception = null;
303                            break;
304                        } catch (final LockConflictException lce) {
305                            exception = lce;
306                            // Fall through and retry.
307                        } catch (final Exception ex) {
308                            if (txn != null) {
309                                txn.abort();
310                            }
311                            throw ex;
312                        } finally {
313                            if (txn != null) {
314                                txn.abort();
315                                txn = null;
316                            }
317                        }
318                    } catch (final LockConflictException lce) {
319                        exception = lce;
320                        if (txn != null) {
321                            try {
322                                txn.abort();
323                                txn = null;
324                            } catch (final Exception ex) {
325                                LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
326                            }
327                        }
328    
329                    }
330                    try {
331                        Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
332                    } catch (final InterruptedException ie) {
333                        // Ignore the error
334                    }
335                }
336                if (exception != null) {
337                    throw exception;
338                }
339                return eventData.length;
340            }
341        }
342    
343        /**
344         * Factory data.
345         */
346        private static class FactoryData {
347            private final String name;
348            private final Agent[] agents;
349            private final int batchSize;
350            private final String dataDir;
351            private final int retries;
352            private final int connectionTimeout;
353            private final int requestTimeout;
354            private final int delayMillis;
355            private final int lockTimeoutRetryCount;
356            private final Property[] properties;
357    
358            /**
359             * Constructor.
360             * @param name The name of the Appender.
361             * @param agents The agents.
362             * @param batchSize The number of events to include in a batch.
363             * @param dataDir The directory for data.
364             */
365            public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
366                               final int connectionTimeout, final int requestTimeout, final int delayMillis,
367                               final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
368                this.name = name;
369                this.agents = agents;
370                this.batchSize = batchSize;
371                this.dataDir = dataDir;
372                this.retries = retries;
373                this.connectionTimeout = connectionTimeout;
374                this.requestTimeout = requestTimeout;
375                this.delayMillis = delayMillis;
376                this.lockTimeoutRetryCount = lockTimeoutRetryCount;
377                this.properties = properties;
378            }
379        }
380    
381        /**
382         * Avro Manager Factory.
383         */
384        private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
385    
386            /**
387             * Create the FlumeKratiManager.
388             * @param name The name of the entity to manage.
389             * @param data The data required to create the entity.
390             * @return The FlumeKratiManager.
391             */
392            @Override
393            public FlumePersistentManager createManager(final String name, final FactoryData data) {
394                SecretKey secretKey = null;
395                Database database = null;
396                Environment environment = null;
397    
398                final Map<String, String> properties = new HashMap<String, String>();
399                if (data.properties != null) {
400                    for (final Property property : data.properties) {
401                        properties.put(property.getName(), property.getValue());
402                    }
403                }
404    
405                try {
406                    final File dir = new File(data.dataDir);
407                    FileUtils.mkdir(dir, true);
408                    final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
409                    dbEnvConfig.setTransactional(true);
410                    dbEnvConfig.setAllowCreate(true);
411                    dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
412                    environment = new Environment(dir, dbEnvConfig);
413                    final DatabaseConfig dbConfig = new DatabaseConfig();
414                    dbConfig.setTransactional(true);
415                    dbConfig.setAllowCreate(true);
416                    database = environment.openDatabase(null, name, dbConfig);
417                } catch (final Exception ex) {
418                    LOGGER.error("Could not create FlumePersistentManager", ex);
419                    // For consistency, close database as well as environment even though it should never happen since the
420                    // database is that last thing in the block above, but this does guard against a future line being
421                    // inserted at the end that would bomb (like some debug logging).
422                    if (database != null) {
423                        database.close();
424                        database = null;
425                    }
426                    if (environment != null) {
427                        environment.close();
428                        environment = null;
429                    }
430                    return null;
431                }
432    
433                try {
434                    String key = null;
435                    for (final Map.Entry<String, String> entry : properties.entrySet()) {
436                        if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
437                            key = entry.getValue();
438                            break;
439                        }
440                    }
441                    if (key != null) {
442                        final PluginManager manager = new PluginManager("KeyProvider");
443                        manager.collectPlugins();
444                        final Map<String, PluginType<?>> plugins = manager.getPlugins();
445                        if (plugins != null) {
446                            boolean found = false;
447                            for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
448                                if (entry.getKey().equalsIgnoreCase(key)) {
449                                    found = true;
450                                    final Class<?> cl = entry.getValue().getPluginClass();
451                                    try {
452                                        final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
453                                        secretKey = provider.getSecretKey();
454                                        LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
455                                    } catch (final Exception ex) {
456                                        LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
457                                            cl.getName());
458                                    }
459                                    break;
460                                }
461                            }
462                            if (!found) {
463                                LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464                            }
465                        } else {
466                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
467                        }
468                    }
469                } catch (final Exception ex) {
470                    LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
471                }
472                return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
473                    data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
474                    data.lockTimeoutRetryCount);
475            }
476        }
477    
478        /**
479         * Thread that sends data to Flume and pulls it from Berkeley DB.
480         */
481        private static class WriterThread extends Thread  {
482            private volatile boolean shutdown = false;
483            private final Database database;
484            private final Environment environment;
485            private final FlumePersistentManager manager;
486            private final Gate gate;
487            private final SecretKey secretKey;
488            private final int batchSize;
489            private final AtomicLong dbCounter;
490            private final int lockTimeoutRetryCount;
491    
492            public WriterThread(final Database database, final Environment environment,
493                                final FlumePersistentManager manager, final Gate gate, final int batchsize,
494                                final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
495                this.database = database;
496                this.environment = environment;
497                this.manager = manager;
498                this.gate = gate;
499                this.batchSize = batchsize;
500                this.secretKey = secretKey;
501                this.setDaemon(true);
502                this.dbCounter = dbCount;
503                this.lockTimeoutRetryCount = lockTimeoutRetryCount;
504            }
505    
506            public void shutdown() {
507                LOGGER.debug("Writer thread shutting down");
508                this.shutdown = true;
509                gate.open();
510            }
511    
512            public boolean isShutdown() {
513                return shutdown;
514            }
515    
516            @Override
517            public void run() {
518                LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.delayMillis);
519                long nextBatchMillis = System.currentTimeMillis() + manager.delayMillis;
520                while (!shutdown) {
521                    final long nowMillis = System.currentTimeMillis();
522                    final long dbCount = database.count();
523                    dbCounter.set(dbCount);
524                    if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
525                        nextBatchMillis = nowMillis + manager.delayMillis;
526                        try {
527                            boolean errors = false;
528                            final DatabaseEntry key = new DatabaseEntry();
529                            final DatabaseEntry data = new DatabaseEntry();
530    
531                            gate.close();
532                            OperationStatus status;
533                            if (batchSize > 1) {
534                                try {
535                                    errors = sendBatch(key, data);
536                                } catch (final Exception ex) {
537                                    break;
538                                }
539                            } else {
540                                Exception exception = null;
541                                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
542                                    exception = null;
543                                    Transaction txn = null;
544                                    Cursor cursor = null;
545                                    try {
546                                        txn = environment.beginTransaction(null, null);
547                                        cursor = database.openCursor(txn, null);
548                                        try {
549                                            status = cursor.getFirst(key, data, LockMode.RMW);
550                                            while (status == OperationStatus.SUCCESS) {
551                                                final SimpleEvent event = createEvent(data);
552                                                if (event != null) {
553                                                    try {
554                                                        manager.doSend(event);
555                                                    } catch (final Exception ioe) {
556                                                        errors = true;
557                                                        LOGGER.error("Error sending event", ioe);
558                                                        break;
559                                                    }
560                                                    try {
561                                                        cursor.delete();
562                                                    } catch (final Exception ex) {
563                                                        LOGGER.error("Unable to delete event", ex);
564                                                    }
565                                                }
566                                                status = cursor.getNext(key, data, LockMode.RMW);
567                                            }
568                                            if (cursor != null) {
569                                                cursor.close();
570                                                cursor = null;
571                                            }
572                                            txn.commit();
573                                            txn = null;
574                                            dbCounter.decrementAndGet();
575                                            exception = null;
576                                            break;
577                                        } catch (final LockConflictException lce) {
578                                            exception = lce;
579                                            // Fall through and retry.
580                                        } catch (final Exception ex) {
581                                            LOGGER.error("Error reading or writing to database", ex);
582                                            shutdown = true;
583                                            break;
584                                        } finally {
585                                            if (cursor != null) {
586                                                cursor.close();
587                                                cursor = null;
588                                            }
589                                            if (txn != null) {
590                                                txn.abort();
591                                                txn = null;
592                                            }
593                                        }
594                                    } catch (final LockConflictException lce) {
595                                        exception = lce;
596                                        if (cursor != null) {
597                                            try {
598                                                cursor.close();
599                                                cursor = null;
600                                            } catch (final Exception ex) {
601                                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
602                                            }
603                                        }
604                                        if (txn != null) {
605                                            try {
606                                                txn.abort();
607                                                txn = null;
608                                            } catch (final Exception ex) {
609                                                LOGGER.trace("Ignored exception aborting tx during lock conflict.");
610                                            }
611                                        }
612                                    }
613                                    try {
614                                        Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
615                                    } catch (final InterruptedException ie) {
616                                        // Ignore the error
617                                    }
618                                }
619                                if (exception != null) {
620                                    LOGGER.error("Unable to read or update data base", exception);
621                                }
622                            }
623                            if (errors) {
624                                Thread.sleep(manager.delayMillis);
625                                continue;
626                            }
627                        } catch (final Exception ex) {
628                            LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
629                        }
630                    } else {
631                        if (nextBatchMillis <= nowMillis) {
632                            nextBatchMillis = nowMillis + manager.delayMillis;
633                        }
634                        try {
635                            final long interval = nextBatchMillis - nowMillis;
636                            gate.waitForOpen(interval);
637                        } catch (final InterruptedException ie) {
638                            LOGGER.warn("WriterThread interrupted, continuing");
639                        } catch (final Exception ex) {
640                            LOGGER.error("WriterThread encountered an exception waiting for work", ex);
641                            break;
642                        }
643                    }
644                }
645    
646                if (batchSize > 1 && database.count() > 0) {
647                    final DatabaseEntry key = new DatabaseEntry();
648                    final DatabaseEntry data = new DatabaseEntry();
649                    try {
650                        sendBatch(key, data);
651                    } catch (final Exception ex) {
652                        LOGGER.warn("Unable to write final batch");
653                    }
654                }
655                LOGGER.trace("WriterThread exiting");
656            }
657    
658            private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
659                boolean errors = false;
660                OperationStatus status;
661                Cursor cursor = null;
662                try {
663                    final BatchEvent batch = new BatchEvent();
664                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
665                            try {
666                                    cursor = database.openCursor(null, CursorConfig.DEFAULT);
667                                    status = cursor.getFirst(key, data, null);
668    
669                                    for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
670                                            final SimpleEvent event = createEvent(data);
671                                            if (event != null) {
672                                                    batch.addEvent(event);
673                                            }
674                                            status = cursor.getNext(key, data, null);
675                                    }
676                                    break;
677                            } catch (final LockConflictException lce) {
678                                    if (cursor != null) {
679                                            try {
680                                    cursor.close();
681                                    cursor = null;
682                                } catch (final Exception ex) {
683                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
684                                }
685                            }
686                        }
687                    }
688    
689                    try {
690                        manager.send(batch);
691                    } catch (final Exception ioe) {
692                        LOGGER.error("Error sending events", ioe);
693                        errors = true;
694                    }
695                    if (!errors) {
696                            if (cursor != null) {
697                                cursor.close();
698                                cursor = null;
699                            }
700                        Transaction txn = null;
701                        Exception exception = null;
702                        for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
703                            try {
704                                txn = environment.beginTransaction(null, null);
705                                try {
706                                    for (final Event event : batch.getEvents()) {
707                                        try {
708                                            final Map<String, String> headers = event.getHeaders();
709                                            key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
710                                            database.delete(txn, key);
711                                        } catch (final Exception ex) {
712                                            LOGGER.error("Error deleting key from database", ex);
713                                        }
714                                    }
715                                    txn.commit();
716                                    long count = dbCounter.get();
717                                    while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
718                                        count = dbCounter.get();
719                                    }
720                                    exception = null;
721                                    break;
722                                } catch (final LockConflictException lce) {
723                                    exception = lce;
724                                    if (cursor != null) {
725                                        try {
726                                            cursor.close();
727                                            cursor = null;
728                                        } catch (final Exception ex) {
729                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
730                                        }
731                                    }
732                                    if (txn != null) {
733                                        try {
734                                            txn.abort();
735                                            txn = null;
736                                        } catch (final Exception ex) {
737                                            LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
738                                        }
739                                    }
740                                } catch (final Exception ex) {
741                                    LOGGER.error("Unable to commit transaction", ex);
742                                    if (txn != null) {
743                                        txn.abort();
744                                    }
745                                }
746                            } catch (final LockConflictException lce) {
747                                exception = lce;
748                                if (cursor != null) {
749                                    try {
750                                        cursor.close();
751                                        cursor = null;
752                                    } catch (final Exception ex) {
753                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
754                                    }
755                                }
756                                if (txn != null) {
757                                    try {
758                                        txn.abort();
759                                        txn = null;
760                                    } catch (final Exception ex) {
761                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
762                                    }
763                                }
764                            } finally {
765                                if (cursor != null) {
766                                    cursor.close();
767                                    cursor = null;
768                                }
769                                if (txn != null) {
770                                    txn.abort();
771                                    txn = null;
772                                }
773                            }
774                            try {
775                                Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
776                            } catch (final InterruptedException ie) {
777                                // Ignore the error
778                            }
779                        }
780                        if (exception != null) {
781                            LOGGER.error("Unable to delete events from data base", exception);
782                        }
783                    }
784                } catch (final Exception ex) {
785                    LOGGER.error("Error reading database", ex);
786                    shutdown = true;
787                    throw ex;
788                } finally {
789                    if (cursor != null) {
790                        cursor.close();
791                    }
792                }
793    
794                return errors;
795            }
796    
797            private SimpleEvent createEvent(final DatabaseEntry data) {
798                final SimpleEvent event = new SimpleEvent();
799                try {
800                    byte[] eventData = data.getData();
801                    if (secretKey != null) {
802                        final Cipher cipher = Cipher.getInstance("AES");
803                        cipher.init(Cipher.DECRYPT_MODE, secretKey);
804                        eventData = cipher.doFinal(eventData);
805                    }
806                    final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
807                    final DataInputStream dais = new DataInputStream(bais);
808                    int length = dais.readInt();
809                    final byte[] bytes = new byte[length];
810                    dais.read(bytes, 0, length);
811                    event.setBody(bytes);
812                    length = dais.readInt();
813                    final Map<String, String> map = new HashMap<String, String>(length);
814                    for (int i = 0; i < length; ++i) {
815                        final String headerKey = dais.readUTF();
816                        final String value = dais.readUTF();
817                        map.put(headerKey, value);
818                    }
819                    event.setHeaders(map);
820                    return event;
821                } catch (final Exception ex) {
822                    LOGGER.error("Error retrieving event", ex);
823                    return null;
824                }
825            }
826    
827        }
828    
829        /**
830         * Factory that creates Daemon threads that can be properly shut down.
831         */
832        private static class DaemonThreadFactory implements ThreadFactory {
833            private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
834            private final ThreadGroup group;
835            private final AtomicInteger threadNumber = new AtomicInteger(1);
836            private final String namePrefix;
837    
838            public DaemonThreadFactory() {
839                final SecurityManager securityManager = System.getSecurityManager();
840                group = securityManager != null ? securityManager.getThreadGroup() :
841                    Thread.currentThread().getThreadGroup();
842                namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
843            }
844    
845            @Override
846            public Thread newThread(final Runnable r) {
847                final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
848                thread.setDaemon(true);
849                if (thread.getPriority() != Thread.NORM_PRIORITY) {
850                    thread.setPriority(Thread.NORM_PRIORITY);
851                }
852                return thread;
853            }
854        }
855    
856        /**
857         * An internal class.
858         */
859        private static class Gate {
860    
861            private boolean isOpen = false;
862    
863            public boolean isOpen() {
864                return isOpen;
865            }
866    
867            public synchronized void open() {
868                isOpen = true;
869                notifyAll();
870            }
871    
872            public synchronized void close() {
873                isOpen = false;
874            }
875    
876            public synchronized void waitForOpen(final long timeout) throws InterruptedException {
877                wait(timeout);
878            }
879        }
880    }