Thursday, 26 May 2011

Numerical calculations in Azure

Here are the steps that I took in looking at Azure and getting a feel for how we could use this. Recently we have made a number of bench mark tests with a simplified Japan earthquake model. This seems a natural place to start. The idea would be to utilize the compute power of the cloud to make earthquake simulations. Here some pseudo code as to what I intend to do

1.    The client loads exposure into cloud storage
2.    The client loads earthquake catalog into cloud storage
3.    The client adds a reference to the exposure and earthquake catalogue data to a queue
4.    The worker node listens on the input queue
5.    The worker node dequeues a reference to data for download
6.    The worker node downloads data from cloud storage
7.    The worker node processes the data
8.    The worker node saves loss data using a reference
9.    The worker node adds a reference to the loss data to a queue
10.    The client listens on the output queue
11.    The client dequeues the reference to result data
12.    The client downloads results and logs

Looking at this sequence the first thing I want to do is to be able to make CRUD operations on Queues and blobs. I found some good examples how this could be done, but I wanted to extend these examples to be more generic. Here's a handy generic function that sends lists of serializable objects to and from blob storage.

        public void RecieveDataFromStorage<T>(string key, out List<T> data)
        {
                CloudBlobContainer container = _blobClient.GetContainerReference(_blobContainer.Name);
                CloudBlob blob = container.GetBlobReference( key);
                byte[] bdata = blob.DownloadByteArray();

                MemoryStream f = new MemoryStream();
                BinaryFormatter sf = new BinaryFormatter();
                f.Write(bdata, 0, bdata.Length);
                f.Flush();
                List<T> target = new List<T>();
                f.Position = 0;

                target = (List<T>)sf.Deserialize(f); 
                data = target;

            return;
        }

        public void SendDataToStorage<T>(string key, List<T> data)
        {
                CloudBlobContainer container = _blobClient.GetContainerReference(_blobContainer.Name);
                CloudBlob blob = container.GetBlobReference(key);

                MemoryStream f = new MemoryStream();
                BinaryFormatter sf = new BinaryFormatter();
                sf.Serialize(f, data);
                f.Position = 0;
                byte[] bdata = f.ToArray();

                blob.UploadByteArray(bdata);
            return;
        }

Where T can be any serializable entity. With these functions we have an easy way to transfer data too and from the cloud storage.

