{"id":2473,"date":"2025-12-23T10:00:09","date_gmt":"2025-12-23T15:00:09","guid":{"rendered":"http:\/\/www.mymiller.name\/wordpress\/?p=2473"},"modified":"2025-12-23T10:00:09","modified_gmt":"2025-12-23T15:00:09","slug":"java-pipelines","status":"publish","type":"post","link":"https:\/\/www.mymiller.name\/wordpress\/pipelines\/java-pipelines\/","title":{"rendered":"Java Pipelines"},"content":{"rendered":"\n<p>Java streams introduced a new way to program for developers.  Have a dataset and build a stream to filter, it maps it, and then collect it in the new form.  This gave developers a powerful tool to use in development and data processing.  I can&#8217;t even begin to say how many times I have used this.  It is an efficient and organized way to process your data.<\/p>\n\n\n\n<p>However I had the question what if I had only 1 element? What if my data came in chunks? Building and tearing down a stream over, over and over for 1 data element didn&#8217;t seem to make sense to me.  Why couldn&#8217;t I use the same stream over and over?<\/p>\n\n\n\n<p>I set out to create something more persistent.  What exactly did it need to do?<\/p>\n\n\n\n<ul class=\"wp-block-list\"><li>Process data similar to a stream.<\/li><li>Expand on the methods.<\/li><li>Run in parallel or single-thread.<\/li><li>Connect multiple entities together.<\/li><\/ul>\n\n\n\n<p>Thus was born Pipelines.  I looked around and had seen there were existing libraries similar to this.  They seemed to be a lot bulkier than I wanted. I prefer <g class=\"gr_ gr_6 gr-alert gr_gramm gr_inline_cards gr_run_anim Grammar only-ins doubleReplace replaceWithoutSep\" id=\"6\" data-gr-id=\"6\">lean<\/g> code. Let&#8217;s get into the code and see what I have come up with.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Pipe Interface<\/h2>\n\n\n\n<p>The first thing we had to do was determine how the pieces were going to fit together.  I created the PipeInterface&lt;S,T&gt;.  This provides a simple interface that can be used<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>package name.mymiller.extensions.utils.pipelines;\n\nimport java.util.List;\n\n\/**\n * Interface for a Pipe object to implement for compatiablity with the Pipeline.\n *\n * @author jmiller\n *\n *\/\npublic interface PipeInterface&lt;S, T> {\n\n\t\/**\n\t *\n\t * @return List of any futures from sub-pipelines\n\t *\/\n\tpublic default List&lt;PipeFuture&lt;?>> getFutures() {\n\t\treturn null;\n\t}\n\n\t\/**\n\t * Method to process the object pushed to the pipe.\n\t *\n\t * @param data\n\t *            Object being pushed to the pipe\n\t * @param futures list of futures associated with this data as possible terminate points.\n\t * @param String containing the name of this pipeline.           \n\t * @return Object after processing has occurred.\n\t * @throws Throwable TODO\n\t *\/\n\tpublic abstract T process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel) throws Throwable;\n\n}\n<\/code><\/pre>\n\n\n\n<p>We will use this interface to define a new pipe section for our pipeline.  As we are designing our pipeline to be able to send data to other pipelines, and to run parallel.  We pass in a List of PipeFuture that could be marked as complete with the data attached.  <\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Pipe Future<\/h2>\n\n\n\n<p>When we are running in parallel it is necessary to use a Future to indicate if the data has completed processing.  As our pipeline design allows for data to processed single threaded, or in parallel we use Future to indicate the processing when in parallel.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>\/**\n *\n *\/\npackage name.mymiller.extensions.utils.pipelines;\n\nimport java.util.UUID;\nimport java.util.concurrent.ExecutionException;\nimport java.util.concurrent.Future;\nimport java.util.concurrent.TimeUnit;\nimport java.util.concurrent.TimeoutException;\nimport java.util.concurrent.atomic.AtomicBoolean;\n\n\/**\n * Future representing the final data for a pipeline.\n *\n * @author jmiller\n * @param &lt;V>\n *            Type of final data\n *\n *\/\npublic class PipeFuture&lt;V> implements Future&lt;V> {\n\n\t\/**\n\t * Object used for synchronization between threads\n\t *\/\n\tprivate Object syncObject = null;\n\n\t\/**\n\t * The final result from processing\n\t *\/\n\tprivate V futureObject = null;\n\n\t\/**\n\t * String identifier for the pipeline. Will match the name of the pipeline\n\t *\/\n\tprivate String identifier = null;\n\n\tprivate UUID pipelineId = null;\n\n\t\/**\n\t * Boolean indicating if the future is complete.\n\t *\/\n\tprivate AtomicBoolean done = null;\n\n\t\/**\n\t * Constructor that takes the identifier for the future.\n\t *\n\t * @param identifier\n\t *            String indicating the pipeline this ia future too.\n\t *\/\n\tpublic PipeFuture(String identifier, UUID pipelineId) {\n\t\tthis.syncObject = new Object();\n\t\tthis.done = new AtomicBoolean(false);\n\t\tthis.identifier = identifier;\n\t\tthis.pipelineId = pipelineId;\n\t}\n\n\t@Override\n\tpublic boolean cancel(boolean mayInterruptIfRunning) {\n\t\treturn false;\n\t}\n\n\t@Override\n\tpublic V get() throws InterruptedException, ExecutionException {\n\t\treturn this.futureObject;\n\t}\n\n\t@Override\n\tpublic V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {\n\t\tunit.timedWait(this.syncObject, timeout);\n\t\treturn this.futureObject;\n\t}\n\n\t\/**\n\t * @return the identifier\n\t *\/\n\tpublic String getIdentifier() {\n\t\treturn this.identifier;\n\t}\n\n\t\/**\n\t * @return the pipelineId\n\t *\/\n\tpublic UUID getPipelineId() {\n\t\treturn this.pipelineId;\n\t}\n\n\t@Override\n\tpublic boolean isCancelled() {\n\t\treturn false;\n\t}\n\n\t@Override\n\tpublic boolean isDone() {\n\t\treturn this.done.get();\n\t}\n\n\t\/**\n\t * @param done\n\t *            the done to set\n\t *\/\n\tpublic synchronized void setDone(boolean done) {\n\t\tthis.done.set(done);\n\t}\n\n\t\/**\n\t * @param futureObject\n\t *            the futureObject to set\n\t *\/\n\tpublic synchronized void setFutureObject(V futureObject) {\n\t\tthis.futureObject = futureObject;\n\t\tthis.syncObject.notifyAll();\n\t}\n\n}\n<\/code><\/pre>\n\n\n\n<p>Here you can see we have extended the Future.  We have added the field to indicate what pipeline this PipeFuture represents.  <\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Pipeline<\/h2>\n\n\n\n<p>Here is the meat and potatoes of our Pipeline.  A single class with built in pipes for the main functionality.  A simple call to Pipeline.&lt;&gt;start() to name your Pipeline and your off to building the processing.  Then a call to Pipeline::process()  or Pipeline::processParallel() to initiate the data processing. <\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>package name.mymiller.extensions.utils.pipelines;\n\nimport java.util.ArrayList;\nimport java.util.Collections;\nimport java.util.Comparator;\nimport java.util.HashSet;\nimport java.util.List;\nimport java.util.Map;\nimport java.util.Set;\nimport java.util.UUID;\nimport java.util.concurrent.ConcurrentHashMap;\nimport java.util.concurrent.ExecutionException;\nimport java.util.concurrent.ExecutorService;\nimport java.util.concurrent.ForkJoinPool;\nimport java.util.function.Consumer;\nimport java.util.function.Function;\nimport java.util.function.Predicate;\nimport java.util.stream.Collector;\n\n\/**\n * Pipeline object to manage your processing pipeline. Pipe segments are added\n * in the ordered they are connected to the Pipeline. They are called in\n * sequential order. Add or create a Collector at the end to collect a result if\n * needed.\n *\n * @author jmiller\n *\n *\/\npublic class Pipeline&lt;S, T> {\n\n\t\/**\n\t * A pipe that will take in an object of type T and perform an action on the\n\t * data. This can transform the data to a new type, or update data on the pipe.\n\t *\n\t * @author jmiller\n\t * @param &lt;T>\n\t *            The type the Pipe will accept,\n\t * @param &lt;R>\n\t *            The Type the Pipe will return.\n\t *\n\t *\/\n\tprivate class ActionPipe&lt;S, T> implements PipeInterface&lt;S, T> {\n\n\t\t\/**\n\t\t * Action Functional interface\n\t\t *\/\n\t\tprivate Function&lt;? super S, ? extends T> action;\n\n\t\t\/**\n\t\t * Constructor to provide the Class it will act on, and the functional interface\n\t\t *\n\t\t * @param type\n\t\t *            Class it will accept.\n\t\t * @param action\n\t\t *            Functional interface to act on.\n\t\t *\/\n\t\tpublic ActionPipe(Function&lt;? super S, ? extends T> action) {\n\t\t\tthis.action = action;\n\t\t}\n\n\t\t\/*\n\t\t * (non-Javadoc)\n\t\t *\n\t\t * @see\n\t\t * name.mymiller.extensions.utils.pipelines.PipeInterface#process(java.lang.\n\t\t * Object)\n\t\t *\/\n\t\t@Override\n\t\tpublic T process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\treturn this.action.apply(data);\n\t\t}\n\t}\n\n\tprivate class DefaultExceptionHandler implements ExceptionHandlerInterface {\n\n\t\t@Override\n\t\tpublic boolean process(Throwable throwable, PipeInterface&lt;?, ?> pipe, Object data, List&lt;PipeFuture&lt;?>> futures,\n\t\t\t\tString pipelineName, boolean isParallel) {\n\t\t\tSystem.out.println(\"Pipe: \" + pipe.getClass().getName());\n\t\t\tSystem.out.println(\"Date: \" + data.toString());\n\t\t\tSystem.out.println(\"Pipeline: \" + pipelineName);\n\t\t\tSystem.out.println(\"Parallel: \" + isParallel);\n\t\t\tSystem.out.println(\"Exception: \" + throwable.getMessage());\n\t\t\tSystem.out.println(\"Stacktrace: \");\n\t\t\tthrowable.printStackTrace(System.out);\n\n\t\t\treturn false;\n\t\t}\n\n\t}\n\n\t\/**\n\t * Pipe segment used to allow only distinct data blocks duplicate data blocks\n\t * from being further processed\n\t *\n\t * @author jmiller\n\t *\n\t * @param &lt;S>\n\t *            Source Type\n\t *\/\n\tprivate class DistinctByPipe&lt;S> implements PipeInterface&lt;S, S> {\n\n\t\t\/**\n\t\t * List containing data blocks previously processed\n\t\t *\/\n\t\tprivate Map&lt;Object, Boolean> seen;\n\n\t\tprivate boolean onDistinct = true;\n\n\t\tprivate Function&lt;? super S, Object> keyExtractor;\n\n\t\t\/**\n\t\t * Default contructor setting up the distinctList\n\t\t *\/\n\t\tpublic DistinctByPipe(Function&lt;? super S, Object> keyExtractor) {\n\t\t\tthis.seen = new ConcurrentHashMap&lt;>();\n\t\t\tthis.keyExtractor = keyExtractor;\n\t\t}\n\n\t\t\/**\n\t\t * Default contructor setting up the distinctList\n\t\t *\/\n\t\tpublic DistinctByPipe(Function&lt;? super S, Object> keyExtractor, boolean onDistinct) {\n\t\t\tthis.seen = new ConcurrentHashMap&lt;>();\n\t\t\tthis.keyExtractor = keyExtractor;\n\t\t\tthis.onDistinct = onDistinct;\n\t\t}\n\n\t\t@Override\n\t\tpublic S process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\tif (this.seen.putIfAbsent(this.keyExtractor.apply(data), Boolean.TRUE) == null) {\n\t\t\t\tif (this.onDistinct) {\n\t\t\t\t\treturn data;\n\t\t\t\t} else {\n\t\t\t\t\treturn null;\n\t\t\t\t}\n\t\t\t}\n\t\t\tif (this.onDistinct) {\n\t\t\t\treturn null;\n\t\t\t} else {\n\t\t\t\treturn data;\n\t\t\t}\n\t\t}\n\t}\n\n\t\/**\n\t * Pipe segment used to allow only distinct data blocks duplicate data blocks\n\t * from being further processed\n\t *\n\t * @author jmiller\n\t *\n\t * @param &lt;S>\n\t *            Source Type\n\t *\/\n\tprivate class DistinctPipe&lt;S> implements PipeInterface&lt;S, S> {\n\n\t\t\/**\n\t\t * List containing data blocks previously processed\n\t\t *\/\n\t\tprivate Set&lt;S> distinctList;\n\n\t\tprivate boolean onDistinct = true;\n\n\t\t\/**\n\t\t * Default contructor setting up the distinctList\n\t\t *\/\n\t\tpublic DistinctPipe() {\n\t\t\tthis.distinctList = Collections.synchronizedSet(new HashSet&lt;>());\n\t\t}\n\n\t\t\/**\n\t\t * Default contructor setting up the distinctList\n\t\t *\/\n\t\tpublic DistinctPipe(boolean onDistinct) {\n\t\t\tthis.distinctList = Collections.synchronizedSet(new HashSet&lt;>());\n\t\t\tthis.onDistinct = onDistinct;\n\t\t}\n\n\t\t@Override\n\t\tpublic S process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\tif (!this.distinctList.contains(data)) {\n\t\t\t\tthis.distinctList.add(data);\n\t\t\t\tif (this.onDistinct) {\n\t\t\t\t\treturn data;\n\t\t\t\t} else {\n\t\t\t\t\treturn null;\n\t\t\t\t}\n\t\t\t}\n\t\t\tif (this.onDistinct) {\n\t\t\t\treturn null;\n\t\t\t} else {\n\t\t\t\treturn data;\n\t\t\t}\n\n\t\t}\n\t}\n\n\t\/**\n\t * Pipe segment allowing for filtering data. Allowing only certain data from\n\t * continuing processing.\n\t *\n\t * @author jmiller\n\t *\n\t * @param &lt;S>\n\t *            Source Type\n\t *\/\n\tprivate class FilterPipe&lt;S> implements PipeInterface&lt;S, S> {\n\n\t\t\/**\n\t\t * Predicate functional interface proving the filtering algorithm.\n\t\t *\/\n\t\tprivate Predicate&lt;? super S> predicate;\n\n\t\t\/**\n\t\t * Constructor to setup the FilterPipe\n\t\t *\n\t\t * @param predicate\n\t\t *            Predicate to be used for filtering data.\n\t\t *\/\n\t\tpublic FilterPipe(Predicate&lt;? super S> predicate) {\n\t\t\tthis.predicate = predicate;\n\t\t}\n\n\t\t@Override\n\t\tpublic S process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\tif (this.predicate.test(data)) {\n\t\t\t\treturn data;\n\t\t\t}\n\t\t\treturn null;\n\t\t}\n\n\t}\n\n\t\/**\n\t * Fork Pipe takes a list of Pipelines and will send the data to each pipeline.\n\t * If processing is occurring in parallel, this pipe will run in parallel as\n\t * well.\n\t *\n\t * @author jmiller\n\t *\n\t * @param &lt;S>\n\t *            Source Type\n\t *\/\n\tprivate class ForkPipe&lt;S> implements PipeInterface&lt;S, S> {\n\n\t\t\/**\n\t\t * List of Pipelines data is to be forked into.\n\t\t *\/\n\t\tprivate List&lt;Pipeline&lt;S, ?>> pipelines = null;\n\n\t\t\/**\n\t\t * Constructor for accepting a list of pipelines to fork data into.\n\t\t *\n\t\t * @param pipelines\n\t\t *            List of Pipelines.\n\t\t *\/\n\t\tpublic ForkPipe(List&lt;Pipeline&lt;S, ?>> pipelines) {\n\t\t\tthis.pipelines = pipelines;\n\t\t}\n\n\t\t\/*\n\t\t * (non-Javadoc)\n\t\t *\n\t\t * @see name.mymiller.extensions.utils.pipelines.PipeInterface#getFutures()\n\t\t *\/\n\t\t@SuppressWarnings({ \"rawtypes\", \"unchecked\" })\n\t\t@Override\n\t\tpublic List&lt;PipeFuture&lt;?>> getFutures() {\n\t\t\tArrayList&lt;PipeFuture&lt;?>> pipeFutures = new ArrayList&lt;>();\n\n\t\t\tfor (Pipeline pipeline : this.pipelines) {\n\t\t\t\tpipeFutures.addAll(pipeline.getFutures());\n\t\t\t}\n\t\t\treturn pipeFutures;\n\t\t}\n\n\t\t@Override\n\t\tpublic S process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\tfor (Pipeline&lt;S, ?> pipeline : this.pipelines) {\n\t\t\t\tif (isParallel) {\n\t\t\t\t\tpipeline.internalParallel(data, futures);\n\t\t\t\t} else {\n\t\t\t\t\tpipeline.process(data);\n\t\t\t\t}\n\t\t\t}\n\t\t\treturn null;\n\t\t}\n\n\t}\n\n\t\/**\n\t * Pipe that places a Max filter on the data. Comparator is used to determine if\n\t * the data exceeds the max value.\n\t *\n\t * @author jmiller\n\t *\n\t * @param &lt;S>\n\t *            Source Type\n\t *\/\n\tprivate class MaxPipe&lt;S> implements PipeInterface&lt;S, S> {\n\n\t\t\/**\n\t\t * Max value to allow through the pipe\n\t\t *\/\n\t\tprivate S max;\n\n\t\t\/**\n\t\t * Comparator to use to compare the data against the max value.\n\t\t *\/\n\t\tprivate Comparator&lt;? super S> comparator;\n\n\t\t\/**\n\t\t * Constructor accepting the max value to allow and the comparator.\n\t\t *\n\t\t * @param max\n\t\t *            Max value of type S to allow.\n\t\t * @param comparator\n\t\t *            Comparator to compare type S.\n\t\t *\/\n\t\tpublic MaxPipe(S max, Comparator&lt;? super S> comparator) {\n\t\t\tthis.max = max;\n\t\t\tthis.comparator = comparator;\n\t\t}\n\n\t\t@Override\n\t\tpublic S process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\tif (this.comparator.compare(data, this.max) &lt;= 0) {\n\t\t\t\treturn data;\n\t\t\t}\n\t\t\treturn null;\n\t\t}\n\n\t}\n\n\t\/**\n\t * Pipe that places a Min filter on the Data. Comaprator is used to determine if\n\t * the data exceeds the min value.\n\t *\n\t * @author jmiller\n\t *\n\t * @param &lt;S>\n\t *            Source Type\n\t *\/\n\tprivate class MinPipe&lt;S> implements PipeInterface&lt;S, S> {\n\n\t\t\/**\n\t\t * Min value to allow thorugh the pipe.\n\t\t *\/\n\t\tprivate S min;\n\n\t\t\/**\n\t\t * Comparator to use to compare the data against the min value.\n\t\t *\/\n\t\tprivate Comparator&lt;? super S> comparator;\n\n\t\t\/**\n\t\t * Constructor accepting the min value to allow and the comparator.\n\t\t *\n\t\t * @param min\n\t\t *            Min value of type S to allow.\n\t\t * @param comparator\n\t\t *            Comparator to compare type S.\n\t\t *\/\n\t\tpublic MinPipe(S min, Comparator&lt;? super S> comparator) {\n\t\t\tthis.min = min;\n\t\t\tthis.comparator = comparator;\n\t\t}\n\n\t\t@Override\n\t\tpublic S process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\tif (this.comparator.compare(this.min, data) >= 0) {\n\t\t\t\treturn data;\n\t\t\t}\n\t\t\treturn null;\n\t\t}\n\n\t}\n\n\t\/**\n\t * Pipe that allows you to see the data at this point in the processing an\n\t * action is called allowing the data to be passed to a Consumer. The data is\n\t * passed along in the pipe.\n\t *\n\t * @author jmiller\n\t *\n\t * @param &lt;S>\n\t *            Source TYpe\n\t *\/\n\tprivate class PeekPipe&lt;S> implements PipeInterface&lt;S, S> {\n\n\t\t\/**\n\t\t * Consumer interface to allow 'peeking' at the data.\n\t\t *\/\n\t\tprivate Consumer&lt;? super S> action = null;\n\n\t\t\/**\n\t\t * Constructor for creating a peek at the data.\n\t\t *\n\t\t * @param action\n\t\t *            Consumer interface to peek at the data.\n\t\t *\/\n\t\tpublic PeekPipe(Consumer&lt;? super S> action) {\n\t\t\tthis.action = action;\n\t\t}\n\n\t\t@Override\n\t\tpublic S process(final S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\tthis.action.accept(data);\n\t\t\treturn data;\n\t\t}\n\t}\n\n\t\/**\n\t * Runnable to perform processing in Parallel\n\t *\n\t * @author jmiller\n\t *\n\t *\/\n\tprivate class PipeRun implements Runnable {\n\t\t\/**\n\t\t * List of pipes at the time the processing was started that needs to be\n\t\t * executed.\n\t\t *\/\n\t\tprivate List&lt;PipeInterface&lt;?, ?>> pipes;\n\n\t\t\/**\n\t\t * Original data source\n\t\t *\/\n\t\tprivate S source;\n\n\t\t\/**\n\t\t * List of futures that will need to be responded to for complete processing.\n\t\t *\/\n\t\tprivate List&lt;PipeFuture&lt;?>> futures;\n\n\t\t\/**\n\t\t * Name givent to this pipeline.\n\t\t *\/\n\t\tprivate String pipelineName;\n\n\t\t\/**\n\t\t * Exception Handler to use for processing\n\t\t *\/\n\t\tprivate ExceptionHandlerInterface exceptionHandler = null;\n\n\t\t\/**\n\t\t * Constructor for creating a paralell processing on the pipeline.\n\t\t *\n\t\t * @param pipes\n\t\t *            List of pipes to rpcess\n\t\t * @param source\n\t\t *            Original data to process\n\t\t * @param futures\n\t\t *            List of futures that need to be completed.\n\t\t * @param pipelineName\n\t\t *            Name of this pipeline\n\t\t *\/\n\t\tprotected PipeRun(List&lt;PipeInterface&lt;?, ?>> pipes, S source, List&lt;PipeFuture&lt;?>> futures, String pipelineName,\n\t\t\t\tExceptionHandlerInterface exceptionHandler) {\n\t\t\tthis.pipes = pipes;\n\t\t\tthis.source = source;\n\t\t\tthis.futures = futures;\n\t\t\tthis.pipelineName = pipelineName;\n\t\t\tthis.exceptionHandler = exceptionHandler;\n\t\t}\n\n\t\t@SuppressWarnings({ \"unchecked\", \"rawtypes\" })\n\t\t@Override\n\t\tpublic void run() {\n\t\t\tObject source = this.source;\n\t\t\tObject target = null;\n\t\t\tfor (PipeInterface pipe : this.pipes) {\n\t\t\t\tboolean processAgain = true;\n\t\t\t\tint processAttempt = 0;\n\n\t\t\t\twhile (processAgain) {\n\t\t\t\t\ttry {\n\t\t\t\t\t\tprocessAttempt++;\n\t\t\t\t\t\ttarget = pipe.process(source, this.futures, this.pipelineName, true);\n\t\t\t\t\t\tprocessAgain = false;\n\t\t\t\t\t} catch (Throwable throwable) {\n\t\t\t\t\t\tif (this.exceptionHandler != null) {\n\t\t\t\t\t\t\tboolean response = this.exceptionHandler.process(throwable, pipe, source, this.futures,\n\t\t\t\t\t\t\t\t\tthis.pipelineName, true);\n\t\t\t\t\t\t\tif (response &amp;&amp; (processAttempt == 1)) {\n\t\t\t\t\t\t\t\tprocessAgain = true;\n\t\t\t\t\t\t\t} else {\n\t\t\t\t\t\t\t\tprocessAgain = false;\n\t\t\t\t\t\t\t\ttarget = null;\n\t\t\t\t\t\t\t}\n\t\t\t\t\t\t}\n\t\t\t\t\t}\n\t\t\t\t}\n\t\t\t\tif (target == null) {\n\t\t\t\t\tbreak;\n\t\t\t\t}\n\t\t\t\tsource = target;\n\t\t\t}\n\n\t\t\tfor (PipeFuture future : this.futures) {\n\t\t\t\tif (future.getIdentifier().equals(this.pipelineName)) {\n\t\t\t\t\tfuture.setFutureObject(target);\n\t\t\t\t}\n\t\t\t}\n\t\t}\n\t}\n\n\t\/**\n\t * Pipe Segment allowing for a copy of the data to flow to an additional\n\t * pipeline if the predicate is matched.\n\t *\n\t * @author jmiller\n\t *\n\t * @param &lt;S>\n\t *            Source Type\n\t *\/\n\tprivate class SwitchPipe&lt;S> implements PipeInterface&lt;S, S> {\n\n\t\t\/**\n\t\t * Predicate functional interface proving the filtering algorithm.\n\t\t *\/\n\t\tprivate Predicate&lt;? super S> predicate;\n\n\t\t\/**\n\t\t * Alternate Pipeline for data to flow.\n\t\t *\/\n\t\tprivate Pipeline&lt;S, ?> pipeline;\n\n\t\t\/**\n\t\t * Constructor to setup the FilterPipe\n\t\t *\n\t\t * @param predicate\n\t\t *            Predicate to be used for filtering data.\n\t\t *\/\n\t\tpublic SwitchPipe(Predicate&lt;? super S> predicate, Pipeline&lt;S, ?> pipeline) {\n\t\t\tthis.predicate = predicate;\n\t\t\tthis.pipeline = pipeline;\n\t\t}\n\n\t\t\/*\n\t\t * (non-Javadoc)\n\t\t *\n\t\t * @see name.mymiller.extensions.utils.pipelines.PipeInterface#getFutures()\n\t\t *\/\n\t\t@SuppressWarnings({ \"rawtypes\", \"unchecked\" })\n\t\t@Override\n\t\tpublic List&lt;PipeFuture&lt;?>> getFutures() {\n\t\t\treturn this.pipeline.getFutures();\n\t\t}\n\n\t\t@Override\n\t\tpublic S process(S data, List&lt;PipeFuture&lt;?>> futures, String pipelineName, boolean isParallel)\n\t\t\t\tthrows Throwable {\n\t\t\tif (this.predicate.test(data)) {\n\t\t\t\tif (isParallel) {\n\t\t\t\t\tthis.pipeline.internalParallel(data, futures);\n\t\t\t\t} else {\n\t\t\t\t\tthis.pipeline.process(data);\n\t\t\t\t}\n\t\t\t} else if (futures != null) {\n\t\t\t\tList&lt;PipeFuture&lt;?>> futureList = this.pipeline.getFutures();\n\t\t\t\tfutures.parallelStream().forEach(future -> {\n\t\t\t\t\tPipeFuture&lt;?> match = futureList.stream()\n\t\t\t\t\t\t\t.filter(pipe -> pipe.getPipelineId().equals(future.getPipelineId())).findFirst()\n\t\t\t\t\t\t\t.orElse(null);\n\n\t\t\t\t\tif (match != null) {\n\t\t\t\t\t\tfuture.setFutureObject(null);\n\t\t\t\t\t\tfuture.setDone(true);\n\t\t\t\t\t}\n\t\t\t\t});\n\t\t\t}\n\t\t\treturn data;\n\t\t}\n\n\t}\n\n\t\/**\n\t * Static method to create a pipeline\n\t *\n\t * @param name\n\t *            Name to give the pipeline\n\t * @return Pipeline ready to received pipe segments\n\t *\/\n\tpublic static &lt;K> Pipeline&lt;K, K> start(String name) {\n\t\tPipeline&lt;K, K> chain = new Pipeline&lt;K, K>(name);\n\t\tchain.pipes = new ArrayList&lt;>();\n\t\treturn chain;\n\t}\n\n\tprivate DefaultExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler();\n\n\t\/**\n\t * Executor to use for parallel processing\n\t *\/\n\tprivate ExecutorService executorService = null;\n\n\t\/**\n\t * List of pipes to be processed.\n\t *\/\n\tprivate List&lt;PipeInterface&lt;?, ?>> pipes;\n\n\t\/**\n\t * Name of this pipeline.\n\t *\/\n\tprivate String pipelineName;\n\n\tprivate UUID pipelineId;\n\n\t\/**\n\t * Exception Handler to use for processing\n\t *\/\n\tprivate ExceptionHandlerInterface exceptionHandler = this.defaultExceptionHandler;\n\n\t\/**\n\t * Copy constructor used to create a duplicate pipeline.\n\t *\n\t * @param pipeline\n\t *            Pipeline to duplciate.\n\t *\/\n\tprivate Pipeline(Pipeline&lt;?, ?> pipeline) {\n\t\tsuper();\n\t\tthis.executorService = pipeline.executorService;\n\t\tthis.pipes = new ArrayList&lt;PipeInterface&lt;?, ?>>(pipeline.pipes);\n\t\t;\n\t\tthis.pipelineName = pipeline.pipelineName;\n\t\tthis.pipelineId = pipeline.pipelineId;\n\t}\n\n\t\/**\n\t * Constructor that creates an empty pipeline with the specified name.\n\t *\n\t * @param pipelineName\n\t *\/\n\tprivate Pipeline(String pipelineName) {\n\t\tsuper();\n\t\tthis.pipelineName = pipelineName;\n\t\tthis.pipelineId = UUID.randomUUID();\n\t\tthis.executorService = new ForkJoinPool(Runtime.getRuntime().availableProcessors(),\n\t\t\t\tForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);\n\t}\n\n\t\/**\n\t * Creates an Action Segment for the pipeline with an action functional\n\t * interface.\n\t *\n\t * @param action\n\t *            Action Functional Interface that will be used.\n\t * @return Pipeline with the additional segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic &lt;V> Pipeline&lt;S, V> action(Function&lt;? super T, ? extends V> action) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, V>) this.connectFirstPipe(new ActionPipe&lt;T, V>(action));\n\t\t}\n\t\treturn this.connectInternalPipe(new ActionPipe&lt;T, V>(action));\n\t}\n\n\t\/**\n\t * Create a Collector pipe segment by passing in the collector interface\n\t *\n\t * @param collector\n\t *            Collector Funcation interface that will be used.\n\t * @return Pipeline with the additional segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic &lt;A, V> Pipeline&lt;S, T> collect(Collector&lt;T, A, V> collector) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new CollectorPipe&lt;T, A, V>(collector));\n\t\t}\n\t\treturn this.connectInternalPipe(new CollectorPipe&lt;T, A, V>(collector));\n\t}\n\n\t\/**\n\t * Create a Collector pipe segment by passing in the collector interface\n\t *\n\t * @param Collector\n\t *            Pipe segment Collector Pipe that will be used.\n\t * @return Pipeline with the additional segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic &lt;V> Pipeline&lt;S, T> collect(CollectorInterface&lt;T, V> collector) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(collector);\n\t\t}\n\t\treturn this.connectInternalPipe(collector);\n\t}\n\n\t\/**\n\t * Method to add your own Pipe to the pipeline. Implement your pipe using the\n\t * PipeInterface\n\t *\n\t * @param pipe\n\t *            Pipe to add to the pipeline\n\t * @return Pipeline with the additional segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic &lt;V> Pipeline&lt;S, V> connect(PipeInterface&lt;T, V> pipe) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, V>) this.connectFirstPipe(pipe);\n\t\t}\n\t\treturn this.connectInternalPipe(pipe);\n\t}\n\n\t\/**\n\t * Internal method to connect the first pipe to the pipeline.\n\t *\n\t * @param pipe\n\t *            Pipe to add to the pipeline\n\t * @return Pipeline with the segment added.\n\t *\/\n\tprivate &lt;K, L> Pipeline&lt;K, L> connectFirstPipe(PipeInterface&lt;K, L> pipe) {\n\t\tPipeline&lt;K, L> pipeline = new Pipeline&lt;K, L>(this);\n\t\tpipeline.pipes.add(pipe);\n\t\treturn pipeline;\n\t}\n\n\t\/**\n\t * Internal call to added a pipe to a pipelien with existing pipes.\n\t *\n\t * @param pipe\n\t *            Pipe to add to the pipeline\n\t * @return Pipeline with the segment added.\n\t *\/\n\tprivate &lt;V> Pipeline&lt;S, V> connectInternalPipe(PipeInterface&lt;T, V> pipe) {\n\t\tPipeline&lt;S, V> pipeline = new Pipeline&lt;S, V>(this);\n\t\tpipeline.pipes.add(pipe);\n\t\treturn pipeline;\n\t}\n\n\t\/**\n\t * Add a pipe for limiting the data to be distinct. Identical data objects will\n\t * be blocked if the two data objects would match with .equals().\n\t *\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> distinct() {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new DistinctPipe&lt;T>());\n\t\t}\n\t\treturn this.connectInternalPipe(new DistinctPipe&lt;T>());\n\t}\n\n\t\/**\n\t * Add a pipe for limiting the data to be distinct based on a property.\n\t * Identical key objects will be blocked if the two data keys would match with\n\t * .equals().\n\t *\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> distinctBy(Function&lt;? super T, Object> keyExtractor) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new DistinctByPipe&lt;T>(keyExtractor));\n\t\t}\n\t\treturn this.connectInternalPipe(new DistinctByPipe&lt;T>(keyExtractor));\n\t}\n\n\t\/**\n\t * Add a pipe for limiting the data to duplicates only. Second instances of\n\t * identical data objects will be allowed if the two data objects would match\n\t * with .equals().\n\t *\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> duplicates() {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new DistinctPipe&lt;T>(false));\n\t\t}\n\t\treturn this.connectInternalPipe(new DistinctPipe&lt;T>(false));\n\t}\n\n\t\/**\n\t * Add a pipe for limiting the data to be duplicates only based on a property.\n\t * Second instances of identical key objects will be blocked if the two data\n\t * keys would match with .equals().\n\t *\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> duplicatesBy(Function&lt;? super T, Object> keyExtractor) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new DistinctByPipe&lt;T>(keyExtractor, false));\n\t\t}\n\t\treturn this.connectInternalPipe(new DistinctByPipe&lt;T>(keyExtractor, false));\n\t}\n\n\t\/**\n\t * Pipeline to add a filter based on a Predicate passed in.\n\t *\n\t * @param predicate\n\t *            Predicate providing the filtering logic.\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> filter(Predicate&lt;? super T> predicate) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new FilterPipe&lt;T>(predicate));\n\t\t}\n\t\treturn this.connectInternalPipe(new FilterPipe&lt;T>(predicate));\n\t}\n\n\t\/**\n\t * Pipe Fork, to split the data passing in, to multiple additional pipelines.\n\t *\n\t * @param pipelines\n\t *            List of Pipelines that this pipe segment forks into\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> fork(List&lt;Pipeline&lt;T, ?>> pipelines) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new ForkPipe&lt;T>(pipelines));\n\t\t}\n\t\treturn this.connectInternalPipe(new ForkPipe&lt;T>(pipelines));\n\t}\n\n\t\/**\n\t * @return the exceptionHandler\n\t *\/\n\tprotected synchronized ExceptionHandlerInterface getExceptionHandler() {\n\t\treturn this.exceptionHandler;\n\t}\n\n\t\/**\n\t *\n\t * @return List of Futures for the next processing\n\t *\/\n\tpublic List&lt;PipeFuture&lt;?>> getFutures() {\n\t\tArrayList&lt;PipeFuture&lt;?>> pipeFutures = new ArrayList&lt;>();\n\n\t\tpipeFutures.add(new PipeFuture&lt;T>(this.pipelineName, this.pipelineId));\n\t\tfor (PipeInterface&lt;?, ?> pipe : this.pipes) {\n\t\t\tpipeFutures.addAll(pipe.getFutures());\n\t\t}\n\t\treturn pipeFutures;\n\n\t}\n\n\t\/**\n\t * @return the pipelineId\n\t *\/\n\tpublic UUID getPipelineId() {\n\t\treturn this.pipelineId;\n\t}\n\n\t\/**\n\t * @return the pipelineName\n\t *\/\n\tpublic String getPipelineName() {\n\t\treturn this.pipelineName;\n\t}\n\n\t\/**\n\t * Internal method for running the code in Parallel\n\t *\n\t * @param source\n\t *            Source data being passed\n\t * @param futures\n\t *            List of Futures that need to me marked as complete when their\n\t *            associated pipeline completes.\n\t *\/\n\tpublic void internalParallel(S source, List&lt;PipeFuture&lt;?>> futures) {\n\t\tthis.executorService.submit(new PipeRun(this.pipes, source, futures, this.pipelineName, this.exceptionHandler));\n\t}\n\n\t\/**\n\t * Max segment limiting data to a max, anything greater than max, is blocked\n\t * from passing.\n\t *\n\t * @param max\n\t *            Max of the data being processed.\n\t * @param comparator\n\t *            Comparator to compare two instances of the data.\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> max(T max, Comparator&lt;? super T> comparator) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new MaxPipe&lt;T>(max, comparator));\n\t\t}\n\t\treturn this.connectInternalPipe(new MaxPipe&lt;T>(max, comparator));\n\t}\n\n\t\/**\n\t * Min segment limiting data to a min, anything less than min, is blocked from\n\t * passing.\n\t *\n\t * @param min\n\t *            Min of the data being processed.\n\t * @param comparator\n\t *            Comparator to compare two instances of the data.\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> min(T min, Comparator&lt;? super T> comparator) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new MinPipe&lt;T>(min, comparator));\n\t\t}\n\t\treturn this.connectInternalPipe(new MinPipe&lt;T>(min, comparator));\n\t}\n\n\t\/**\n\t * Method to add a peek funcationality to allow the data to be seen by other\n\t * code outside of the pipeline\n\t *\n\t * @param consumer\n\t *            Consumer that shall peek at the data.\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> peek(Consumer&lt;? super T> consumer) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new PeekPipe&lt;T>(consumer));\n\t\t}\n\t\treturn this.connectInternalPipe(new PeekPipe&lt;T>(consumer));\n\t}\n\n\t\/**\n\t * Method to process an instance of data on the current thread.\n\t *\n\t * @param s\n\t *            S data type that shall start the data processing\n\t * @return Completed value after processing.\n\t *\/\n\t@SuppressWarnings({ \"rawtypes\", \"unchecked\" })\n\tpublic T process(S s) {\n\t\tObject source = s;\n\t\tObject target = null;\n\t\tfor (PipeInterface pipe : this.pipes) {\n\t\t\tboolean processAgain = true;\n\t\t\tint processAttempt = 0;\n\n\t\t\twhile (processAgain) {\n\t\t\t\ttry {\n\t\t\t\t\tprocessAttempt++;\n\t\t\t\t\ttarget = pipe.process(source, null, this.pipelineName, false);\n\t\t\t\t\tprocessAgain = false;\n\t\t\t\t} catch (Throwable throwable) {\n\t\t\t\t\tif (this.exceptionHandler != null) {\n\t\t\t\t\t\tboolean response = this.exceptionHandler.process(throwable, pipe, source, null,\n\t\t\t\t\t\t\t\tthis.getPipelineName(), false);\n\t\t\t\t\t\tif (response &amp;&amp; (processAttempt == 1)) {\n\t\t\t\t\t\t\tprocessAgain = true;\n\t\t\t\t\t\t} else {\n\t\t\t\t\t\t\tprocessAgain = false;\n\t\t\t\t\t\t\ttarget = null;\n\t\t\t\t\t\t}\n\t\t\t\t\t}\n\t\t\t\t}\n\t\t\t}\n\t\t\tif (target == null) {\n\t\t\t\tbreak;\n\t\t\t}\n\t\t\tsource = target;\n\t\t}\n\t\treturn (T) target;\n\t}\n\n\t\/**\n\t * Method to process an instance of data in parallel\n\t *\n\t * @param source\n\t *            S data type that shall start the data processing\n\t * @return List of PipeFutures that represent all possible outcomes of the data.\n\t *\/\n\tpublic List&lt;PipeFuture&lt;?>> processParallel(S source) {\n\t\tList&lt;PipeFuture&lt;?>> futures = this.getFutures();\n\n\t\tthis.executorService.submit(new PipeRun(this.pipes, source, futures, this.pipelineName, this.exceptionHandler));\n\n\t\treturn futures;\n\t}\n\n\t\/**\n\t * @param exceptionHandler\n\t *            the exceptionHandler to set\n\t *\/\n\tprotected synchronized void setExceptionHandler(ExceptionHandlerInterface exceptionHandler) {\n\t\tthis.exceptionHandler = exceptionHandler;\n\t\tif (this.exceptionHandler == null) {\n\t\t\tthis.exceptionHandler = this.defaultExceptionHandler;\n\t\t}\n\t}\n\n\t\/**\n\t * Allow a data object to flow to another pipeline if the predicate matches.\n\t *\n\t * @param predicate\n\t *            Predicate to determine if the data is allowed to flow on the\n\t *            alternate pipeline\n\t * @param pipeline\n\t *            Alternate pipeline to send the data.\n\t * @return Pipeline with the segment added.\n\t *\/\n\t@SuppressWarnings(\"unchecked\")\n\tpublic Pipeline&lt;S, T> switchIf(Predicate&lt;? super T> predicate, Pipeline&lt;T, ?> pipeline) {\n\t\tif (this.pipes.isEmpty()) {\n\t\t\treturn (Pipeline&lt;S, T>) this.connectFirstPipe(new SwitchPipe&lt;T>(predicate, pipeline));\n\t\t}\n\t\treturn this.connectInternalPipe(new SwitchPipe&lt;T>(predicate, pipeline));\n\t}\n\n\t\/**\n\t * Method to wait for ALL PipeFutures in the list to complete\n\t *\n\t * @param futures\n\t *            List of PipeFutures to wait on.\n\t * @throws InterruptedException\n\t * @throws ExecutionException\n\t *\/\n\tpublic void waitForAll(List&lt;PipeFuture&lt;?>> futures) throws InterruptedException, ExecutionException {\n\n\t\tfor (PipeFuture&lt;?> future : futures) {\n\t\t\tfuture.get();\n\t\t}\n\t}\n\n\t\/**\n\t * Method to watch for a PipeFuture to complete once it does, it will return it.\n\t * Processing will yield after each cycle of checking futures.\n\t *\n\t * @param futures\n\t *            List of futures to watch.\n\t * @return PipeFuture that has completed.\n\t * @throws InterruptedException\n\t * @throws ExecutionException\n\t *\/\n\tpublic PipeFuture&lt;?> waitForOne(List&lt;PipeFuture&lt;?>> futures) throws InterruptedException, ExecutionException {\n\t\twhile (true) {\n\t\t\tfor (PipeFuture&lt;?> future : futures) {\n\t\t\t\tif (future.isDone()) {\n\t\t\t\t\treturn future;\n\t\t\t\t}\n\t\t\t}\n\t\t\tThread.yield();\n\t\t}\n\t}\n}\n<\/code><\/pre>\n\n\n\n<p>Let&#8217;s take a look at the pipes that are included<\/p>\n\n\n\n<ul class=\"wp-block-list\"><li>action() can perform an action with the data such as updating a database, or transpose the data into an entirely new object.<\/li><li>collect() provide a Collector\/CollectorInterface for collecting elements.  CollectorInterface provides ways to pull data as needed.<\/li><li>connect() allows for custom pipe to be attached into the pipeline.  Just implement a pipe to the PipelineInterface to create your own.<\/li><li>distinct() only allow distinct objects to pass this pipe.  Duplicate objects will not be processed further.<\/li><li>distinctBy() provide a Function() to determine what property of an object to judge distinct by.<\/li><li>duplicates() allow only duplicates of objects to pass.  First instance of an object will be blocked, while subsequent duplicates will be allowed to pass.<\/li><li>duplicatesBy() provide a Funciton() to determine what property of an object to judge duplciate by.<\/li><li>filter() filter out objects based on the Predicate().<\/li><li>fork() send the data into each of the Pipelines provided to this Pipe.<\/li><li>max() block any objects that exceed this object based on the provided Comparator().<\/li><li>min() block any objects that are below this object based on the provided Comparator().<\/li><li>peek() allow a consumer to be called with the data without effecting the data.<\/li><li>switchIf() switch the data to different Pipeline if the Predicate indicates it should.<\/li><\/ul>\n\n\n\n<p>Please see the <a href=\"http:\/\/www.mymiller.name\/wordpress\/portfolio\/java-pipeline\/\">Project Page<\/a> for additional information. Future articles and enhancements will be shared.  Once I am satisfied with testing I will release this a library.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Java streams introduced a new way to program for developers. Have a dataset and build a stream to filter, it maps it, and then collect it in the new form. This gave developers a powerful tool to use in development and data processing. I can&#8217;t even begin to say how many times I have used [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":2474,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_coblocks_attr":"","_coblocks_dimensions":"","_coblocks_responsive_height":"","_coblocks_accordion_ie_support":"","jetpack_post_was_ever_published":false,"_jetpack_newsletter_access":"","_jetpack_dont_email_post_to_subs":false,"_jetpack_newsletter_tier_id":0,"_jetpack_memberships_contains_paywalled_content":false,"_jetpack_memberships_contains_paid_content":false,"footnotes":"","jetpack_publicize_message":"","jetpack_publicize_feature_enabled":true,"jetpack_social_post_already_shared":true,"jetpack_social_options":{"image_generator_settings":{"template":"highway","default_image_id":0,"font":"","enabled":false},"version":2}},"categories":[453],"tags":[69],"series":[271],"class_list":["post-2473","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-pipelines","tag-java-2","series-pipelines"],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2019\/03\/machine-495376_640.jpg?fit=640%2C426&ssl=1","jetpack-related-posts":[{"id":3671,"url":"https:\/\/www.mymiller.name\/wordpress\/spring_ai\/spring-cloud-data-flow-orchestrating-machine-learning-pipelines\/","url_meta":{"origin":2473,"position":0},"title":"Spring Cloud Data Flow: Orchestrating Machine Learning Pipelines","author":"Jeffery Miller","date":"December 24, 2025","format":false,"excerpt":"In the dynamic world of machine learning, the journey from raw data to a deployed model involves a series of intricate steps. Spring Cloud Data Flow (SCDF) emerges as a powerful ally, offering a comprehensive platform to streamline and manage these complex data pipelines. In this guide, we\u2019ll delve into\u2026","rel":"","context":"In &quot;Spring AI&quot;","block_context":{"text":"Spring AI","link":"https:\/\/www.mymiller.name\/wordpress\/category\/spring_ai\/"},"img":{"alt_text":"","src":"https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2024\/09\/ai-generated-8411275_1280-jpg.avif","width":350,"height":200,"srcset":"https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2024\/09\/ai-generated-8411275_1280-jpg.avif 1x, https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2024\/09\/ai-generated-8411275_1280-jpg.avif 1.5x, https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2024\/09\/ai-generated-8411275_1280-jpg.avif 2x, https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2024\/09\/ai-generated-8411275_1280-jpg.avif 3x"},"classes":[]},{"id":3951,"url":"https:\/\/www.mymiller.name\/wordpress\/java\/scaling-streams-mastering-virtual-threads-in-spring-boot-4-and-java-25\/","url_meta":{"origin":2473,"position":1},"title":"Scaling Streams: Mastering Virtual Threads in Spring Boot 4 and Java 25","author":"Jeffery Miller","date":"December 22, 2025","format":false,"excerpt":"As a software architect, I\u2019ve seen the industry shift from heavy platform threads to reactive streams, and finally to the \"best of both worlds\": Virtual Threads. With the recent release of Spring Boot 4.0 and Java 25 (LTS), Project Loom's innovations have officially become the bedrock of high-concurrency enterprise Java.\u2026","rel":"","context":"In &quot;JAVA&quot;","block_context":{"text":"JAVA","link":"https:\/\/www.mymiller.name\/wordpress\/category\/java\/"},"img":{"alt_text":"","src":"https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2025\/12\/Gemini_Generated_Image_wqijejwqijejwqij-scaled.avif","width":350,"height":200,"srcset":"https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2025\/12\/Gemini_Generated_Image_wqijejwqijejwqij-scaled.avif 1x, https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2025\/12\/Gemini_Generated_Image_wqijejwqijejwqij-scaled.avif 1.5x, https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2025\/12\/Gemini_Generated_Image_wqijejwqijejwqij-scaled.avif 2x, https:\/\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2025\/12\/Gemini_Generated_Image_wqijejwqijejwqij-scaled.avif 3x"},"classes":[]},{"id":3179,"url":"https:\/\/www.mymiller.name\/wordpress\/java_extra\/understanding-json-data-processing-with-java-exploring-the-jsonfieldprocessor-class\/","url_meta":{"origin":2473,"position":2},"title":"Understanding JSON Data Processing with Java: Exploring the JsonFieldProcessor Class","author":"Jeffery Miller","date":"January 15, 2026","format":false,"excerpt":"In today's digital era, data comes in various formats, with JSON (JavaScript Object Notation) being one of the most popular for representing structured data. Manipulating and processing JSON data efficiently is crucial for many software applications, from web development to data analysis. In this article, we'll delve into the workings\u2026","rel":"","context":"In &quot;Java Extras&quot;","block_context":{"text":"Java Extras","link":"https:\/\/www.mymiller.name\/wordpress\/category\/java_extra\/"},"img":{"alt_text":"","src":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2024\/04\/data-7798787_640.png?fit=640%2C640&ssl=1&resize=350%2C200","width":350,"height":200,"srcset":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2024\/04\/data-7798787_640.png?fit=640%2C640&ssl=1&resize=350%2C200 1x, https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2024\/04\/data-7798787_640.png?fit=640%2C640&ssl=1&resize=525%2C300 1.5x"},"classes":[]},{"id":2526,"url":"https:\/\/www.mymiller.name\/wordpress\/pipelines\/pipeline-switches\/","url_meta":{"origin":2473,"position":3},"title":"Pipeline Switches","author":"Jeffery Miller","date":"December 23, 2025","format":false,"excerpt":"One of the things that make pipelines attract is the ability to create switches. Now my question to you is what if it isn't the data your processing that should determine if a switch pushes the data down an alternate pipeline? We have several methods that perform this task filter()\u2026","rel":"","context":"In &quot;Pipelines&quot;","block_context":{"text":"Pipelines","link":"https:\/\/www.mymiller.name\/wordpress\/category\/pipelines\/"},"img":{"alt_text":"","src":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2019\/03\/railway-station-1270893_640.jpg?fit=640%2C384&ssl=1&resize=350%2C200","width":350,"height":200,"srcset":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2019\/03\/railway-station-1270893_640.jpg?fit=640%2C384&ssl=1&resize=350%2C200 1x, https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2019\/03\/railway-station-1270893_640.jpg?fit=640%2C384&ssl=1&resize=525%2C300 1.5x"},"classes":[]},{"id":3388,"url":"https:\/\/www.mymiller.name\/wordpress\/java_new_features\/vector-api-for-computations\/","url_meta":{"origin":2473,"position":4},"title":"Vector API for computations","author":"Jeffery Miller","date":"December 24, 2025","format":false,"excerpt":"Java 16 introduced a new feature called the Vector API, which provides a set of low-level vector operations for performing mathematical calculations on large sets of data. The Vector API is designed to take advantage of the hardware capabilities of modern CPUs, such as SIMD (Single Instruction Multiple Data) instructions,\u2026","rel":"","context":"In &quot;Java New Features&quot;","block_context":{"text":"Java New Features","link":"https:\/\/www.mymiller.name\/wordpress\/category\/java_new_features\/"},"img":{"alt_text":"","src":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2023\/06\/board-g12f4be736_640.jpg?fit=640%2C424&ssl=1&resize=350%2C200","width":350,"height":200,"srcset":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2023\/06\/board-g12f4be736_640.jpg?fit=640%2C424&ssl=1&resize=350%2C200 1x, https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2023\/06\/board-g12f4be736_640.jpg?fit=640%2C424&ssl=1&resize=525%2C300 1.5x"},"classes":[]},{"id":3353,"url":"https:\/\/www.mymiller.name\/wordpress\/lambda_stream\/java-teeing-collectors\/","url_meta":{"origin":2473,"position":5},"title":"Java Teeing Collectors","author":"Jeffery Miller","date":"December 24, 2025","format":false,"excerpt":"Java 12 introduced a new collector called the Teeing collector. This collector allows you to process elements with two different collectors simultaneously and combine the results into a single output. In this article, we'll take a closer look at the Teeing collector and how you can use it in your\u2026","rel":"","context":"In &quot;Lambda's and Streams&quot;","block_context":{"text":"Lambda's and Streams","link":"https:\/\/www.mymiller.name\/wordpress\/category\/lambda_stream\/"},"img":{"alt_text":"","src":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2023\/06\/lost-places-g4dcac2ba2_640.jpg?fit=640%2C427&ssl=1&resize=350%2C200","width":350,"height":200,"srcset":"https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2023\/06\/lost-places-g4dcac2ba2_640.jpg?fit=640%2C427&ssl=1&resize=350%2C200 1x, https:\/\/i0.wp.com\/www.mymiller.name\/wordpress\/wp-content\/uploads\/2023\/06\/lost-places-g4dcac2ba2_640.jpg?fit=640%2C427&ssl=1&resize=525%2C300 1.5x"},"classes":[]}],"jetpack_sharing_enabled":true,"jetpack_likes_enabled":true,"_links":{"self":[{"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/posts\/2473","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/comments?post=2473"}],"version-history":[{"count":4,"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/posts\/2473\/revisions"}],"predecessor-version":[{"id":2483,"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/posts\/2473\/revisions\/2483"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/media\/2474"}],"wp:attachment":[{"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/media?parent=2473"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/categories?post=2473"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/tags?post=2473"},{"taxonomy":"series","embeddable":true,"href":"https:\/\/www.mymiller.name\/wordpress\/wp-json\/wp\/v2\/series?post=2473"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}