001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.cassandra;
018
019import java.io.Serializable;
020import java.net.InetSocketAddress;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import com.datastax.driver.core.BatchStatement;
026import com.datastax.driver.core.BoundStatement;
027import com.datastax.driver.core.Cluster;
028import com.datastax.driver.core.PreparedStatement;
029import com.datastax.driver.core.Session;
030import org.apache.logging.log4j.core.LogEvent;
031import org.apache.logging.log4j.core.appender.ManagerFactory;
032import org.apache.logging.log4j.core.appender.db.AbstractDatabaseManager;
033import org.apache.logging.log4j.core.appender.db.ColumnMapping;
034import org.apache.logging.log4j.core.config.plugins.convert.DateTypeConverter;
035import org.apache.logging.log4j.core.config.plugins.convert.TypeConverters;
036import org.apache.logging.log4j.core.net.SocketAddress;
037import org.apache.logging.log4j.spi.ThreadContextMap;
038import org.apache.logging.log4j.spi.ThreadContextStack;
039import org.apache.logging.log4j.util.ReadOnlyStringMap;
040import org.apache.logging.log4j.util.Strings;
041
042/**
043 * Manager for a Cassandra appender instance.
044 */
045public class CassandraManager extends AbstractDatabaseManager {
046
047    private static final int DEFAULT_PORT = 9042;
048
049    private final Cluster cluster;
050    private final String keyspace;
051    private final String insertQueryTemplate;
052    private final List<ColumnMapping> columnMappings;
053    private final BatchStatement batchStatement;
054    // re-usable argument binding array
055    private final Object[] values;
056
057    private Session session;
058    private PreparedStatement preparedStatement;
059
060    private CassandraManager(final String name, final int bufferSize, final Cluster cluster,
061                             final String keyspace, final String insertQueryTemplate,
062                             final List<ColumnMapping> columnMappings, final BatchStatement batchStatement) {
063        super(name, bufferSize);
064        this.cluster = cluster;
065        this.keyspace = keyspace;
066        this.insertQueryTemplate = insertQueryTemplate;
067        this.columnMappings = columnMappings;
068        this.batchStatement = batchStatement;
069        this.values = new Object[columnMappings.size()];
070    }
071
072    @Override
073    protected void startupInternal() throws Exception {
074        session = cluster.connect(keyspace);
075        preparedStatement = session.prepare(insertQueryTemplate);
076    }
077
078    @Override
079    protected boolean shutdownInternal() throws Exception {
080        session.close();
081        cluster.close();
082        return true;
083    }
084
085    @Override
086    protected void connectAndStart() {
087        // a Session automatically manages connections for us
088    }
089
090    @Override
091    protected void writeInternal(final LogEvent event, final Serializable serializable) {
092        for (int i = 0; i < columnMappings.size(); i++) {
093            final ColumnMapping columnMapping = columnMappings.get(i);
094            if (ThreadContextMap.class.isAssignableFrom(columnMapping.getType())
095                || ReadOnlyStringMap.class.isAssignableFrom(columnMapping.getType())) {
096                values[i] = event.getContextData().toMap();
097            } else if (ThreadContextStack.class.isAssignableFrom(columnMapping.getType())) {
098                values[i] = event.getContextStack().asList();
099            } else if (Date.class.isAssignableFrom(columnMapping.getType())) {
100                values[i] = DateTypeConverter.fromMillis(event.getTimeMillis(), columnMapping.getType().asSubclass(Date.class));
101            } else {
102                values[i] = TypeConverters.convert(columnMapping.getLayout().toSerializable(event),
103                    columnMapping.getType(), null);
104            }
105        }
106        final BoundStatement boundStatement = preparedStatement.bind(values);
107        if (batchStatement == null) {
108            session.execute(boundStatement);
109        } else {
110            batchStatement.add(boundStatement);
111        }
112    }
113
114    @Override
115    protected boolean commitAndClose() {
116        if (batchStatement != null) {
117            session.execute(batchStatement);
118        }
119        return true;
120    }
121
122    public static CassandraManager getManager(final String name, final SocketAddress[] contactPoints,
123                                              final ColumnMapping[] columns, final boolean useTls,
124                                              final String clusterName, final String keyspace, final String table,
125                                              final String username, final String password,
126                                              final boolean useClockForTimestampGenerator, final int bufferSize,
127                                              final boolean batched, final BatchStatement.Type batchType) {
128        return getManager(name,
129            new FactoryData(contactPoints, columns, useTls, clusterName, keyspace, table, username, password,
130                useClockForTimestampGenerator, bufferSize, batched, batchType), CassandraManagerFactory.INSTANCE);
131    }
132
133    private static class CassandraManagerFactory implements ManagerFactory<CassandraManager, FactoryData> {
134
135        private static final CassandraManagerFactory INSTANCE = new CassandraManagerFactory();
136
137        @Override
138        public CassandraManager createManager(final String name, final FactoryData data) {
139            final Cluster.Builder builder = Cluster.builder()
140                .addContactPointsWithPorts(data.contactPoints)
141                .withClusterName(data.clusterName);
142            if (data.useTls) {
143                builder.withSSL();
144            }
145            if (Strings.isNotBlank(data.username)) {
146                builder.withCredentials(data.username, data.password);
147            }
148            if (data.useClockForTimestampGenerator) {
149                builder.withTimestampGenerator(new ClockTimestampGenerator());
150            }
151            final Cluster cluster = builder.build();
152
153            final StringBuilder sb = new StringBuilder("INSERT INTO ").append(data.table).append(" (");
154            for (final ColumnMapping column : data.columns) {
155                sb.append(column.getName()).append(',');
156            }
157            sb.setCharAt(sb.length() - 1, ')');
158            sb.append(" VALUES (");
159            final List<ColumnMapping> columnMappings = new ArrayList<>(data.columns.length);
160            for (final ColumnMapping column : data.columns) {
161                if (Strings.isNotEmpty(column.getLiteralValue())) {
162                    sb.append(column.getLiteralValue());
163                } else {
164                    sb.append('?');
165                    columnMappings.add(column);
166                }
167                sb.append(',');
168            }
169            sb.setCharAt(sb.length() - 1, ')');
170            final String insertQueryTemplate = sb.toString();
171            LOGGER.debug("Using CQL for appender {}: {}", name, insertQueryTemplate);
172            return new CassandraManager(name, data.getBufferSize(), cluster, data.keyspace, insertQueryTemplate,
173                columnMappings, data.batched ? new BatchStatement(data.batchType) : null);
174        }
175    }
176
177    private static class FactoryData extends AbstractFactoryData {
178        private final InetSocketAddress[] contactPoints;
179        private final ColumnMapping[] columns;
180        private final boolean useTls;
181        private final String clusterName;
182        private final String keyspace;
183        private final String table;
184        private final String username;
185        private final String password;
186        private final boolean useClockForTimestampGenerator;
187        private final boolean batched;
188        private final BatchStatement.Type batchType;
189
190        private FactoryData(final SocketAddress[] contactPoints, final ColumnMapping[] columns, final boolean useTls,
191                            final String clusterName, final String keyspace, final String table, final String username,
192                            final String password, final boolean useClockForTimestampGenerator, final int bufferSize,
193                            final boolean batched, final BatchStatement.Type batchType) {
194            super(bufferSize, null);
195            this.contactPoints = convertAndAddDefaultPorts(contactPoints);
196            this.columns = columns;
197            this.useTls = useTls;
198            this.clusterName = clusterName;
199            this.keyspace = keyspace;
200            this.table = table;
201            this.username = username;
202            this.password = password;
203            this.useClockForTimestampGenerator = useClockForTimestampGenerator;
204            this.batched = batched;
205            this.batchType = batchType;
206        }
207
208        private static InetSocketAddress[] convertAndAddDefaultPorts(final SocketAddress... socketAddresses) {
209            final InetSocketAddress[] inetSocketAddresses = new InetSocketAddress[socketAddresses.length];
210            for (int i = 0; i < inetSocketAddresses.length; i++) {
211                final SocketAddress socketAddress = socketAddresses[i];
212                inetSocketAddresses[i] = socketAddress.getPort() == 0
213                    ? new InetSocketAddress(socketAddress.getAddress(), DEFAULT_PORT)
214                    : socketAddress.getSocketAddress();
215            }
216            return inetSocketAddresses;
217        }
218    }
219}