Friday, September 29, 2017

TransactionScope and Database Queues

Are you familiar with using a database as a data queue? Basically, you have a table that acts as a queue ... you pop a row out of the table and use the information in the row to do some processing. You delete the row from the table after you're done with the processing. This can be done one row at a time, or the table can be accessed from multiple threads simultaneously (which will obviously speed up your queue processing). Of course, using multiple threads would depend on whether or not the order that you process the data is important .

We have a server-side application (multi-threaded and runs as a Windows Service) that uses this queue processing mechanism quite extensively. The main use is to receive messages from other servers (usually via http, but could also be via tcp) and then immediately write the message to a queue database table for incoming messages. Then, on other threads, we pop the data off the queue and call various methods to process it.  Another way we use queuing is in an opposite operation, where during our processing of data, we determine that we need to send data out to another server. Rather than have the method that's doing the processing also handle sending it out to another server (which could compromise the method's processing speed), the method simply writes it to another queue database table used for outgoing messages.

Here is some simple code for getting the data from your database queue table ... let's call the table "incoming". The actual data is contained in a class, let's call it the MyMessage class, that is serialized and stored in a column in the incoming table, let's call the column "message" and define it as varchar(max). Typically the class that contains the following code, will run it in a loop, possibly with multiple threads, I didn't include that part of the code:

// see my blog post for this static Utils method: https://geek-goddess-bonnie.blogspot.com/2010/12/transactionscope-and-sqlserver.html
using (TransactionScope scope = Utils.GetTransactionScope())
{
MyMessage message = null;
string query;
try
{
// see my blog for several posts about DataAccess classes
// https://geek-goddess-bonnie.blogspot.com/search?q=DataAccess+class
using (DataAccess da = new DataAccess(this.MyConnectionString))
{
// In my real application, I use Typed DataSets, but a plain DataTable is fine for this blog post
DataTable dt = this.GetMessageFromQueue(da);

// Look for a row in the dataset
// This is a TOP 1 query and will only return a single row
// No results means there's nothing "on this queue"
if (dt.Rows.Count > 0)
{
// we can delete it right away, it won't actually be deleted
// until the scope.Complete() executes
this.RemoveMessageFromQueue(dt, da);
// Your MyMessage class should be marked [Serializable] and will
// need static methods to serialize (GetString) and deserialze (GetObject)
message = MyMessage.GetObject(dt.Rows[0]["message"]);
// Send the message to the ProcessMessage() method, which gets overridden in derived classes
if (this.ProcessMessage(message))
scope.Complete();

this.Found = true;
}
else
{
// In my real application, as I mentioned, I run this code in a loop, possibly with multiple threads.
// Don't sleep the thread if the last time returned a result, but otherwise sleep to keep from spinning endlessly
if (this.Found == true)
this.Found = false;
else
Thread.Sleep(100);
}
}
}
}
// the da.commands below are part of the sample DataAccess class described
// in Part 2 and Part 3 of my blog post series
protected virtual DataTable GetMessageFromQueue(DataAccess da)
{
// although I typically use Stored Procs, for this example, I am not.
da.IsStoredProc = false;

DataTable dt = new DataTable();
da.ClearParameters();
query = "SELECT TOP 1 incomingkey, message FROM incoming " +
"WITH (UPDLOCK, READPAST) ORDER BY 1"
da.FillData(dt, query );
return dt;
}
protected virtual void RemoveMessageFromQueue(DataTable dt, DataAccess da)
{
da.ClearParameters();
da.AddParm("@incomingkey", dt.Rows[0]["incomingkey"]);
da.ExecuteCommand("DELETE FROM incoming WHERE incomingkey = @incomingkey");
}
// This must be overridden in the sub-class
protected virtual bool ProcessMessage(MyMessage message)
{
throw new NotImplementedException();
}

Notice the virtual ProcessMessage() method. Different sub-classes of this class could implement anything thing they want in this overridden method. That could mean using another TransactionScope (or not) ... and if you *do* use another TransactionScope, you have the option of having it participate in the current Transaction (by using TransactionScope.Required), or not (by using TransactionScope.RequiresNew or TransactionScope.Suppress).

Now, on to the part where TransactionScope becomes really handy. In most cases, we'll want whatever we do in the ProcessMessage() method to participate in that active Transaction. If there are problems in the ProcessMessage, we return false and everything gets "undone", because we don't execute the scope.Complete() ... that means the Message is *not* removed from the queue and is available to be popped off and tried again (you may need to implement a way to only retry a certain number of times before logging an error and returning true).

There is something to be aware of in all of this ... and that is elevated Transactions. If you are creating any database connections in your ProcessMessage() method (which is quite likely and we do all the time in our application), than the Transaction will be elevated and you will need to be sure you have enabled Distributed Transactions. It's not too hard to do. I had originally posted a link here to another person's blog about how to configure MSDTC, but I discovered 3 years after writing this, that that post got hijacked or disappeared. Luckily, 3 years later, I have been able to recreate that post in my own blog and have updated this to reflect the new link:

https://geek-goddess-bonnie.blogspot.com/2020/10/configure-msdtc-for-distributed.html

I think that this is enough to get you started with this concept. If you have any questions about any of this, please leave me a comment.

Happy Coding!  =0)