001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.File; 024import java.nio.charset.Charset; 025import java.nio.charset.StandardCharsets; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import javax.crypto.Cipher; 035import javax.crypto.SecretKey; 036 037import com.sleepycat.je.Cursor; 038import com.sleepycat.je.CursorConfig; 039import com.sleepycat.je.Database; 040import com.sleepycat.je.DatabaseConfig; 041import com.sleepycat.je.DatabaseEntry; 042import com.sleepycat.je.Environment; 043import com.sleepycat.je.EnvironmentConfig; 044import com.sleepycat.je.LockConflictException; 045import com.sleepycat.je.LockMode; 046import com.sleepycat.je.OperationStatus; 047import com.sleepycat.je.StatsConfig; 048import com.sleepycat.je.Transaction; 049import org.apache.flume.Event; 050import org.apache.flume.event.SimpleEvent; 051import org.apache.logging.log4j.LoggingException; 052import org.apache.logging.log4j.core.appender.ManagerFactory; 053import org.apache.logging.log4j.core.config.Property; 054import org.apache.logging.log4j.core.config.plugins.util.PluginManager; 055import org.apache.logging.log4j.core.config.plugins.util.PluginType; 056import org.apache.logging.log4j.core.util.ExecutorServices; 057import org.apache.logging.log4j.core.util.FileUtils; 058import org.apache.logging.log4j.core.util.Log4jThread; 059import org.apache.logging.log4j.core.util.Log4jThreadFactory; 060import org.apache.logging.log4j.core.util.SecretKeyProvider; 061import org.apache.logging.log4j.util.Strings; 062 063/** 064 * Manager that persists data to Berkeley DB before passing it on to Flume. 065 */ 066public class FlumePersistentManager extends FlumeAvroManager { 067 068 /** Attribute name for the key provider. */ 069 public static final String KEY_PROVIDER = "keyProvider"; 070 071 private static final Charset UTF8 = StandardCharsets.UTF_8; 072 073 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; 074 075 private static final long SHUTDOWN_WAIT_MILLIS = 60000; 076 077 private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500; 078 079 private static BDBManagerFactory factory = new BDBManagerFactory(); 080 081 private final Database database; 082 083 private final Environment environment; 084 085 private final WriterThread worker; 086 087 private final Gate gate = new Gate(); 088 089 private final SecretKey secretKey; 090 091 private final int lockTimeoutRetryCount; 092 093 private final ExecutorService threadPool; 094 095 private final AtomicLong dbCount = new AtomicLong(); 096 097 /** 098 * Constructor 099 * @param name The unique name of this manager. 100 * @param shortName Original name for the Manager. 101 * @param agents An array of Agents. 102 * @param batchSize The number of events to include in a batch. 103 * @param retries The number of times to retry connecting before giving up. 104 * @param connectionTimeout The amount of time to wait for a connection to be established. 105 * @param requestTimeout The amount of time to wair for a response to a request. 106 * @param delay The amount of time to wait between retries. 107 * @param database The database to write to. 108 * @param environment The database environment. 109 * @param secretKey The SecretKey to use for encryption. 110 * @param lockTimeoutRetryCount The number of times to retry a lock timeout. 111 */ 112 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, 113 final int batchSize, final int retries, final int connectionTimeout, 114 final int requestTimeout, final int delay, final Database database, 115 final Environment environment, final SecretKey secretKey, 116 final int lockTimeoutRetryCount) { 117 super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); 118 this.database = database; 119 this.environment = environment; 120 dbCount.set(database.count()); 121 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, 122 lockTimeoutRetryCount); 123 this.worker.start(); 124 this.secretKey = secretKey; 125 this.threadPool = Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume")); 126 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 127 } 128 129 130 /** 131 * Returns a FlumeAvroManager. 132 * @param name The name of the manager. 133 * @param agents The agents to use. 134 * @param properties Properties to pass to the Manager. 135 * @param batchSize The number of events to include in a batch. 136 * @param retries The number of times to retry connecting before giving up. 137 * @param connectionTimeout The amount of time to wait to establish a connection. 138 * @param requestTimeout The amount of time to wait for a response to a request. 139 * @param delayMillis Amount of time to delay before delivering a batch. 140 * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. 141 * @param dataDir The location of the Berkeley database. 142 * @return A FlumeAvroManager. 143 */ 144 public static FlumePersistentManager getManager(final String name, final Agent[] agents, 145 final Property[] properties, int batchSize, final int retries, 146 final int connectionTimeout, final int requestTimeout, 147 final int delayMillis, final int lockTimeoutRetryCount, 148 final String dataDir) { 149 if (agents == null || agents.length == 0) { 150 throw new IllegalArgumentException("At least one agent is required"); 151 } 152 153 if (batchSize <= 0) { 154 batchSize = 1; 155 } 156 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; 157 158 final StringBuilder sb = new StringBuilder("FlumePersistent["); 159 boolean first = true; 160 for (final Agent agent : agents) { 161 if (!first) { 162 sb.append(','); 163 } 164 sb.append(agent.getHost()).append(':').append(agent.getPort()); 165 first = false; 166 } 167 sb.append(']'); 168 sb.append(' ').append(dataDirectory); 169 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, 170 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties)); 171 } 172 173 @Override 174 public void send(final Event event) { 175 if (worker.isShutdown()) { 176 throw new LoggingException("Unable to record event"); 177 } 178 179 final Map<String, String> headers = event.getHeaders(); 180 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); 181 try { 182 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 183 final DataOutputStream daos = new DataOutputStream(baos); 184 daos.writeInt(event.getBody().length); 185 daos.write(event.getBody(), 0, event.getBody().length); 186 daos.writeInt(event.getHeaders().size()); 187 for (final Map.Entry<String, String> entry : headers.entrySet()) { 188 daos.writeUTF(entry.getKey()); 189 daos.writeUTF(entry.getValue()); 190 } 191 byte[] eventData = baos.toByteArray(); 192 if (secretKey != null) { 193 final Cipher cipher = Cipher.getInstance("AES"); 194 cipher.init(Cipher.ENCRYPT_MODE, secretKey); 195 eventData = cipher.doFinal(eventData); 196 } 197 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, 198 gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); 199 try { 200 future.get(); 201 } catch (final InterruptedException ie) { 202 // preserve interruption status 203 Thread.currentThread().interrupt(); 204 } 205 } catch (final Exception ex) { 206 throw new LoggingException("Exception occurred writing log event", ex); 207 } 208 } 209 210 @Override 211 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 212 boolean closed = true; 213 LOGGER.debug("Shutting down FlumePersistentManager"); 214 worker.shutdown(); 215 final long requestedTimeoutMillis = timeUnit.toMillis(timeout); 216 final long shutdownWaitMillis = requestedTimeoutMillis > 0 ? requestedTimeoutMillis : SHUTDOWN_WAIT_MILLIS; 217 try { 218 worker.join(shutdownWaitMillis); 219 } catch (final InterruptedException ie) { 220 // Ignore the exception and shutdown. 221 } 222 ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString()); 223 try { 224 worker.join(); 225 } catch (final InterruptedException ex) { 226 logDebug("interrupted while waiting for worker to complete", ex); 227 } 228 try { 229 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); 230 database.close(); 231 } catch (final Exception ex) { 232 logWarn("Failed to close database", ex); 233 closed = false; 234 } 235 try { 236 environment.cleanLog(); 237 environment.close(); 238 } catch (final Exception ex) { 239 logWarn("Failed to close environment", ex); 240 closed = false; 241 } 242 return closed && super.releaseSub(timeout, timeUnit); 243 } 244 245 private void doSend(final SimpleEvent event) { 246 LOGGER.debug("Sending event to Flume"); 247 super.send(event); 248 } 249 250 /** 251 * Thread for writing to Berkeley DB to avoid having interrupts close the database. 252 */ 253 private static class BDBWriter implements Callable<Integer> { 254 private final byte[] eventData; 255 private final byte[] keyData; 256 private final Environment environment; 257 private final Database database; 258 private final Gate gate; 259 private final AtomicLong dbCount; 260 private final long batchSize; 261 private final int lockTimeoutRetryCount; 262 263 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, 264 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, 265 final int lockTimeoutRetryCount) { 266 this.keyData = keyData; 267 this.eventData = eventData; 268 this.environment = environment; 269 this.database = database; 270 this.gate = gate; 271 this.dbCount = dbCount; 272 this.batchSize = batchSize; 273 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 274 } 275 276 @Override 277 public Integer call() throws Exception { 278 final DatabaseEntry key = new DatabaseEntry(keyData); 279 final DatabaseEntry data = new DatabaseEntry(eventData); 280 Exception exception = null; 281 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 282 Transaction txn = null; 283 try { 284 txn = environment.beginTransaction(null, null); 285 try { 286 database.put(txn, key, data); 287 txn.commit(); 288 txn = null; 289 if (dbCount.incrementAndGet() >= batchSize) { 290 gate.open(); 291 } 292 exception = null; 293 break; 294 } catch (final LockConflictException lce) { 295 exception = lce; 296 // Fall through and retry. 297 } catch (final Exception ex) { 298 if (txn != null) { 299 txn.abort(); 300 } 301 throw ex; 302 } finally { 303 if (txn != null) { 304 txn.abort(); 305 txn = null; 306 } 307 } 308 } catch (final LockConflictException lce) { 309 exception = lce; 310 if (txn != null) { 311 try { 312 txn.abort(); 313 txn = null; 314 } catch (final Exception ex) { 315 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); 316 } 317 } 318 319 } 320 try { 321 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 322 } catch (final InterruptedException ie) { 323 // Ignore the error 324 } 325 } 326 if (exception != null) { 327 throw exception; 328 } 329 return eventData.length; 330 } 331 } 332 333 /** 334 * Factory data. 335 */ 336 private static class FactoryData { 337 private final String name; 338 private final Agent[] agents; 339 private final int batchSize; 340 private final String dataDir; 341 private final int retries; 342 private final int connectionTimeout; 343 private final int requestTimeout; 344 private final int delayMillis; 345 private final int lockTimeoutRetryCount; 346 private final Property[] properties; 347 348 /** 349 * Constructor. 350 * @param name The name of the Appender. 351 * @param agents The agents. 352 * @param batchSize The number of events to include in a batch. 353 * @param dataDir The directory for data. 354 */ 355 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 356 final int connectionTimeout, final int requestTimeout, final int delayMillis, 357 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { 358 this.name = name; 359 this.agents = agents; 360 this.batchSize = batchSize; 361 this.dataDir = dataDir; 362 this.retries = retries; 363 this.connectionTimeout = connectionTimeout; 364 this.requestTimeout = requestTimeout; 365 this.delayMillis = delayMillis; 366 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 367 this.properties = properties; 368 } 369 } 370 371 /** 372 * Avro Manager Factory. 373 */ 374 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { 375 376 /** 377 * Create the FlumeKratiManager. 378 * @param name The name of the entity to manage. 379 * @param data The data required to create the entity. 380 * @return The FlumeKratiManager. 381 */ 382 @Override 383 public FlumePersistentManager createManager(final String name, final FactoryData data) { 384 SecretKey secretKey = null; 385 Database database = null; 386 Environment environment = null; 387 388 final Map<String, String> properties = new HashMap<>(); 389 if (data.properties != null) { 390 for (final Property property : data.properties) { 391 properties.put(property.getName(), property.getValue()); 392 } 393 } 394 395 try { 396 final File dir = new File(data.dataDir); 397 FileUtils.mkdir(dir, true); 398 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); 399 dbEnvConfig.setTransactional(true); 400 dbEnvConfig.setAllowCreate(true); 401 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); 402 environment = new Environment(dir, dbEnvConfig); 403 final DatabaseConfig dbConfig = new DatabaseConfig(); 404 dbConfig.setTransactional(true); 405 dbConfig.setAllowCreate(true); 406 database = environment.openDatabase(null, name, dbConfig); 407 } catch (final Exception ex) { 408 LOGGER.error("Could not create FlumePersistentManager", ex); 409 // For consistency, close database as well as environment even though it should never happen since the 410 // database is that last thing in the block above, but this does guard against a future line being 411 // inserted at the end that would bomb (like some debug logging). 412 if (database != null) { 413 database.close(); 414 database = null; 415 } 416 if (environment != null) { 417 environment.close(); 418 environment = null; 419 } 420 return null; 421 } 422 423 try { 424 String key = null; 425 for (final Map.Entry<String, String> entry : properties.entrySet()) { 426 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { 427 key = entry.getValue(); 428 break; 429 } 430 } 431 if (key != null) { 432 final PluginManager manager = new PluginManager("KeyProvider"); 433 manager.collectPlugins(); 434 final Map<String, PluginType<?>> plugins = manager.getPlugins(); 435 if (plugins != null) { 436 boolean found = false; 437 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { 438 if (entry.getKey().equalsIgnoreCase(key)) { 439 found = true; 440 final Class<?> cl = entry.getValue().getPluginClass(); 441 try { 442 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); 443 secretKey = provider.getSecretKey(); 444 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); 445 } catch (final Exception ex) { 446 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", 447 cl.getName()); 448 } 449 break; 450 } 451 } 452 if (!found) { 453 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 454 } 455 } else { 456 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 457 } 458 } 459 } catch (final Exception ex) { 460 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); 461 } 462 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, 463 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey, 464 data.lockTimeoutRetryCount); 465 } 466 } 467 468 /** 469 * Thread that sends data to Flume and pulls it from Berkeley DB. 470 */ 471 private static class WriterThread extends Log4jThread { 472 private volatile boolean shutdown; 473 private final Database database; 474 private final Environment environment; 475 private final FlumePersistentManager manager; 476 private final Gate gate; 477 private final SecretKey secretKey; 478 private final int batchSize; 479 private final AtomicLong dbCounter; 480 private final int lockTimeoutRetryCount; 481 482 public WriterThread(final Database database, final Environment environment, 483 final FlumePersistentManager manager, final Gate gate, final int batchsize, 484 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { 485 super("FlumePersistentManager-Writer"); 486 this.database = database; 487 this.environment = environment; 488 this.manager = manager; 489 this.gate = gate; 490 this.batchSize = batchsize; 491 this.secretKey = secretKey; 492 this.setDaemon(true); 493 this.dbCounter = dbCount; 494 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 495 } 496 497 public void shutdown() { 498 LOGGER.debug("Writer thread shutting down"); 499 this.shutdown = true; 500 gate.open(); 501 } 502 503 public boolean isShutdown() { 504 return shutdown; 505 } 506 507 @Override 508 public void run() { 509 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); 510 long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); 511 while (!shutdown) { 512 final long nowMillis = System.currentTimeMillis(); 513 final long dbCount = database.count(); 514 dbCounter.set(dbCount); 515 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { 516 nextBatchMillis = nowMillis + manager.getDelayMillis(); 517 try { 518 boolean errors = false; 519 final DatabaseEntry key = new DatabaseEntry(); 520 final DatabaseEntry data = new DatabaseEntry(); 521 522 gate.close(); 523 OperationStatus status; 524 if (batchSize > 1) { 525 try { 526 errors = sendBatch(key, data); 527 } catch (final Exception ex) { 528 break; 529 } 530 } else { 531 Exception exception = null; 532 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 533 exception = null; 534 Transaction txn = null; 535 Cursor cursor = null; 536 try { 537 txn = environment.beginTransaction(null, null); 538 cursor = database.openCursor(txn, null); 539 try { 540 status = cursor.getFirst(key, data, LockMode.RMW); 541 while (status == OperationStatus.SUCCESS) { 542 final SimpleEvent event = createEvent(data); 543 if (event != null) { 544 try { 545 manager.doSend(event); 546 } catch (final Exception ioe) { 547 errors = true; 548 LOGGER.error("Error sending event", ioe); 549 break; 550 } 551 try { 552 cursor.delete(); 553 } catch (final Exception ex) { 554 LOGGER.error("Unable to delete event", ex); 555 } 556 } 557 status = cursor.getNext(key, data, LockMode.RMW); 558 } 559 if (cursor != null) { 560 cursor.close(); 561 cursor = null; 562 } 563 txn.commit(); 564 txn = null; 565 dbCounter.decrementAndGet(); 566 exception = null; 567 break; 568 } catch (final LockConflictException lce) { 569 exception = lce; 570 // Fall through and retry. 571 } catch (final Exception ex) { 572 LOGGER.error("Error reading or writing to database", ex); 573 shutdown = true; 574 break; 575 } finally { 576 if (cursor != null) { 577 cursor.close(); 578 cursor = null; 579 } 580 if (txn != null) { 581 txn.abort(); 582 txn = null; 583 } 584 } 585 } catch (final LockConflictException lce) { 586 exception = lce; 587 if (cursor != null) { 588 try { 589 cursor.close(); 590 cursor = null; 591 } catch (final Exception ex) { 592 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 593 } 594 } 595 if (txn != null) { 596 try { 597 txn.abort(); 598 txn = null; 599 } catch (final Exception ex) { 600 LOGGER.trace("Ignored exception aborting tx during lock conflict."); 601 } 602 } 603 } 604 try { 605 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 606 } catch (final InterruptedException ie) { 607 // Ignore the error 608 } 609 } 610 if (exception != null) { 611 LOGGER.error("Unable to read or update data base", exception); 612 } 613 } 614 if (errors) { 615 Thread.sleep(manager.getDelayMillis()); 616 continue; 617 } 618 } catch (final Exception ex) { 619 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); 620 } 621 } else { 622 if (nextBatchMillis <= nowMillis) { 623 nextBatchMillis = nowMillis + manager.getDelayMillis(); 624 } 625 try { 626 final long interval = nextBatchMillis - nowMillis; 627 gate.waitForOpen(interval); 628 } catch (final InterruptedException ie) { 629 LOGGER.warn("WriterThread interrupted, continuing"); 630 } catch (final Exception ex) { 631 LOGGER.error("WriterThread encountered an exception waiting for work", ex); 632 break; 633 } 634 } 635 } 636 637 if (batchSize > 1 && database.count() > 0) { 638 final DatabaseEntry key = new DatabaseEntry(); 639 final DatabaseEntry data = new DatabaseEntry(); 640 try { 641 sendBatch(key, data); 642 } catch (final Exception ex) { 643 LOGGER.warn("Unable to write final batch"); 644 } 645 } 646 LOGGER.trace("WriterThread exiting"); 647 } 648 649 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception { 650 boolean errors = false; 651 OperationStatus status; 652 Cursor cursor = null; 653 try { 654 final BatchEvent batch = new BatchEvent(); 655 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 656 try { 657 cursor = database.openCursor(null, CursorConfig.DEFAULT); 658 status = cursor.getFirst(key, data, null); 659 660 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { 661 final SimpleEvent event = createEvent(data); 662 if (event != null) { 663 batch.addEvent(event); 664 } 665 status = cursor.getNext(key, data, null); 666 } 667 break; 668 } catch (final LockConflictException lce) { 669 if (cursor != null) { 670 try { 671 cursor.close(); 672 cursor = null; 673 } catch (final Exception ex) { 674 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 675 } 676 } 677 } 678 } 679 680 try { 681 manager.send(batch); 682 } catch (final Exception ioe) { 683 LOGGER.error("Error sending events", ioe); 684 errors = true; 685 } 686 if (!errors) { 687 if (cursor != null) { 688 cursor.close(); 689 cursor = null; 690 } 691 Transaction txn = null; 692 Exception exception = null; 693 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 694 try { 695 txn = environment.beginTransaction(null, null); 696 try { 697 for (final Event event : batch.getEvents()) { 698 try { 699 final Map<String, String> headers = event.getHeaders(); 700 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); 701 database.delete(txn, key); 702 } catch (final Exception ex) { 703 LOGGER.error("Error deleting key from database", ex); 704 } 705 } 706 txn.commit(); 707 long count = dbCounter.get(); 708 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { 709 count = dbCounter.get(); 710 } 711 exception = null; 712 break; 713 } catch (final LockConflictException lce) { 714 exception = lce; 715 if (cursor != null) { 716 try { 717 cursor.close(); 718 cursor = null; 719 } catch (final Exception ex) { 720 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 721 } 722 } 723 if (txn != null) { 724 try { 725 txn.abort(); 726 txn = null; 727 } catch (final Exception ex) { 728 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 729 } 730 } 731 } catch (final Exception ex) { 732 LOGGER.error("Unable to commit transaction", ex); 733 if (txn != null) { 734 txn.abort(); 735 } 736 } 737 } catch (final LockConflictException lce) { 738 exception = lce; 739 if (cursor != null) { 740 try { 741 cursor.close(); 742 cursor = null; 743 } catch (final Exception ex) { 744 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 745 } 746 } 747 if (txn != null) { 748 try { 749 txn.abort(); 750 txn = null; 751 } catch (final Exception ex) { 752 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 753 } 754 } 755 } finally { 756 if (cursor != null) { 757 cursor.close(); 758 cursor = null; 759 } 760 if (txn != null) { 761 txn.abort(); 762 txn = null; 763 } 764 } 765 try { 766 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 767 } catch (final InterruptedException ie) { 768 // Ignore the error 769 } 770 } 771 if (exception != null) { 772 LOGGER.error("Unable to delete events from data base", exception); 773 } 774 } 775 } catch (final Exception ex) { 776 LOGGER.error("Error reading database", ex); 777 shutdown = true; 778 throw ex; 779 } finally { 780 if (cursor != null) { 781 cursor.close(); 782 } 783 } 784 785 return errors; 786 } 787 788 private SimpleEvent createEvent(final DatabaseEntry data) { 789 final SimpleEvent event = new SimpleEvent(); 790 try { 791 byte[] eventData = data.getData(); 792 if (secretKey != null) { 793 final Cipher cipher = Cipher.getInstance("AES"); 794 cipher.init(Cipher.DECRYPT_MODE, secretKey); 795 eventData = cipher.doFinal(eventData); 796 } 797 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); 798 final DataInputStream dais = new DataInputStream(bais); 799 int length = dais.readInt(); 800 final byte[] bytes = new byte[length]; 801 dais.read(bytes, 0, length); 802 event.setBody(bytes); 803 length = dais.readInt(); 804 final Map<String, String> map = new HashMap<>(length); 805 for (int i = 0; i < length; ++i) { 806 final String headerKey = dais.readUTF(); 807 final String value = dais.readUTF(); 808 map.put(headerKey, value); 809 } 810 event.setHeaders(map); 811 return event; 812 } catch (final Exception ex) { 813 LOGGER.error("Error retrieving event", ex); 814 return null; 815 } 816 } 817 818 } 819 820 /** 821 * An internal class. 822 */ 823 private static class Gate { 824 825 private boolean isOpen = false; 826 827 public boolean isOpen() { 828 return isOpen; 829 } 830 831 public synchronized void open() { 832 isOpen = true; 833 notifyAll(); 834 } 835 836 public synchronized void close() { 837 isOpen = false; 838 } 839 840 public synchronized void waitForOpen(final long timeout) throws InterruptedException { 841 wait(timeout); 842 } 843 } 844}