package com.netflix.astyanax.util; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.WriteAheadEntry; import com.netflix.astyanax.WriteAheadLog; import com.netflix.astyanax.connectionpool.OperationResult; import com.netflix.astyanax.connectionpool.exceptions.NoAvailableHostsException; import com.netflix.astyanax.connectionpool.exceptions.WalException; import com.netflix.astyanax.impl.NoOpWriteAheadLog; public class WriteAheadMutationBatchExecutor { private ListeningExecutorService executor; private WriteAheadLog wal = new NoOpWriteAheadLog(); private Predicate<Exception> retryablePredicate = Predicates.alwaysFalse(); private final Keyspace keyspace; private long waitOnNoHosts = 1000; public WriteAheadMutationBatchExecutor(Keyspace keyspace, int nThreads) { this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(nThreads, new ThreadFactoryBuilder().setDaemon(true).build())); this.keyspace = keyspace; } public WriteAheadMutationBatchExecutor(Keyspace keyspace, ExecutorService executor) { this.executor = MoreExecutors.listeningDecorator(executor); this.keyspace = keyspace; } public WriteAheadMutationBatchExecutor usingWriteAheadLog(WriteAheadLog wal) { this.wal = wal; return this; } public WriteAheadMutationBatchExecutor usingRetryablePredicate(Predicate<Exception> predicate) { this.retryablePredicate = predicate; return this; } /** * Replay records from the WAL */ public List<ListenableFuture<OperationResult<Void>>> replayWal(int count) { List<ListenableFuture<OperationResult<Void>>> futures = Lists.newArrayList(); WriteAheadEntry walEntry; while (null != (walEntry = wal.readNextEntry()) && count-- > 0) { MutationBatch m = keyspace.prepareMutationBatch(); try { walEntry.readMutation(m); futures.add(executeWalEntry(walEntry, m)); } catch (WalException e) { wal.removeEntry(walEntry); } } return futures; } /** * Write a mutation to the wal and execute it */ public ListenableFuture<OperationResult<Void>> execute(final MutationBatch m) throws WalException { final WriteAheadEntry walEntry = wal.createEntry(); walEntry.writeMutation(m); return executeWalEntry(walEntry, m); } private ListenableFuture<OperationResult<Void>> executeWalEntry(final WriteAheadEntry walEntry, final MutationBatch m) { return executor.submit(new Callable<OperationResult<Void>>() { public OperationResult<Void> call() throws Exception { try { OperationResult<Void> result = m.execute(); wal.removeEntry(walEntry); return result; } catch (Exception e) { if (e instanceof NoAvailableHostsException) { Thread.sleep(waitOnNoHosts); } if (retryablePredicate.apply(e)) executor.submit(this); else wal.removeEntry(walEntry); throw e; } } }); } public void shutdown() { executor.shutdown(); } }