Friday 27 May 2011

Numerical calculations in Azure continued

After some help from Steve Spencer I figured out the best way to debug the WorkerRole was not to do the whole thing in unit tests but to pressed the play button on the WorkerRole and use a unit test to feed it with data. It turned out the problem was within the blob storage code. I still have some work to refactor the worker role but I want to get some results so I am going to leave it as it is for the moment:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.StorageClient;
using AzureHelper;
using System.Configuration;
using System.IO;
using System.Text;
using CatModel;
using Contracts;
using Entities;
using Infrastructure;

namespace WorkerRole1
{
    public class WorkerRole : RoleEntryPoint
    {
        private String localPath;
        private String calcsimExePath;

        private EQModel eQModel { get; set; }

        private DateTime Start { get; set; }
        private DateTime Finish { get; set; }
        private TimeSpan Stopwatch { get; set; }
        public ILog Log { set; get; }

        private AzQueueHandler jobInputQueue;
        private AzQueueHandler jobOutputQueue;
        private AzQueueHandler jobLogQueue;

        private AzBlobHandler jobInputBlog;

        private AzBlobHandler dataStoreExposureX;
        private AzBlobHandler dataStoreExposureY;
        private AzBlobHandler dataStoreExposureV;

        private AzBlobHandler dataStoreCatalogSm;
        private AzBlobHandler dataStoreCatalogSx;
        private AzBlobHandler dataStoreCatalogSy;
        private AzBlobHandler dataStoreCatalogSd;

        private AzBlobHandler jobLossBlob;
        private AzBlobHandler jobLogBlob;
        private AzBlobHandler jobOutputBlog;

        private int idleCount;
        private int idleMax;
        private int idleSleepTime;

        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections
            ServicePointManager.DefaultConnectionLimit = 12;

            // For information on handling configuration changes
            // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

            return base.OnStart();
        }

        private void Init()
        {
            AppDomain.CurrentDomain.UnhandledException += new UnhandledExceptionEventHandler(CurrentDomain_UnhandledException);
            idleCount = 0; // how many times the instance have been idle
            idleMax = 10;   // After 10 times (5 minutes = 30 secs *10) being idle, then die
            idleSleepTime = 30 * 1000; // sleep for this number of seconds between each queue poll

            var accountName = ConfigurationManager.AppSettings["AzureAccountName"];
            var accountKey = ConfigurationManager.AppSettings["AzureAccountKey"];

            jobInputQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobInputQueue"], accountName, accountKey);
            jobOutputQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobOutputQueue"], accountName, accountKey);
            jobLogQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobLogQueue"], accountName, accountKey);
            jobInputBlog = new AzBlobHandler(ConfigurationManager.AppSettings["JobInputBlob"], accountName, accountKey);

            jobLogBlob = new AzBlobHandler(ConfigurationManager.AppSettings["jobLogBlob"], accountName, accountKey);
            dataStoreExposureX = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureX"], accountName, accountKey);
            dataStoreExposureY = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureY"], accountName, accountKey);
            dataStoreExposureV = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureV"], accountName, accountKey);

            dataStoreCatalogSm = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSm"], accountName, accountKey);
            dataStoreCatalogSx = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSx"], accountName, accountKey);
            dataStoreCatalogSy = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSy"], accountName, accountKey);
            dataStoreCatalogSd = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSd"], accountName, accountKey);

            jobLossBlob = new AzBlobHandler(ConfigurationManager.AppSettings["JobLossBlob"], accountName, accountKey);
            jobOutputBlog = new AzBlobHandler(ConfigurationManager.AppSettings["JobOutputBlob"], accountName, accountKey);

            eQModel = new EQModel();
            this.Log = new Log("WorkerRole");

            localPath = Environment.CurrentDirectory;

            // pull EXE file from blob storage to local file system
            calcsimExePath = System.IO.Path.Combine(localPath, "Startup\\calcsim.exe");
        }

