public class

ThriftColumnFamilyQueryImpl

extends Object
implements ColumnFamilyQuery<K, C>
/*******************************************************************************
 * Copyright 2011 Netflix
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ******************************************************************************/
package com.netflix.astyanax.thrift;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.cassandra.dht.BigIntegerToken;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Cassandra.Client;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CounterSuperColumn;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.cassandra.utils.Pair;
import org.mortbay.log.Log;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.CassandraOperationType;
import com.netflix.astyanax.KeyspaceTracerFactory;
import com.netflix.astyanax.RowCallback;
import com.netflix.astyanax.RowCopier;
import com.netflix.astyanax.connectionpool.ConnectionPool;
import com.netflix.astyanax.connectionpool.Host;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.TokenRange;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.OperationResultImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.query.AllRowsQuery;
import com.netflix.astyanax.query.ColumnCountQuery;
import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.query.ColumnQuery;
import com.netflix.astyanax.query.CqlQuery;
import com.netflix.astyanax.query.IndexQuery;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.query.RowSliceQuery;
import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.shallows.EmptyColumnList;
import com.netflix.astyanax.shallows.EmptyRowsImpl;
import com.netflix.astyanax.thrift.model.*;
import com.netflix.astyanax.util.TokenGenerator;

/**
 * Implementation of all column family queries using the thrift API.
 * 
 * @author elandau
 * 
 * @param <K>
 * @param <C>
 */
public class ThriftColumnFamilyQueryImpl<K, C> implements ColumnFamilyQuery<K, C> {
    private final ConnectionPool<Cassandra.Client> connectionPool;
    private final ColumnFamily<K, C> columnFamily;
    private final KeyspaceTracerFactory tracerFactory;
    private final Keyspace keyspace;
    private ConsistencyLevel consistencyLevel;
    private static final RandomPartitioner partitioner = new RandomPartitioner();
    private final ExecutorService executor;
    private Host pinnedHost;
    private RetryPolicy retry;

    public ThriftColumnFamilyQueryImpl(ExecutorService executor, KeyspaceTracerFactory tracerFactory,
            Keyspace keyspace, ConnectionPool<Cassandra.Client> cp, ColumnFamily<K, C> columnFamily,
            ConsistencyLevel consistencyLevel, RetryPolicy retry) {
        this.keyspace = keyspace;
        this.connectionPool = cp;
        this.consistencyLevel = consistencyLevel;
        this.columnFamily = columnFamily;
        this.tracerFactory = tracerFactory;
        this.executor = executor;
        this.retry = retry;
    }

