001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.nio.charset.Charset;
025import java.nio.charset.StandardCharsets;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import javax.crypto.Cipher;
035import javax.crypto.SecretKey;
036
037import com.sleepycat.je.Cursor;
038import com.sleepycat.je.CursorConfig;
039import com.sleepycat.je.Database;
040import com.sleepycat.je.DatabaseConfig;
041import com.sleepycat.je.DatabaseEntry;
042import com.sleepycat.je.Environment;
043import com.sleepycat.je.EnvironmentConfig;
044import com.sleepycat.je.LockConflictException;
045import com.sleepycat.je.LockMode;
046import com.sleepycat.je.OperationStatus;
047import com.sleepycat.je.StatsConfig;
048import com.sleepycat.je.Transaction;
049import org.apache.flume.Event;
050import org.apache.flume.event.SimpleEvent;
051import org.apache.logging.log4j.LoggingException;
052import org.apache.logging.log4j.core.appender.ManagerFactory;
053import org.apache.logging.log4j.core.config.Property;
054import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
055import org.apache.logging.log4j.core.config.plugins.util.PluginType;
056import org.apache.logging.log4j.core.util.ExecutorServices;
057import org.apache.logging.log4j.core.util.FileUtils;
058import org.apache.logging.log4j.core.util.Log4jThread;
059import org.apache.logging.log4j.core.util.Log4jThreadFactory;
060import org.apache.logging.log4j.core.util.SecretKeyProvider;
061import org.apache.logging.log4j.util.Strings;
062
063/**
064 * Manager that persists data to Berkeley DB before passing it on to Flume.
065 */
066public class FlumePersistentManager extends FlumeAvroManager {
067
068    /** Attribute name for the key provider. */
069    public static final String KEY_PROVIDER = "keyProvider";
070
071    private static final Charset UTF8 = StandardCharsets.UTF_8;
072
073    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
074
075    private static final long SHUTDOWN_WAIT_MILLIS = 60000;
076
077    private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
078
079    private static BDBManagerFactory factory = new BDBManagerFactory();
080
081    private final Database database;
082
083    private final Environment environment;
084
085    private final WriterThread worker;
086
087    private final Gate gate = new Gate();
088
089    private final SecretKey secretKey;
090
091    private final int lockTimeoutRetryCount;
092
093    private final ExecutorService threadPool;
094
095    private final AtomicLong dbCount = new AtomicLong();
096
097    /**
098     * Constructor
099     * @param name The unique name of this manager.
100     * @param shortName Original name for the Manager.
101     * @param agents An array of Agents.
102     * @param batchSize The number of events to include in a batch.
103     * @param retries The number of times to retry connecting before giving up.
104     * @param connectionTimeout The amount of time to wait for a connection to be established.
105     * @param requestTimeout The amount of time to wair for a response to a request.
106     * @param delay The amount of time to wait between retries.
107     * @param database The database to write to.
108     * @param environment The database environment.
109     * @param secretKey The SecretKey to use for encryption.
110     * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
111     */
112    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
113                                     final int batchSize, final int retries, final int connectionTimeout,
114                                     final int requestTimeout, final int delay, final Database database,
115                                     final Environment environment, final SecretKey secretKey,
116                                     final int lockTimeoutRetryCount) {
117        super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
118        this.database = database;
119        this.environment = environment;
120        dbCount.set(database.count());
121        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
122            lockTimeoutRetryCount);
123        this.worker.start();
124        this.secretKey = secretKey;
125        this.threadPool = Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
126        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
127    }
128
129
130    /**
131     * Returns a FlumeAvroManager.
132     * @param name The name of the manager.
133     * @param agents The agents to use.
134     * @param properties Properties to pass to the Manager.
135     * @param batchSize The number of events to include in a batch.
136     * @param retries The number of times to retry connecting before giving up.
137     * @param connectionTimeout The amount of time to wait to establish a connection.
138     * @param requestTimeout The amount of time to wait for a response to a request.
139     * @param delayMillis Amount of time to delay before delivering a batch.
140     * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
141     * @param dataDir The location of the Berkeley database.
142     * @return A FlumeAvroManager.
143     */
144    public static FlumePersistentManager getManager(final String name, final Agent[] agents,
145                                                    final Property[] properties, int batchSize, final int retries,
146                                                    final int connectionTimeout, final int requestTimeout,
147                                                    final int delayMillis, final int lockTimeoutRetryCount,
148                                                    final String dataDir) {
149        if (agents == null || agents.length == 0) {
150            throw new IllegalArgumentException("At least one agent is required");
151        }
152
153        if (batchSize <= 0) {
154            batchSize = 1;
155        }
156        final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
157
158        final StringBuilder sb = new StringBuilder("FlumePersistent[");
159        boolean first = true;
160        for (final Agent agent : agents) {
161            if (!first) {
162                sb.append(',');
163            }
164            sb.append(agent.getHost()).append(':').append(agent.getPort());
165            first = false;
166        }
167        sb.append(']');
168        sb.append(' ').append(dataDirectory);
169        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
170            connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
171    }
172
173    @Override
174    public void send(final Event event)  {
175        if (worker.isShutdown()) {
176            throw new LoggingException("Unable to record event");
177        }
178
179        final Map<String, String> headers = event.getHeaders();
180        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
181        try {
182            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
183            final DataOutputStream daos = new DataOutputStream(baos);
184            daos.writeInt(event.getBody().length);
185            daos.write(event.getBody(), 0, event.getBody().length);
186            daos.writeInt(event.getHeaders().size());
187            for (final Map.Entry<String, String> entry : headers.entrySet()) {
188                daos.writeUTF(entry.getKey());
189                daos.writeUTF(entry.getValue());
190            }
191            byte[] eventData = baos.toByteArray();
192            if (secretKey != null) {
193                final Cipher cipher = Cipher.getInstance("AES");
194                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
195                eventData = cipher.doFinal(eventData);
196            }
197            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
198                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
199            try {
200                future.get();
201            } catch (final InterruptedException ie) {
202                // preserve interruption status
203                Thread.currentThread().interrupt();
204            }
205        } catch (final Exception ex) {
206            throw new LoggingException("Exception occurred writing log event", ex);
207        }
208    }
209
210    @Override
211    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
212        boolean closed = true;
213        LOGGER.debug("Shutting down FlumePersistentManager");
214        worker.shutdown();
215        final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
216        final long shutdownWaitMillis = requestedTimeoutMillis > 0 ? requestedTimeoutMillis : SHUTDOWN_WAIT_MILLIS;
217                try {
218            worker.join(shutdownWaitMillis);
219        } catch (final InterruptedException ie) {
220            // Ignore the exception and shutdown.
221        }
222        ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString());
223        try {
224            worker.join();
225        } catch (final InterruptedException ex) {
226            logDebug("interrupted while waiting for worker to complete", ex);
227        }
228        try {
229            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
230            database.close();
231        } catch (final Exception ex) {
232            logWarn("Failed to close database", ex);
233            closed = false;
234        }
235        try {
236            environment.cleanLog();
237            environment.close();
238        } catch (final Exception ex) {
239            logWarn("Failed to close environment", ex);
240            closed = false;
241        }
242        return closed && super.releaseSub(timeout, timeUnit);
243    }
244
245    private void doSend(final SimpleEvent event) {
246        LOGGER.debug("Sending event to Flume");
247        super.send(event);
248    }
249
250    /**
251     * Thread for writing to Berkeley DB to avoid having interrupts close the database.
252     */
253    private static class BDBWriter implements Callable<Integer> {
254        private final byte[] eventData;
255        private final byte[] keyData;
256        private final Environment environment;
257        private final Database database;
258        private final Gate gate;
259        private final AtomicLong dbCount;
260        private final long batchSize;
261        private final int lockTimeoutRetryCount;
262
263        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
264                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
265                         final int lockTimeoutRetryCount) {
266            this.keyData = keyData;
267            this.eventData = eventData;
268            this.environment = environment;
269            this.database = database;
270            this.gate = gate;
271            this.dbCount = dbCount;
272            this.batchSize = batchSize;
273            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
274        }
275
276        @Override
277        public Integer call() throws Exception {
278            final DatabaseEntry key = new DatabaseEntry(keyData);
279            final DatabaseEntry data = new DatabaseEntry(eventData);
280            Exception exception = null;
281            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
282                Transaction txn = null;
283                try {
284                    txn = environment.beginTransaction(null, null);
285                    try {
286                        database.put(txn, key, data);
287                        txn.commit();
288                        txn = null;
289                        if (dbCount.incrementAndGet() >= batchSize) {
290                            gate.open();
291                        }
292                        exception = null;
293                        break;
294                    } catch (final LockConflictException lce) {
295                        exception = lce;
296                        // Fall through and retry.
297                    } catch (final Exception ex) {
298                        if (txn != null) {
299                            txn.abort();
300                        }
301                        throw ex;
302                    } finally {
303                        if (txn != null) {
304                            txn.abort();
305                            txn = null;
306                        }
307                    }
308                } catch (final LockConflictException lce) {
309                    exception = lce;
310                    if (txn != null) {
311                        try {
312                            txn.abort();
313                            txn = null;
314                        } catch (final Exception ex) {
315                            LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
316                        }
317                    }
318
319                }
320                try {
321                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
322                } catch (final InterruptedException ie) {
323                    // Ignore the error
324                }
325            }
326            if (exception != null) {
327                throw exception;
328            }
329            return eventData.length;
330        }
331    }
332
333    /**
334     * Factory data.
335     */
336    private static class FactoryData {
337        private final String name;
338        private final Agent[] agents;
339        private final int batchSize;
340        private final String dataDir;
341        private final int retries;
342        private final int connectionTimeout;
343        private final int requestTimeout;
344        private final int delayMillis;
345        private final int lockTimeoutRetryCount;
346        private final Property[] properties;
347
348        /**
349         * Constructor.
350         * @param name The name of the Appender.
351         * @param agents The agents.
352         * @param batchSize The number of events to include in a batch.
353         * @param dataDir The directory for data.
354         */
355        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
356                           final int connectionTimeout, final int requestTimeout, final int delayMillis,
357                           final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
358            this.name = name;
359            this.agents = agents;
360            this.batchSize = batchSize;
361            this.dataDir = dataDir;
362            this.retries = retries;
363            this.connectionTimeout = connectionTimeout;
364            this.requestTimeout = requestTimeout;
365            this.delayMillis = delayMillis;
366            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
367            this.properties = properties;
368        }
369    }
370
371    /**
372     * Avro Manager Factory.
373     */
374    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
375
376        /**
377         * Create the FlumeKratiManager.
378         * @param name The name of the entity to manage.
379         * @param data The data required to create the entity.
380         * @return The FlumeKratiManager.
381         */
382        @Override
383        public FlumePersistentManager createManager(final String name, final FactoryData data) {
384            SecretKey secretKey = null;
385            Database database = null;
386            Environment environment = null;
387
388            final Map<String, String> properties = new HashMap<>();
389            if (data.properties != null) {
390                for (final Property property : data.properties) {
391                    properties.put(property.getName(), property.getValue());
392                }
393            }
394
395            try {
396                final File dir = new File(data.dataDir);
397                FileUtils.mkdir(dir, true);
398                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
399                dbEnvConfig.setTransactional(true);
400                dbEnvConfig.setAllowCreate(true);
401                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
402                environment = new Environment(dir, dbEnvConfig);
403                final DatabaseConfig dbConfig = new DatabaseConfig();
404                dbConfig.setTransactional(true);
405                dbConfig.setAllowCreate(true);
406                database = environment.openDatabase(null, name, dbConfig);
407            } catch (final Exception ex) {
408                LOGGER.error("Could not create FlumePersistentManager", ex);
409                // For consistency, close database as well as environment even though it should never happen since the
410                // database is that last thing in the block above, but this does guard against a future line being
411                // inserted at the end that would bomb (like some debug logging).
412                if (database != null) {
413                    database.close();
414                    database = null;
415                }
416                if (environment != null) {
417                    environment.close();
418                    environment = null;
419                }
420                return null;
421            }
422
423            try {
424                String key = null;
425                for (final Map.Entry<String, String> entry : properties.entrySet()) {
426                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
427                        key = entry.getValue();
428                        break;
429                    }
430                }
431                if (key != null) {
432                    final PluginManager manager = new PluginManager("KeyProvider");
433                    manager.collectPlugins();
434                    final Map<String, PluginType<?>> plugins = manager.getPlugins();
435                    if (plugins != null) {
436                        boolean found = false;
437                        for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
438                            if (entry.getKey().equalsIgnoreCase(key)) {
439                                found = true;
440                                final Class<?> cl = entry.getValue().getPluginClass();
441                                try {
442                                    final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
443                                    secretKey = provider.getSecretKey();
444                                    LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
445                                } catch (final Exception ex) {
446                                    LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
447                                        cl.getName());
448                                }
449                                break;
450                            }
451                        }
452                        if (!found) {
453                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
454                        }
455                    } else {
456                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
457                    }
458                }
459            } catch (final Exception ex) {
460                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
461            }
462            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
463                data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
464                data.lockTimeoutRetryCount);
465        }
466    }
467
468    /**
469     * Thread that sends data to Flume and pulls it from Berkeley DB.
470     */
471    private static class WriterThread extends Log4jThread  {
472        private volatile boolean shutdown;
473        private final Database database;
474        private final Environment environment;
475        private final FlumePersistentManager manager;
476        private final Gate gate;
477        private final SecretKey secretKey;
478        private final int batchSize;
479        private final AtomicLong dbCounter;
480        private final int lockTimeoutRetryCount;
481
482        public WriterThread(final Database database, final Environment environment,
483                            final FlumePersistentManager manager, final Gate gate, final int batchsize,
484                            final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
485            super("FlumePersistentManager-Writer");
486            this.database = database;
487            this.environment = environment;
488            this.manager = manager;
489            this.gate = gate;
490            this.batchSize = batchsize;
491            this.secretKey = secretKey;
492            this.setDaemon(true);
493            this.dbCounter = dbCount;
494            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
495        }
496
497        public void shutdown() {
498            LOGGER.debug("Writer thread shutting down");
499            this.shutdown = true;
500            gate.open();
501        }
502
503        public boolean isShutdown() {
504            return shutdown;
505        }
506
507        @Override
508        public void run() {
509            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
510            long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
511            while (!shutdown) {
512                final long nowMillis = System.currentTimeMillis();
513                final long dbCount = database.count();
514                dbCounter.set(dbCount);
515                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
516                    nextBatchMillis = nowMillis + manager.getDelayMillis();
517                    try {
518                        boolean errors = false;
519                        final DatabaseEntry key = new DatabaseEntry();
520                        final DatabaseEntry data = new DatabaseEntry();
521
522                        gate.close();
523                        OperationStatus status;
524                        if (batchSize > 1) {
525                            try {
526                                errors = sendBatch(key, data);
527                            } catch (final Exception ex) {
528                                break;
529                            }
530                        } else {
531                            Exception exception = null;
532                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
533                                exception = null;
534                                Transaction txn = null;
535                                Cursor cursor = null;
536                                try {
537                                    txn = environment.beginTransaction(null, null);
538                                    cursor = database.openCursor(txn, null);
539                                    try {
540                                        status = cursor.getFirst(key, data, LockMode.RMW);
541                                        while (status == OperationStatus.SUCCESS) {
542                                            final SimpleEvent event = createEvent(data);
543                                            if (event != null) {
544                                                try {
545                                                    manager.doSend(event);
546                                                } catch (final Exception ioe) {
547                                                    errors = true;
548                                                    LOGGER.error("Error sending event", ioe);
549                                                    break;
550                                                }
551                                                try {
552                                                    cursor.delete();
553                                                } catch (final Exception ex) {
554                                                    LOGGER.error("Unable to delete event", ex);
555                                                }
556                                            }
557                                            status = cursor.getNext(key, data, LockMode.RMW);
558                                        }
559                                        if (cursor != null) {
560                                            cursor.close();
561                                            cursor = null;
562                                        }
563                                        txn.commit();
564                                        txn = null;
565                                        dbCounter.decrementAndGet();
566                                        exception = null;
567                                        break;
568                                    } catch (final LockConflictException lce) {
569                                        exception = lce;
570                                        // Fall through and retry.
571                                    } catch (final Exception ex) {
572                                        LOGGER.error("Error reading or writing to database", ex);
573                                        shutdown = true;
574                                        break;
575                                    } finally {
576                                        if (cursor != null) {
577                                            cursor.close();
578                                            cursor = null;
579                                        }
580                                        if (txn != null) {
581                                            txn.abort();
582                                            txn = null;
583                                        }
584                                    }
585                                } catch (final LockConflictException lce) {
586                                    exception = lce;
587                                    if (cursor != null) {
588                                        try {
589                                            cursor.close();
590                                            cursor = null;
591                                        } catch (final Exception ex) {
592                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
593                                        }
594                                    }
595                                    if (txn != null) {
596                                        try {
597                                            txn.abort();
598                                            txn = null;
599                                        } catch (final Exception ex) {
600                                            LOGGER.trace("Ignored exception aborting tx during lock conflict.");
601                                        }
602                                    }
603                                }
604                                try {
605                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
606                                } catch (final InterruptedException ie) {
607                                    // Ignore the error
608                                }
609                            }
610                            if (exception != null) {
611                                LOGGER.error("Unable to read or update data base", exception);
612                            }
613                        }
614                        if (errors) {
615                            Thread.sleep(manager.getDelayMillis());
616                            continue;
617                        }
618                    } catch (final Exception ex) {
619                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
620                    }
621                } else {
622                    if (nextBatchMillis <= nowMillis) {
623                        nextBatchMillis = nowMillis + manager.getDelayMillis();
624                    }
625                    try {
626                        final long interval = nextBatchMillis - nowMillis;
627                        gate.waitForOpen(interval);
628                    } catch (final InterruptedException ie) {
629                        LOGGER.warn("WriterThread interrupted, continuing");
630                    } catch (final Exception ex) {
631                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
632                        break;
633                    }
634                }
635            }
636
637            if (batchSize > 1 && database.count() > 0) {
638                final DatabaseEntry key = new DatabaseEntry();
639                final DatabaseEntry data = new DatabaseEntry();
640                try {
641                    sendBatch(key, data);
642                } catch (final Exception ex) {
643                    LOGGER.warn("Unable to write final batch");
644                }
645            }
646            LOGGER.trace("WriterThread exiting");
647        }
648
649        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
650            boolean errors = false;
651            OperationStatus status;
652            Cursor cursor = null;
653            try {
654                final BatchEvent batch = new BatchEvent();
655                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
656                        try {
657                                cursor = database.openCursor(null, CursorConfig.DEFAULT);
658                                status = cursor.getFirst(key, data, null);
659
660                                for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
661                                        final SimpleEvent event = createEvent(data);
662                                        if (event != null) {
663                                                batch.addEvent(event);
664                                        }
665                                        status = cursor.getNext(key, data, null);
666                                }
667                                break;
668                        } catch (final LockConflictException lce) {
669                                if (cursor != null) {
670                                        try {
671                                cursor.close();
672                                cursor = null;
673                            } catch (final Exception ex) {
674                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
675                            }
676                        }
677                    }
678                }
679
680                try {
681                    manager.send(batch);
682                } catch (final Exception ioe) {
683                    LOGGER.error("Error sending events", ioe);
684                    errors = true;
685                }
686                if (!errors) {
687                        if (cursor != null) {
688                            cursor.close();
689                            cursor = null;
690                        }
691                    Transaction txn = null;
692                    Exception exception = null;
693                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
694                        try {
695                            txn = environment.beginTransaction(null, null);
696                            try {
697                                for (final Event event : batch.getEvents()) {
698                                    try {
699                                        final Map<String, String> headers = event.getHeaders();
700                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
701                                        database.delete(txn, key);
702                                    } catch (final Exception ex) {
703                                        LOGGER.error("Error deleting key from database", ex);
704                                    }
705                                }
706                                txn.commit();
707                                long count = dbCounter.get();
708                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
709                                    count = dbCounter.get();
710                                }
711                                exception = null;
712                                break;
713                            } catch (final LockConflictException lce) {
714                                exception = lce;
715                                if (cursor != null) {
716                                    try {
717                                        cursor.close();
718                                        cursor = null;
719                                    } catch (final Exception ex) {
720                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
721                                    }
722                                }
723                                if (txn != null) {
724                                    try {
725                                        txn.abort();
726                                        txn = null;
727                                    } catch (final Exception ex) {
728                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
729                                    }
730                                }
731                            } catch (final Exception ex) {
732                                LOGGER.error("Unable to commit transaction", ex);
733                                if (txn != null) {
734                                    txn.abort();
735                                }
736                            }
737                        } catch (final LockConflictException lce) {
738                            exception = lce;
739                            if (cursor != null) {
740                                try {
741                                    cursor.close();
742                                    cursor = null;
743                                } catch (final Exception ex) {
744                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
745                                }
746                            }
747                            if (txn != null) {
748                                try {
749                                    txn.abort();
750                                    txn = null;
751                                } catch (final Exception ex) {
752                                    LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
753                                }
754                            }
755                        } finally {
756                            if (cursor != null) {
757                                cursor.close();
758                                cursor = null;
759                            }
760                            if (txn != null) {
761                                txn.abort();
762                                txn = null;
763                            }
764                        }
765                        try {
766                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
767                        } catch (final InterruptedException ie) {
768                            // Ignore the error
769                        }
770                    }
771                    if (exception != null) {
772                        LOGGER.error("Unable to delete events from data base", exception);
773                    }
774                }
775            } catch (final Exception ex) {
776                LOGGER.error("Error reading database", ex);
777                shutdown = true;
778                throw ex;
779            } finally {
780                if (cursor != null) {
781                    cursor.close();
782                }
783            }
784
785            return errors;
786        }
787
788        private SimpleEvent createEvent(final DatabaseEntry data) {
789            final SimpleEvent event = new SimpleEvent();
790            try {
791                byte[] eventData = data.getData();
792                if (secretKey != null) {
793                    final Cipher cipher = Cipher.getInstance("AES");
794                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
795                    eventData = cipher.doFinal(eventData);
796                }
797                final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
798                final DataInputStream dais = new DataInputStream(bais);
799                int length = dais.readInt();
800                final byte[] bytes = new byte[length];
801                dais.read(bytes, 0, length);
802                event.setBody(bytes);
803                length = dais.readInt();
804                final Map<String, String> map = new HashMap<>(length);
805                for (int i = 0; i < length; ++i) {
806                    final String headerKey = dais.readUTF();
807                    final String value = dais.readUTF();
808                    map.put(headerKey, value);
809                }
810                event.setHeaders(map);
811                return event;
812            } catch (final Exception ex) {
813                LOGGER.error("Error retrieving event", ex);
814                return null;
815            }
816        }
817
818    }
819
820    /**
821     * An internal class.
822     */
823    private static class Gate {
824
825        private boolean isOpen = false;
826
827        public boolean isOpen() {
828            return isOpen;
829        }
830
831        public synchronized void open() {
832            isOpen = true;
833            notifyAll();
834        }
835
836        public synchronized void close() {
837            isOpen = false;
838        }
839
840        public synchronized void waitForOpen(final long timeout) throws InterruptedException {
841            wait(timeout);
842        }
843    }
844}