Friday 27 May 2011

Results of runing a simplified earthquake model in the cloud

Here are some preliminary results of how long it takes to upload and run 10000 iterations of a cat model under various worker role configurations

Extra Large with 8 cores took 26s
Large with 4 cores took 34s
Medium with 2 cores took 49s
Small with 1 core took 82s

For comparison here are some run times on a D20 desktop

VB.NET on a D20 compiled with VS2010 running on 1 thread  took  104s
F90 on a D20 compiled with Intel running on 1 thread took 162s
C++ on a D20 compiled with Intel with optimization on 1 thread took 58 s

The time needed to upload exposure and events took about 100s Within the cloud it took about 1s to download from blob storage

The initial flat file text files had the following sizes and number of points

Exposure file 7.3MB    385560 Points
Event Catalogue 4.8MB  194510 Events

To reduce the run times we reduced the number of events from 194510 to 10000. So this means the actual run times are 20 times longer.

I have not received the bill for these tests but I think it is probably quite cheap. I think there is a lot to be said for cloud computing because I only had to worry about my code, the infrastructure was completely abstracted away. In fact setting up a new node takes about 8 minutes with a very friendly billing terms that you pay only for the time that the role is active.

Numerical calculations in Azure continued

After some help from Steve Spencer I figured out the best way to debug the WorkerRole was not to do the whole thing in unit tests but to pressed the play button on the WorkerRole and use a unit test to feed it with data. It turned out the problem was within the blob storage code. I still have some work to refactor the worker role but I want to get some results so I am going to leave it as it is for the moment:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.StorageClient;
using AzureHelper;
using System.Configuration;
using System.IO;
using System.Text;
using CatModel;
using Contracts;
using Entities;
using Infrastructure;

namespace WorkerRole1
{
    public class WorkerRole : RoleEntryPoint
    {
        private String localPath;
        private String calcsimExePath;

        private EQModel eQModel { get; set; }

        private DateTime Start { get; set; }
        private DateTime Finish { get; set; }
        private TimeSpan Stopwatch { get; set; }
        public ILog Log { set; get; }

        private AzQueueHandler jobInputQueue;
        private AzQueueHandler jobOutputQueue;
        private AzQueueHandler jobLogQueue;

        private AzBlobHandler jobInputBlog;

        private AzBlobHandler dataStoreExposureX;
        private AzBlobHandler dataStoreExposureY;
        private AzBlobHandler dataStoreExposureV;

        private AzBlobHandler dataStoreCatalogSm;
        private AzBlobHandler dataStoreCatalogSx;
        private AzBlobHandler dataStoreCatalogSy;
        private AzBlobHandler dataStoreCatalogSd;

        private AzBlobHandler jobLossBlob;
        private AzBlobHandler jobLogBlob;
        private AzBlobHandler jobOutputBlog;

        private int idleCount;
        private int idleMax;
        private int idleSleepTime;

        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections
            ServicePointManager.DefaultConnectionLimit = 12;

            // For information on handling configuration changes
            // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

            return base.OnStart();
        }

        private void Init()
        {
            AppDomain.CurrentDomain.UnhandledException += new UnhandledExceptionEventHandler(CurrentDomain_UnhandledException);
            idleCount = 0; // how many times the instance have been idle
            idleMax = 10;   // After 10 times (5 minutes = 30 secs *10) being idle, then die
            idleSleepTime = 30 * 1000; // sleep for this number of seconds between each queue poll

            var accountName = ConfigurationManager.AppSettings["AzureAccountName"];
            var accountKey = ConfigurationManager.AppSettings["AzureAccountKey"];

            jobInputQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobInputQueue"], accountName, accountKey);
            jobOutputQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobOutputQueue"], accountName, accountKey);
            jobLogQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobLogQueue"], accountName, accountKey);
            jobInputBlog = new AzBlobHandler(ConfigurationManager.AppSettings["JobInputBlob"], accountName, accountKey);

            jobLogBlob = new AzBlobHandler(ConfigurationManager.AppSettings["jobLogBlob"], accountName, accountKey);
            dataStoreExposureX = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureX"], accountName, accountKey);
            dataStoreExposureY = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureY"], accountName, accountKey);
            dataStoreExposureV = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureV"], accountName, accountKey);

            dataStoreCatalogSm = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSm"], accountName, accountKey);
            dataStoreCatalogSx = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSx"], accountName, accountKey);
            dataStoreCatalogSy = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSy"], accountName, accountKey);
            dataStoreCatalogSd = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSd"], accountName, accountKey);

            jobLossBlob = new AzBlobHandler(ConfigurationManager.AppSettings["JobLossBlob"], accountName, accountKey);
            jobOutputBlog = new AzBlobHandler(ConfigurationManager.AppSettings["JobOutputBlob"], accountName, accountKey);

            eQModel = new EQModel();
            this.Log = new Log("WorkerRole");

            localPath = Environment.CurrentDirectory;

            // pull EXE file from blob storage to local file system
            calcsimExePath = System.IO.Path.Combine(localPath, "Startup\\calcsim.exe");
        }