Next step is that we want to do some calculations. The simplified japan earthquake algorithm is single threaded. In order to make use of various number of processors we need to make this multi core enabled. Normally when making an algorithm multicore enabled you need to first really understand what is the algorithm trying to do. Then the next step is to think still in the problem domain how to split the work, fore example by data or by process. Then this is reviewed in the context of the hardware and technology available. Finally you would code applying patterns that do things like take account of cache invalidation etc. In our case we just want to get a feeling for what various CPU configurations can bring. Therefore I just made the inner loop parallel. The algorithm falls under the cataegory of "embarrasingly parrallizable code" because the loops are allmost completely independent from each other, only the line highlighted in bold has some cross dependancies. The trick is how to sum the losses. Below is the code before and after…

            for (i = 1; i <= ns; i++)
            {
                loss[i] = 0;

                if ((sm[i] > 3))
                {
                    dkrit = 0.5 * sm[i] * sm[i] * sm[i];

                    for (j = 1; j <= nr; j++)
                    {
                        rr[j] = 0.001 * Math.Sqrt((sx[i] - rx[j]) * (sx[i] - rx[j]) + (sy[i] - ry[j]) * (sy[i] - ry[j]));
                        rr[j] = Math.Sqrt(sd[i] * sd[i] + rr[j] * rr[j]);

                        if ((rr[j] < dkrit))
                        {
                            rlog[j] = Math.Log(rr[j]);
                            mmi[j] = 981 * Math.Exp(c1 + c2 * sm[i] + c3 * sm[i] * sm[i] + c4 * rlog[j] + c5 * rr[j]);
                            if ((mmi[j] > 50))
                            {
                                mmi[j] = 3.66 * Math.Log10(mmi[j]) - 1.66;
                                lmmi = Math.Log(mmi[j]);
                                mdr[j] = 0.01 * Math.Exp(v1 * lmmi * lmmi + v2 * lmmi + v3);
                                loss[i] = loss[i] + mdr[j] * value[j]; // <----------
                            }
                        }
                    }
                }

                   Parallel.For(1, nr,
                        (jj) => {
                                    //rr[jj] = 0.001 * Math.Sqrt((sx[i] - rx[jj]) * (sx[i] - rx[jj]) + (sy[i] - ry[jj]) * (sy[i] - ry[jj]));
                                    double rr = 0.001 * Math.Sqrt((sx[i] - rx[jj]) * (sx[i] - rx[jj]) + (sy[i] - ry[jj]) * (sy[i] - ry[jj]));
                                    // rr[jj] = Math.Sqrt(sd[i] * sd[i] + rr[jj] * rr[jj]);
                                    rr = Math.Sqrt(sd[i] * sd[i] + rr * rr);;

                                    if ((rr < dkrit))
                                    {
                                        //rlog[jj] = Math.Log(rr[jj]);
                                        double rlog = Math.Log(rr);
                                        //mmi[jj] = 981 * Math.Exp(c1 + c2 * sm[i] + c3 * sm[i] * sm[i] + c4 * rlog[jj] + c5 * rr[jj]);
                                        double mmi = 981 * Math.Exp(c1 + c2 * sm[i] + c3 * sm[i] * sm[i] + c4 * rlog + c5 * rr);
                                        if ((mmi > 50))
                                        {
                                            //mmi[jj] = 3.66 * Math.Log10(mmi[jj]) - 1.66;
                                            mmi = 3.66 * Math.Log10(mmi) - 1.66;
                                            lmmi = Math.Log(mmi);
                                            //mdr[jj] = 0.01 * Math.Exp(v1 * lmmi * lmmi + v2 * lmmi + v3);
                                            double mdr = 0.01 * Math.Exp(v1 * lmmi * lmmi + v2 * lmmi + v3);
                                            localsum[jj] = mdr * value[jj];
                                        }
                                }

                            });
                    loss[i] = localsum.Sum();

There is a lot of scope for further optimization for example we could add a buffer into localsum to account for cache invalidation. Here's an example of what I did with the base class.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Contracts;
using Entities;

namespace Contracts
{
    public abstract class JobProcessorBase : IJobProcessor
    {
        private DateTime Start { get; set; }
        private DateTime Finish { get; set; }
        private TimeSpan Stopwatch { get; set; }
        public List<double> LossList { get; set; }

        public ILog Log { set; get; }

        public StorageType StorageType { get; private set; }

        public void ProcessJob()
        {

            string key = "";
            TimedAction(() => key = DequeueAndDeleteKey(), "GetExposure");

            this.StorageType = StorageType.None;
            if (key.ToLower().IndexOf("blob") == 0)
            {
                this.StorageType = StorageType.Blob;
            }

            TimedAction(() => GetExposure(key), "GetExposure");
            TimedAction(() => GetEventCatalogue(key), "GetEventCatalogue");
            TimedAction(() => RunModel(), "RunModel");
            TimedAction(() => SaveLosses(key), "SaveLosses");
            TimedAction(() => SaveLog(key), "SaveLog");

        }

        private void TimedAction(Action ToDo, string MethodName)
        {
            Start = DateTime.Now;
            ToDo.Invoke();
            Finish = DateTime.Now;
            Stopwatch = Finish.Subtract(Start);
            this.Log.LogMessage(String.Format("Time to {0} {1} milliseconds",MethodName, Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);
        }

        public abstract string DequeueAndDeleteKey(); 
        public abstract void GetExposure(string key); 
        public abstract void GetEventCatalogue(string key); 
        public abstract void RunModel(); 
        public abstract void SaveLosses(string key); 
        public abstract void SaveLog(string key); 
    }
}

Next I started to implement the base classes that implements a template method pattern that times each individual step. In the true spirit of test driven development I build slowly an end to end test that loaded the data ran the model and downloaded the results.

        [TestMethod]
        public void RunBlobModel()
        {
            BlobJobDispatcher blobJobDispatcher = new BlobJobDispatcher();
            string key = "";

            // Load data into Cload
            blobJobDispatcher.CreateJob(out key);

            // Pretend to be the Worker Role
            LoadData loadData = new LoadData();
            JobProcessor jobProcessor = new JobProcessor(loadData);
            jobProcessor.ProcessJob();

            // Recieve results back from the cload
            blobJobDispatcher.RecieveResults(out key);
        }

Here's how the client implementation looked like:

namespace Client
{
    public class BlobJobDispatcher : JobDispatcherBase
    {

        private IJobProcessor JobProcessor;
        private LoadData loadData { get; set; }
        private IDataStore dataStoreExposureX { get; set; }
        private IDataStore dataStoreExposureY { get; set; }
        private IDataStore dataStoreExposureV { get; set; }
        private QueueStorage dataStoreExposureQueue { get; set; }

        private IDataStore dataStoreCatalogSm { get; set; }
        private IDataStore dataStoreCatalogSx { get; set; }
        private IDataStore dataStoreCatalogSy { get; set; }
        private IDataStore dataStoreCatalogSd { get; set; }
        private QueueStorage dataStoreCatalogQueue { get; set; }

        private QueueStorage jobQueue { get; set; }

        private QueueStorage resultsQueue { get; set; }

        private IDataStore dataStoreLosses { get; set; }
        private IDataStore dataStoreLog { get; set; }

        public BlobJobDispatcher()
        {
            this.ClientLog = new Log("BlobJobDispatcher");
            loadData = new LoadData();
            string storageConnectionString = ConfigurationManager.ConnectionStrings["Storage"].ConnectionString;
            dataStoreExposureX = new BlobStorage(storageConnectionString, "exposurex", false);
            dataStoreExposureY = new BlobStorage(storageConnectionString, "exposurey", false);
            dataStoreExposureV = new BlobStorage(storageConnectionString, "exposurev", false);
            dataStoreExposureQueue = new QueueStorage(storageConnectionString, "exposurequeue", true);

            dataStoreCatalogSm = new BlobStorage(storageConnectionString, "catalogsm", false);
            dataStoreCatalogSx = new BlobStorage(storageConnectionString, "catalogsx", false);
            dataStoreCatalogSy = new BlobStorage(storageConnectionString, "catalogsy", false);
            dataStoreCatalogSd = new BlobStorage(storageConnectionString, "catalogsd", false);
            dataStoreCatalogQueue = new QueueStorage(storageConnectionString, "catalogqueue", true);

            jobQueue = new QueueStorage(storageConnectionString, "jobqueue", true);

            resultsQueue = new QueueStorage(storageConnectionString, "resultsqueue", true);
            dataStoreLosses = new BlobStorage(storageConnectionString, "catalogsd", false);
            dataStoreLog = new BlobStorage(storageConnectionString, "log", false);
        }

        #region JobDispatcherBase Members

        public override void ReadExposure()
        {
            loadData.ReadExposure();
        }

        public override void ReadEventCatalogue()
        {
            loadData.ReadEventCatalogue();
        }

        public override void SendExposureToCload(string key)
        {
            this.dataStoreExposureX.SendDataToStorage<double>(key, this.loadData.rxList);
            this.dataStoreExposureY.SendDataToStorage<double>(key, this.loadData.ryList);
            this.dataStoreExposureV.SendDataToStorage<double>(key, this.loadData.valueList);
            this.dataStoreExposureQueue.SendDataToStorage(key);
        }

        public override void SendEventCatalogueToCload(string key)
        {
            dataStoreCatalogSm.SendDataToStorage<double>(key, this.loadData.smList);
            dataStoreCatalogSx.SendDataToStorage<double>(key, this.loadData.sxList);
            dataStoreCatalogSy.SendDataToStorage<double>(key, this.loadData.syList);
            dataStoreCatalogSd.SendDataToStorage<double>(key, this.loadData.sdList);
            this.dataStoreCatalogQueue.SendDataToStorage(key);
        }

        public override void SubmitJobToCload(string key)
        {
            jobQueue.SendDataToStorage(key);
        }

        public override string WaitForKey()
        {
            string recievedKey = "";
            while (recievedKey == "")
            {
                resultsQueue.RecieveDataFromStorage(out recievedKey);
                Thread.Sleep(1000);
            }
            return recievedKey;
        }

        public override List<double> GetLossListFromCloud(string key)
        {
            List<double> recievedLosses = new List<double>();
            dataStoreLosses.RecieveDataFromStorage<double>(key,out recievedLosses);
            return recievedLosses;
        }

        public override List<LogEvent> GetLogsFromCloud(string key)
        {
            List<LogEvent> recievedLogs = new List<LogEvent>();
            dataStoreLog.RecieveDataFromStorage<LogEvent>(key, out recievedLogs);
            return recievedLogs;
        }

        public override void PersistLogs()
        {
            this.ClientLog.Save("C:\\Azure\\PartnerRe\\CloudInfra\\Data\\FlatFileLogg.txt",false);
        }

        public override void PersistLossList()
        {
            StreamWriter FileOut = new StreamWriter("C:\\Azure\\PartnerRe\\CloudInfra\\Data\\LossFile.txt");
            foreach (double l in LossList)
            {
                FileOut.WriteLine(string.Format("{0}", l));
            }
            FileOut.Close();
        }
        #endregion

    }
}

Here's how the server side implementation looked like

namespace Server
{
    public class
        JobProcessor : JobProcessorBase
    {
        public LoadData loadData {get; set;}

        private EQModel eQModel { get; set; }

        private IDataStore dataStoreExposureX { get; set; }
        private IDataStore dataStoreExposureY { get; set; }
        private IDataStore dataStoreExposureV { get; set; }
        private QueueStorage dataStoreExposureQueue { get; set; }

        private IDataStore dataStoreCatalogSm { get; set; }
        private IDataStore dataStoreCatalogSx { get; set; }
        private IDataStore dataStoreCatalogSy { get; set; }
        private IDataStore dataStoreCatalogSd { get; set; }
        private QueueStorage dataStoreCatalogQueue { get; set; }

        private QueueStorage jobQueue { get; set; }

        private QueueStorage resultsQueue { get; set; }

        private IDataStore dataStoreLosses { get; set; }
        private IDataStore dataStoreLog { get; set; }

        public JobProcessor(LoadData LoadData)
        {
            this.loadData = LoadData;
            Log = new Log("FlatFileJobProcessor");
            eQModel = new EQModel();

            string storageConnectionString = ConfigurationManager.ConnectionStrings["Storage"].ConnectionString;
            dataStoreExposureX = new BlobStorage(storageConnectionString, "exposurex", false);
            dataStoreExposureY = new BlobStorage(storageConnectionString, "exposurey", false);
            dataStoreExposureV = new BlobStorage(storageConnectionString, "exposurev", false);
            dataStoreExposureQueue = new QueueStorage(storageConnectionString, "exposurequeue", false);

            dataStoreCatalogSm = new BlobStorage(storageConnectionString, "catalogsm", false);
            dataStoreCatalogSx = new BlobStorage(storageConnectionString, "catalogsx", false);
            dataStoreCatalogSy = new BlobStorage(storageConnectionString, "catalogsy", false);
            dataStoreCatalogSd = new BlobStorage(storageConnectionString, "catalogsd", false);
            dataStoreCatalogQueue = new QueueStorage(storageConnectionString, "catalogqueue", false);

            jobQueue = new QueueStorage(storageConnectionString, "jobqueue", false);

            resultsQueue = new QueueStorage(storageConnectionString, "resultsqueue", false);
            dataStoreLosses = new BlobStorage(storageConnectionString, "joblossblob", false);
            dataStoreLog = new BlobStorage(storageConnectionString, "log", false);

        }

        public override string DequeueAndDeleteKey()
        {
            string recievedKey = "";
            while (recievedKey == "")
            {
                jobQueue.RecieveDataFromStorage(out recievedKey);
                if (recievedKey == "")
                {
                    Thread.Sleep(1000);
                }
            }
            return recievedKey;
        }

        public override void GetExposure(string key)
        {
            switch ( this.StorageType)
            {
                case StorageType.None:
                    break;
                case StorageType.Blob:
                    List<double> recievedx = new List<double>();
                    List<double> recievedy = new List<double>();
                    List<double> recievedv = new List<double>();

                    dataStoreExposureX.RecieveDataFromStorage<double>(key, out recievedx);
                    dataStoreExposureY.RecieveDataFromStorage<double>(key, out recievedy);
                    dataStoreExposureV.RecieveDataFromStorage<double>(key, out recievedv);

                    this.loadData.rxList = recievedx;
                    this.loadData.ryList = recievedy;
                    this.loadData.valueList = recievedv;

                    break;
                case StorageType.SQLAzure:
                    break;
                case StorageType.Table:
                    break;
            }
            return;
        }

        public override void GetEventCatalogue(string key)
        {
            switch (this.StorageType)
            {
                case StorageType.None:
                    break;
                case StorageType.Blob:

                    List<double> recievedsm = new List<double>();
                    List<double> recievedsx = new List<double>();
                    List<double> recievedsy = new List<double>();
                    List<double> recievedsd = new List<double>();

                    dataStoreCatalogSm.RecieveDataFromStorage<double>(key, out recievedsm);
                    dataStoreCatalogSx.RecieveDataFromStorage<double>(key, out recievedsx);
                    dataStoreCatalogSy.RecieveDataFromStorage<double>(key, out recievedsy);
                    dataStoreCatalogSd.RecieveDataFromStorage<double>(key, out recievedsd);

                    this.loadData.sxList = recievedsx;
                    this.loadData.syList = recievedsy;
                    this.loadData.smList= recievedsm;
                    this.loadData.sdList = recievedsd;
                    break;
                case StorageType.SQLAzure:
                    break;
                case StorageType.Table:
                    break;
            }
            return;
        }

        public override void RunModel()
        {
            this.Log.LogMessage("Start earthquake model");

            List<double> result = new List<double>();

            eQModel.RunModel(loadData.rxList,
                             loadData.ryList,
                             loadData.valueList,
                             loadData.smList,
                             loadData.sxList,
                             loadData.syList,
                             loadData.sdList,
                             out result);
            this.LossList = result;

            this.Log.LogMessage("=== Appending Model Log to Server Log ====");

            List<LogEvent> ModelLogs = eQModel.log.GetLogs();
            this.Log.Add(ModelLogs);

        }

        public override void SaveLosses(string key)
        {
            switch (this.StorageType)
            {
                case StorageType.None:
                    break;
                case StorageType.Blob:
                    this.dataStoreLosses.SendDataToStorage<double>(key,this.LossList);
                    this.resultsQueue.SendDataToStorage(key);
                    break;
                case StorageType.SQLAzure:
                    break;
                case StorageType.Table:
                    break;
            }
            return;
        }

        public override void SaveLog(string key)
        {
            switch (this.StorageType)
            {
                case StorageType.None:
                    break;
                case StorageType.Blob:
                    this.dataStoreLog.SendDataToStorage<LogEvent>(key,this.Log.GetLogs());
                    break;
                case StorageType.SQLAzure:
                    break;
                case StorageType.Table:
                    break;
            }
            return;
        }
    }
}

And the unit test worked. My next step was implement a worker role that executes the ProcessJob method and to change the unit test to look like

        [TestMethod]
        public void RunBlobModel()
        {
            BlobJobDispatcher blobJobDispatcher = new BlobJobDispatcher();
            string key = "";
            blobJobDispatcher.CreateJob(out key);
            //LoadData loadData = new LoadData();
            //JobProcessor jobProcessor = new JobProcessor(loadData);
            //jobProcessor.ProcessJob();

            blobJobDispatcher.RecieveResults(out key);
        }

I thought that making the last step of replacing the processing part with a Worker role would be a trivial step. But it wasn’t. My main problem was that when I published my worker role the role would not start. As far as I can see there are no logs to help figure out what is going wrong. So instead I found an example of a worker role that was deployable and slowly refactored this to be in a state where it would process my earthquake models. This is quite a slow process because each time I want to test if the worker role can be deployed it took around 9 minutes.