Coverage Summary for Class: TaskSitter (org.kitodo.production.helper.tasks)

Class Method, % Line, %
TaskSitter 62,5% (5/8) 69,5% (41/59)
TaskSitter$1 100% (1/1) 100% (2/2)
Total 66,7% (6/9) 70,5% (43/61)


 /*
  * (c) Kitodo. Key to digital objects e. V. <contact@kitodo.org>
  *
  * This file is part of the Kitodo project.
  *
  * It is licensed under GNU General Public License version 3 or later.
  *
  * For the full copyright and license information, please read the
  * GPL3-License.txt file that was distributed with this source code.
  */
 
 package org.kitodo.production.helper.tasks;
 
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
 import java.time.temporal.TemporalUnit;
 import java.util.ConcurrentModificationException;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import javax.servlet.ServletContextEvent;
 import javax.servlet.ServletContextListener;
 import javax.servlet.annotation.WebListener;
 
 import org.kitodo.config.ConfigCore;
 import org.kitodo.config.enums.ParameterCore;
 import org.kitodo.production.helper.tasks.EmptyTask.Behaviour;
 
 /**
  * The class TaskSitter takes care of the tasks in the task manager. While the
  * application is working, a scheduler on the TaskManager will call the run()
  * method of the TaskSitter every some seconds to delete threads that have died,
  * replace threads that are to be restarted by new copies of themselves (a
  * Thread can never be started twice) and finally starts some new threads if
  * there aren’t too many working any more. Several limits are configurable for
  * the {@link #run()} method.
  *
  * <p>
  * On shutdown of the servlet container, the TaskSitter will try to shut down
  * all threads that are still running. Because the TaskManager is singleton (its
  * constructor is private) a caring class is needed which will be available for
  * instantiation to the servlet container.
  */
 @WebListener
 public class TaskSitter implements Runnable, ServletContextListener {
     /**
      * The field autoRunLimit holds the number of threads which at most are
      * allowed to be started automatically. It is by default initialised by the
      * number of available processors of the runtime and set to 0 while the
      * feature is disabled.
      */
     private static int autoRunLimit;
 
     static {
         setAutoRunningThreads(true);
     }
 
     /**
      * When the servlet is unloaded, i.e. on container shutdown, the TaskManager
      * shall be shut down gracefully.
      *
      * @see javax.servlet.ServletContextListener#contextDestroyed(javax.servlet.ServletContextEvent)
      */
     @Override
     public void contextDestroyed(ServletContextEvent arg) {
         TaskManager.shutdownNow();
     }
 
     /**
      * Currently, there is nothing to do when the servlet is loading.
      *
      * @see javax.servlet.ServletContextListener#contextInitialized(javax.servlet.ServletContextEvent)
      */
     @Override
     public void contextInitialized(ServletContextEvent argument) {
         // nothing is done here
     }
 
     /**
      * Returns whether the TaskManager’s
      * autorun mode is on or not.
      *
      * @return whether the TaskManager is auto-running threads or not
      */
     public static boolean isAutoRunningThreads() {
         return autoRunLimit > 0;
     }
 
     /**
      * Examines the task list, deletes threads that have
      * died, replaces threads that are to be restarted by new copies of
      * themselves and finally starts new threads up to the given limit.
      *
      * <p>
      * Several limits are configurable: There are both limits in number and in
      * time for successfully finished or erroneous threads which can be set in
      * the configuration. There are internal default values for these settings
      * too, which will be applied in case of missing configuration entries.
      * Since zombie processes will still occupy all their resources and aren’t
      * available for garbage collection, these values have been chosen rather
      * restrictive. For the limit for auto starting threads, see
      * {@link #setAutoRunningThreads(boolean)}.
      *
      * <p>
      * If the task list is empty, the method will exit without further delay,
      * otherwise it will initialise its variables and read the configuration.
      * Reading the configuration is done again in each iteration so
      * configuration changes will propagate here.
      *
      * <p>
      * Then the function iterates along the task list and takes care for each
      * task. To be able to modify the list in passing, we need a
      * {@link java.util.ListIterator} here.
      *
      * <p>
      * Running tasks reduce the clearance to run new tasks. (However, the
      * clearance must not become negative.) New tasks will be added to the
      * launch list, except if they have already been marked for removal, of
      * course. If a task has terminated, it is handled as specified by its
      * behavior variable: All tasks that are marked DELETE_IMMEDIATELY will
      * instantly be disposed of; otherwise, they will be kept as long as
      * configured and only be removed if their dead body has become older. Tasks
      * marked PREPARE_FOR_RESTART will be replaced (because a
      * {@link java.lang.Thread} cannot be started a second time) by a copy of
      * them.
      *
      * <p>
      * If a ConcurrentModificationException arises during list examination, the
      * method will behave like a polite servant and retire silently until the
      * lordship has scarpered. This is not a pity because it will be started
      * every some seconds.
      *
      * <p>
      * After having finished iterating, the method will reduce the absolute
      * number of expired threads as configured. (Since new threads will be added
      * to the bottom of the list and we therefore want to remove older ones
      * top-down we cannot do this before we know their count, thus we cannot do
      * this while iterating.) Last, new threads will be started up to the
      * remaining available clearance.
      *
      * @see java.lang.Runnable#run()
      */
     @Override
     public void run() {
         TaskManager taskManager = TaskManager.singleton();
         if (taskManager.taskList.isEmpty()) {
             return;
         }
 
         LinkedList<EmptyTask> launchableThreads = new LinkedList<>();
         LinkedList<EmptyTask> finishedThreads = new LinkedList<>();
         LinkedList<EmptyTask> failedThreads = new LinkedList<>();
         int availableClearance = autoRunLimit;
 
         int successfulMaxCount = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.TASK_MANAGER_KEEP_SUCCESSFUL);
         int failedMaxCount = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.TASK_MANAGER_KEEP_FAILED);
         Duration successfulMaxAge = ConfigCore.getDurationParameter(ParameterCore.TASK_MANAGER_KEEP_SUCCESSFUL_MINS,
                 ChronoUnit.MINUTES);
         Duration failedMaxAge = ConfigCore.getDurationParameter(ParameterCore.TASK_MANAGER_KEEP_FAILED_MINS,
                 ChronoUnit.MINUTES);
 
         ListIterator<EmptyTask> position = taskManager.taskList.listIterator();
         EmptyTask task;
         try {
             while (position.hasNext()) {
                 availableClearance = handleTaskModification(launchableThreads, finishedThreads, failedThreads,
                         availableClearance, successfulMaxAge, failedMaxAge, position);
             }
         } catch (ConcurrentModificationException e) {
             return;
         }
 
         while (finishedThreads.size() > successfulMaxCount && (task = finishedThreads.pollFirst()) != null) {
             taskManager.taskList.remove(task);
         }
 
         while (failedThreads.size() > failedMaxCount && (task = failedThreads.pollFirst()) != null) {
             taskManager.taskList.remove(task);
         }
 
         while (launchableThreads.size() > availableClearance) {
             launchableThreads.removeLast();
         }
         while ((task = launchableThreads.pollFirst()) != null) {
             task.start();
         }
     }
 
     private int handleTaskModification(LinkedList<EmptyTask> launchableThreads, LinkedList<EmptyTask> finishedThreads,
                                        LinkedList<EmptyTask> failedThreads, int availableClearance,
                                        Duration successfulMaxAge, Duration failedMaxAge,
                                        ListIterator<EmptyTask> position) {
         EmptyTask task;
         task = position.next();
         switch (task.getTaskState()) {
             case WORKING:
             case STOPPING:
                 availableClearance = Math.max(availableClearance - 1, 0);
                 break;
             case NEW:
                 if (Behaviour.DELETE_IMMEDIATELY.equals(task.getBehaviour())) {
                     position.remove();
                 } else {
                     launchableThreads.addLast(task);
                 }
                 break;
             default: // cases STOPPED, FINISHED, CRASHED
                 switch (task.getBehaviour()) {
                     case DELETE_IMMEDIATELY:
                         position.remove();
                         break;
                     case PREPARE_FOR_RESTART:
                         EmptyTask replacement = task.replace();
                         if (Objects.nonNull(replacement)) {
                             position.set(replacement);
                             launchableThreads.addLast(replacement);
                         }
                         break;
                     default: // case KEEP_FOR_A_WHILE
                         boolean taskFinishedSuccessfully = Objects.isNull(task.getException());
                         Duration durationDead = task.getDurationDead();
                         if (Objects.isNull(durationDead)) {
                             task.setTimeOfDeath();
                         } else if (durationDead.compareTo(taskFinishedSuccessfully ? successfulMaxAge : failedMaxAge) > 0) {
                             position.remove();
                             break;
                         }
                         if (taskFinishedSuccessfully) {
                             finishedThreads.add(task);
                         } else {
                             failedThreads.add(task);
                         }
                         break;
                 }
         }
         return availableClearance;
     }
 
     /**
      * Turns the feature to auto-run tasks
      * on or off. To enable, it will set the limit of auto running threads to
      * the number of available cores of the runtime or to the value set in the
      * global configuration file, if any. To disable auto-running it will set
      * the number to 0.
      *
      * @param on
      *            whether the TaskManager shall auto-run threads
      */
     public static void setAutoRunningThreads(boolean on) {
         if (on) {
             int cores = Runtime.getRuntime().availableProcessors();
             autoRunLimit = ConfigCore.getIntParameter(ParameterCore.TASK_MANAGER_AUTORUN_LIMIT, cores);
         } else {
             autoRunLimit = 0;
         }
     }
 }