Move message from error queue to original queue for reprocessing

I have a question regarding a specific use case we have: I have an API where a client can copy, delete of move messages from one queue to another.

For Copy and Deletion, I just use SEMPv2 API, but for the move, we did not find a better way than consuming the message, forward it to the destination queue and ack the original one.

Here is a simplified version of my moveMessages method:

public void moveMessages(String user, String pass, String queueName, List<String> replicationGroupMessageIds, String destinationQueue) {
	final JCSMPSession session = createJcsmpSession(user, pass);
	if (session == null) {
		throw new CustomException("moveMessage - Error during session creation.");
	}

	try {
		// We use the same session for all message move
		session.connect();
		for (String rgmId : replicationGroupMessageIds) {
			var msg = getMessage(session, queueName, rgmId);
			if (msg != null) {
				forwardMessage(session, msg, destinationQueue);
				msg.ackMessage();
			}
		}
	} catch (JCSMPException e) {
		LOGGER.error("an relevant error message");
		throw new CustomException("moveMessage - Error during message forwarding. ", e);
	} finally {
		session.closeSession();
	}
	LOGGER.info("message successfully moved");
}

I can pass a list of multiple replicationGroupMessageId to move.
The createJcsmpSession() method creates the session using the JCSMPFactory.
The getMessage() method will retrieve the message (by its replicationId) using the queue browser with similar technique here: How to see information on each message in a queue

The forwardMessage method is like this:

private void forwardMessage(JCSMPSession session, BytesXMLMessage msg, String destinationQueueName) throws JCSMPException {
	XMLMessage copy = JCSMPFactory.onlyInstance().createMessage(msg);
	final Queue queue = JCSMPFactory.onlyInstance().createQueue(destinationQueueName);
	XMLMessageProducer producer = session.getMessageProducer(
			new JCSMPStreamingPublishCorrelatingEventHandler() {
				@Override
				public void responseReceivedEx(Object key) {
					// Do nothing
					LOGGER.info("Producer received response for msg: '{}'", key);
				}

				@Override
				public void handleErrorEx(Object key, JCSMPException cause, long timestamp) {
					LOGGER.error("Producer received error for msg: '{}@{}'", key, timestamp, cause);
				}
			});

	try {
		if (copy != null) {
			copy.setDeliveryMode(DeliveryMode.PERSISTENT);
			producer.send(copy, queue);
			LOGGER.info("forwardMessage() - Message forwarded with success");
			copy.ackMessage();
		}
	} catch (JCSMPException jex) {
		LOGGER.error("Exception raised when forwarding message!");
		throw new CustomException("Error while forwarding message.", jex);
	}
}

This way of doing the move is working well in our test environment but one of our client is reporting that sometimes the move “does not work” and they reported that the message is not in the destination and no more in the source queue (message lost) after a move (from a queue with only few messages and moving only one or two messages)

In the logs we see that the code above behave successfully and we see “message successfully moved” and we suspect that the message has been directly consumed by the consumer listening the destination queue.

In the APM telemetry of Solace, we cannot see the message directly pushed in a queue, and then we do not find evidence of the issue.

Do you see something wrong with the above code? Is the fact that doing a copy can generates issue?
I tried first not to create a copy, but the message was impossible to move.

Hi there, I’m Solly AI, I generate responses to your questions using the power of the Solace Agent Mesh:

Please remember that AI technology can make mistakes so ensure that you read our Disclaimer.

AI Generated responses are a work in progress, your help in evaluating my response would be awesome! If my response makes sense, please mark it as a solution or give a :+1:. If my response doesn’t make sense for your topic, please react to it with a :shaking_face:.

The documentation available doesn’t directly address the specific scenario of using a queue browser with replicationGroupMessageId to implement a moveMessages method in JCSMP, so I can’t pinpoint the exact root cause. However, there are some relevant considerations from the JCSMP documentation that are worth highlighting:

Regarding message loss after publishing to the destination queue:

The documentation notes that when a Guaranteed message published in a transaction cannot be successfully spooled to a destination endpoint at commit time, it can be silently discarded if the endpoint’s reject-msg-to-sender-on-discard property is not enabled. By default, this property is enabled for queues, but it’s worth verifying. If the destination queue is full, shut down, or otherwise unable to accept the message, it could be discarded without an obvious error — meaning the message leaves the source (after ack) but never lands in the destination.

Regarding the “Do Not Cache XMLMessages” best practice:

The documentation explicitly warns:

“The application should not cache or reuse messages because the Solace JCSMP API may automatically recycle the messages.”

If you are browsing a message and then forwarding that same message object (or a reference to it) rather than constructing a fresh outbound message, the API may recycle it before or during the send, which could result in message loss or corruption.

Regarding the race condition concern:

A consumer on the destination queue consuming the message before your logic completes is normal, expected behavior — that wouldn’t cause message loss, just normal consumption. The more likely culprit is one of the above.

A strongly recommended approach is to use local transactions in JCSMP, which allow you to atomically consume from the source and publish to the destination in a single commit. This eliminates the window between the publish and the ack where a failure could cause loss. See Publishing Messages in Transactions in the Solace JCSMP API for details on how transactions work.

For general JCSMP best practices around message handling, acknowledgment, and flow configuration, see Best Practices in the Solace JCSMP API.

Sources: