Thu. Mar 28th, 2024
JobManager to manage your jobs

So often I see developers turn to open source to implement job management.  These are often far more advanced than what many people really need in their Java applications.  Sure there are times when it is needed and called for, however many times that is not the case.  Today I will show you a JobManager that will satisfy many needs.  First off let’s look at the basic requirements.

  1. It should use a ThreadPool
  2. Manage Jobs that need to run as soon as possible.
  3. Schedule Jobs to run at a later time.
  4. Manage services that should be enabled to run full time.

Really this is more about managing threads than managing jobs.

First let’s look at the JobManager class.

package name.mymiller.extensions.job;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import name.mymiller.extensions.lang.singleton.SingletonInterface;
import name.mymiller.extensions.log.LogManager;

/**
 * Manager to handle processing of Jobs
 *
 * @author jmiller
 */
public class JobManager implements SingletonInterface<JobManager> {
	/**
	 * Configuration Node
	 */
	private static final String JOBMANAGER = "/jobmanager";

	/**
	 * Job Manager Global Instance
	 */
	private static JobManager global_instance = null;

	/**
	 * @return Global Instance of the Job Manager
	 */
	public static JobManager getInstance() {
		if (JobManager.global_instance == null) {
			JobManager.global_instance = new JobManager(1, 2);
		}
		return JobManager.global_instance;
	}

	public static JobManager getInstance(int reserveProcessors, int processorMultiplier) {
		if (JobManager.global_instance == null) {
			JobManager.global_instance = new JobManager(reserveProcessors, processorMultiplier);
		}
		return JobManager.global_instance;
	}

	/**
	 * Thread pool to perform the processing
	 */
	protected ExecutorService pool = null;

	/**
	 * Thread pool for scheduled jobs
	 */
	protected ScheduledExecutorService scheduledPool = null;

	/**
	 * Map of named threads created.
	 */
	private HashMap<String, Thread> namedThreads = null;

	/**
	 * Constructor protected to limit instantiation.
	 */
	protected JobManager(int reserveProcessors, int processorMultiplier) {
		this.namedThreads = new HashMap<>();
		int processors = Runtime.getRuntime().availableProcessors();

		if (reserveProcessors >= processors) {
			processors = processorMultiplier;
			LogManager.getLogger(this.getClass()).info("Job Manager Processors: " + processors);
			LogManager.getLogger(this.getClass())
					.info("Job Manager Reserved Processors: " + (Runtime.getRuntime().availableProcessors() - 1));
		} else {
			processors = (processors - reserveProcessors) * processorMultiplier;
			LogManager.getLogger(this.getClass()).info("Job Manager Processors: " + processors);
			LogManager.getLogger(this.getClass()).info("Job Manager Reserved Processors: " + reserveProcessors);
		}

		this.pool = Executors.newFixedThreadPool(processors);

		this.scheduledPool = Executors.newScheduledThreadPool(1);
	}

	/**
	 * Blocks until all tasks have completed execution after a shutdown request, or
	 * the timeout occurs, or the current thread is interrupted, whichever happens
	 * first.
	 *
	 * @param timeout
	 *            the maximum time to wait
	 * @param unit
	 *            the time unit of the timeout argument
	 * @return true if this executor terminated and false if the timeout elapsed
	 *         before termination
	 * @throws InterruptedException
	 *             if interrupted while waiting
	 * @see ExecutorService#awaitTermination(long, TimeUnit)
	 */
	public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
		return this.pool.awaitTermination(timeout, unit);
	}

	/**
	 * Create a new thread to execute the name.mymiller.extensions.job.
	 *
	 * @param name
	 *            Name of the thread to execute the name.mymiller.extensions.job
	 * @param service
	 *            Job to execute.
	 */
	public void createService(final String name, final AbstractService service) {
		final Thread thread = new Thread(service, name);

		this.namedThreads.put(name, thread);

		thread.start();
	}

	/**
	 * Executes the name.mymiller.extensions.job asynchronously.
	 *
	 * @param job
	 *            Job to process
	 */
	public void executeJob(final Job job) {
		this.pool.execute(job);
	}

	/**
	 * Returns true if this executor has been shut down.
	 *
	 * @return true if this executor has been shut down
	 * @see ExecutorService#isShutdown()
	 */
	public boolean isShutdown() {
		return this.pool.isShutdown();
	}

	/**
	 * Returns true if all tasks have completed following shut down. Note that
	 * isTerminated is never true unless either shutdown or shutdownNow was called
	 * first.
	 *
	 * @return true if all tasks have completed following shut down
	 * @see ExecutorService#isTerminated()
	 */
	public boolean isTerminated() {
		return this.pool.isTerminated();
	}

	/**
	 * Creates and executes a one-shot name.mymiller.extensions.job that becomes
	 * enabled after the given delay.
	 *
	 * @param job
	 *            the task to execute
	 * @param delay
	 *            the time from now to delay execution
	 * @param unit
	 *            the time unit of the delay parameter
	 * @return a ScheduledFuture that can be used to extract result or cancel
	 * @see ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
	 */
	public ScheduledFuture<?> schedule(Job job, long delay, TimeUnit unit) {
		return this.scheduledPool.schedule(job, delay, unit);
	}

	/**
	 * Creates and executes a periodic name.mymiller.extensions.job that becomes
	 * enabled first after the given initial delay, and subsequently with the given
	 * period; that is executions will commence after initialDelay then
	 * initialDelay+period, then initialDelay + 2 * period, and so on. If any
	 * execution of the name.mymiller.extensions.job encounters an exception,
	 * subsequent executions are suppressed. Otherwise, the task will only terminate
	 * via cancellation or termination of the executor. If any execution of this
	 * task takes longer than its period, then subsequent executions may start late,
	 * but will not concurrently execute.
	 *
	 * @param job
	 *            the task to execute
	 * @param initialDelay
	 *            the time to delay first execution
	 * @param period
	 *            the period between successive executions
	 * @param unit
	 *            the time unit of the initialDelay and period parameters
	 * @return a ScheduledFuture representing pending completion of the task, and
	 *         whose get() method will throw an exception upon cancellation
	 * @see ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long,
	 *      TimeUnit)
	 */
	public ScheduledFuture<?> scheduleAtFixedRate(Job job, long initialDelay, long period, TimeUnit unit) {
		return this.scheduledPool.scheduleAtFixedRate(job, initialDelay, period, unit);
	}

	public <T> Future<T> scheduleWithFixedDelay(Callable<T> callable, long delay, TimeUnit unit) {
		return this.scheduledPool.schedule(callable, delay, unit);
	}

	/**
	 * Creates and executes a periodic name.mymiller.extensions.job that becomes
	 * enabled first after the given initial delay, and subsequently with the given
	 * delay between the termination of one execution and the commencement of the
	 * next. If any execution of the name.mymiller.extensions.job encounters an
	 * exception, subsequent executions are suppressed. Otherwise, the task will
	 * only terminate via cancellation or termination of the executor.
	 *
	 * @param job
	 *            the task to execute
	 * @param initialDelay
	 *            the time to delay first execution
	 * @param delay
	 *            the delay between the termination of one execution and the
	 *            commencement of the next
	 * @param unit
	 *            the time unit of the initialDelay and delay parameters
	 * @return a ScheduledFuture representing pending completion of the task, and
	 *         whose get() method will throw an exception upon cancellation
	 * @see ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long,
	 *      TimeUnit)
	 */
	public ScheduledFuture<?> scheduleWithFixedDelay(Job job, long initialDelay, long delay, TimeUnit unit) {
		return this.scheduledPool.scheduleWithFixedDelay(job, initialDelay, delay, unit);
	}

	/**
	 * Stops all execution once all jobs complete.
	 */
	public void shutdown() {
		this.pool.shutdown();
		this.scheduledPool.shutdown();
	}

	/**
	 * Attempts to stop all actively executing tasks, halts the processing of
	 * waiting tasks, and returns a list of the tasks that were awaiting execution.
	 * This method does not wait for actively executing tasks to terminate. Use
	 * awaitTermination to do that.
	 * <p>
	 * There are no guarantees beyond best-effort attempts to stop processing
	 * actively executing tasks. For example, typical implementations will cancel
	 * via Thread.interrupt(), so any task that fails to respond to interrupts may
	 * never terminate.
	 *
	 * @return list of tasks that never commenced execution
	 * @see ExecutorService#shutdownNow()
	 */
	public List<Runnable> shutdownNow() {
		this.scheduledPool.shutdownNow();
		return this.pool.shutdownNow();
	}

	/**
	 * Submits a callable to to the pool for processing
	 * 
	 * @param callable
	 *            Executable unit to process
	 * @return Future for the callable
	 */
	public <T> Future<T> submit(Callable<T> callable) {
		return this.pool.submit(callable);
	}

	/**
	 * Submits a name.mymiller.extensions.job for execution
	 *
	 * @param job
	 *            Job to process
	 * @return Future representing the completion of the
	 *         name.mymiller.extensions.job.
	 */
	public Future<?> submit(final Job job) {
		return this.pool.submit(job);
	}

	/**
	 * Sumits a Runnable to the processing pool
	 * 
	 * @param runnable
	 *            Runnable to process
	 * @return Future for the Runnable.
	 */
	public Future<?> submit(Runnable runnable) {
		return this.pool.submit(runnable);
	}
}

As you can see here we have a singleton, that uses the JobManager.getInstance() to retrieve the global instance.  You can adjust the SCHEDULED_THREADS, PROCESSOR_RESERVE, and PROCESSOR_MULTIPLIER based on your system.  In fact you can make them easily configurable if you wish.

We have the following methods for running Jobs:

  • submit(Job job)
  • executeJob(Job job)

In addition we have the following for scheduling Jobs to run:

  • schedule(Job job, long delay, TimeUnit unit)
  • scheduleAtFixedRate(Job job, long initialDelay, long period, TimeUnit unit)
  • scheduleWithFixedDelay(Job job, long initialDelay, long delay, TimeUnit unit)

