Skip to content

Instantly share code, notes, and snippets.

@jlyman
Created September 26, 2012 21:23
Show Gist options
  • Save jlyman/3790668 to your computer and use it in GitHub Desktop.
Save jlyman/3790668 to your computer and use it in GitHub Desktop.
MassTransit refusing to work with errored out messages are moved back to the regular queue.
class Program
{
public class YourMessage { public string Text { get; set; } }
static void Main(string[] args)
{
string newMessageId = "";
Bus.Initialize(sbc =>
{
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/test_queue");
sbc.SetDefaultRetryLimit(2);
sbc.Subscribe(subs =>
{
subs.Handler<YourMessage>((cxt, msg) =>
{
newMessageId = cxt.MessageId;
Console.WriteLine("!!! Throwing exception"); // SHOULD SEE THIS IN CONSOLE EVERY TIME THE CONSUMER IS TRIGGERED, AND THE MESSAGE IS PROCESSED. NOT SEEING THIS AFTER THE RETRY COUNT HAS BEEN EXCEEDED AND THE MESSAGE PLACED IN THE ERROR QUEUE, EVEN AFTER MOVING THE MESSAGE BACK TO THE ORIGINAL QUEUE.
throw new Exception("Bombing out intentionally.");
});
});
});
// Publish message
Bus.Instance.Publish(new YourMessage { Text = "Hi" });
// Retry failed message
while (Console.ReadKey().KeyChar != 'q')
{
Console.WriteLine("Requeuing failed messages...");
MessageQueue queue = new MessageQueue();
queue.ReturnMessageToSourceQueue(newMessageId);
}
Console.WriteLine("--- Finished ---");
}
}
/// <summary>
/// Represents a service bus message.
/// </summary>
public class MessageDetails
{
//Constants
private const string MessageTypePrefix = "urn:message:";
private const string MessageTypeNamespace = "MyApp.Messages.";
//Properties
/// <summary>
/// The full body of the message.
/// </summary>
public string Body { get; protected set; }
/// <summary>
/// The identifier of the message.
/// </summary>
public string Id { get; protected set; }
/// <summary>
/// The type name of the message if available; otherwise <c>null</c>.
/// </summary>
public string TypeName { get; protected set; }
/// <summary>
/// The name of the queue where the original message failed to be processed before failing.
/// Null if it did not fail out of something (ie, is a good, normal message).
/// </summary>
public string OriginalQueueName { get; protected set; }
/// <summary>
/// The name of the queue this message is currently in.
/// </summary>
public string QueueName { get; protected set; }
//Constructors
/// <summary>
/// Constructs a new <see cref="MessageDetails"/> based on a raw RabbitMQ message.
/// </summary>
/// <param name="getResult">A raw RabbitMQ message.</param>
/// <param name="queueName">The name of the queue from which the message came.</param>
public MessageDetails(BasicGetResult getResult, string queueName)
{
this.QueueName = queueName;
this.Id = getResult.BasicProperties.MessageId;
this.Body = "UNABLE TO PARSE XML";
if (queueName.Contains("_error")) this.OriginalQueueName = queueName.Replace("_error", "");
try
{
XDocument xml = XDocument.Parse(Encoding.UTF8.GetString(getResult.Body));
this.Body = xml.Root.Element("message").ToString();
this.TypeName = xml.Root.Element("messageType").Value;
this.TypeName = this.TypeName.Replace(MessageDetails.MessageTypePrefix, "");
this.TypeName = this.TypeName.Replace(MessageDetails.MessageTypeNamespace, "");
}
catch (Exception ex)
{
Console.WriteLine("Message body is not valid XML (ID=" + this.Id + ").", ex);
}
}
}
public class MessageQueue : IDisposable
{
// Fields
protected string queueName = "test_queue_error";
protected const string MessageRootNode = "message";
protected IModel channel;
//Constructors
/// <summary>
/// Creates a new <see cref="MessageQueue"/>.
/// </summary>
public MessageQueue()
{
ConnectionFactory factory = new ConnectionFactory();
IConnection connection = factory.CreateConnection();
this.channel = connection.CreateModel();
connection.AutoClose = true;
}
//Methods
/// <summary>
/// <see cref="IDisposable"/>
/// </summary>
public void Dispose()
{
try
{
channel.Close();
}
catch (Exception ex)
{
Console.WriteLine("Unable to close channel to RabbitMQ", ex);
}
}
protected BasicGetResult[] PeekMessages(string queueName)
{
List<BasicGetResult> peekedMessages = new List<BasicGetResult>();
try
{
// Ensure queue is there
channel.QueueDeclare(queueName, true, false, false, null);
BasicGetResult getResult;
while ((getResult = channel.BasicGet(queueName, false)) != null)
{
peekedMessages.Add(getResult);
}
}
catch (OperationInterruptedException ex)
{
Console.WriteLine("Can't peek messages in queue " + queueName, ex);
}
return peekedMessages.ToArray();
}
protected BasicGetResult GetById(string messageId, string queueName)
{
BasicGetResult message = null;
BasicGetResult[] peekedMessages = this.PeekMessages(queueName);
foreach (BasicGetResult peekedMessage in peekedMessages)
{
if (peekedMessage.BasicProperties.MessageId == messageId)
{
message = peekedMessage;
channel.BasicAck(peekedMessage.DeliveryTag, false);
}
else
{
channel.BasicNack(peekedMessage.DeliveryTag, false, true);
}
}
return message;
}
/// <summary>
/// Returns the error message identified by <paramref name="messageId"/> to the source
/// queue where the failure happened.
/// </summary>
/// <param name="messageId">The identifier of the error message.</param>
/// <remarks>May throw a timeout exception if a message with the given id cannot be found.</remarks>
public void ReturnMessageToSourceQueue(string messageId)
{
if (string.IsNullOrWhiteSpace(messageId))
{
throw new ArgumentNullException("messageId");
}
Console.WriteLine("Returning message id (" + messageId + ") to source queue...");
BasicGetResult message = GetById(messageId, queueName);
this.channel.BasicPublish(queueName.Replace("_error", ""), "", message.BasicProperties, message.Body);
Console.WriteLine("Returned message id (" + messageId + ") to source queue.");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment