I have a question regarding a specific use case we have: I have an API where a client can copy, delete of move messages from one queue to another.
For Copy and Deletion, I just use SEMPv2 API, but for the move, we did not find a better way than consuming the message, forward it to the destination queue and ack the original one.
Here is a simplified version of my moveMessages method:
public void moveMessages(String user, String pass, String queueName, List<String> replicationGroupMessageIds, String destinationQueue) {
final JCSMPSession session = createJcsmpSession(user, pass);
if (session == null) {
throw new CustomException("moveMessage - Error during session creation.");
}
try {
// We use the same session for all message move
session.connect();
for (String rgmId : replicationGroupMessageIds) {
var msg = getMessage(session, queueName, rgmId);
if (msg != null) {
forwardMessage(session, msg, destinationQueue);
msg.ackMessage();
}
}
} catch (JCSMPException e) {
LOGGER.error("an relevant error message");
throw new CustomException("moveMessage - Error during message forwarding. ", e);
} finally {
session.closeSession();
}
LOGGER.info("message successfully moved");
}
I can pass a list of multiple replicationGroupMessageId to move.
The createJcsmpSession() method creates the session using the JCSMPFactory.
The getMessage() method will retrieve the message (by its replicationId) using the queue browser with similar technique here: How to see information on each message in a queue
The forwardMessage method is like this:
private void forwardMessage(JCSMPSession session, BytesXMLMessage msg, String destinationQueueName) throws JCSMPException {
XMLMessage copy = JCSMPFactory.onlyInstance().createMessage(msg);
final Queue queue = JCSMPFactory.onlyInstance().createQueue(destinationQueueName);
XMLMessageProducer producer = session.getMessageProducer(
new JCSMPStreamingPublishCorrelatingEventHandler() {
@Override
public void responseReceivedEx(Object key) {
// Do nothing
LOGGER.info("Producer received response for msg: '{}'", key);
}
@Override
public void handleErrorEx(Object key, JCSMPException cause, long timestamp) {
LOGGER.error("Producer received error for msg: '{}@{}'", key, timestamp, cause);
}
});
try {
if (copy != null) {
copy.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(copy, queue);
LOGGER.info("forwardMessage() - Message forwarded with success");
copy.ackMessage();
}
} catch (JCSMPException jex) {
LOGGER.error("Exception raised when forwarding message!");
throw new CustomException("Error while forwarding message.", jex);
}
}
This way of doing the move is working well in our test environment but one of our client is reporting that sometimes the move “does not work” and they reported that the message is not in the destination and no more in the source queue (message lost) after a move (from a queue with only few messages and moving only one or two messages)
In the logs we see that the code above behave successfully and we see “message successfully moved” and we suspect that the message has been directly consumed by the consumer listening the destination queue.
In the APM telemetry of Solace, we cannot see the message directly pushed in a queue, and then we do not find evidence of the issue.
Do you see something wrong with the above code? Is the fact that doing a copy can generates issue?
I tried first not to create a copy, but the message was impossible to move.