Coverage Summary for Class: ActiveMQDirector (org.kitodo.production.interfaces.activemq)
Class |
Class, %
|
Method, %
|
Line, %
|
ActiveMQDirector |
0%
(0/1)
|
0%
(0/12)
|
0%
(0/83)
|
/*
* (c) Kitodo. Key to digital objects e. V. <contact@kitodo.org>
*
* This file is part of the Kitodo project.
*
* It is licensed under GNU General Public License version 3 or later.
*
* For the full copyright and license information, please read the
* GPL3-License.txt file that was distributed with this source code.
*/
package org.kitodo.production.interfaces.activemq;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.kitodo.config.ConfigCore;
import org.kitodo.config.enums.ParameterCore;
/**
* The class ActiveMQDirector is the head of all Active MQ processors. It
* implements the ServletContextListener interface and is called automatically
* upon server startup. Its job is to connect to the Active MQ server and
* register the listeners configured.
*
* <p>
* The Active MQ services are intended to be run in case that
* {@code activeMQ.hostURL} is configured in the kitodo_config.properties file.
* To disable the service, the entry there should be commented out.
*
* <p>
* The class ActiveMQDirector also provides a basic ExceptionListener
* implementation as required for the connection.
*/
@WebListener
public class ActiveMQDirector implements Runnable, ServletContextListener {
private static final Logger logger = LogManager.getLogger(ActiveMQDirector.class);
// When implementing new services, add them to this list
private static Collection<? extends ActiveMQProcessor> services;
static {
services = Arrays.asList(new FinalizeStepProcessor());
}
private static Connection connection = null;
private static Session session = null;
private static MessageProducer resultsTopic;
/**
* The method is called by the web container on startup
* and is used to start up the active MQ connection. All processors from
* {@link #services} are registered.
*/
@Override
public void contextInitialized(ServletContextEvent initialisation) {
if (ConfigCore.getOptionalString(ParameterCore.ACTIVE_MQ_HOST_URL).isPresent()) {
Thread connectAsynchronously = new Thread(new ActiveMQDirector());
connectAsynchronously.setName(ActiveMQDirector.class.getSimpleName());
connectAsynchronously.setDaemon(true);
connectAsynchronously.start();
}
}
@Override
public void run() {
String activeMQHost = ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_HOST_URL);
logger.info("Connecting to Active MQ server: {}", activeMQHost);
session = connectToServer(activeMQHost);
if (Objects.nonNull(session)) {
registerListeners(services);
Optional<String> activeMQResultsTopic = ConfigCore.getOptionalString(ParameterCore.ACTIVE_MQ_RESULTS_TOPIC);
activeMQResultsTopic.ifPresent(topic -> resultsTopic = setUpReportChannel(topic));
}
logger.info("Connection to Active MQ server established.");
}
private Connection getConnectionFromActiveMQSslFactory(String server) throws Exception {
logger.trace("Using the ActiveMQSslConnectionFactory to establish a connection to \"" + server + "\"");
ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory(server);
factory.setKeyStore(ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_KEYSTORE));
factory.setKeyStorePassword(ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_KEYSTORE_PASSWORD));
factory.setTrustStore(ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_TRUSTSTORE));
factory.setTrustStorePassword(ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_TRUSTSTORE_PASSWORD));
if (ConfigCore.getBooleanParameter(ParameterCore.ACTIVE_MQ_USE_AUTH, false)) {
logger.trace("Using authentication on connection \"" + server + "\"");
factory.setUserName(ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_AUTH_USERNAME));
factory.setPassword(ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_AUTH_PASSWORD));
}
return factory.createConnection();
}
private Connection getConnectionFromActiveMQFactory(String server) throws JMSException {
logger.trace("Using the ActiveMQConnectionFactory to establish a connection to \"" + server + "\"");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(server);
if (ConfigCore.getBooleanParameter(ParameterCore.ACTIVE_MQ_USE_AUTH, false)) {
logger.trace("Using authentication on connection \"" + server + "\"");
factory.setUserName(ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_AUTH_USERNAME));
factory.setPassword(ConfigCore.getParameter(ParameterCore.ACTIVE_MQ_AUTH_PASSWORD));
}
return factory.createConnection();
}
/**
* Sets up a connection to an active MQ server. The connection object is
* global because it is needed later to shut down the connection.
*
* @param server
* should be “tcp://{host}:{port}” or “vm://localhost” in case
* that the server is run inside the same virtual machine
* @return the session object or “null” upon error
*/
private Session connectToServer(String server) {
try {
if (ConfigCore.getBooleanParameter(ParameterCore.ACTIVE_MQ_USE_SSL, false)) {
connection = getConnectionFromActiveMQSslFactory(server);
} else {
connection = getConnectionFromActiveMQFactory(server);
}
connection.start();
connection.setExceptionListener(exception -> logger.error(exception));
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
logger.fatal("Error connecting to ActiveMQ server, giving up.", e);
}
return null;
}
/**
* This method registers the listeners with the active MQ server.
*
* <p>
* If a queue name was configured for a service, a MessageConsumer is set up
* to listen on that queue and, in case of incoming messages, make the
* service process the message. The message checker is saved inside the
* service to be able to shut it down later.
*/
private void registerListeners(Collection<? extends ActiveMQProcessor> processors) {
for (ActiveMQProcessor processor : processors) {
if (Objects.nonNull(processor.getQueueName())) {
MessageConsumer messageConsumer;
try {
Destination queue = session.createQueue(processor.getQueueName());
messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(processor);
processor.setMessageConsumer(messageConsumer);
} catch (JMSException | RuntimeException e) {
logger.fatal("Error setting up monitoring for \"{}\": Giving up.", processor.getQueueName(), e);
}
}
}
}
/**
* This sets up a connection to the topic the results shall be written to.
* The delivery mode is set so “persistent” to allow consumers not online
* with the server in the moment of message submission to read the messages
* later. The log messages are set to be kept on the server for 7 days. This
* value can be overridden from the kitodo_config.properties parameter
* “activeMQ.results.timeToLive”. The time to live must be specified in
* milliseconds; 0 disables the oblivion. (See also:
* http://docs.oracle.com/javaee/6/api/javax/jms/MessageProducer.html#setTimeToLive%28long%29
* )
*
* @param topic
* name of the active MQ topic
* @return a MessageProducer object ready for writing or “null” on error
*/
private MessageProducer setUpReportChannel(String topic) {
MessageProducer reportChannel;
try {
Destination channel = session.createTopic(topic);
reportChannel = session.createProducer(channel);
reportChannel.setDeliveryMode(DeliveryMode.PERSISTENT);
reportChannel.setTimeToLive(ConfigCore.getLongParameterOrDefaultValue(ParameterCore.ACTIVE_MQ_RESULTS_TTL));
return reportChannel;
} catch (JMSException | RuntimeException e) {
logger.fatal("Error setting up report channel \"{}\": Giving up.", topic, e);
}
return null;
}
/**
* Any class that wants to create new Active MQ Messages needs read access
* to the session, since Active MQ messages don’t have a constructor.
*
* @return the session object
*/
public static Session getSession() {
return session;
}
/**
* Instances of WebServiceResult can be sent by calling their send() method.
* Therefore, they need read access on their topic.
*
* @return the resultsTopic object
*/
public static MessageProducer getResultsTopic() {
return resultsTopic;
}
/**
* The method contextDestroyed is called by the web container on shutdown.
* It shuts down all listeners, the session and last, the connection.
*/
@Override
public void contextDestroyed(ServletContextEvent destruction) {
// Shut down all message consumers on any queues
for (ActiveMQProcessor service : services) {
MessageConsumer messageConsumer = service.getMessageConsumer();
if (Objects.nonNull(messageConsumer)) {
try {
messageConsumer.close();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
}
// quit session
try {
if (Objects.nonNull(session)) {
session.close();
}
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
// shut down connection
try {
if (Objects.nonNull(connection)) {
connection.close();
}
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
}