        public override void Run()
        {
            // This is a sample worker implementation. Replace with your logic.
            Trace.WriteLine("WorkerRole1 entry point called", "Information");
            Init();

            TraceInfo("AzJobHost::Run() Azure Instance ID {0}, DeploymentId {1}", RoleEnvironment.CurrentRoleInstance.Id, RoleEnvironment.DeploymentId);

            bool more = true;
            //Stopwatch swWrkRoleLifetime = Stopwatch.StartNew();

            // msg pump loop
            string id = ""; string popId = "";
            while (more)
            {
                id = ""; popId = "";
                string msg = jobInputQueue.GetMessage(ref id, ref popId, false);

                if (msg == null)
                {
                    idleCount++;
                    if (idleCount >= idleMax)
                        more = false;
                    else Thread.Sleep(idleSleepTime);
                }
                else
                {
                    ProcessMsg(id, popId, msg);
                }
            }

            //swWrkRoleLifetime.Stop();
            //TraceInfo("AzJobHost::Exit(). Execution time {0}", swWrkRoleLifetime.Elapsed);
        }

        private bool ProcessMsg(string id, string popId, string msg)
        {
            bool rc = true;

            //Stopwatch sw = Stopwatch.StartNew();
            TraceInfo("AzJobHost::ProcessMsg( '{0}', '{1}') - Azure Instance Id: {2}", id, msg, RoleEnvironment.CurrentRoleInstance.Id);

            List<double> x = new List<double>();
            List<double> y = new List<double>();
            List<double> v = new List<double>();
            List<double> sm = new List<double>();
            List<double> sx = new List<double>();
            List<double> sy = new List<double>();
            List<double> sd = new List<double>();

            Start = DateTime.Now;
            dataStoreExposureX.RecieveDataFromStorage<double>(msg, out x);
            dataStoreExposureY.RecieveDataFromStorage<double>(msg, out y);
            dataStoreExposureV.RecieveDataFromStorage<double>(msg, out v);
            Finish = DateTime.Now;
            Stopwatch = Finish.Subtract(Start);
            this.Log.LogMessage(String.Format("Time to Upload Exposure to model {0} milliseconds",  Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);

            Start = DateTime.Now;
            dataStoreCatalogSm.RecieveDataFromStorage<double>(msg, out sm);
            dataStoreCatalogSx.RecieveDataFromStorage<double>(msg, out sx);
            dataStoreCatalogSy.RecieveDataFromStorage<double>(msg, out sy);
            dataStoreCatalogSd.RecieveDataFromStorage<double>(msg, out sd);

            Finish = DateTime.Now;
            Stopwatch = Finish.Subtract(Start);
            this.Log.LogMessage(String.Format("Time to Upload Catalog to model {0} milliseconds", Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);

            List<double> losses = new List<double>();

            Start = DateTime.Now;
            eQModel.RunModel(x, y, v, sm, sx, sy, sd, out losses);
            Finish = DateTime.Now;
            Stopwatch = Finish.Subtract(Start);
            this.Log.LogMessage(String.Format("Time to Run Japan Earthquake {0} milliseconds", Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);

            this.Log.LogMessage("=== Appending Model Log to Server Log ====");

            List<LogEvent> ModelLogs = eQModel.log.GetLogs();
            this.Log.Add(ModelLogs);

            jobLossBlob.SendDataToStorage<double>(msg, losses);
            jobLogBlob.SendDataToStorage<LogEvent>(msg, this.Log.GetLogs());

            jobOutputQueue.PutMessage(msg);

            jobInputQueue.DeleteMessage(id, popId);
            return rc;
        }

        private void TraceInfo(string format, params object[] args)
        {
            string msg = string.Format(format, args);
            Trace.WriteLine(msg, "Information");
        }

        private void RoleEnvironmentChanging(object sender, RoleEnvironmentChangingEventArgs e)
        {
            // If a configuration setting is changing
            if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange))
            {
                // Set e.Cancel to true to restart this role instance
                e.Cancel = true;
            }
        }

        void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
        {
            TraceInfo("AzJobHost::UnhandledException: " + (Exception)e.ExceptionObject);
            RoleEnvironment.RequestRecycle();
        }

    }
}

