Thu. Mar 28th, 2024

Java streams introduced a new way to program for developers. Have a dataset and build a stream to filter, it maps it, and then collect it in the new form. This gave developers a powerful tool to use in development and data processing. I can’t even begin to say how many times I have used this. It is an efficient and organized way to process your data.

However I had the question what if I had only 1 element? What if my data came in chunks? Building and tearing down a stream over, over and over for 1 data element didn’t seem to make sense to me. Why couldn’t I use the same stream over and over?

I set out to create something more persistent. What exactly did it need to do?

  • Process data similar to a stream.
  • Expand on the methods.
  • Run in parallel or single-thread.
  • Connect multiple entities together.

Thus was born Pipelines. I looked around and had seen there were existing libraries similar to this. They seemed to be a lot bulkier than I wanted. I prefer lean code. Let’s get into the code and see what I have come up with.

Pipe Interface

The first thing we had to do was determine how the pieces were going to fit together. I created the PipeInterface<S,T>. This provides a simple interface that can be used

package name.mymiller.extensions.utils.pipelines;

import java.util.List;

/**
 * Interface for a Pipe object to implement for compatiablity with the Pipeline.
 *
 * @author jmiller
 *
 */
public interface PipeInterface<S, T> {

	/**
	 *
	 * @return List of any futures from sub-pipelines
	 */
	public default List<PipeFuture<?>> getFutures() {
		return null;
	}

	/**
	 * Method to process the object pushed to the pipe.
	 *
	 * @param data
	 *            Object being pushed to the pipe
	 * @param futures list of futures associated with this data as possible terminate points.
	 * @param String containing the name of this pipeline.           
	 * @return Object after processing has occurred.
	 * @throws Throwable TODO
	 */
	public abstract T process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) throws Throwable;

}

We will use this interface to define a new pipe section for our pipeline. As we are designing our pipeline to be able to send data to other pipelines, and to run parallel. We pass in a List of PipeFuture that could be marked as complete with the data attached.

Pipe Future

When we are running in parallel it is necessary to use a Future to indicate if the data has completed processing. As our pipeline design allows for data to processed single threaded, or in parallel we use Future to indicate the processing when in parallel.

/**
 *
 */
package name.mymiller.extensions.utils.pipelines;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Future representing the final data for a pipeline.
 *
 * @author jmiller
 * @param <V>
 *            Type of final data
 *
 */
public class PipeFuture<V> implements Future<V> {

	/**
	 * Object used for synchronization between threads
	 */
	private Object syncObject = null;

	/**
	 * The final result from processing
	 */
	private V futureObject = null;

	/**
	 * String identifier for the pipeline. Will match the name of the pipeline
	 */
	private String identifier = null;

	private UUID pipelineId = null;

	/**
	 * Boolean indicating if the future is complete.
	 */
	private AtomicBoolean done = null;

	/**
	 * Constructor that takes the identifier for the future.
	 *
	 * @param identifier
	 *            String indicating the pipeline this ia future too.
	 */
	public PipeFuture(String identifier, UUID pipelineId) {
		this.syncObject = new Object();
		this.done = new AtomicBoolean(false);
		this.identifier = identifier;
		this.pipelineId = pipelineId;
	}

	@Override
	public boolean cancel(boolean mayInterruptIfRunning) {
		return false;
	}

	@Override
	public V get() throws InterruptedException, ExecutionException {
		return this.futureObject;
	}

	@Override
	public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
		unit.timedWait(this.syncObject, timeout);
		return this.futureObject;
	}

	/**
	 * @return the identifier
	 */
	public String getIdentifier() {
		return this.identifier;
	}

	/**
	 * @return the pipelineId
	 */
	public UUID getPipelineId() {
		return this.pipelineId;
	}

	@Override
	public boolean isCancelled() {
		return false;
	}

	@Override
	public boolean isDone() {
		return this.done.get();
	}

	/**
	 * @param done
	 *            the done to set
	 */
	public synchronized void setDone(boolean done) {
		this.done.set(done);
	}

	/**
	 * @param futureObject
	 *            the futureObject to set
	 */
	public synchronized void setFutureObject(V futureObject) {
		this.futureObject = futureObject;
		this.syncObject.notifyAll();
	}

}

