001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.cassandra; 018 019import java.io.Serializable; 020import java.net.InetSocketAddress; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import com.datastax.driver.core.BatchStatement; 026import com.datastax.driver.core.BoundStatement; 027import com.datastax.driver.core.Cluster; 028import com.datastax.driver.core.PreparedStatement; 029import com.datastax.driver.core.Session; 030import org.apache.logging.log4j.core.LogEvent; 031import org.apache.logging.log4j.core.appender.ManagerFactory; 032import org.apache.logging.log4j.core.appender.db.AbstractDatabaseManager; 033import org.apache.logging.log4j.core.appender.db.ColumnMapping; 034import org.apache.logging.log4j.core.config.plugins.convert.DateTypeConverter; 035import org.apache.logging.log4j.core.config.plugins.convert.TypeConverters; 036import org.apache.logging.log4j.core.net.SocketAddress; 037import org.apache.logging.log4j.spi.ThreadContextMap; 038import org.apache.logging.log4j.spi.ThreadContextStack; 039import org.apache.logging.log4j.util.ReadOnlyStringMap; 040import org.apache.logging.log4j.util.Strings; 041 042/** 043 * Manager for a Cassandra appender instance. 044 */ 045public class CassandraManager extends AbstractDatabaseManager { 046 047 private static final int DEFAULT_PORT = 9042; 048 049 private final Cluster cluster; 050 private final String keyspace; 051 private final String insertQueryTemplate; 052 private final List<ColumnMapping> columnMappings; 053 private final BatchStatement batchStatement; 054 // re-usable argument binding array 055 private final Object[] values; 056 057 private Session session; 058 private PreparedStatement preparedStatement; 059 060 private CassandraManager(final String name, final int bufferSize, final Cluster cluster, 061 final String keyspace, final String insertQueryTemplate, 062 final List<ColumnMapping> columnMappings, final BatchStatement batchStatement) { 063 super(name, bufferSize); 064 this.cluster = cluster; 065 this.keyspace = keyspace; 066 this.insertQueryTemplate = insertQueryTemplate; 067 this.columnMappings = columnMappings; 068 this.batchStatement = batchStatement; 069 this.values = new Object[columnMappings.size()]; 070 } 071 072 @Override 073 protected void startupInternal() throws Exception { 074 session = cluster.connect(keyspace); 075 preparedStatement = session.prepare(insertQueryTemplate); 076 } 077 078 @Override 079 protected boolean shutdownInternal() throws Exception { 080 session.close(); 081 cluster.close(); 082 return true; 083 } 084 085 @Override 086 protected void connectAndStart() { 087 // a Session automatically manages connections for us 088 } 089 090 @Override 091 protected void writeInternal(final LogEvent event, final Serializable serializable) { 092 for (int i = 0; i < columnMappings.size(); i++) { 093 final ColumnMapping columnMapping = columnMappings.get(i); 094 if (ThreadContextMap.class.isAssignableFrom(columnMapping.getType()) 095 || ReadOnlyStringMap.class.isAssignableFrom(columnMapping.getType())) { 096 values[i] = event.getContextData().toMap(); 097 } else if (ThreadContextStack.class.isAssignableFrom(columnMapping.getType())) { 098 values[i] = event.getContextStack().asList(); 099 } else if (Date.class.isAssignableFrom(columnMapping.getType())) { 100 values[i] = DateTypeConverter.fromMillis(event.getTimeMillis(), columnMapping.getType().asSubclass(Date.class)); 101 } else { 102 values[i] = TypeConverters.convert(columnMapping.getLayout().toSerializable(event), 103 columnMapping.getType(), null); 104 } 105 } 106 final BoundStatement boundStatement = preparedStatement.bind(values); 107 if (batchStatement == null) { 108 session.execute(boundStatement); 109 } else { 110 batchStatement.add(boundStatement); 111 } 112 } 113 114 @Override 115 protected boolean commitAndClose() { 116 if (batchStatement != null) { 117 session.execute(batchStatement); 118 } 119 return true; 120 } 121 122 public static CassandraManager getManager(final String name, final SocketAddress[] contactPoints, 123 final ColumnMapping[] columns, final boolean useTls, 124 final String clusterName, final String keyspace, final String table, 125 final String username, final String password, 126 final boolean useClockForTimestampGenerator, final int bufferSize, 127 final boolean batched, final BatchStatement.Type batchType) { 128 return getManager(name, 129 new FactoryData(contactPoints, columns, useTls, clusterName, keyspace, table, username, password, 130 useClockForTimestampGenerator, bufferSize, batched, batchType), CassandraManagerFactory.INSTANCE); 131 } 132 133 private static class CassandraManagerFactory implements ManagerFactory<CassandraManager, FactoryData> { 134 135 private static final CassandraManagerFactory INSTANCE = new CassandraManagerFactory(); 136 137 @Override 138 public CassandraManager createManager(final String name, final FactoryData data) { 139 final Cluster.Builder builder = Cluster.builder() 140 .addContactPointsWithPorts(data.contactPoints) 141 .withClusterName(data.clusterName); 142 if (data.useTls) { 143 builder.withSSL(); 144 } 145 if (Strings.isNotBlank(data.username)) { 146 builder.withCredentials(data.username, data.password); 147 } 148 if (data.useClockForTimestampGenerator) { 149 builder.withTimestampGenerator(new ClockTimestampGenerator()); 150 } 151 final Cluster cluster = builder.build(); 152 153 final StringBuilder sb = new StringBuilder("INSERT INTO ").append(data.table).append(" ("); 154 for (final ColumnMapping column : data.columns) { 155 sb.append(column.getName()).append(','); 156 } 157 sb.setCharAt(sb.length() - 1, ')'); 158 sb.append(" VALUES ("); 159 final List<ColumnMapping> columnMappings = new ArrayList<>(data.columns.length); 160 for (final ColumnMapping column : data.columns) { 161 if (Strings.isNotEmpty(column.getLiteralValue())) { 162 sb.append(column.getLiteralValue()); 163 } else { 164 sb.append('?'); 165 columnMappings.add(column); 166 } 167 sb.append(','); 168 } 169 sb.setCharAt(sb.length() - 1, ')'); 170 final String insertQueryTemplate = sb.toString(); 171 LOGGER.debug("Using CQL for appender {}: {}", name, insertQueryTemplate); 172 return new CassandraManager(name, data.getBufferSize(), cluster, data.keyspace, insertQueryTemplate, 173 columnMappings, data.batched ? new BatchStatement(data.batchType) : null); 174 } 175 } 176 177 private static class FactoryData extends AbstractFactoryData { 178 private final InetSocketAddress[] contactPoints; 179 private final ColumnMapping[] columns; 180 private final boolean useTls; 181 private final String clusterName; 182 private final String keyspace; 183 private final String table; 184 private final String username; 185 private final String password; 186 private final boolean useClockForTimestampGenerator; 187 private final boolean batched; 188 private final BatchStatement.Type batchType; 189 190 private FactoryData(final SocketAddress[] contactPoints, final ColumnMapping[] columns, final boolean useTls, 191 final String clusterName, final String keyspace, final String table, final String username, 192 final String password, final boolean useClockForTimestampGenerator, final int bufferSize, 193 final boolean batched, final BatchStatement.Type batchType) { 194 super(bufferSize, null); 195 this.contactPoints = convertAndAddDefaultPorts(contactPoints); 196 this.columns = columns; 197 this.useTls = useTls; 198 this.clusterName = clusterName; 199 this.keyspace = keyspace; 200 this.table = table; 201 this.username = username; 202 this.password = password; 203 this.useClockForTimestampGenerator = useClockForTimestampGenerator; 204 this.batched = batched; 205 this.batchType = batchType; 206 } 207 208 private static InetSocketAddress[] convertAndAddDefaultPorts(final SocketAddress... socketAddresses) { 209 final InetSocketAddress[] inetSocketAddresses = new InetSocketAddress[socketAddresses.length]; 210 for (int i = 0; i < inetSocketAddresses.length; i++) { 211 final SocketAddress socketAddress = socketAddresses[i]; 212 inetSocketAddresses[i] = socketAddress.getPort() == 0 213 ? new InetSocketAddress(socketAddress.getAddress(), DEFAULT_PORT) 214 : socketAddress.getSocketAddress(); 215 } 216 return inetSocketAddresses; 217 } 218 } 219}