Thursday 26 May 2011

Numerical calculations in Azure

Here are the steps that I took in looking at Azure and getting a feel for how we could use this. Recently we have made a number of bench mark tests with a simplified Japan earthquake model. This seems a natural place to start. The idea would be to utilize the compute power of the cloud to make earthquake simulations. Here some pseudo code as to what I intend to do

1.    The client loads exposure into cloud storage
2.    The client loads earthquake catalog into cloud storage
3.    The client adds a reference to the exposure and earthquake catalogue data to a queue
4.    The worker node listens on the input queue
5.    The worker node dequeues a reference to data for download
6.    The worker node downloads data from cloud storage
7.    The worker node processes the data
8.    The worker node saves loss data using a reference
9.    The worker node adds a reference to the loss data to a queue
10.    The client listens on the output queue
11.    The client dequeues the reference to result data
12.    The client downloads results and logs

Looking at this sequence the first thing I want to do is to be able to make CRUD operations on Queues and blobs. I found some good examples how this could be done, but I wanted to extend these examples to be more generic. Here's a handy generic function that sends lists of serializable objects to and from blob storage.

        public void RecieveDataFromStorage<T>(string key, out List<T> data)
        {
                CloudBlobContainer container = _blobClient.GetContainerReference(_blobContainer.Name);
                CloudBlob blob = container.GetBlobReference( key);
                byte[] bdata = blob.DownloadByteArray();

                MemoryStream f = new MemoryStream();
                BinaryFormatter sf = new BinaryFormatter();
                f.Write(bdata, 0, bdata.Length);
                f.Flush();
                List<T> target = new List<T>();
                f.Position = 0;

                target = (List<T>)sf.Deserialize(f); 
                data = target;

            return;
        }

        public void SendDataToStorage<T>(string key, List<T> data)
        {
                CloudBlobContainer container = _blobClient.GetContainerReference(_blobContainer.Name);
                CloudBlob blob = container.GetBlobReference(key);

                MemoryStream f = new MemoryStream();
                BinaryFormatter sf = new BinaryFormatter();
                sf.Serialize(f, data);
                f.Position = 0;
                byte[] bdata = f.ToArray();

                blob.UploadByteArray(bdata);
            return;
        }

Where T can be any serializable entity. With these functions we have an easy way to transfer data too and from the cloud storage.

