Coverage Summary for Class: IndexRestClient (org.kitodo.data.elasticsearch.index)

Class Class, % Method, % Line, %
IndexRestClient 100% (1/1) 100% (9/9) 83,6% (51/61)


 /*
  * (c) Kitodo. Key to digital objects e. V. <contact@kitodo.org>
  *
  * This file is part of the Kitodo project.
  *
  * It is licensed under GNU General Public License version 3 or later.
  *
  * For the full copyright and license information, please read the
  * GPL3-License.txt file that was distributed with this source code.
  */
 
 package org.kitodo.data.elasticsearch.index;
 
 import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 
 import javax.ws.rs.HttpMethod;
 
 import org.apache.http.HttpEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.nio.entity.NStringEntity;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.kitodo.data.elasticsearch.KitodoRestClient;
 import org.kitodo.data.elasticsearch.exceptions.CustomResponseException;
 import org.kitodo.data.exceptions.DataException;
 
 /**
  * Implementation of ElasticSearch REST Client for index package.
  */
 public class IndexRestClient extends KitodoRestClient {
 
     /**
      * IndexRestClient singleton.
      */
     private static volatile IndexRestClient instance = null;
     private final Object lock = new Object();
 
     private IndexRestClient() {
     }
 
     /**
      * Return singleton variable of type IndexRestClient.
      *
      * @return unique instance of IndexRestClient
      */
     public static IndexRestClient getInstance() {
         IndexRestClient localReference = instance;
         if (Objects.isNull(localReference)) {
             synchronized (IndexRestClient.class) {
                 localReference = instance;
                 if (Objects.isNull(localReference)) {
                     localReference = new IndexRestClient();
                     localReference.initiateClient();
                     instance = localReference;
                 }
             }
         }
         return localReference;
     }
 
     /**
      * Add document to the index. This method will be used for add or update of
      * single document.
      *
      * @param type
      *            for which request is performed
      * @param entity
      *            with document which is going to be indexed
      * @param id
      *            of document - equal to the id from table in database
      * @param forceRefresh
      *            force index refresh - if true, time of execution is longer but
      *            object is right after that available for display
      */
     public void addDocument(String type, Map<String, Object> entity, Integer id, boolean forceRefresh)
             throws IOException, CustomResponseException {
         IndexRequest indexRequest = new IndexRequest(this.indexBase + "_" + type).source(entity);
         indexRequest.id(String.valueOf(id));
         if (forceRefresh) {
             indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         }
 
         IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
         processStatusCode(indexResponse.status());
     }
 
     /**
      * Add list of documents to the index. This method will be used for add whole
      * table to the index. It performs asynchronous request.
      *
      * @param type
      *            for which request is performed
      * @param documentsToIndex
      *            list of json documents to the index
      */
     void addTypeSync(String type, Map<Integer, Map<String, Object>> documentsToIndex) throws CustomResponseException {
         BulkRequest bulkRequest = prepareBulkRequest(type, documentsToIndex);
 
         try {
             BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
             if (bulkResponse.hasFailures()) {
                 throw new CustomResponseException(bulkResponse.buildFailureMessage());
             }
         } catch (IOException e) {
             throw new CustomResponseException(e);
         }
     }
 
     /**
      * Add list of documents to the index. This method will be used for add whole
      * table to the index. It performs asynchronous request.
      *
      * @param type
      *            for which request is performed
      * @param documentsToIndex
      *            list of json documents to the index
      */
     void addTypeAsync(String type, Map<Integer, Map<String, Object>> documentsToIndex) {
         BulkRequest bulkRequest = prepareBulkRequest(type, documentsToIndex);
 
         ResponseListener responseListener = new ResponseListener(type, documentsToIndex.size());
         highLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, responseListener);
 
         synchronized (lock) {
             while (Objects.isNull(responseListener.getBulkResponse())) {
                 try {
                     lock.wait(1000);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     break;
                 }
             }
         }
     }
 
     /**
      * Delete document from type specific index.
      *
      * @param type
      *            for which request is performed
      * @param id
      *            of the document
      * @param forceRefresh
      *            force index refresh - if true, time of execution is longer but
      *            object is right after that available for display
      */
     void deleteDocument(String type, Integer id, boolean forceRefresh) throws CustomResponseException, DataException {
         DeleteRequest deleteRequest = new DeleteRequest(this.indexBase + "_" + type);
         deleteRequest.id(String.valueOf(id));
         if (forceRefresh) {
             deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         }
 
         try {
             highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
         } catch (ResponseException e) {
             handleResponseException(e);
         }  catch (IOException e) {
             throw new DataException(e);
         }
     }
 
     /**
      * Enable sorting by text field.
      *
      * @param field
      *            as String
      * @param mappingType
      *            as String
      */
     public void enableSortingByTextField(String field, String mappingType) throws IOException, CustomResponseException {
         String query = "{\n \"properties\": {\n\"" + field + "\": {\n" + "      \"type\": \"text\",\n"
                 + "      \"fielddata\": true,\n" + "      \"fields\": {\n" + "        \"raw\": {\n"
                 + "          \"type\":  \"text\",\n" + "          \"index\": false}\n" + "    }\n" + "  }}}";
         HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
         Request request = new Request(HttpMethod.PUT,
                 "/" + this.getIndexBase() + "_" + mappingType + "/_mappings");
         request.setEntity(entity);
         Response indexResponse = client.performRequest(request);
         processStatusCode(indexResponse.getStatusLine());
     }
 
     private BulkRequest prepareBulkRequest(String type, Map<Integer, Map<String, Object>> documentsToIndex) {
         BulkRequest bulkRequest = new BulkRequest();
 
         for (Map.Entry<Integer, Map<String, Object>> entry : documentsToIndex.entrySet()) {
             IndexRequest indexRequest = new IndexRequest(this.indexBase + "_" + type);
             indexRequest.id(String.valueOf(entry.getKey()));
             bulkRequest.add(indexRequest.source(entry.getValue()));
         }
 
         return bulkRequest;
     }
 }