We also need a way to run our services:

  • createService(final String name, final AbstractService service)

Now let’s take a look at the Job class.  We need to be able to tie multiple Jobs together.  So we will add a Pre & Post Job processing, we’ll make it possible to override methods for this.

/*******************************************************************************
 * Copyright 2018 MyMiller Consulting LLC.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License.  You may obtain a copy
 * of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 ******************************************************************************/
package name.mymiller.extensions.job;

import java.util.ArrayList;
import java.util.List;

/**
 * @author jmiller Abstract class to build Jobs for the Job Manager
 */
public abstract class Job implements Runnable {
    /**
     * List of jobs to be completed as a portion of this name.mymiller.extensions.job.
     */
    protected List<Job> subJobs = null;

    /**
     * Constructor allowing direct subclassing
     */
    protected Job() {
        this.subJobs = new ArrayList<>();
    }

    /**
     * Method to add a Sub Job to this name.mymiller.extensions.job
     *
     * @param job Job to add
     */
    protected void addSubJob(final Job job) {
        this.subJobs.add(job);
    }

    /**
     * Override this method to implement post-processing, after all sub-jobs
     * have completed.
     */
    protected void postProcess() {

    }

    /**
     * Overrite this method to implement any pre-run processing, before the Run
     * method is called
     */
    protected void preProcess() {

    }

    /**
     * Method to implement to perform the actions necessary for the Job.
     */
    protected abstract void process();

    /**
     * Method to remove a name.mymiller.extensions.job from the list
     *
     * @param job Job to remove
     */
    protected void removeSubJob(final Job job) {
        this.subJobs.remove(job);
    }

    @Override
    public void run() {

        this.preProcess();
        this.process();
        for (final Job job : this.subJobs) {
            JobManager.getInstance().submit(job);
        }
        this.postProcess();
    }
}

So you can override the preProcess() and postProcess() methods to perform any processing you wish to do before and after the main processing and processing of the sub-Jobs.  In addition you can add Jobs that are executed after this Job’s process is completed.

Now let’s look at a service.  It needs to be able to shutdown easily and execute on it’s own thread entirely.

/**
 * Copyright 2018 MyMiller Consulting LLC.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License.  You may obtain a copy
 * of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
/**
 *
 */
package name.mymiller.extensions.job;

import name.mymiller.extensions.log.LogManager;

/**
 * @author jmiller Basic service functions
 */
public abstract class AbstractService extends Job {

    /**
     * Boolean indicating if the Service should shutdown
     */
    private boolean shutdown = false;

    /**
     * Boolean indicating if the Service should restart after shutdown
     */
    private boolean restart = false;

    /**
     *
     * @return Indicating if the Service should restart have shutdown
     */
    public boolean isRestart() {
        return this.restart;
    }

    /**
     * Set whether the Service should Restart
     *
     * @param restart
     *            Boolean indicating if it should restart
     */
    public void setRestart(final boolean restart) {
        this.restart = restart;
    }

    /**
     *
     * @return Indicating if the Service should shutdown
     */
    public boolean isShutdown() {
        return this.shutdown;
    }

    /**
     * Method to set if the service is shutdown
     *
     * @param shutdown
     *            boolean indicating if the service is shutdown.
     */
    protected void setShutdown(boolean shutdown) {
        this.shutdown = shutdown;
    }

    /**
     *
     * @return Indicating if the service is up or shutdown
     */
    public boolean notShutdown() {
        return !this.isShutdown();
    }

    @Override
    protected void process() {
        LogManager.getLogger(AbstractService.class).info(this.getClass().getSimpleName() + " Service Running");
        this.setShutdown(false);
        this.service();
        this.setShutdown(true);
        LogManager.getLogger(AbstractService.class).info(this.getClass().getSimpleName() + " Service Exiting");
    }

    /**
     * Method that performs the processing for a service
     */
    abstract protected void service();

    /**
     * Method to shutdown the HTTP Server
     *
     * @param delay
     *            ms. to wait before shutting down.
     */
    public void shutdown(int delay) {
        LogManager.getLogger(AbstractService.class).info(this.getClass().getSimpleName() + " Service Shutdown called");
        this.stop(delay);
        this.shutdown = true;
    }

    /**
     * Start.
     */
    abstract public void start();

    /**
     * Force a stop of the server
     *
     * @param delay
     *            the maximum time in seconds to wait until exchanges have
     *            finished.
     */
    abstract protected void stop(final int delay);

}

Implement the following methods on your Service.

  • start() – any steps necessary to start your Service.
  • stop(final int delay) – steps necessary to stop your service.
  • service() – main loop of your thread that needs to stay executing.

By Jeffery Miller

I am known for being able to quickly decipher difficult problems to assist development teams in producing a solution. I have been called upon to be the Team Lead for multiple large-scale projects. I have a keen interest in learning new technologies, always ready for a new challenge.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d