Here you can see we have extended the Future. We have added the field to indicate what pipeline this PipeFuture represents.

Pipeline

Here is the meat and potatoes of our Pipeline. A single class with built in pipes for the main functionality. A simple call to Pipeline.<>start() to name your Pipeline and your off to building the processing. Then a call to Pipeline::process() or Pipeline::processParallel() to initiate the data processing.

package name.mymiller.extensions.utils.pipelines;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;

/**
 * Pipeline object to manage your processing pipeline. Pipe segments are added
 * in the ordered they are connected to the Pipeline. They are called in
 * sequential order. Add or create a Collector at the end to collect a result if
 * needed.
 *
 * @author jmiller
 *
 */
public class Pipeline<S, T> {

	/**
	 * A pipe that will take in an object of type T and perform an action on the
	 * data. This can transform the data to a new type, or update data on the pipe.
	 *
	 * @author jmiller
	 * @param <T>
	 *            The type the Pipe will accept,
	 * @param <R>
	 *            The Type the Pipe will return.
	 *
	 */
	private class ActionPipe<S, T> implements PipeInterface<S, T> {

		/**
		 * Action Functional interface
		 */
		private Function<? super S, ? extends T> action;

		/**
		 * Constructor to provide the Class it will act on, and the functional interface
		 *
		 * @param type
		 *            Class it will accept.
		 * @param action
		 *            Functional interface to act on.
		 */
		public ActionPipe(Function<? super S, ? extends T> action) {
			this.action = action;
		}

