Coverage Summary for Class: IndexWorker (org.kitodo.production.helper)

Class Class, % Method, % Line, %
IndexWorker 0% (0/1) 0% (0/4) 0% (0/34)


 /*
  * (c) Kitodo. Key to digital objects e. V. <contact@kitodo.org>
  *
  * This file is part of the Kitodo project.
  *
  * It is licensed under GNU General Public License version 3 or later.
  *
  * For the full copyright and license information, please read the
  * GPL3-License.txt file that was distributed with this source code.
  */
 
 package org.kitodo.production.helper;
 
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.kitodo.config.ConfigCore;
 import org.kitodo.config.enums.ParameterCore;
 import org.kitodo.data.database.exceptions.DAOException;
 import org.kitodo.data.elasticsearch.exceptions.CustomResponseException;
 import org.kitodo.production.enums.ObjectType;
 import org.kitodo.production.services.data.base.SearchService;
 
 public class IndexWorker implements Runnable {
 
     private static final Logger logger = LogManager.getLogger(IndexWorker.class);
     
     private final boolean indexAllObjects;
     private final ObjectType objectType;
     private final SearchService searchService;
     private final IndexWorkerStatus indexWorkerStatus;
 
     /**
      * Constructor initializing an IndexWorker object with the given SearchService
      * and list of objects that will be indexed.
      *
      * @param searchService
      *            SearchService instance used for indexing
      */
     public IndexWorker(SearchService searchService, ObjectType objectType, IndexWorkerStatus indexWorkerStatus, boolean indexAllObjects) {
         this.searchService = searchService;
         this.indexWorkerStatus = indexWorkerStatus;
         this.indexAllObjects = indexAllObjects;
         this.objectType = objectType;
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public void run() {
         int maxAttempts = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_ATTEMPTS);
         int batchSize = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_BATCH);
         int timeBetweenAttempts = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_TIME_BETWEEN_ATTEMPTS);
         int maxBatch = indexWorkerStatus.getMaxBatch();
 
         int nextBatch = indexWorkerStatus.getAndIncrementNextBatch();
         while (!indexWorkerStatus.hasFailed() && !indexWorkerStatus.isCanceled() && nextBatch < maxBatch) {
             // nextBatch is a valid batch that needs to be processed
 
             int attempt = 1;
             while (attempt < maxAttempts) {
                 try {
                     int offset = nextBatch * batchSize;
                     logger.info("index " + objectType.toString() + " with offset " + offset + " and attempt " 
                         + attempt + "/" + maxAttempts);
 
                     if (indexAllObjects) {
                         indexObjects(searchService.getAll(offset, batchSize));
                     } else {
                         indexObjects(searchService.getAllNotIndexed(offset, batchSize));
                     }
 
                     break;
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                     attempt += 1;
                     try {
                         Thread.sleep(timeBetweenAttempts);
                     } catch (InterruptedException e2) {
                         logger.trace("Index worker sleep is interrupted while waiting for next indexing attempt");
                     }
                 }
             }
 
             if (attempt >= maxAttempts) {
                 logger.error("stop indexing after maximum amount of attempts");
                 this.indexWorkerStatus.markAsFailed();
             } else {
                 // find next batch that can be indexed
                 nextBatch = indexWorkerStatus.getAndIncrementNextBatch();
             }
         }
     }
 
     @SuppressWarnings("unchecked")
     private void indexObjects(List<Object> objectsToIndex) throws CustomResponseException, DAOException, IOException {
         this.searchService.addAllObjectsToIndex(objectsToIndex);
     }
 }