Thursday, August 14, 2014

FileSystemWatcher Improvements

A few months ago, I needed the capability to receive files, process them, and send the processed data to another part of the application. We send information around to various parts of our application by a queuing mechanism. This could be implemented with MSMQ (which is how we did it in version 1 of our app) or by mimicking a queue in database tables (version 2). The queuing mechanism isn't important to this post though, so I won't go into any detail about that.

For my first cut of a FileListener class, I simply used the System.IO.FileSystemWatcher class directly in my code. The code handles the Created event and sends only the filename of the file through to the queuing process in that event handler. The queuing process then pops the filename off the queue, opens and reads the file (simply by calling the File.ReadAllText() method), munges the data from the file and sends the data off to another queue processor. Here's a working example of that, with the queuing process not fleshed out at all, but supplemented by Console.WriteLine()'s so you can follow the progress:

public class MyFileListener
{
    protected FileSystemWatcher Watcher = null;
    protected string FilePath = @"F:\CompletePathToDirectory";
    protected bool ErrorProcessingFileName = true;


    // simulating queuing mechanism with a fake QueuingClass
    QueuingClass QueueManager;


    public MyFileListener()
    {
        this.QueueManager = new QueuingClass();
        this.Watcher = new FileSystemWatcher();
        this.Watcher .Path = this. FilePath;
        this.Watcher .EnableRaisingEvents = true;
        this.Watcher .Created += new FileSystemEventHandler(Watcher_Created);
        this.Watcher .Error += new ErrorEventHandler(Watcher_Error);
        Console.WriteLine( "Waiting for files to be deposited in {0}" , this.FilePath);
    }
    private void Watcher_Created(object sender, FileSystemEventArgs e)
    {
        using (TransactionScope scope = new TransactionScope(TransactionScopeOption .Required, new TransactionOptions() { IsolationLevel = IsolationLevel .ReadCommitted }))
        {
            try
            {
                this.CreateAndSendMessage(e .FullPath);
                scope .Complete();
            }
            catch (Exception ex)
            {
                this.ErrorProcessingFileName = true ;
            }
        }
    }
    private void Watcher_Error(object sender, ErrorEventArgs e)
    {
        this.ErrorProcessingFileName = true ;
    }


    private void CreateAndSendMessage(string filename)
    {
        Console.WriteLine( "Sending FileName {0} To Queue" , filename);
        // put code here to send the file to your queuing mechanism
        // for demo purposes, I'm going to simulate sending to a queue
        this.QueueManager .SendMessageToQueue(filename);
    }
}
public class QueuingClass
{
    private string Message;


    public void SendMessageToQueue(string message)
    {
        this.Message = message;
        this.PopMessageOffQueue();
    }
    protected void PopMessageOffQueue()
    {
        try
        {
            string filename = this. Message;
            if (File .Exists(filename))
            {
                string fileData = File. ReadAllText(filename);


                this.ProcessFileDataAndSend(fileData);


                if (File .Exists(filename))
                    File.Delete(filename);
            }
        }
        catch (FileNotFoundException ex)
        {
            // we can go ahead and eat this exception ... it means the file was processed and/or deleted in another thread
            Console.WriteLine( "File Already Removed: " + ex.FileName);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex .Message);
        }
    }
    protected void ProcessFileDataAndSend(string FileData)
    {
        Console.WriteLine( "Here is the data retrieved from file {0}:\r\n{1}" , this.Message, FileData);
        // This is where you'd process the data from the file
        // and send the processed data off to another queue
        // which I won't bother simulating here.
    }
}

All you have to do to use this MyFileListener class is create an instance of it. Then drop a file (or multiple files) into the directory you've specified and watch the fun. I've just used a Console application to run this ... you can do it however you want to, obviously.

private void TestFileListener()
{
    MyFileListener Listener = new MyFileListener();
}

But, there's a problem with using the Created event when you're attempting to process a large file. The Created event is fired when the file first starts to get copied to the directory you're listening to, not when the file has finished being copied. By attempting to read the file as soon as the Created event is raised, problems arose with blocking file access (the system sending the files couldn't complete the sending of the files, because presumably my trying to read it at the same caused file access issues). I even had the built-in delay of the queuing process (since the file isn't actually being read until it is "popped" off the queue).This ended up working OK for small files (there was enough of a delay), but not larger files. So, I needed to come up with a better idea.

So, I created my own class, sub-classed from FileSystemWatcher and used that instead. The difference being that I create a new event that I call FileReady and fire it when I know the file  copy has completed. How do I know when that happens? In a try/catch, I try to do a File.Open with FileShare set to None. If I can't open the file, then I know it's not done being copied yet and I keep trying until I can open it (at which point I fire the FileReady event). Here's the code for this new class:

public class MyFileWatcher : FileSystemWatcher
{
    public event FileSystemEventHandler FileReady;


    public MyFileWatcher()
    {
        this.Created += new FileSystemEventHandler(Watcher_Created);
    }
    private void Watcher_Created(object sender, FileSystemEventArgs e)
    {
        System.Threading.ThreadPool.QueueUserWorkItem((n) => { WaitForFileReady(e); });
    }
    private void WaitForFileReady(FileSystemEventArgs e)
    {
        while (true)
        {
            try
            {
                using (FileStream fs = File.Open(e.FullPath, FileMode.Open, FileAccess.ReadWrite, FileShare.None))
                {
                    break;
                }
            }
            catch (FileNotFoundException ex)
            {
                return;
            }
            catch (Exception ex)
            {
                System.Threading.Thread.Sleep(100);
            }
        }
        OnFileReady(e);
    }
    protected virtual void OnFileReady(FileSystemEventArgs e)
    {
        if (this.EnableRaisingEvents && FileReady != null)
            FileReady(this, e);
    }
}

In order to utilize this in the MyFileListener class, simply replace the FileSystemWatcher with the MyFileWatcher and handle the FileReady event instead of the Created event (the code inside the event handler will be exactly the same). Here are the changes to make:

// the declaration
protected MyFileWatcher Watcher = null;
// the MyFileListener constructor
this.Watcher = new MyFileWatcher();
this.Watcher.FileReady += new FileSystemEventHandler(Watcher_FileReady);
// the new event handler name
private void Watcher_FileReady(object sender, FileSystemEventArgs e)

I hope this post helps you successfully implement your next application's needs … Happy Coding!

2 comments:

  1. If you copy and drop two files in the watcher directory the process failed to fire "ready" event and to doesn't process (delete) second file.

    ReplyDelete
    Replies
    1. I don't know what to tell you ... I tested again to be sure it was still working after all these years. I dropped 3 files, 2 small ones with a couple of lines of text each (1KB each), and 1 larger one with a lot of XML text in it, about 1MB. All 3 are processed completely each time. What's your operating system and what version of Visual Studio are you using?

      Delete