    // Single ROW query
    @Override
    public RowQuery<K, C> getKey(final K rowKey) {
        return new AbstractRowQueryImpl<K, C>(columnFamily.getColumnSerializer()) {
            private boolean firstPage = true;

            @Override
            public ColumnQuery<C> getColumn(final C column) {
                return new ColumnQuery<C>() {
                    @Override
                    public OperationResult<Column<C>> execute() throws ConnectionException {
                        return connectionPool.executeWithFailover(new AbstractKeyspaceOperationImpl<Column<C>>(
                                tracerFactory.newTracer(CassandraOperationType.GET_COLUMN, columnFamily), pinnedHost,
                                keyspace.getKeyspaceName()) {
                            @Override
                            public Column<C> internalExecute(Client client) throws Exception {
                                ColumnOrSuperColumn cosc = client.get(
                                        columnFamily.getKeySerializer().toByteBuffer(rowKey),
                                        new org.apache.cassandra.thrift.ColumnPath().setColumn_family(
                                                columnFamily.getName()).setColumn(
                                                columnFamily.getColumnSerializer().toByteBuffer(column)),
                                        ThriftConverter.ToThriftConsistencyLevel(consistencyLevel));
                                if (cosc.isSetColumn()) {
                                    org.apache.cassandra.thrift.Column c = cosc.getColumn();
                                    return new ThriftColumnImpl<C>(columnFamily.getColumnSerializer().fromBytes(
                                            c.getName()), c);
                                }
                                else if (cosc.isSetSuper_column()) {
                                    // TODO: Super columns
                                    // should be deprecated
                                    SuperColumn sc = cosc.getSuper_column();
                                    return new ThriftSuperColumnImpl<C>(columnFamily.getColumnSerializer().fromBytes(
                                            sc.getName()), sc);
                                }
                                else if (cosc.isSetCounter_column()) {
                                    org.apache.cassandra.thrift.CounterColumn c = cosc.getCounter_column();
                                    return new ThriftCounterColumnImpl<C>(columnFamily.getColumnSerializer().fromBytes(
                                            c.getName()), c);
                                }
                                else if (cosc.isSetCounter_super_column()) {
                                    // TODO: Super columns
                                    // should be deprecated
                                    CounterSuperColumn sc = cosc.getCounter_super_column();
                                    return new ThriftCounterSuperColumnImpl<C>(columnFamily.getColumnSerializer()
                                            .fromBytes(sc.getName()), sc);
                                }
                                else {
                                    throw new RuntimeException("Unknown column type in response");
                                }
                            }

                            @Override
                            public BigInteger getToken() {
                                return partitioner.getToken(columnFamily.getKeySerializer().toByteBuffer(rowKey)).token;
                            }
                        }, retry);
                    }

                    @Override
                    public Future<OperationResult<Column<C>>> executeAsync() throws ConnectionException {
                        return executor.submit(new Callable<OperationResult<Column<C>>>() {
                            @Override
                            public OperationResult<Column<C>> call() throws Exception {
                                return execute();
                            }
                        });
                    }
                };
            }

            @Override
            public OperationResult<ColumnList<C>> execute() throws ConnectionException {
                return connectionPool.executeWithFailover(
                        new AbstractKeyspaceOperationImpl<ColumnList<C>>(tracerFactory.newTracer(
                                CassandraOperationType.GET_ROW, columnFamily), pinnedHost, keyspace.getKeyspaceName()) {

                            @Override
                            public ColumnList<C> execute(Client client) throws ConnectionException {
                                if (isPaginating && paginateNoMore) {
                                    return new EmptyColumnList<C>();
                                }

                                return super.execute(client);
                            }

                            @Override
                            public ColumnList<C> internalExecute(Client client) throws Exception {
                                List<ColumnOrSuperColumn> columnList = client.get_slice(columnFamily.getKeySerializer()
                                        .toByteBuffer(rowKey), new ColumnParent().setColumn_family(columnFamily
                                        .getName()), predicate, ThriftConverter
                                        .ToThriftConsistencyLevel(consistencyLevel));

                                // Special handling for pagination
                                if (isPaginating && predicate.isSetSlice_range()) {
                                    // Did we reach the end of the query.
                                    if (columnList.size() != predicate.getSlice_range().getCount()) {
                                        paginateNoMore = true;
                                    }

                                    // If this is the first page then adjust the
                                    // count so we fetch one extra column
                                    // that will later be dropped
                                    if (firstPage) {
                                        firstPage = false;
                                        predicate.getSlice_range().setCount(predicate.getSlice_range().getCount() + 1);
                                    }
                                    else {
                                        if (!columnList.isEmpty())
                                            columnList.remove(0);
                                    }

                                    // Set the start column for the next page to
                                    // the last column of this page.
                                    // We will discard this column later.
                                    if (!columnList.isEmpty()) {
                                        ColumnOrSuperColumn last = Iterables.getLast(columnList);
                                        if (last.isSetColumn()) {
                                            predicate.getSlice_range().setStart(last.getColumn().getName());
                                        }
                                    }
                                }
                                ColumnList<C> result = new ThriftColumnOrSuperColumnListImpl<C>(columnList,
                                        columnFamily.getColumnSerializer());
                                return result;
                            }

                            @Override
                            public BigInteger getToken() {
                                return partitioner.getToken(columnFamily.getKeySerializer().toByteBuffer(rowKey)).token;
                            }
                        }, retry);
            }

            @Override
            public ColumnCountQuery getCount() {
                return new ColumnCountQuery() {
                    @Override
                    public OperationResult<Integer> execute() throws ConnectionException {
                        return connectionPool.executeWithFailover(new AbstractKeyspaceOperationImpl<Integer>(
                                tracerFactory.newTracer(CassandraOperationType.GET_COLUMN_COUNT, columnFamily),
                                pinnedHost, keyspace.getKeyspaceName()) {
                            @Override
                            public Integer internalExecute(Client client) throws Exception {
                                return client.get_count(columnFamily.getKeySerializer().toByteBuffer(rowKey),
                                        new ColumnParent().setColumn_family(columnFamily.getName()), predicate,
                                        ThriftConverter.ToThriftConsistencyLevel(consistencyLevel));
                            }

                            @Override
                            public BigInteger getToken() {
                                return partitioner.getToken(columnFamily.getKeySerializer().toByteBuffer(rowKey)).token;
                            }
                        }, retry);
                    }

                    @Override
                    public Future<OperationResult<Integer>> executeAsync() throws ConnectionException {
                        return executor.submit(new Callable<OperationResult<Integer>>() {
                            @Override
                            public OperationResult<Integer> call() throws Exception {
                                return execute();
                            }
                        });
                    }
                };
            }

            @Override
            public Future<OperationResult<ColumnList<C>>> executeAsync() throws ConnectionException {
                return executor.submit(new Callable<OperationResult<ColumnList<C>>>() {
                    @Override
                    public OperationResult<ColumnList<C>> call() throws Exception {
                        return execute();
                    }
                });
            }

            @Override
            public RowCopier<K, C> copyTo(final ColumnFamily<K, C> otherColumnFamily, final K otherRowKey) {
                return new RowCopier<K, C>() {
                    @Override
                    public OperationResult<Void> execute() throws ConnectionException {
                        return connectionPool.executeWithFailover(
                                new AbstractKeyspaceOperationImpl<Void>(tracerFactory.newTracer(
                                        CassandraOperationType.COPY_TO, columnFamily), pinnedHost, keyspace
                                        .getKeyspaceName()) {
                                    @Override
                                    public Void internalExecute(Client client) throws Exception {
                                        List<ColumnOrSuperColumn> columnList = client.get_slice(columnFamily
                                                .getKeySerializer().toByteBuffer(rowKey), new ColumnParent()
                                                .setColumn_family(columnFamily.getName()), predicate, ThriftConverter
                                                .ToThriftConsistencyLevel(consistencyLevel));

                                        // Create mutation list from columns in
                                        // the response
                                        List<Mutation> mutationList = new ArrayList<Mutation>();
                                        for (ColumnOrSuperColumn sosc : columnList) {
                                            Mutation mutation = new Mutation();
                                            mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn().setColumn(sosc
                                                    .getColumn()));
                                            mutationList.add(mutation);
                                        }

                                        // Create mutation map
                                        Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
                                        HashMap<String, List<Mutation>> cfmap = new HashMap<String, List<Mutation>>();
                                        cfmap.put(otherColumnFamily.getName(), mutationList);
                                        mutationMap.put(columnFamily.getKeySerializer().toByteBuffer(otherRowKey),
                                                cfmap);

                                        // Execute the mutation
                                        client.batch_mutate(mutationMap,
                                                ThriftConverter.ToThriftConsistencyLevel(consistencyLevel));
                                        return null;
                                    }
                                }, retry);
                    }

                    @Override
                    public Future<OperationResult<Void>> executeAsync() throws ConnectionException {
                        return executor.submit(new Callable<OperationResult<Void>>() {
                            @Override
                            public OperationResult<Void> call() throws Exception {
                                return execute();
                            }
                        });
                    }
                };
            }
        };
    }

    @Override
    public RowSliceQuery<K, C> getKeyRange(final K startKey, final K endKey, final String startToken,
            final String endToken, final int count) {
        return new AbstractRowSliceQueryImpl<K, C>(columnFamily.getColumnSerializer()) {
            @Override
            public OperationResult<Rows<K, C>> execute() throws ConnectionException {
                return connectionPool.executeWithFailover(
                        new AbstractKeyspaceOperationImpl<Rows<K, C>>(tracerFactory.newTracer(
                                CassandraOperationType.GET_ROWS_RANGE, columnFamily), pinnedHost, keyspace
                                .getKeyspaceName()) {
                            @Override
                            public Rows<K, C> internalExecute(Client client) throws Exception {
                                // This is a sorted list
                                // Same call for standard and super columns via
                                // the ColumnParent
                                KeyRange range = new KeyRange();
                                if (startKey != null)
                                    range.setStart_key(columnFamily.getKeySerializer().toByteBuffer(startKey));
                                if (endKey != null)
                                    range.setEnd_key(columnFamily.getKeySerializer().toByteBuffer(endKey));
                                range.setCount(count).setStart_token(startToken).setEnd_token(endToken);

                                List<org.apache.cassandra.thrift.KeySlice> keySlices = client.get_range_slices(
                                        new ColumnParent().setColumn_family(columnFamily.getName()), predicate, range,
                                        ThriftConverter.ToThriftConsistencyLevel(consistencyLevel));

                                if (keySlices == null || keySlices.isEmpty()) {
                                    return new EmptyRowsImpl<K, C>();
                                }
                                else {
                                    return new ThriftRowsSliceImpl<K, C>(keySlices, columnFamily.getKeySerializer(),
                                            columnFamily.getColumnSerializer());
                                }
                            }

                            @Override
                            public BigInteger getToken() {
                                if (startKey != null)
                                    return partitioner.getToken(columnFamily.getKeySerializer().toByteBuffer(startKey)).token;
                                return null;
                            }
                        }, retry);
            }

            @Override
            public Future<OperationResult<Rows<K, C>>> executeAsync() throws ConnectionException {
                return executor.submit(new Callable<OperationResult<Rows<K, C>>>() {
                    @Override
                    public OperationResult<Rows<K, C>> call() throws Exception {
                        return execute();
                    }
                });
            }
        };
    }
    
    @Override
    public RowSliceQuery<K, C> getKeySlice(final Iterable<K> keys) {
        return new AbstractRowSliceQueryImpl<K, C>(columnFamily.getColumnSerializer()) {
            @Override
            public OperationResult<Rows<K, C>> execute() throws ConnectionException {
                return connectionPool.executeWithFailover(
                        new AbstractKeyspaceOperationImpl<Rows<K, C>>(tracerFactory.newTracer(
                                CassandraOperationType.GET_ROWS_SLICE, columnFamily), pinnedHost, keyspace
                                .getKeyspaceName()) {
                            @Override
                            public Rows<K, C> internalExecute(Client client) throws Exception {
                                Map<ByteBuffer, List<ColumnOrSuperColumn>> cfmap = client.multiget_slice(columnFamily
                                        .getKeySerializer().toBytesList(keys), new ColumnParent()
                                        .setColumn_family(columnFamily.getName()), predicate, ThriftConverter
                                        .ToThriftConsistencyLevel(consistencyLevel));
                                if (cfmap == null || cfmap.isEmpty()) {
                                    return new EmptyRowsImpl<K, C>();
                                }
                                else {
                                    return new ThriftRowsListImpl<K, C>(cfmap, columnFamily.getKeySerializer(),
                                            columnFamily.getColumnSerializer());
                                }
                            }

                            @Override
                            public BigInteger getToken() {
                                // / return
                                // partitioner.getToken(columnFamily.getKeySerializer().toByteBuffer(keys.iterator().next())).token;
                                return null;
                            }
                        }, retry);
            }

            @Override
            public Future<OperationResult<Rows<K, C>>> executeAsync() throws ConnectionException {
                return executor.submit(new Callable<OperationResult<Rows<K, C>>>() {
                    @Override
                    public OperationResult<Rows<K, C>> call() throws Exception {
                        return execute();
                    }
                });
            }
        };
    }

    @Override
    public RowSliceQuery<K, C> getKeySlice(final K keys[]) {
        return getKeySlice(Arrays.asList(keys));
    }
    
    @Override
    public RowSliceQuery<K, C> getKeySlice(final Collection<K> keys) {
        return new AbstractRowSliceQueryImpl<K, C>(columnFamily.getColumnSerializer()) {
            @Override
            public OperationResult<Rows<K, C>> execute() throws ConnectionException {
                return connectionPool.executeWithFailover(
                        new AbstractKeyspaceOperationImpl<Rows<K, C>>(tracerFactory.newTracer(
                                CassandraOperationType.GET_ROWS_SLICE, columnFamily), pinnedHost, keyspace
                                .getKeyspaceName()) {
                            @Override
                            public Rows<K, C> internalExecute(Client client) throws Exception {
                                Map<ByteBuffer, List<ColumnOrSuperColumn>> cfmap = client.multiget_slice(columnFamily
                                        .getKeySerializer().toBytesList(keys), new ColumnParent()
                                        .setColumn_family(columnFamily.getName()), predicate, ThriftConverter
                                        .ToThriftConsistencyLevel(consistencyLevel));
                                if (cfmap == null || cfmap.isEmpty()) {
                                    return new EmptyRowsImpl<K, C>();
                                }
                                else {
                                    return new ThriftRowsListImpl<K, C>(cfmap, columnFamily.getKeySerializer(),
                                            columnFamily.getColumnSerializer());
                                }
                            }

                            @Override
                            public BigInteger getToken() {
                                // / return
                                // partitioner.getToken(columnFamily.getKeySerializer().toByteBuffer(keys.iterator().next())).token;
                                return null;
                            }
                        }, retry);
            }

            @Override
            public Future<OperationResult<Rows<K, C>>> executeAsync() throws ConnectionException {
                return executor.submit(new Callable<OperationResult<Rows<K, C>>>() {
                    @Override
                    public OperationResult<Rows<K, C>> call() throws Exception {
                        return execute();
                    }
                });
            }
        };
    }

    @Override
    public ColumnFamilyQuery<K, C> setConsistencyLevel(ConsistencyLevel consistencyLevel) {
        this.consistencyLevel = consistencyLevel;
        return this;
    }

    @Override
    public IndexQuery<K, C> searchWithIndex() {
        return new AbstractIndexQueryImpl<K, C>(columnFamily) {
            @Override
            public OperationResult<Rows<K, C>> execute() throws ConnectionException {
                return connectionPool.executeWithFailover(
                        new AbstractKeyspaceOperationImpl<Rows<K, C>>(tracerFactory.newTracer(
                                CassandraOperationType.GET_ROWS_BY_INDEX, columnFamily), pinnedHost, keyspace
                                .getKeyspaceName()) {
                            @Override
                            public Rows<K, C> execute(Client client) throws ConnectionException {
                                if (isPaginating && paginateNoMore) {
                                    return new EmptyRowsImpl<K, C>();
                                }

                                return super.execute(client);
                            }

                            @Override
                            public Rows<K, C> internalExecute(Client client) throws Exception {
                                List<org.apache.cassandra.thrift.KeySlice> cfmap;
                                cfmap = client.get_indexed_slices(
                                        new ColumnParent().setColumn_family(columnFamily.getName()), indexClause,
                                        predicate, ThriftConverter.ToThriftConsistencyLevel(consistencyLevel));

                                if (cfmap == null) {
                                    return new EmptyRowsImpl<K, C>();
                                }
                                else {
                                    if (isPaginating) {
                                        if (!firstPage) {
                                            cfmap.remove(0);
                                        }

                                        try {
                                            if (!cfmap.isEmpty()) {
                                                setNextStartKey(ByteBuffer.wrap(Iterables.getLast(cfmap).getKey()));
                                            }
                                            else {
                                                paginateNoMore = true;
                                            }
                                        }
                                        catch (ArithmeticException e) {
                                            paginateNoMore = true;
                                        }
                                    }
                                    return new ThriftRowsSliceImpl<K, C>(cfmap, columnFamily.getKeySerializer(),
                                            columnFamily.getColumnSerializer());
                                }
                            }
                        }, retry);
            }

            @Override
            public Future<OperationResult<Rows<K, C>>> executeAsync() throws ConnectionException {
                return executor.submit(new Callable<OperationResult<Rows<K, C>>>() {
                    @Override
                    public OperationResult<Rows<K, C>> call() throws Exception {
                        return execute();
                    }
                });
            }
        };
    }

    @Override
    public CqlQuery<K, C> withCql(final String cql) {
        return new CqlQuery<K, C>() {
            private boolean useCompression = false;

            @Override
            public OperationResult<CqlResult<K, C>> execute() throws ConnectionException {
                return connectionPool.executeWithFailover(
                        new AbstractKeyspaceOperationImpl<CqlResult<K, C>>(tracerFactory.newTracer(
                                CassandraOperationType.CQL, columnFamily), pinnedHost, keyspace.getKeyspaceName()) {
                            @Override
                            public CqlResult<K, C> internalExecute(Client client) throws Exception {
                                org.apache.cassandra.thrift.CqlResult res = client.execute_cql_query(StringSerializer
                                        .get().toByteBuffer(cql), useCompression ? Compression.GZIP : Compression.NONE);
                                switch (res.getType()) {
                                case ROWS:
                                    return new ThriftCqlResultImpl<K, C>(new ThriftCqlRowsImpl<K, C>(res.getRows(),
                                            columnFamily.getKeySerializer(), columnFamily.getColumnSerializer()));
                                case INT:
                                    return new ThriftCqlResultImpl<K, C>(res.getNum());
                                default:
                                    return null;
                                }
                            }
                        }, retry);
            }

            @Override
            public Future<OperationResult<CqlResult<K, C>>> executeAsync() throws ConnectionException {
                return executor.submit(new Callable<OperationResult<CqlResult<K, C>>>() {
                    @Override
                    public OperationResult<CqlResult<K, C>> call() throws Exception {
                        return execute();
                    }
                });
            }

            @Override
            public CqlQuery<K, C> useCompression() {
                useCompression = true;
                return this;
            }
        };
    }

    @Override
    public AllRowsQuery<K, C> getAllRows() {
        return new AbstractThriftAllRowsQueryImpl<K, C>(columnFamily) {
            private AbstractThriftAllRowsQueryImpl<K, C> getThisQuery() {
                return this;
            }

            protected List<org.apache.cassandra.thrift.KeySlice> getNextBlock(final KeyRange range) {
                while (true) {
                    try {
                        return connectionPool.executeWithFailover(
                                new AbstractKeyspaceOperationImpl<List<org.apache.cassandra.thrift.KeySlice>>(
                                        tracerFactory.newTracer(CassandraOperationType.GET_ROWS_RANGE, columnFamily),
                                        pinnedHost, keyspace.getKeyspaceName()) {
                                    @Override
                                    public List<org.apache.cassandra.thrift.KeySlice> internalExecute(Client client)
                                            throws Exception {
                                        return client.get_range_slices(
                                                new ColumnParent().setColumn_family(columnFamily.getName()), predicate,
                                                range, ThriftConverter.ToThriftConsistencyLevel(consistencyLevel));
                                    }

                                    @Override
                                    public BigInteger getToken() {
                                        if (range.getStart_key() != null)
                                            return partitioner.getToken(range.start_key).token;
                                        return null;
                                    }
                                }, retry).getResult();
                    }
                    catch (ConnectionException e) {
                        // Let exception callback handle this exception. If it
                        // returns false then
                        // we return an empty result which the iterator's
                        // hasNext() to return false.
                        // If no exception handler is provided then simply
                        // return an empty set as if the
                        // there is no more data
                        if (this.getExceptionCallback() == null) {
                            throw new RuntimeException(e);
                        }
                        else {
                            if (!this.getExceptionCallback().onException(e)) {
                                return new ArrayList<org.apache.cassandra.thrift.KeySlice>();
                            }
                        }
                    }
                }
            }

            @Override
            public OperationResult<Rows<K, C>> execute() throws ConnectionException {
                return new OperationResultImpl<Rows<K, C>>(Host.NO_HOST, new ThriftAllRowsImpl<K, C>(getThisQuery(),
                        columnFamily), 0);
            }

            @Override
            public Future<OperationResult<Rows<K, C>>> executeAsync() throws ConnectionException {
                throw new UnsupportedOperationException("executeAsync not supported here.  Use execute()");
            }

            @Override
            public void executeWithCallback(final RowCallback<K, C> callback) throws ConnectionException {
                final RandomPartitioner partitioner = new RandomPartitioner();

                final AtomicReference<ConnectionException> error = new AtomicReference<ConnectionException>();

                List<Pair<String, String>> ranges = Lists.newArrayList();
                if (this.getConcurrencyLevel() != null) {
                    int nThreads = this.getConcurrencyLevel();
                    for (int i = 0; i < nThreads; i++) {
                        BigIntegerToken start =  new BigIntegerToken(TokenGenerator.initialToken(nThreads, i,   TokenGenerator.MINIMUM, TokenGenerator.MAXIMUM));
                        BigIntegerToken end   =  new BigIntegerToken(TokenGenerator.initialToken(nThreads, i+1, TokenGenerator.MINIMUM, TokenGenerator.MAXIMUM));
                        ranges.add(Pair.create(start.toString(), end.toString()));
                    }
                }
                else {
                    ranges = Lists.transform(keyspace.describeRing(true), new Function<TokenRange, Pair<String, String>> () {
                        @Override
                        public Pair<String, String> apply(TokenRange input) {
                            return Pair.create(input.getStartToken(), input.getEndToken());
                        }
                    });
                }
                final CountDownLatch doneSignal = new CountDownLatch(ranges.size());
                
                for (final Pair<String, String> token : ranges) {
                    executor.submit(new Callable<Void>() {
                        @Override
                        public Void call() throws Exception {
                            // Prepare the range of tokens for this token range
                            final KeyRange range = new KeyRange().setCount(getBlockSize())
                                    .setStart_token(token.left).setEnd_token(token.right);

                            try {
                                // Loop until we get all the rows for this
                                // token range or we get an exception
                                while (error.get() == null) {
                                    try {
                                        // Get the next block
                                        List<KeySlice> ks = connectionPool.executeWithFailover(
                                                new AbstractKeyspaceOperationImpl<List<KeySlice>>(tracerFactory
                                                        .newTracer(CassandraOperationType.GET_ROWS_RANGE,
                                                                columnFamily), pinnedHost, keyspace
                                                        .getKeyspaceName()) {
                                                    @Override
                                                    public List<KeySlice> internalExecute(Client client)
                                                            throws Exception {
                                                        return client.get_range_slices(new ColumnParent()
                                                                .setColumn_family(columnFamily.getName()),
                                                                predicate, range, ThriftConverter
                                                                        .ToThriftConsistencyLevel(consistencyLevel));
                                                    }

                                                    @Override
                                                    public BigInteger getToken() {
                                                        if (range.getStart_key() != null)
                                                            return partitioner.getToken(ByteBuffer.wrap(range
                                                                    .getStart_key())).token;
                                                        return null;
                                                    }
                                                }, retry.duplicate()).getResult();

                                        // Notify the callback
                                        if (!ks.isEmpty()) {
                                            Rows<K, C> rows = new ThriftRowsSliceImpl<K, C>(ks, columnFamily
                                                    .getKeySerializer(), columnFamily.getColumnSerializer());
                                            callback.success(rows);
                                            if (rows.size() == getBlockSize()) {
                                                Row<K, C> lastRow = rows.getRowByIndex(rows.size() - 1);

                                                // Determine the start token
                                                // for the next page
                                                String token = partitioner.getToken(lastRow.getRawKey()).toString();
                                                if (getRepeatLastToken()) {
                                                    // Start token is
                                                    // non-inclusive
                                                    BigInteger intToken = new BigInteger(token)
                                                            .subtract(new BigInteger("1"));
                                                    range.setStart_token(intToken.toString());
                                                }
                                                else {
                                                    range.setStart_token(token);
                                                }
                                            }
                                            else {
                                                return null;
                                            }
                                        }
                                        else {
                                            return null;
                                        }
                                    }
                                    catch (Exception e) {
                                        ConnectionException ce = ThriftConverter.ToConnectionPoolException(e);
                                        if (!callback.failure(ce))   
                                            error.set(ce);
                                    }
                                }
                            }
                            finally {
                                doneSignal.countDown();
                            }
                            return null;
                        }
                    });
                }
                // Block until all threads finish
                try {
                    doneSignal.await();
                }
                catch (InterruptedException e) {
                    Log.debug("Execution interrupted on get all rows for keyspace " + keyspace.getKeyspaceName());
                }

                if (error.get() != null) {
                    throw error.get();
                }
            }
        };
    }

    @Override
    public ColumnFamilyQuery<K, C> pinToHost(Host host) {
        this.pinnedHost = host;
        return this;
    }

    @Override
    public ColumnFamilyQuery<K, C> withRetryPolicy(RetryPolicy retry) {
        this.retry = retry;
        return this;
    }
}