		/*
		 * (non-Javadoc)
		 *
		 * @see
		 * name.mymiller.extensions.utils.pipelines.PipeInterface#process(java.lang.
		 * Object)
		 */
		@Override
		public T process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			return this.action.apply(data);
		}
	}

	private class DefaultExceptionHandler implements ExceptionHandlerInterface {

		@Override
		public boolean process(Throwable throwable, PipeInterface<?, ?> pipe, Object data, List<PipeFuture<?>> futures,
				String pipelineName, boolean isParallel) {
			System.out.println("Pipe: " + pipe.getClass().getName());
			System.out.println("Date: " + data.toString());
			System.out.println("Pipeline: " + pipelineName);
			System.out.println("Parallel: " + isParallel);
			System.out.println("Exception: " + throwable.getMessage());
			System.out.println("Stacktrace: ");
			throwable.printStackTrace(System.out);

			return false;
		}

	}

	/**
	 * Pipe segment used to allow only distinct data blocks duplicate data blocks
	 * from being further processed
	 *
	 * @author jmiller
	 *
	 * @param <S>
	 *            Source Type
	 */
	private class DistinctByPipe<S> implements PipeInterface<S, S> {

		/**
		 * List containing data blocks previously processed
		 */
		private Map<Object, Boolean> seen;

		private boolean onDistinct = true;

		private Function<? super S, Object> keyExtractor;

		/**
		 * Default contructor setting up the distinctList
		 */
		public DistinctByPipe(Function<? super S, Object> keyExtractor) {
			this.seen = new ConcurrentHashMap<>();
			this.keyExtractor = keyExtractor;
		}

		/**
		 * Default contructor setting up the distinctList
		 */
		public DistinctByPipe(Function<? super S, Object> keyExtractor, boolean onDistinct) {
			this.seen = new ConcurrentHashMap<>();
			this.keyExtractor = keyExtractor;
			this.onDistinct = onDistinct;
		}

		@Override
		public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			if (this.seen.putIfAbsent(this.keyExtractor.apply(data), Boolean.TRUE) == null) {
				if (this.onDistinct) {
					return data;
				} else {
					return null;
				}
			}
			if (this.onDistinct) {
				return null;
			} else {
				return data;
			}
		}
	}

	/**
	 * Pipe segment used to allow only distinct data blocks duplicate data blocks
	 * from being further processed
	 *
	 * @author jmiller
	 *
	 * @param <S>
	 *            Source Type
	 */
	private class DistinctPipe<S> implements PipeInterface<S, S> {

		/**
		 * List containing data blocks previously processed
		 */
		private Set<S> distinctList;

		private boolean onDistinct = true;

		/**
		 * Default contructor setting up the distinctList
		 */
		public DistinctPipe() {
			this.distinctList = Collections.synchronizedSet(new HashSet<>());
		}

		/**
		 * Default contructor setting up the distinctList
		 */
		public DistinctPipe(boolean onDistinct) {
			this.distinctList = Collections.synchronizedSet(new HashSet<>());
			this.onDistinct = onDistinct;
		}

		@Override
		public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			if (!this.distinctList.contains(data)) {
				this.distinctList.add(data);
				if (this.onDistinct) {
					return data;
				} else {
					return null;
				}
			}
			if (this.onDistinct) {
				return null;
			} else {
				return data;
			}

		}
	}

	/**
	 * Pipe segment allowing for filtering data. Allowing only certain data from
	 * continuing processing.
	 *
	 * @author jmiller
	 *
	 * @param <S>
	 *            Source Type
	 */
	private class FilterPipe<S> implements PipeInterface<S, S> {

		/**
		 * Predicate functional interface proving the filtering algorithm.
		 */
		private Predicate<? super S> predicate;

		/**
		 * Constructor to setup the FilterPipe
		 *
		 * @param predicate
		 *            Predicate to be used for filtering data.
		 */
		public FilterPipe(Predicate<? super S> predicate) {
			this.predicate = predicate;
		}

		@Override
		public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			if (this.predicate.test(data)) {
				return data;
			}
			return null;
		}

	}

	/**
	 * Fork Pipe takes a list of Pipelines and will send the data to each pipeline.
	 * If processing is occurring in parallel, this pipe will run in parallel as
	 * well.
	 *
	 * @author jmiller
	 *
	 * @param <S>
	 *            Source Type
	 */
	private class ForkPipe<S> implements PipeInterface<S, S> {

		/**
		 * List of Pipelines data is to be forked into.
		 */
		private List<Pipeline<S, ?>> pipelines = null;

		/**
		 * Constructor for accepting a list of pipelines to fork data into.
		 *
		 * @param pipelines
		 *            List of Pipelines.
		 */
		public ForkPipe(List<Pipeline<S, ?>> pipelines) {
			this.pipelines = pipelines;
		}

		/*
		 * (non-Javadoc)
		 *
		 * @see name.mymiller.extensions.utils.pipelines.PipeInterface#getFutures()
		 */
		@SuppressWarnings({ "rawtypes", "unchecked" })
		@Override
		public List<PipeFuture<?>> getFutures() {
			ArrayList<PipeFuture<?>> pipeFutures = new ArrayList<>();

			for (Pipeline pipeline : this.pipelines) {
				pipeFutures.addAll(pipeline.getFutures());
			}
			return pipeFutures;
		}

		@Override
		public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			for (Pipeline<S, ?> pipeline : this.pipelines) {
				if (isParallel) {
					pipeline.internalParallel(data, futures);
				} else {
					pipeline.process(data);
				}
			}
			return null;
		}

	}

	/**
	 * Pipe that places a Max filter on the data. Comparator is used to determine if
	 * the data exceeds the max value.
	 *
	 * @author jmiller
	 *
	 * @param <S>
	 *            Source Type
	 */
	private class MaxPipe<S> implements PipeInterface<S, S> {

		/**
		 * Max value to allow through the pipe
		 */
		private S max;

		/**
		 * Comparator to use to compare the data against the max value.
		 */
		private Comparator<? super S> comparator;

		/**
		 * Constructor accepting the max value to allow and the comparator.
		 *
		 * @param max
		 *            Max value of type S to allow.
		 * @param comparator
		 *            Comparator to compare type S.
		 */
		public MaxPipe(S max, Comparator<? super S> comparator) {
			this.max = max;
			this.comparator = comparator;
		}

		@Override
		public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			if (this.comparator.compare(data, this.max) <= 0) {
				return data;
			}
			return null;
		}

	}

	/**
	 * Pipe that places a Min filter on the Data. Comaprator is used to determine if
	 * the data exceeds the min value.
	 *
	 * @author jmiller
	 *
	 * @param <S>
	 *            Source Type
	 */
	private class MinPipe<S> implements PipeInterface<S, S> {

		/**
		 * Min value to allow thorugh the pipe.
		 */
		private S min;

		/**
		 * Comparator to use to compare the data against the min value.
		 */
		private Comparator<? super S> comparator;

		/**
		 * Constructor accepting the min value to allow and the comparator.
		 *
		 * @param min
		 *            Min value of type S to allow.
		 * @param comparator
		 *            Comparator to compare type S.
		 */
		public MinPipe(S min, Comparator<? super S> comparator) {
			this.min = min;
			this.comparator = comparator;
		}

		@Override
		public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			if (this.comparator.compare(this.min, data) >= 0) {
				return data;
			}
			return null;
		}

	}

	/**
	 * Pipe that allows you to see the data at this point in the processing an
	 * action is called allowing the data to be passed to a Consumer. The data is
	 * passed along in the pipe.
	 *
	 * @author jmiller
	 *
	 * @param <S>
	 *            Source TYpe
	 */
	private class PeekPipe<S> implements PipeInterface<S, S> {

		/**
		 * Consumer interface to allow 'peeking' at the data.
		 */
		private Consumer<? super S> action = null;

		/**
		 * Constructor for creating a peek at the data.
		 *
		 * @param action
		 *            Consumer interface to peek at the data.
		 */
		public PeekPipe(Consumer<? super S> action) {
			this.action = action;
		}

		@Override
		public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			this.action.accept(data);
			return data;
		}
	}

	/**
	 * Runnable to perform processing in Parallel
	 *
	 * @author jmiller
	 *
	 */
	private class PipeRun implements Runnable {
		/**
		 * List of pipes at the time the processing was started that needs to be
		 * executed.
		 */
		private List<PipeInterface<?, ?>> pipes;

		/**
		 * Original data source
		 */
		private S source;

		/**
		 * List of futures that will need to be responded to for complete processing.
		 */
		private List<PipeFuture<?>> futures;

		/**
		 * Name givent to this pipeline.
		 */
		private String pipelineName;

		/**
		 * Exception Handler to use for processing
		 */
		private ExceptionHandlerInterface exceptionHandler = null;

		/**
		 * Constructor for creating a paralell processing on the pipeline.
		 *
		 * @param pipes
		 *            List of pipes to rpcess
		 * @param source
		 *            Original data to process
		 * @param futures
		 *            List of futures that need to be completed.
		 * @param pipelineName
		 *            Name of this pipeline
		 */
		protected PipeRun(List<PipeInterface<?, ?>> pipes, S source, List<PipeFuture<?>> futures, String pipelineName,
				ExceptionHandlerInterface exceptionHandler) {
			this.pipes = pipes;
			this.source = source;
			this.futures = futures;
			this.pipelineName = pipelineName;
			this.exceptionHandler = exceptionHandler;
		}

		@SuppressWarnings({ "unchecked", "rawtypes" })
		@Override
		public void run() {
			Object source = this.source;
			Object target = null;
			for (PipeInterface pipe : this.pipes) {
				boolean processAgain = true;
				int processAttempt = 0;

				while (processAgain) {
					try {
						processAttempt++;
						target = pipe.process(source, this.futures, this.pipelineName, true);
						processAgain = false;
					} catch (Throwable throwable) {
						if (this.exceptionHandler != null) {
							boolean response = this.exceptionHandler.process(throwable, pipe, source, this.futures,
									this.pipelineName, true);
							if (response && (processAttempt == 1)) {
								processAgain = true;
							} else {
								processAgain = false;
								target = null;
							}
						}
					}
				}
				if (target == null) {
					break;
				}
				source = target;
			}

			for (PipeFuture future : this.futures) {
				if (future.getIdentifier().equals(this.pipelineName)) {
					future.setFutureObject(target);
				}
			}
		}
	}

	/**
	 * Pipe Segment allowing for a copy of the data to flow to an additional
	 * pipeline if the predicate is matched.
	 *
	 * @author jmiller
	 *
	 * @param <S>
	 *            Source Type
	 */
	private class SwitchPipe<S> implements PipeInterface<S, S> {

		/**
		 * Predicate functional interface proving the filtering algorithm.
		 */
		private Predicate<? super S> predicate;

		/**
		 * Alternate Pipeline for data to flow.
		 */
		private Pipeline<S, ?> pipeline;

		/**
		 * Constructor to setup the FilterPipe
		 *
		 * @param predicate
		 *            Predicate to be used for filtering data.
		 */
		public SwitchPipe(Predicate<? super S> predicate, Pipeline<S, ?> pipeline) {
			this.predicate = predicate;
			this.pipeline = pipeline;
		}

		/*
		 * (non-Javadoc)
		 *
		 * @see name.mymiller.extensions.utils.pipelines.PipeInterface#getFutures()
		 */
		@SuppressWarnings({ "rawtypes", "unchecked" })
		@Override
		public List<PipeFuture<?>> getFutures() {
			return this.pipeline.getFutures();
		}

		@Override
		public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
				throws Throwable {
			if (this.predicate.test(data)) {
				if (isParallel) {
					this.pipeline.internalParallel(data, futures);
				} else {
					this.pipeline.process(data);
				}
			} else if (futures != null) {
				List<PipeFuture<?>> futureList = this.pipeline.getFutures();
				futures.parallelStream().forEach(future -> {
					PipeFuture<?> match = futureList.stream()
							.filter(pipe -> pipe.getPipelineId().equals(future.getPipelineId())).findFirst()
							.orElse(null);

					if (match != null) {
						future.setFutureObject(null);
						future.setDone(true);
					}
				});
			}
			return data;
		}

	}

	/**
	 * Static method to create a pipeline
	 *
	 * @param name
	 *            Name to give the pipeline
	 * @return Pipeline ready to received pipe segments
	 */
	public static <K> Pipeline<K, K> start(String name) {
		Pipeline<K, K> chain = new Pipeline<K, K>(name);
		chain.pipes = new ArrayList<>();
		return chain;
	}

	private DefaultExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler();

	/**
	 * Executor to use for parallel processing
	 */
	private ExecutorService executorService = null;

	/**
	 * List of pipes to be processed.
	 */
	private List<PipeInterface<?, ?>> pipes;

	/**
	 * Name of this pipeline.
	 */
	private String pipelineName;

	private UUID pipelineId;

	/**
	 * Exception Handler to use for processing
	 */
	private ExceptionHandlerInterface exceptionHandler = this.defaultExceptionHandler;

	/**
	 * Copy constructor used to create a duplicate pipeline.
	 *
	 * @param pipeline
	 *            Pipeline to duplciate.
	 */
	private Pipeline(Pipeline<?, ?> pipeline) {
		super();
		this.executorService = pipeline.executorService;
		this.pipes = new ArrayList<PipeInterface<?, ?>>(pipeline.pipes);
		;
		this.pipelineName = pipeline.pipelineName;
		this.pipelineId = pipeline.pipelineId;
	}

	/**
	 * Constructor that creates an empty pipeline with the specified name.
	 *
	 * @param pipelineName
	 */
	private Pipeline(String pipelineName) {
		super();
		this.pipelineName = pipelineName;
		this.pipelineId = UUID.randomUUID();
		this.executorService = new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
				ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
	}

	/**
	 * Creates an Action Segment for the pipeline with an action functional
	 * interface.
	 *
	 * @param action
	 *            Action Functional Interface that will be used.
	 * @return Pipeline with the additional segment added.
	 */
	@SuppressWarnings("unchecked")
	public <V> Pipeline<S, V> action(Function<? super T, ? extends V> action) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, V>) this.connectFirstPipe(new ActionPipe<T, V>(action));
		}
		return this.connectInternalPipe(new ActionPipe<T, V>(action));
	}

	/**
	 * Create a Collector pipe segment by passing in the collector interface
	 *
	 * @param collector
	 *            Collector Funcation interface that will be used.
	 * @return Pipeline with the additional segment added.
	 */
	@SuppressWarnings("unchecked")
	public <A, V> Pipeline<S, T> collect(Collector<T, A, V> collector) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new CollectorPipe<T, A, V>(collector));
		}
		return this.connectInternalPipe(new CollectorPipe<T, A, V>(collector));
	}

	/**
	 * Create a Collector pipe segment by passing in the collector interface
	 *
	 * @param Collector
	 *            Pipe segment Collector Pipe that will be used.
	 * @return Pipeline with the additional segment added.
	 */
	@SuppressWarnings("unchecked")
	public <V> Pipeline<S, T> collect(CollectorInterface<T, V> collector) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(collector);
		}
		return this.connectInternalPipe(collector);
	}

	/**
	 * Method to add your own Pipe to the pipeline. Implement your pipe using the
	 * PipeInterface
	 *
	 * @param pipe
	 *            Pipe to add to the pipeline
	 * @return Pipeline with the additional segment added.
	 */
	@SuppressWarnings("unchecked")
	public <V> Pipeline<S, V> connect(PipeInterface<T, V> pipe) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, V>) this.connectFirstPipe(pipe);
		}
		return this.connectInternalPipe(pipe);
	}

	/**
	 * Internal method to connect the first pipe to the pipeline.
	 *
	 * @param pipe
	 *            Pipe to add to the pipeline
	 * @return Pipeline with the segment added.
	 */
	private <K, L> Pipeline<K, L> connectFirstPipe(PipeInterface<K, L> pipe) {
		Pipeline<K, L> pipeline = new Pipeline<K, L>(this);
		pipeline.pipes.add(pipe);
		return pipeline;
	}

	/**
	 * Internal call to added a pipe to a pipelien with existing pipes.
	 *
	 * @param pipe
	 *            Pipe to add to the pipeline
	 * @return Pipeline with the segment added.
	 */
	private <V> Pipeline<S, V> connectInternalPipe(PipeInterface<T, V> pipe) {
		Pipeline<S, V> pipeline = new Pipeline<S, V>(this);
		pipeline.pipes.add(pipe);
		return pipeline;
	}

	/**
	 * Add a pipe for limiting the data to be distinct. Identical data objects will
	 * be blocked if the two data objects would match with .equals().
	 *
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> distinct() {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new DistinctPipe<T>());
		}
		return this.connectInternalPipe(new DistinctPipe<T>());
	}

	/**
	 * Add a pipe for limiting the data to be distinct based on a property.
	 * Identical key objects will be blocked if the two data keys would match with
	 * .equals().
	 *
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> distinctBy(Function<? super T, Object> keyExtractor) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new DistinctByPipe<T>(keyExtractor));
		}
		return this.connectInternalPipe(new DistinctByPipe<T>(keyExtractor));
	}

	/**
	 * Add a pipe for limiting the data to duplicates only. Second instances of
	 * identical data objects will be allowed if the two data objects would match
	 * with .equals().
	 *
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> duplicates() {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new DistinctPipe<T>(false));
		}
		return this.connectInternalPipe(new DistinctPipe<T>(false));
	}

	/**
	 * Add a pipe for limiting the data to be duplicates only based on a property.
	 * Second instances of identical key objects will be blocked if the two data
	 * keys would match with .equals().
	 *
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> duplicatesBy(Function<? super T, Object> keyExtractor) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new DistinctByPipe<T>(keyExtractor, false));
		}
		return this.connectInternalPipe(new DistinctByPipe<T>(keyExtractor, false));
	}

	/**
	 * Pipeline to add a filter based on a Predicate passed in.
	 *
	 * @param predicate
	 *            Predicate providing the filtering logic.
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> filter(Predicate<? super T> predicate) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new FilterPipe<T>(predicate));
		}
		return this.connectInternalPipe(new FilterPipe<T>(predicate));
	}

	/**
	 * Pipe Fork, to split the data passing in, to multiple additional pipelines.
	 *
	 * @param pipelines
	 *            List of Pipelines that this pipe segment forks into
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> fork(List<Pipeline<T, ?>> pipelines) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new ForkPipe<T>(pipelines));
		}
		return this.connectInternalPipe(new ForkPipe<T>(pipelines));
	}

	/**
	 * @return the exceptionHandler
	 */
	protected synchronized ExceptionHandlerInterface getExceptionHandler() {
		return this.exceptionHandler;
	}

	/**
	 *
	 * @return List of Futures for the next processing
	 */
	public List<PipeFuture<?>> getFutures() {
		ArrayList<PipeFuture<?>> pipeFutures = new ArrayList<>();

		pipeFutures.add(new PipeFuture<T>(this.pipelineName, this.pipelineId));
		for (PipeInterface<?, ?> pipe : this.pipes) {
			pipeFutures.addAll(pipe.getFutures());
		}
		return pipeFutures;

	}

	/**
	 * @return the pipelineId
	 */
	public UUID getPipelineId() {
		return this.pipelineId;
	}

	/**
	 * @return the pipelineName
	 */
	public String getPipelineName() {
		return this.pipelineName;
	}

	/**
	 * Internal method for running the code in Parallel
	 *
	 * @param source
	 *            Source data being passed
	 * @param futures
	 *            List of Futures that need to me marked as complete when their
	 *            associated pipeline completes.
	 */
	public void internalParallel(S source, List<PipeFuture<?>> futures) {
		this.executorService.submit(new PipeRun(this.pipes, source, futures, this.pipelineName, this.exceptionHandler));
	}

	/**
	 * Max segment limiting data to a max, anything greater than max, is blocked
	 * from passing.
	 *
	 * @param max
	 *            Max of the data being processed.
	 * @param comparator
	 *            Comparator to compare two instances of the data.
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> max(T max, Comparator<? super T> comparator) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new MaxPipe<T>(max, comparator));
		}
		return this.connectInternalPipe(new MaxPipe<T>(max, comparator));
	}

	/**
	 * Min segment limiting data to a min, anything less than min, is blocked from
	 * passing.
	 *
	 * @param min
	 *            Min of the data being processed.
	 * @param comparator
	 *            Comparator to compare two instances of the data.
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> min(T min, Comparator<? super T> comparator) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new MinPipe<T>(min, comparator));
		}
		return this.connectInternalPipe(new MinPipe<T>(min, comparator));
	}

	/**
	 * Method to add a peek funcationality to allow the data to be seen by other
	 * code outside of the pipeline
	 *
	 * @param consumer
	 *            Consumer that shall peek at the data.
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> peek(Consumer<? super T> consumer) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new PeekPipe<T>(consumer));
		}
		return this.connectInternalPipe(new PeekPipe<T>(consumer));
	}

	/**
	 * Method to process an instance of data on the current thread.
	 *
	 * @param s
	 *            S data type that shall start the data processing
	 * @return Completed value after processing.
	 */
	@SuppressWarnings({ "rawtypes", "unchecked" })
	public T process(S s) {
		Object source = s;
		Object target = null;
		for (PipeInterface pipe : this.pipes) {
			boolean processAgain = true;
			int processAttempt = 0;

			while (processAgain) {
				try {
					processAttempt++;
					target = pipe.process(source, null, this.pipelineName, false);
					processAgain = false;
				} catch (Throwable throwable) {
					if (this.exceptionHandler != null) {
						boolean response = this.exceptionHandler.process(throwable, pipe, source, null,
								this.getPipelineName(), false);
						if (response && (processAttempt == 1)) {
							processAgain = true;
						} else {
							processAgain = false;
							target = null;
						}
					}
				}
			}
			if (target == null) {
				break;
			}
			source = target;
		}
		return (T) target;
	}

	/**
	 * Method to process an instance of data in parallel
	 *
	 * @param source
	 *            S data type that shall start the data processing
	 * @return List of PipeFutures that represent all possible outcomes of the data.
	 */
	public List<PipeFuture<?>> processParallel(S source) {
		List<PipeFuture<?>> futures = this.getFutures();

		this.executorService.submit(new PipeRun(this.pipes, source, futures, this.pipelineName, this.exceptionHandler));

		return futures;
	}

	/**
	 * @param exceptionHandler
	 *            the exceptionHandler to set
	 */
	protected synchronized void setExceptionHandler(ExceptionHandlerInterface exceptionHandler) {
		this.exceptionHandler = exceptionHandler;
		if (this.exceptionHandler == null) {
			this.exceptionHandler = this.defaultExceptionHandler;
		}
	}

	/**
	 * Allow a data object to flow to another pipeline if the predicate matches.
	 *
	 * @param predicate
	 *            Predicate to determine if the data is allowed to flow on the
	 *            alternate pipeline
	 * @param pipeline
	 *            Alternate pipeline to send the data.
	 * @return Pipeline with the segment added.
	 */
	@SuppressWarnings("unchecked")
	public Pipeline<S, T> switchIf(Predicate<? super T> predicate, Pipeline<T, ?> pipeline) {
		if (this.pipes.isEmpty()) {
			return (Pipeline<S, T>) this.connectFirstPipe(new SwitchPipe<T>(predicate, pipeline));
		}
		return this.connectInternalPipe(new SwitchPipe<T>(predicate, pipeline));
	}

	/**
	 * Method to wait for ALL PipeFutures in the list to complete
	 *
	 * @param futures
	 *            List of PipeFutures to wait on.
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	public void waitForAll(List<PipeFuture<?>> futures) throws InterruptedException, ExecutionException {

		for (PipeFuture<?> future : futures) {
			future.get();
		}
	}

	/**
	 * Method to watch for a PipeFuture to complete once it does, it will return it.
	 * Processing will yield after each cycle of checking futures.
	 *
	 * @param futures
	 *            List of futures to watch.
	 * @return PipeFuture that has completed.
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	public PipeFuture<?> waitForOne(List<PipeFuture<?>> futures) throws InterruptedException, ExecutionException {
		while (true) {
			for (PipeFuture<?> future : futures) {
				if (future.isDone()) {
					return future;
				}
			}
			Thread.yield();
		}
	}
}

Let’s take a look at the pipes that are included

  • action() can perform an action with the data such as updating a database, or transpose the data into an entirely new object.
  • collect() provide a Collector/CollectorInterface for collecting elements. CollectorInterface provides ways to pull data as needed.
  • connect() allows for custom pipe to be attached into the pipeline. Just implement a pipe to the PipelineInterface to create your own.
  • distinct() only allow distinct objects to pass this pipe. Duplicate objects will not be processed further.
  • distinctBy() provide a Function() to determine what property of an object to judge distinct by.
  • duplicates() allow only duplicates of objects to pass. First instance of an object will be blocked, while subsequent duplicates will be allowed to pass.
  • duplicatesBy() provide a Funciton() to determine what property of an object to judge duplciate by.
  • filter() filter out objects based on the Predicate().
  • fork() send the data into each of the Pipelines provided to this Pipe.
  • max() block any objects that exceed this object based on the provided Comparator().
  • min() block any objects that are below this object based on the provided Comparator().
  • peek() allow a consumer to be called with the data without effecting the data.
  • switchIf() switch the data to different Pipeline if the Predicate indicates it should.

Please see the Project Page for additional information. Future articles and enhancements will be shared. Once I am satisfied with testing I will release this a library.

By Jeffery Miller

I am known for being able to quickly decipher difficult problems to assist development teams in producing a solution. I have been called upon to be the Team Lead for multiple large-scale projects. I have a keen interest in learning new technologies, always ready for a new challenge.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d