Coverage Summary for Class: ActiveMQProcessor (org.kitodo.production.interfaces.activemq)
Class |
Class, %
|
Method, %
|
Line, %
|
ActiveMQProcessor |
0%
(0/1)
|
0%
(0/8)
|
0%
(0/38)
|
/*
* (c) Kitodo. Key to digital objects e. V. <contact@kitodo.org>
*
* This file is part of the Kitodo project.
*
* It is licensed under GNU General Public License version 3 or later.
*
* For the full copyright and license information, please read the
* GPL3-License.txt file that was distributed with this source code.
*/
package org.kitodo.production.interfaces.activemq;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.kitodo.config.ConfigCore;
import org.kitodo.config.enums.ParameterCore;
import org.kitodo.data.database.beans.Client;
import org.kitodo.data.database.beans.User;
import org.kitodo.data.database.exceptions.DAOException;
import org.kitodo.production.enums.ReportLevel;
import org.kitodo.production.helper.Helper;
import org.kitodo.production.security.SecurityUserDetails;
import org.kitodo.production.services.ServiceManager;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
/**
* The class ActiveMQProcessor offers general services, such as making the
* incoming messages available as MapMessages and publishing the results. When I
* came clear that this code would be necessary for every processor, I thought
* an abstract class would be the right place for it. ActiveMQProcessor also
* provides a place to save the checker for the ActiveMQDirector, to be able to
* shut it down later.
*/
public abstract class ActiveMQProcessor implements MessageListener {
/**
* The name of the queue from which this processor is processing messages.
*/
private String queueName;
/**
* The message consumer object that actually receives the messages and
* executes {@link #process(MapMessageObjectReader)}.
*/
private MessageConsumer messageConsumer;
/**
* Must be implemented to let the service do what it should do.
*
* @param ticket
* an object providing access to the fields of the received map
* message
*/
protected abstract void process(MapMessageObjectReader ticket) throws DAOException, JMSException;
/**
* Instantiating the class ActiveMQProcessor always requires to pass the
* name of the queue it should be attached to. That means, your constructor
* should look like this:
*
* <pre>
* public MyServiceProcessor() {
* super(ConfigCore.getParameter("activeMQ.myService.queue", null));
* }
* </pre>
*
* <p>
* If the parameter is not set in kitodo_config.properties, it will return
* “null” and so prevents it from being set up in ActiveMQDirector.
*
* @param queueName
* the queue name, if configured, or “null” to prevent the
* processor from being connected.
*/
public ActiveMQProcessor(String queueName) {
this.queueName = queueName;
}
/**
* Provides a corset which checks the message being a MapMessage, performs a
* type safe type cast, and extracts the job’s ID to be able to send useful
* results to the results topic, which it does after the abstract method
* process() has finished.
*
* <p>
* Since this will be the same for all processors which use MapMessages, I
* extracted the portion into the abstract class.
*/
@Override
public void onMessage(Message arg) {
MapMessageObjectReader message;
String ticketID = null;
try {
// Basic check message
message = getMessageFromObjectReader(arg);
ticketID = message.getMandatoryString("id");
// turn on logging
turnOnLogging(ticketID);
// set default user
Optional<String> optionalLogin = ConfigCore.getOptionalString(ParameterCore.ACTIVE_MQ_USER);
SecurityContext securityContext = SecurityContextHolder.getContext();
if (optionalLogin.isPresent()) {
if (Objects.isNull(securityContext.getAuthentication())) {
setUserAuthentification(optionalLogin, securityContext);
} else {
optionalLogin = Optional.empty();
}
}
// process message
process(message);
if (optionalLogin.isPresent()) {
securityContext.setAuthentication(null);
}
// turn off logging again
Helper.setActiveMQReporting(null);
// if everything ‘s fine, report success
new WebServiceResult(queueName, ticketID, ReportLevel.SUCCESS).send();
} catch (Exception e) {
// report any errors
new WebServiceResult(queueName, ticketID, ReportLevel.FATAL, e.getMessage()).send();
}
}
private void setUserAuthentification(Optional<String> optionalLogin, SecurityContext securityContext) throws DAOException {
User user = ServiceManager.getUserService().getByLogin(optionalLogin.get());
SecurityUserDetails securityUserDetails = new SecurityUserDetails(user);
Authentication auth = new UsernamePasswordAuthenticationToken(securityUserDetails, null,
securityUserDetails.getAuthorities());
Client clientId = ServiceManager.getClientService().getById(user.getClients().get(0).getId());
securityUserDetails.setSessionClient(clientId);
securityContext.setAuthentication(auth);
}
private void turnOnLogging(String ticketID) {
Map<String, String> loggingConfig = new HashMap<>();
loggingConfig.put("queueName", queueName);
loggingConfig.put("id", ticketID);
Helper.setActiveMQReporting(loggingConfig);
}
private MapMessageObjectReader getMessageFromObjectReader(Message arg) {
MapMessageObjectReader message;
if (arg instanceof MapMessage) {
message = new MapMessageObjectReader((MapMessage) arg);
} else {
throw new IllegalArgumentException("Incompatible types.");
}
return message;
}
/**
* Returns the queue name. Maybe null if the processor is not active.
*
* @return the queue name
*/
public String getQueueName() {
return queueName;
}
/**
* Sets the message consumer to have it later for shutting down the service.
*
* @param messageConsumer
* the MessageConsumer object responsible for checking messages
*/
public void setMessageConsumer(MessageConsumer messageConsumer) {
this.messageConsumer = messageConsumer;
}
/**
* Returns the message consumer. Maybe null. Used for shutdown.
*
* @return the message consumer
*/
public MessageConsumer getMessageConsumer() {
return messageConsumer;
}
}