        public override void Run()
        {
            // This is a sample worker implementation. Replace with your logic.
            Trace.WriteLine("WorkerRole1 entry point called", "Information");
            Init();

            TraceInfo("AzJobHost::Run() Azure Instance ID {0}, DeploymentId {1}", RoleEnvironment.CurrentRoleInstance.Id, RoleEnvironment.DeploymentId);

            bool more = true;
            //Stopwatch swWrkRoleLifetime = Stopwatch.StartNew();

            // msg pump loop
            string id = ""; string popId = "";
            while (more)
            {
                id = ""; popId = "";
                string msg = jobInputQueue.GetMessage(ref id, ref popId, false);

                if (msg == null)
                {
                    idleCount++;
                    if (idleCount >= idleMax)
                        more = false;
                    else Thread.Sleep(idleSleepTime);
                }
                else
                {
                    ProcessMsg(id, popId, msg);
                }
            }

            //swWrkRoleLifetime.Stop();
            //TraceInfo("AzJobHost::Exit(). Execution time {0}", swWrkRoleLifetime.Elapsed);
        }

        private bool ProcessMsg(string id, string popId, string msg)
        {
            bool rc = true;

            //Stopwatch sw = Stopwatch.StartNew();
            TraceInfo("AzJobHost::ProcessMsg( '{0}', '{1}') - Azure Instance Id: {2}", id, msg, RoleEnvironment.CurrentRoleInstance.Id);

            List<double> x = new List<double>();
            List<double> y = new List<double>();
            List<double> v = new List<double>();
            List<double> sm = new List<double>();
            List<double> sx = new List<double>();
            List<double> sy = new List<double>();
            List<double> sd = new List<double>();

            Start = DateTime.Now;
            dataStoreExposureX.RecieveDataFromStorage<double>(msg, out x);
            dataStoreExposureY.RecieveDataFromStorage<double>(msg, out y);
            dataStoreExposureV.RecieveDataFromStorage<double>(msg, out v);
            Finish = DateTime.Now;
            Stopwatch = Finish.Subtract(Start);
            this.Log.LogMessage(String.Format("Time to Upload Exposure to model {0} milliseconds",  Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);

            Start = DateTime.Now;
            dataStoreCatalogSm.RecieveDataFromStorage<double>(msg, out sm);
            dataStoreCatalogSx.RecieveDataFromStorage<double>(msg, out sx);
            dataStoreCatalogSy.RecieveDataFromStorage<double>(msg, out sy);
            dataStoreCatalogSd.RecieveDataFromStorage<double>(msg, out sd);

            Finish = DateTime.Now;
            Stopwatch = Finish.Subtract(Start);
            this.Log.LogMessage(String.Format("Time to Upload Catalog to model {0} milliseconds", Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);

            List<double> losses = new List<double>();

            Start = DateTime.Now;
            eQModel.RunModel(x, y, v, sm, sx, sy, sd, out losses);
            Finish = DateTime.Now;
            Stopwatch = Finish.Subtract(Start);
            this.Log.LogMessage(String.Format("Time to Run Japan Earthquake {0} milliseconds", Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);

            this.Log.LogMessage("=== Appending Model Log to Server Log ====");

            List<LogEvent> ModelLogs = eQModel.log.GetLogs();
            this.Log.Add(ModelLogs);

            jobLossBlob.SendDataToStorage<double>(msg, losses);
            jobLogBlob.SendDataToStorage<LogEvent>(msg, this.Log.GetLogs());

            jobOutputQueue.PutMessage(msg);

            jobInputQueue.DeleteMessage(id, popId);
            return rc;
        }

        private void TraceInfo(string format, params object[] args)
        {
            string msg = string.Format(format, args);
            Trace.WriteLine(msg, "Information");
        }

        private void RoleEnvironmentChanging(object sender, RoleEnvironmentChangingEventArgs e)
        {
            // If a configuration setting is changing
            if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange))
            {
                // Set e.Cancel to true to restart this role instance
                e.Cancel = true;
            }
        }

        void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
        {
            TraceInfo("AzJobHost::UnhandledException: " + (Exception)e.ExceptionObject);
            RoleEnvironment.RequestRecycle();
        }

    }
}