Coverage Summary for Class: IndexingService (org.kitodo.production.services.index)
Class |
Method, %
|
Line, %
|
IndexingService |
28,2%
(11/39)
|
35,8%
(81/226)
|
IndexingService$1 |
0%
(0/2)
|
0%
(0/4)
|
IndexingService$2 |
100%
(1/1)
|
100%
(1/1)
|
Total |
28,6%
(12/42)
|
35,5%
(82/231)
|
/*
* (c) Kitodo. Key to digital objects e. V. <contact@kitodo.org>
*
* This file is part of the Kitodo project.
*
* It is licensed under GNU General Public License version 3 or later.
*
* For the full copyright and license information, please read the
* GPL3-License.txt file that was distributed with this source code.
*/
package org.kitodo.production.services.index;
import static java.lang.Math.toIntExact;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.EnumMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import javax.faces.push.PushContext;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.kitodo.config.ConfigCore;
import org.kitodo.config.ConfigMain;
import org.kitodo.config.enums.ParameterCore;
import org.kitodo.data.database.exceptions.DAOException;
import org.kitodo.data.elasticsearch.KitodoRestClient;
import org.kitodo.data.elasticsearch.exceptions.CustomResponseException;
import org.kitodo.data.elasticsearch.index.IndexRestClient;
import org.kitodo.data.exceptions.DataException;
import org.kitodo.production.enums.IndexStates;
import org.kitodo.production.enums.ObjectType;
import org.kitodo.production.helper.Helper;
import org.kitodo.production.helper.IndexWorker;
import org.kitodo.production.helper.IndexWorkerStatus;
import org.kitodo.production.services.ServiceManager;
import org.kitodo.production.services.data.base.SearchService;
public class IndexingService {
private static final Logger logger = LogManager.getLogger(IndexingService.class);
private static volatile IndexingService instance = null;
private static final List<ObjectType> objectTypes = ObjectType.getIndexableObjectTypes();
private final Map<ObjectType, SearchService> searchServices = new EnumMap<>(ObjectType.class);
private final Map<ObjectType, IndexStates> objectIndexingStates = new EnumMap<>(ObjectType.class);
private final Map<ObjectType, Integer> countDatabaseObjects = new EnumMap<>(ObjectType.class);
// messages for web socket communication
private static final String INDEXING_STARTED_MESSAGE = "indexing_started";
static final String INDEXING_FINISHED_MESSAGE = "indexing_finished";
public static final String DELETION_STARTED_MESSAGE = "deletion_started";
private static final String DELETION_FINISHED_MESSAGE = "deletion_finished";
private static final String DELETION_FAILED_MESSAGE = "deletion_failed";
public static final String MAPPING_STARTED_MESSAGE = "mapping_started";
private static final String MAPPING_FINISHED_MESSAGE = "mapping_finished";
public static final String MAPPING_FAILED_MESSAGE = "mapping_failed";
static final int PAUSE = 1000;
private IndexWorkerStatus indexWorkerStatus = null;
private IndexManagmentThread indexAllThread = null;
private boolean indexingAll = false;
private ObjectType currentIndexState = ObjectType.NONE;
private IndexStates currentState = IndexStates.NO_STATE;
private static final IndexRestClient indexRestClient = IndexRestClient.getInstance();
/**
* Return singleton variable of type IndexingService.
*
* @return unique instance of IndexingService
*/
public static IndexingService getInstance() {
IndexingService localReference = instance;
if (Objects.isNull(localReference)) {
synchronized (IndexingService.class) {
localReference = instance;
if (Objects.isNull(localReference)) {
localReference = new IndexingService();
instance = localReference;
}
}
}
return localReference;
}
/**
* Standard constructor.
*/
private IndexingService() {
for (ObjectType objectType : objectTypes) {
searchServices.put(objectType, getService(objectType));
objectIndexingStates.put(objectType, IndexStates.NO_STATE);
}
indexRestClient.setIndexBase(ConfigMain.getParameter("elasticsearch.index", "kitodo"));
try {
countDatabaseObjects();
} catch (DAOException e) {
Helper.setErrorMessage(e.getLocalizedMessage(), logger, e);
}
}
private SearchService getService(ObjectType objectType) {
if (!searchServices.containsKey(objectType) || Objects.isNull(searchServices.get(objectType))) {
switch (objectType) {
case BATCH:
searchServices.put(objectType, ServiceManager.getBatchService());
break;
case DOCKET:
searchServices.put(objectType, ServiceManager.getDocketService());
break;
case PROCESS:
searchServices.put(objectType, ServiceManager.getProcessService());
break;
case PROJECT:
searchServices.put(objectType, ServiceManager.getProjectService());
break;
case RULESET:
searchServices.put(objectType, ServiceManager.getRulesetService());
break;
case TASK:
searchServices.put(objectType, ServiceManager.getTaskService());
break;
case TEMPLATE:
searchServices.put(objectType, ServiceManager.getTemplateService());
break;
case WORKFLOW:
searchServices.put(objectType, ServiceManager.getWorkflowService());
break;
case FILTER:
searchServices.put(objectType, ServiceManager.getFilterService());
break;
default:
return null;
}
}
return searchServices.get(objectType);
}
/**
* Return the total number of all objects that can be indexed.
*
* @return long number of all items that can be written to the index
*/
public long getTotalCount() {
int totalCount = 0;
for (ObjectType objectType : objectTypes) {
totalCount += countDatabaseObjects.get(objectType);
}
return totalCount;
}
/**
* Update counts of index and database objects.
*/
private void updateCounts() throws DAOException {
countDatabaseObjects();
}
public Map<ObjectType, Integer> getCountDatabaseObjects() {
return countDatabaseObjects;
}
public boolean isIndexCorrupted() throws DAOException, DataException {
updateCounts();
return getTotalCount() != getAllIndexed();
}
/**
* Return the number of all objects processed during the current indexing
* progress.
*
* @return long number of all currently indexed objects
* @throws ArithmeticException
* if the value will not fit in a {@code long}
*/
public long getAllIndexed() throws DataException {
long allIndexed = 0;
for (ObjectType objectType : objectTypes) {
allIndexed = Math.addExact(allIndexed, getNumberOfIndexedObjects(objectType));
}
return allIndexed;
}
/**
* Return the number of indexed objects for the given ObjectType.
*
* @param objectType
* ObjectType for which the number of indexed objects is returned
*
* @return number of indexed objects
*/
public long getNumberOfIndexedObjects(ObjectType objectType) throws DataException {
return searchServices.get(objectType).count();
}
/**
* Count database objects. Execute it on application start and next on button
* click.
*/
public void countDatabaseObjects() throws DAOException {
for (ObjectType objectType : objectTypes) {
countDatabaseObjects.put(objectType, getNumberOfDatabaseObjects(objectType));
}
}
/**
* Manage indexing for a given object type.
*
* <p>This method is executed in the `IndexManagementThread`.</p>
*
* @param type
* type objects that get indexed
*/
public IndexWorkerStatus runIndexing(ObjectType type, PushContext pushContext, boolean indexAllObjects)
throws DataException, CustomResponseException, DAOException {
SearchService searchService = searchServices.get(type);
int indexLimit = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_INDEXLIMIT);
if (countDatabaseObjects.get(type) > 0) {
if (indexAllObjects) {
// only check for loose index data when indexing full objects
Long amountInIndex = searchService.count();
long offset = 0L;
// remove documents in index that are no longer available in database
// by iterating over all indexed documents in elastic search
try {
while (offset < amountInIndex) {
// TODO: actually iterate over all elastic search documents
searchService.removeLooseIndexData(searchService.findAllIDs(offset, indexLimit));
offset += indexLimit;
}
} catch (RuntimeException e) {
// this is an elastic search exception, but elastic search is not available here
logger.info("Cannot check documents beyond elastic search max_result_window, continuing ...");
}
}
return spawnIndexingThreads(type, pushContext, indexAllObjects);
}
return null;
}
/**
* Return the number of objects in the database for the given ObjectType.
*
* @param objectType
* name of ObjectType for which the number of database objects is
* returned
* @return number of database objects
*/
private int getNumberOfDatabaseObjects(ObjectType objectType) throws DAOException {
SearchService searchService = searchServices.get(objectType);
if (Objects.nonNull(searchService)) {
return toIntExact(searchService.countDatabaseRows());
}
return 0;
}
private ExecutorService createDeamonizedExecutorService(int threads) {
return Executors.newFixedThreadPool(threads,
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
}
}
);
}
/**
* Create multiple indexing worker threads and wait until they are finished.
*
* <p>This method is executed in the `IndexAllThread`.</p>
*
* @param type the object type to be indexed
* @param pollingChannel the UI polling channel for triggering updates
* @param indexAllObjects whehter all or only remaining objects are indexed
*/
private IndexWorkerStatus spawnIndexingThreads(ObjectType type, PushContext pollingChannel, boolean indexAllObjects)
throws DAOException {
// declare that indexing for type has started
currentIndexState = type;
currentState = IndexStates.INDEXING_STARTED;
objectIndexingStates.put(type, IndexStates.INDEXING_STARTED);
int threads = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_THREADS);
int totalNumberOfObjects = getNumberOfDatabaseObjects(type);
int batchSize = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_BATCH);
int maxBatch = (int)Math.ceil((double)(totalNumberOfObjects) / (double)(batchSize));
// create new thread-safe indexing status
indexWorkerStatus = new IndexWorkerStatus(maxBatch);
logger.info("start " + threads + " threads for indexing " + type.toString());
ExecutorService executor = null;
try {
executor = createDeamonizedExecutorService(threads);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (int i = 0; i < threads; i++) {
Future<?> future = executor.submit(new IndexWorker(getService(type), type, indexWorkerStatus, indexAllObjects));
futures.add(future);
}
waitWhileIndexing(type, futures, pollingChannel);
} finally {
if (Objects.nonNull(executor)) {
executor.shutdown();
}
}
return indexWorkerStatus;
}
/**
* Wait and check whether index worker threads have finished or failed.
*
* <p>Also trigger UI updates every second.</p>
*
* <p>This method is executed in the `IndexAllThread`.</p>
*
* @param type the object type currently being indexed
* @param futures a list of futures allowing to check the status of the worker threads
* @param pollingChannel the UI polling channel
*/
private void waitWhileIndexing(ObjectType type, List<Future<?>> futures, PushContext pollingChannel) {
// send update that indexing has started (activating polling in user interface)
pollingChannel.send(INDEXING_STARTED_MESSAGE + type);
while (true) {
// check whether all jobs are done
boolean done = true;
boolean failed = indexWorkerStatus.hasFailed() || indexWorkerStatus.isCanceled();
for (Future<?> future : futures) {
if (!future.isDone()) {
done = false;
}
if (future.isCancelled()) {
failed = true;
}
}
// check for failure first (in case all threads have already stopped gracefully and are done)
if (failed) {
logger.info("indexing of " + type.toString() + " failed, cleaning up");
currentIndexState = ObjectType.NONE;
currentState = IndexStates.INDEXING_FAILED;
objectIndexingStates.put(type, IndexStates.INDEXING_FAILED);
// make sure to stop any remaining other worker threads
for (Future<?> future : futures) {
future.cancel(true);
}
break;
}
if (done) {
// indexing has completed
logger.info("indexing of " + type.toString() + " finished successfully");
currentIndexState = ObjectType.NONE;
currentState = IndexStates.INDEXING_SUCCESSFUL;
objectIndexingStates.put(type, IndexStates.INDEXING_SUCCESSFUL);
break;
}
// wait a bit
try {
Thread.sleep(PAUSE);
} catch (InterruptedException e) {
logger.trace("Index management sleep interrupted while waiting for worker threads to finish indexing");
}
}
}
/**
* Return the progress in percent of the currently running indexing process. If
* the list of entries to be indexed is empty, this will return "0".
*
* @param currentType
* the ObjectType for which the progress will be determined
* @return the progress of the current indexing process in percent
*/
public int getProgress(ObjectType currentType, PushContext pollingChannel) throws DataException {
long numberOfObjects = countDatabaseObjects.get(currentType);
long nrOfIndexedObjects = getNumberOfIndexedObjects(currentType);
int progress = numberOfObjects > 0 ? (int) ((nrOfIndexedObjects / (float) numberOfObjects) * 100) : 0;
if (Objects.equals(currentIndexState, currentType) && (numberOfObjects == 0 || progress == 100)) {
currentIndexState = ObjectType.NONE;
if (numberOfObjects == 0) {
objectIndexingStates.put(currentType, IndexStates.NO_STATE);
} else {
objectIndexingStates.put(currentType, IndexStates.INDEXING_SUCCESSFUL);
}
pollingChannel.send(INDEXING_FINISHED_MESSAGE + currentType + "!");
}
return progress;
}
/**
* Create mapping which enables sorting and other aggregation functions.
*/
public String createMapping() throws IOException, CustomResponseException {
for (String mappingType : KitodoRestClient.MAPPING_TYPES) {
String mapping = readMapping(mappingType);
if ("".equals(mapping)) {
if (indexRestClient.createIndex(null, mappingType)) {
currentState = IndexStates.CREATING_MAPPING_SUCCESSFUL;
} else {
currentState = IndexStates.CREATING_MAPPING_FAILED;
return MAPPING_FAILED_MESSAGE;
}
} else {
if (indexRestClient.createIndex(mapping, mappingType)) {
if (isMappingValid(mapping, mappingType)) {
currentState = IndexStates.CREATING_MAPPING_SUCCESSFUL;
} else {
currentState = IndexStates.CREATING_MAPPING_FAILED;
return MAPPING_FAILED_MESSAGE;
}
} else {
currentState = IndexStates.CREATING_MAPPING_FAILED;
return MAPPING_FAILED_MESSAGE;
}
}
}
return MAPPING_FINISHED_MESSAGE;
}
/**
* Delete whole ElasticSearch index.
*/
public String deleteIndex() {
try {
indexRestClient.deleteAllIndexes();
currentState = IndexStates.DELETING_SUCCESSFUL;
return DELETION_FINISHED_MESSAGE;
} catch (IOException e) {
Helper.setErrorMessage(e.getLocalizedMessage(), logger, e);
currentState = IndexStates.DELETING_FAILED;
return DELETION_FAILED_MESSAGE;
}
}
private boolean isMappingValid(String mapping, String mappingType) {
return isMappingEqualTo(mapping, mappingType);
}
/**
* Return server information provided by the searchService and gathered by the
* rest client.
*
* @return String information about the server
*/
public String getServerInformation() throws IOException {
return indexRestClient.getServerInformation();
}
/**
* Tests and returns whether the ElasticSearch index has been created or not.
*
* @return whether the ElasticSearch index exists or not
*/
public boolean indexExists() throws IOException, CustomResponseException {
return indexRestClient.typeIndexesExist();
}
/**
* Return the state of the ES index. -2 = failed deleting the index -1 = failed
* creating ES mapping 1 = successfully created ES mapping 2 = successfully
* deleted index
*
* @return state of ES index
*/
public IndexStates getIndexState() {
return currentState;
}
public void setIndexState(IndexStates state) {
currentState = state;
}
/**
* Return the index state of the given objectType.
*
* @param objectType
* the objectType for which the IndexState should be returned
*
* @return indexing state of the given object type.
*/
public IndexStates getObjectIndexState(ObjectType objectType) {
return objectIndexingStates.get(objectType);
}
/**
* Return static variable representing the global state. - return 'indexing
* failed' state if any object type is in 'indexing failed' state - return 'no
* state' if any object type is in 'no state' state - return 'indexing
* successful' state if all object types are in 'indexing successful' state
*
* @return static variable for global indexing state
*/
public IndexStates getAllObjectsIndexingState() {
for (ObjectType objectType : objectTypes) {
if (Objects.equals(objectIndexingStates.get(objectType), IndexStates.INDEXING_FAILED)) {
return IndexStates.INDEXING_FAILED;
}
if (Objects.equals(objectIndexingStates.get(objectType), IndexStates.NO_STATE)) {
return IndexStates.NO_STATE;
}
}
return IndexStates.INDEXING_SUCCESSFUL;
}
/**
* Return whether any indexing process is currently in progress or not.
*
* @return boolean Value indicating whether any indexing process is currently in
* progress or not
*/
public boolean indexingInProgress() {
return !Objects.equals(this.currentIndexState, ObjectType.NONE) || indexingAll;
}
/**
* Check if current mapping is empty.
*
* @return true if mapping is empty, otherwise false
*/
public boolean isMappingEmpty() {
String emptyMapping = "{\n\"mappings\": {\n\n }\n}";
for (String mappingType : KitodoRestClient.MAPPING_TYPES) {
if (isMappingEqualTo(emptyMapping, mappingType)) {
return true;
}
}
return false;
}
private boolean isMappingEqualTo(String mapping, String mappingType) {
try (JsonReader mappingExpectedReader = Json.createReader(new StringReader(mapping));
JsonReader mappingCurrentReader = Json.createReader(new StringReader(indexRestClient.getMapping(mappingType)))) {
JsonObject mappingExpected = mappingExpectedReader.readObject();
JsonObject mappingCurrent = mappingCurrentReader.readObject().getJsonObject(indexRestClient.getIndexBase() + "_" + mappingType);
return mappingExpected.equals(mappingCurrent);
} catch (IOException e) {
return false;
}
}
private static String readMapping(String mappingType) {
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
try (InputStream inputStream = classloader.getResourceAsStream("elasticsearch_mappings/" + mappingType + ".json")) {
if (Objects.nonNull(inputStream)) {
String mapping = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
try (JsonReader jsonReader = Json.createReader(new StringReader(mapping))) {
JsonObject jsonObject = jsonReader.readObject();
return jsonObject.toString();
}
} else {
Helper.setErrorMessage("Mapping not found!");
return "";
}
} catch (IOException e) {
Helper.setErrorMessage(e.getLocalizedMessage(), logger, e);
return "";
}
}
private void startIndexingThread(PushContext context, boolean indexAllObjects, ObjectType objectType) throws IllegalStateException {
if (Objects.isNull(indexAllThread) || !indexAllThread.isAlive()) {
indexAllThread = new IndexManagmentThread(context, this, objectType, indexAllObjects);
indexAllThread.setName("IndexManagementThread");
indexAllThread.start();
} else {
throw new IllegalStateException("indexing already in progress, can not start again");
}
}
/**
* Start indexing of all objects of specific object type.
*
* @param context the UI context
* @param objectType the object type to index
*/
public void startIndexing(PushContext context, ObjectType objectType) {
startIndexingThread(context, true, objectType);
}
/**
* Start indexing of remaining objects of specific object type.
*
* @param context the UI context
* @param objectType the object type to index
*/
public void startIndexingRemaining(PushContext context, ObjectType objectType) {
startIndexingThread(context, false, objectType);
}
/**
* Start indexing of all database objects independent of object type.
*/
public void startAllIndexing(PushContext context) {
startIndexingThread(context, true, null);
}
/**
* Starts indexing all remaining database objects independent of object type.
*/
public void startAllIndexingRemaining(PushContext pushContext) {
startIndexingThread(pushContext, false, null);
}
void setIndexingAll(boolean indexing) {
indexingAll = indexing;
}
void resetCurrentIndexState() {
currentIndexState = ObjectType.NONE;
}
/**
* Cancels indexing upon user request.
*/
public void cancelIndexing() {
if (Objects.nonNull(indexWorkerStatus)) {
indexWorkerStatus.markAsCanceled();
}
}
/**
* Get logger.
*
* @return value of logger
*/
public static Logger getLogger() {
return logger;
}
}