public class

DedicatedMultiRowUniquenessConstraint

extends Object
implements UniquenessConstraint
package com.netflix.astyanax.recipes.uniqueness;

import java.util.List;

import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.recipes.locks.BusyLockException;
import com.netflix.astyanax.recipes.locks.StaleLockException;

/**
 * Multi-row uniqueness constraint where all involved column families are dedicated
 * only to uniquness constraint.
 * @author elandau
 *
 * @param <C>
 */
public class DedicatedMultiRowUniquenessConstraint<C> implements UniquenessConstraint {

    private final Keyspace keyspace;

    private Integer ttl = null;
    private ConsistencyLevel consistencyLevel = ConsistencyLevel.CL_LOCAL_QUORUM;
    private final C uniqueColumnName;
    
    private class Row<K> {
        private final ColumnFamily<K, C> columnFamily;
        private final K row;
        
        Row(ColumnFamily<K, C> columnFamily, K row) {
            super();
            this.columnFamily = columnFamily;
            this.row = row;
        }
        
        void fillMutation(MutationBatch m, Integer ttl) {
            m.withRow(columnFamily, row).putEmptyColumn(uniqueColumnName, ttl);
        }
        
        void fillReleaseMutation(MutationBatch m) {
            m.withRow(columnFamily, row).deleteColumn(uniqueColumnName);
        }
        
        void verifyLock() throws Exception {
            // Phase 2: Read back all columns. There should be only 1
            ColumnList<C> result = keyspace.prepareQuery(columnFamily).setConsistencyLevel(consistencyLevel)
                    .getKey(row).execute().getResult();

            if (result.size() != 1) {
                throw new NotUniqueException(row.toString());
            }
        }
        
        Column<C> getUniqueColumn() throws ConnectionException {
            ColumnList<C> columns = keyspace.prepareQuery(columnFamily).getKey(row).execute().getResult();
            Column<C> foundColumn = null;
            for (Column<C> column : columns) {
                if (column.getTtl() == 0) {
                    if (foundColumn != null)
                        throw new RuntimeException("Row has duplicate quite columns");
                    foundColumn = column;
                }
            }
            if (foundColumn == null) {
                throw new NotFoundException("Unique column not found");
            }
            return foundColumn;
        }
    }
    
    private final List<Row<?>> locks = Lists.newArrayList();

    public DedicatedMultiRowUniquenessConstraint(Keyspace keyspace, Supplier<C> uniqueColumnSupplier) {
        this.keyspace = keyspace;
        this.uniqueColumnName = uniqueColumnSupplier.get();
    }

    public DedicatedMultiRowUniquenessConstraint(Keyspace keyspace, C uniqueColumnName) {
        this.keyspace = keyspace;
        this.uniqueColumnName = uniqueColumnName;
    }

    /**
     * TTL to use for the uniquness operation. This is the TTL for the columns
     * to expire in the event of a client crash before the uniqueness can be
     * committed
     * 
     * @param ttl
     * @return
     */
    public DedicatedMultiRowUniquenessConstraint<C> withTtl(Integer ttl) {
        this.ttl = ttl;
        return this;
    }

    /**
     * Consistency level used
     * 
     * @param consistencyLevel
     * @return
     */
    public DedicatedMultiRowUniquenessConstraint<C> withConsistencyLevel(ConsistencyLevel consistencyLevel) {
        this.consistencyLevel = consistencyLevel;
        return this;
    }

    /**
     * Add a row to the set of rows being tested for uniqueness
     * 
     * @param columnFamily
     * @param rowKey
     * @return
     */
    public <K> DedicatedMultiRowUniquenessConstraint<C> withRow(ColumnFamily<K, C> columnFamily, K rowKey) {
        locks.add(new Row<K>(columnFamily, rowKey));
        return this;
    }
    
    public C getLockColumn() {
        return uniqueColumnName;
    }

    @Override
    public void acquire() throws NotUniqueException, Exception {
        acquireAndMutate(null);
    }
    
    @Override
    public void acquireAndMutate(MutationBatch other) throws NotUniqueException, Exception {
        // Insert lock check column for all rows in a single batch mutation
        try {
            MutationBatch m = keyspace.prepareMutationBatch().setConsistencyLevel(consistencyLevel);
            for (Row<?> lock : locks) {
                lock.fillMutation(m, ttl);
            }
            m.execute();

            // Check each lock in order
            for (Row<?> lock : locks) {
                lock.verifyLock();
            }

            // Commit the unique columns
            for (Row<?> lock : locks) {
                lock.fillMutation(m, null);
            }
            
            if (other != null) {
                m.mergeShallow(other);
            }
            m.execute();
        }
        catch (BusyLockException e) {
            release();
            throw new NotUniqueException(e);
        }
        catch (StaleLockException e) {
            release();
            throw new NotUniqueException(e);
        }
        catch (Exception e) {
            release();
            throw e;
        }
    }

    @Override
    public void release() throws Exception {
        MutationBatch m = keyspace.prepareMutationBatch();
        for (Row<?> lock : locks) {
            lock.fillReleaseMutation(m);
        }
        m.execute();
    }
    
    /**
     * @return
     * @throws Exception
     */
    public Column<C> getUniqueColumn() throws Exception {
        if (locks.size() == 0) 
            throw new IllegalStateException("Missing call to withRow to add rows to the uniqueness constraint");
        
        // Get the unique row from all columns
        List<Column<C>> columns = Lists.newArrayList();
        for (Row<?> row : locks) {
            columns.add(row.getUniqueColumn());
        }
        
        // Check that all rows have the same unique column otherwise they are not part of 
        // the same unique group
        Column<C> foundColumn = columns.get(0);
        for (int i = 1; i < columns.size(); i++) {
            Column<C> nextColumn = columns.get(i);
            if (!nextColumn.getRawName().equals(foundColumn.getRawName())) {
                throw new NotUniqueException("The provided rows are not part of the same uniquness constraint");
            }
            
            if (foundColumn.hasValue() != nextColumn.hasValue()) {
                throw new NotUniqueException("The provided rows are not part of the same uniquness constraint");
            }
            
            if (foundColumn.hasValue() && !nextColumn.getByteBufferValue().equals((foundColumn.getByteBufferValue()))) {
                throw new NotUniqueException("The provided rows are not part of the same uniquness constraint");
            }
        }
        
        return foundColumn;
    }
}