Next step is that we want to do some calculations. The simplified japan earthquake algorithm is single threaded. In order to make use of various number of processors we need to make this multi core enabled. Normally when making an algorithm multicore enabled you need to first really understand what is the algorithm trying to do. Then the next step is to think still in the problem domain how to split the work, fore example by data or by process. Then this is reviewed in the context of the hardware and technology available. Finally you would code applying patterns that do things like take account of cache invalidation etc. In our case we just want to get a feeling for what various CPU configurations can bring. Therefore I just made the inner loop parallel. The algorithm falls under the cataegory of "embarrasingly parrallizable code" because the loops are allmost completely independent from each other, only the line highlighted in bold has some cross dependancies. The trick is how to sum the losses. Below is the code before and after…

            for (i = 1; i <= ns; i++)
            {
                loss[i] = 0;

                if ((sm[i] > 3))
                {
                    dkrit = 0.5 * sm[i] * sm[i] * sm[i];

                    for (j = 1; j <= nr; j++)
                    {
                        rr[j] = 0.001 * Math.Sqrt((sx[i] - rx[j]) * (sx[i] - rx[j]) + (sy[i] - ry[j]) * (sy[i] - ry[j]));
                        rr[j] = Math.Sqrt(sd[i] * sd[i] + rr[j] * rr[j]);

                        if ((rr[j] < dkrit))
                        {
                            rlog[j] = Math.Log(rr[j]);
                            mmi[j] = 981 * Math.Exp(c1 + c2 * sm[i] + c3 * sm[i] * sm[i] + c4 * rlog[j] + c5 * rr[j]);
                            if ((mmi[j] > 50))
                            {
                                mmi[j] = 3.66 * Math.Log10(mmi[j]) - 1.66;
                                lmmi = Math.Log(mmi[j]);
                                mdr[j] = 0.01 * Math.Exp(v1 * lmmi * lmmi + v2 * lmmi + v3);
                                loss[i] = loss[i] + mdr[j] * value[j]; // <----------
                            }
                        }
                    }
                }

                   Parallel.For(1, nr,
                        (jj) => {
                                    //rr[jj] = 0.001 * Math.Sqrt((sx[i] - rx[jj]) * (sx[i] - rx[jj]) + (sy[i] - ry[jj]) * (sy[i] - ry[jj]));
                                    double rr = 0.001 * Math.Sqrt((sx[i] - rx[jj]) * (sx[i] - rx[jj]) + (sy[i] - ry[jj]) * (sy[i] - ry[jj]));
                                    // rr[jj] = Math.Sqrt(sd[i] * sd[i] + rr[jj] * rr[jj]);
                                    rr = Math.Sqrt(sd[i] * sd[i] + rr * rr);;

                                    if ((rr < dkrit))
                                    {
                                        //rlog[jj] = Math.Log(rr[jj]);
                                        double rlog = Math.Log(rr);
                                        //mmi[jj] = 981 * Math.Exp(c1 + c2 * sm[i] + c3 * sm[i] * sm[i] + c4 * rlog[jj] + c5 * rr[jj]);
                                        double mmi = 981 * Math.Exp(c1 + c2 * sm[i] + c3 * sm[i] * sm[i] + c4 * rlog + c5 * rr);
                                        if ((mmi > 50))
                                        {
                                            //mmi[jj] = 3.66 * Math.Log10(mmi[jj]) - 1.66;
                                            mmi = 3.66 * Math.Log10(mmi) - 1.66;
                                            lmmi = Math.Log(mmi);
                                            //mdr[jj] = 0.01 * Math.Exp(v1 * lmmi * lmmi + v2 * lmmi + v3);
                                            double mdr = 0.01 * Math.Exp(v1 * lmmi * lmmi + v2 * lmmi + v3);
                                            localsum[jj] = mdr * value[jj];
                                        }
                                }

                            });
                    loss[i] = localsum.Sum();

There is a lot of scope for further optimization for example we could add a buffer into localsum to account for cache invalidation. Here's an example of what I did with the base class.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Contracts;
using Entities;

namespace Contracts
{
    public abstract class JobProcessorBase : IJobProcessor
    {
        private DateTime Start { get; set; }
        private DateTime Finish { get; set; }
        private TimeSpan Stopwatch { get; set; }
        public List<double> LossList { get; set; }

        public ILog Log { set; get; }

        public StorageType StorageType { get; private set; }

        public void ProcessJob()
        {

            string key = "";
            TimedAction(() => key = DequeueAndDeleteKey(), "GetExposure");

            this.StorageType = StorageType.None;
            if (key.ToLower().IndexOf("blob") == 0)
            {
                this.StorageType = StorageType.Blob;
            }

            TimedAction(() => GetExposure(key), "GetExposure");
            TimedAction(() => GetEventCatalogue(key), "GetEventCatalogue");
            TimedAction(() => RunModel(), "RunModel");
            TimedAction(() => SaveLosses(key), "SaveLosses");
            TimedAction(() => SaveLog(key), "SaveLog");

        }

        private void TimedAction(Action ToDo, string MethodName)
        {
            Start = DateTime.Now;
            ToDo.Invoke();
            Finish = DateTime.Now;
            Stopwatch = Finish.Subtract(Start);
            this.Log.LogMessage(String.Format("Time to {0} {1} milliseconds",MethodName, Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);
        }

        public abstract string DequeueAndDeleteKey(); 
        public abstract void GetExposure(string key); 
        public abstract void GetEventCatalogue(string key); 
        public abstract void RunModel(); 
        public abstract void SaveLosses(string key); 
        public abstract void SaveLog(string key); 
    }
}

Next I started to implement the base classes that implements a template method pattern that times each individual step. In the true spirit of test driven development I build slowly an end to end test that loaded the data ran the model and downloaded the results.

        [TestMethod]
        public void RunBlobModel()
        {
            BlobJobDispatcher blobJobDispatcher = new BlobJobDispatcher();
            string key = "";

            // Load data into Cload
            blobJobDispatcher.CreateJob(out key);

            // Pretend to be the Worker Role
            LoadData loadData = new LoadData();
            JobProcessor jobProcessor = new JobProcessor(loadData);
            jobProcessor.ProcessJob();

            // Recieve results back from the cload
            blobJobDispatcher.RecieveResults(out key);
        }

Here's how the client implementation looked like:

namespace Client
{
    public class BlobJobDispatcher : JobDispatcherBase
    {

        private IJobProcessor JobProcessor;
        private LoadData loadData { get; set; }
        private IDataStore dataStoreExposureX { get; set; }
        private IDataStore dataStoreExposureY { get; set; }
        private IDataStore dataStoreExposureV { get; set; }
        private QueueStorage dataStoreExposureQueue { get; set; }

        private IDataStore dataStoreCatalogSm { get; set; }
        private IDataStore dataStoreCatalogSx { get; set; }
        private IDataStore dataStoreCatalogSy { get; set; }
        private IDataStore dataStoreCatalogSd { get; set; }
        private QueueStorage dataStoreCatalogQueue { get; set; }

        private QueueStorage jobQueue { get; set; }

        private QueueStorage resultsQueue { get; set; }

        private IDataStore dataStoreLosses { get; set; }
        private IDataStore dataStoreLog { get; set; }

        public BlobJobDispatcher()
        {
            this.ClientLog = new Log("BlobJobDispatcher");
            loadData = new LoadData();
            string storageConnectionString = ConfigurationManager.ConnectionStrings["Storage"].ConnectionString;
            dataStoreExposureX = new BlobStorage(storageConnectionString, "exposurex", false);
            dataStoreExposureY = new BlobStorage(storageConnectionString, "exposurey", false);
            dataStoreExposureV = new BlobStorage(storageConnectionString, "exposurev", false);
            dataStoreExposureQueue = new QueueStorage(storageConnectionString, "exposurequeue", true);

            dataStoreCatalogSm = new BlobStorage(storageConnectionString, "catalogsm", false);
            dataStoreCatalogSx = new BlobStorage(storageConnectionString, "catalogsx", false);
            dataStoreCatalogSy = new BlobStorage(storageConnectionString, "catalogsy", false);
            dataStoreCatalogSd = new BlobStorage(storageConnectionString, "catalogsd", false);
            dataStoreCatalogQueue = new QueueStorage(storageConnectionString, "catalogqueue", true);

            jobQueue = new QueueStorage(storageConnectionString, "jobqueue", true);

            resultsQueue = new QueueStorage(storageConnectionString, "resultsqueue", true);
            dataStoreLosses = new BlobStorage(storageConnectionString, "catalogsd", false);
            dataStoreLog = new BlobStorage(storageConnectionString, "log", false);
        }

        #region JobDispatcherBase Members

        public override void ReadExposure()
        {
            loadData.ReadExposure();
        }

        public override void ReadEventCatalogue()
        {
            loadData.ReadEventCatalogue();
        }

        public override void SendExposureToCload(string key)
        {
            this.dataStoreExposureX.SendDataToStorage<double>(key, this.loadData.rxList);
            this.dataStoreExposureY.SendDataToStorage<double>(key, this.loadData.ryList);
            this.dataStoreExposureV.SendDataToStorage<double>(key, this.loadData.valueList);
            this.dataStoreExposureQueue.SendDataToStorage(key);
        }

        public override void SendEventCatalogueToCload(string key)
        {
            dataStoreCatalogSm.SendDataToStorage<double>(key, this.loadData.smList);
            dataStoreCatalogSx.SendDataToStorage<double>(key, this.loadData.sxList);
            dataStoreCatalogSy.SendDataToStorage<double>(key, this.loadData.syList);
            dataStoreCatalogSd.SendDataToStorage<double>(key, this.loadData.sdList);
            this.dataStoreCatalogQueue.SendDataToStorage(key);
        }

        public override void SubmitJobToCload(string key)
        {
            jobQueue.SendDataToStorage(key);
        }

        public override string WaitForKey()
        {
            string recievedKey = "";
            while (recievedKey == "")
            {
                resultsQueue.RecieveDataFromStorage(out recievedKey);
                Thread.Sleep(1000);
            }
            return recievedKey;
        }

        public override List<double> GetLossListFromCloud(string key)
        {
            List<double> recievedLosses = new List<double>();
            dataStoreLosses.RecieveDataFromStorage<double>(key,out recievedLosses);
            return recievedLosses;
        }

        public override List<LogEvent> GetLogsFromCloud(string key)
        {
            List<LogEvent> recievedLogs = new List<LogEvent>();
            dataStoreLog.RecieveDataFromStorage<LogEvent>(key, out recievedLogs);
            return recievedLogs;
        }

        public override void PersistLogs()
        {
            this.ClientLog.Save("C:\\Azure\\PartnerRe\\CloudInfra\\Data\\FlatFileLogg.txt",false);
        }

        public override void PersistLossList()
        {
            StreamWriter FileOut = new StreamWriter("C:\\Azure\\PartnerRe\\CloudInfra\\Data\\LossFile.txt");
            foreach (double l in LossList)
            {
                FileOut.WriteLine(string.Format("{0}", l));
            }
            FileOut.Close();
        }
        #endregion

    }
}

Here's how the server side implementation looked like

namespace Server
{
    public class
        JobProcessor : JobProcessorBase
    {
        public LoadData loadData {get; set;}

        private EQModel eQModel { get; set; }

        private IDataStore dataStoreExposureX { get; set; }
        private IDataStore dataStoreExposureY { get; set; }
        private IDataStore dataStoreExposureV { get; set; }
        private QueueStorage dataStoreExposureQueue { get; set; }

        private IDataStore dataStoreCatalogSm { get; set; }
        private IDataStore dataStoreCatalogSx { get; set; }
        private IDataStore dataStoreCatalogSy { get; set; }
        private IDataStore dataStoreCatalogSd { get; set; }
        private QueueStorage dataStoreCatalogQueue { get; set; }

        private QueueStorage jobQueue { get; set; }

        private QueueStorage resultsQueue { get; set; }

        private IDataStore dataStoreLosses { get; set; }
        private IDataStore dataStoreLog { get; set; }

        public JobProcessor(LoadData LoadData)
        {
            this.loadData = LoadData;
            Log = new Log("FlatFileJobProcessor");
            eQModel = new EQModel();

            string storageConnectionString = ConfigurationManager.ConnectionStrings["Storage"].ConnectionString;
            dataStoreExposureX = new BlobStorage(storageConnectionString, "exposurex", false);
            dataStoreExposureY = new BlobStorage(storageConnectionString, "exposurey", false);
            dataStoreExposureV = new BlobStorage(storageConnectionString, "exposurev", false);
            dataStoreExposureQueue = new QueueStorage(storageConnectionString, "exposurequeue", false);

            dataStoreCatalogSm = new BlobStorage(storageConnectionString, "catalogsm", false);
            dataStoreCatalogSx = new BlobStorage(storageConnectionString, "catalogsx", false);
            dataStoreCatalogSy = new BlobStorage(storageConnectionString, "catalogsy", false);
            dataStoreCatalogSd = new BlobStorage(storageConnectionString, "catalogsd", false);
            dataStoreCatalogQueue = new QueueStorage(storageConnectionString, "catalogqueue", false);

            jobQueue = new QueueStorage(storageConnectionString, "jobqueue", false);

            resultsQueue = new QueueStorage(storageConnectionString, "resultsqueue", false);
            dataStoreLosses = new BlobStorage(storageConnectionString, "joblossblob", false);
            dataStoreLog = new BlobStorage(storageConnectionString, "log", false);

        }

        public override string DequeueAndDeleteKey()
        {
            string recievedKey = "";
            while (recievedKey == "")
            {
                jobQueue.RecieveDataFromStorage(out recievedKey);
                if (recievedKey == "")
                {
                    Thread.Sleep(1000);
                }
            }
            return recievedKey;
        }

        public override void GetExposure(string key)
        {
            switch ( this.StorageType)
            {
                case StorageType.None:
                    break;
                case StorageType.Blob:
                    List<double> recievedx = new List<double>();
                    List<double> recievedy = new List<double>();
                    List<double> recievedv = new List<double>();

                    dataStoreExposureX.RecieveDataFromStorage<double>(key, out recievedx);
                    dataStoreExposureY.RecieveDataFromStorage<double>(key, out recievedy);
                    dataStoreExposureV.RecieveDataFromStorage<double>(key, out recievedv);

                    this.loadData.rxList = recievedx;
                    this.loadData.ryList = recievedy;
                    this.loadData.valueList = recievedv;

                    break;
                case StorageType.SQLAzure:
                    break;
                case StorageType.Table:
                    break;
            }
            return;
        }

        public override void GetEventCatalogue(string key)
        {
            switch (this.StorageType)
            {
                case StorageType.None:
                    break;
                case StorageType.Blob:

                    List<double> recievedsm = new List<double>();
                    List<double> recievedsx = new List<double>();
                    List<double> recievedsy = new List<double>();
                    List<double> recievedsd = new List<double>();

                    dataStoreCatalogSm.RecieveDataFromStorage<double>(key, out recievedsm);
                    dataStoreCatalogSx.RecieveDataFromStorage<double>(key, out recievedsx);
                    dataStoreCatalogSy.RecieveDataFromStorage<double>(key, out recievedsy);
                    dataStoreCatalogSd.RecieveDataFromStorage<double>(key, out recievedsd);

                    this.loadData.sxList = recievedsx;
                    this.loadData.syList = recievedsy;
                    this.loadData.smList= recievedsm;
                    this.loadData.sdList = recievedsd;
                    break;
                case StorageType.SQLAzure:
                    break;
                case StorageType.Table:
                    break;
            }
            return;
        }

        public override void RunModel()
        {
            this.Log.LogMessage("Start earthquake model");

            List<double> result = new List<double>();

            eQModel.RunModel(loadData.rxList,
                             loadData.ryList,
                             loadData.valueList,
                             loadData.smList,
                             loadData.sxList,
                             loadData.syList,
                             loadData.sdList,
                             out result);
            this.LossList = result;

            this.Log.LogMessage("=== Appending Model Log to Server Log ====");

            List<LogEvent> ModelLogs = eQModel.log.GetLogs();
            this.Log.Add(ModelLogs);

        }

        public override void SaveLosses(string key)
        {
            switch (this.StorageType)
            {
                case StorageType.None:
                    break;
                case StorageType.Blob:
                    this.dataStoreLosses.SendDataToStorage<double>(key,this.LossList);
                    this.resultsQueue.SendDataToStorage(key);
                    break;
                case StorageType.SQLAzure:
                    break;
                case StorageType.Table:
                    break;
            }
            return;
        }

        public override void SaveLog(string key)
        {
            switch (this.StorageType)
            {
                case StorageType.None:
                    break;
                case StorageType.Blob:
                    this.dataStoreLog.SendDataToStorage<LogEvent>(key,this.Log.GetLogs());
                    break;
                case StorageType.SQLAzure:
                    break;
                case StorageType.Table:
                    break;
            }
            return;
        }
    }
}

And the unit test worked. My next step was implement a worker role that executes the ProcessJob method and to change the unit test to look like

        [TestMethod]
        public void RunBlobModel()
        {
            BlobJobDispatcher blobJobDispatcher = new BlobJobDispatcher();
            string key = "";
            blobJobDispatcher.CreateJob(out key);
            //LoadData loadData = new LoadData();
            //JobProcessor jobProcessor = new JobProcessor(loadData);
            //jobProcessor.ProcessJob();

            blobJobDispatcher.RecieveResults(out key);
        }

I thought that making the last step of replacing the processing part with a Worker role would be a trivial step. But it wasn’t. My main problem was that when I published my worker role the role would not start. As far as I can see there are no logs to help figure out what is going wrong. So instead I found an example of a worker role that was deployable and slowly refactored this to be in a state where it would process my earthquake models. This is quite a slow process because each time I want to test if the worker role can be deployed it took around 9 minutes.