After some help from Steve Spencer I figured out the best way to debug the WorkerRole was not to do the whole thing in unit tests but to pressed the play button on the WorkerRole and use a unit test to feed it with data. It turned out the problem was within the blob storage code. I still have some work to refactor the worker role but I want to get some results so I am going to leave it as it is for the moment:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.StorageClient;
using AzureHelper;
using System.Configuration;
using System.IO;
using System.Text;
using CatModel;
using Contracts;
using Entities;
using Infrastructure;
namespace WorkerRole1
{
public class WorkerRole : RoleEntryPoint
{
private String localPath;
private String calcsimExePath;
private EQModel eQModel { get; set; }
private DateTime Start { get; set; }
private DateTime Finish { get; set; }
private TimeSpan Stopwatch { get; set; }
public ILog Log { set; get; }
private AzQueueHandler jobInputQueue;
private AzQueueHandler jobOutputQueue;
private AzQueueHandler jobLogQueue;
private AzBlobHandler jobInputBlog;
private AzBlobHandler dataStoreExposureX;
private AzBlobHandler dataStoreExposureY;
private AzBlobHandler dataStoreExposureV;
private AzBlobHandler dataStoreCatalogSm;
private AzBlobHandler dataStoreCatalogSx;
private AzBlobHandler dataStoreCatalogSy;
private AzBlobHandler dataStoreCatalogSd;
private AzBlobHandler jobLossBlob;
private AzBlobHandler jobLogBlob;
private AzBlobHandler jobOutputBlog;
private int idleCount;
private int idleMax;
private int idleSleepTime;
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
return base.OnStart();
}
private void Init()
{
AppDomain.CurrentDomain.UnhandledException += new UnhandledExceptionEventHandler(CurrentDomain_UnhandledException);
idleCount = 0; // how many times the instance have been idle
idleMax = 10; // After 10 times (5 minutes = 30 secs *10) being idle, then die
idleSleepTime = 30 * 1000; // sleep for this number of seconds between each queue poll
var accountName = ConfigurationManager.AppSettings["AzureAccountName"];
var accountKey = ConfigurationManager.AppSettings["AzureAccountKey"];
jobInputQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobInputQueue"], accountName, accountKey);
jobOutputQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobOutputQueue"], accountName, accountKey);
jobLogQueue = new AzQueueHandler(ConfigurationManager.AppSettings["JobLogQueue"], accountName, accountKey);
jobInputBlog = new AzBlobHandler(ConfigurationManager.AppSettings["JobInputBlob"], accountName, accountKey);
jobLogBlob = new AzBlobHandler(ConfigurationManager.AppSettings["jobLogBlob"], accountName, accountKey);
dataStoreExposureX = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureX"], accountName, accountKey);
dataStoreExposureY = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureY"], accountName, accountKey);
dataStoreExposureV = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreExposureV"], accountName, accountKey);
dataStoreCatalogSm = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSm"], accountName, accountKey);
dataStoreCatalogSx = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSx"], accountName, accountKey);
dataStoreCatalogSy = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSy"], accountName, accountKey);
dataStoreCatalogSd = new AzBlobHandler(ConfigurationManager.AppSettings["dataStoreCatalogSd"], accountName, accountKey);
jobLossBlob = new AzBlobHandler(ConfigurationManager.AppSettings["JobLossBlob"], accountName, accountKey);
jobOutputBlog = new AzBlobHandler(ConfigurationManager.AppSettings["JobOutputBlob"], accountName, accountKey);
eQModel = new EQModel();
this.Log = new Log("WorkerRole");
localPath = Environment.CurrentDirectory;
// pull EXE file from blob storage to local file system
calcsimExePath = System.IO.Path.Combine(localPath, "Startup\\calcsim.exe");
}
public override void Run()
{
// This is a sample worker implementation. Replace with your logic.
Trace.WriteLine("WorkerRole1 entry point called", "Information");
Init();
TraceInfo("AzJobHost::Run() Azure Instance ID {0}, DeploymentId {1}", RoleEnvironment.CurrentRoleInstance.Id, RoleEnvironment.DeploymentId);
bool more = true;
//Stopwatch swWrkRoleLifetime = Stopwatch.StartNew();
// msg pump loop
string id = ""; string popId = "";
while (more)
{
id = ""; popId = "";
string msg = jobInputQueue.GetMessage(ref id, ref popId, false);
if (msg == null)
{
idleCount++;
if (idleCount >= idleMax)
more = false;
else Thread.Sleep(idleSleepTime);
}
else
{
ProcessMsg(id, popId, msg);
}
}
//swWrkRoleLifetime.Stop();
//TraceInfo("AzJobHost::Exit(). Execution time {0}", swWrkRoleLifetime.Elapsed);
}
private bool ProcessMsg(string id, string popId, string msg)
{
bool rc = true;
//Stopwatch sw = Stopwatch.StartNew();
TraceInfo("AzJobHost::ProcessMsg( '{0}', '{1}') - Azure Instance Id: {2}", id, msg, RoleEnvironment.CurrentRoleInstance.Id);
List<double> x = new List<double>();
List<double> y = new List<double>();
List<double> v = new List<double>();
List<double> sm = new List<double>();
List<double> sx = new List<double>();
List<double> sy = new List<double>();
List<double> sd = new List<double>();
Start = DateTime.Now;
dataStoreExposureX.RecieveDataFromStorage<double>(msg, out x);
dataStoreExposureY.RecieveDataFromStorage<double>(msg, out y);
dataStoreExposureV.RecieveDataFromStorage<double>(msg, out v);
Finish = DateTime.Now;
Stopwatch = Finish.Subtract(Start);
this.Log.LogMessage(String.Format("Time to Upload Exposure to model {0} milliseconds", Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);
Start = DateTime.Now;
dataStoreCatalogSm.RecieveDataFromStorage<double>(msg, out sm);
dataStoreCatalogSx.RecieveDataFromStorage<double>(msg, out sx);
dataStoreCatalogSy.RecieveDataFromStorage<double>(msg, out sy);
dataStoreCatalogSd.RecieveDataFromStorage<double>(msg, out sd);
Finish = DateTime.Now;
Stopwatch = Finish.Subtract(Start);
this.Log.LogMessage(String.Format("Time to Upload Catalog to model {0} milliseconds", Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);
List<double> losses = new List<double>();
Start = DateTime.Now;
eQModel.RunModel(x, y, v, sm, sx, sy, sd, out losses);
Finish = DateTime.Now;
Stopwatch = Finish.Subtract(Start);
this.Log.LogMessage(String.Format("Time to Run Japan Earthquake {0} milliseconds", Stopwatch.TotalMilliseconds), Stopwatch.TotalMilliseconds);
this.Log.LogMessage("=== Appending Model Log to Server Log ====");
List<LogEvent> ModelLogs = eQModel.log.GetLogs();
this.Log.Add(ModelLogs);
jobLossBlob.SendDataToStorage<double>(msg, losses);
jobLogBlob.SendDataToStorage<LogEvent>(msg, this.Log.GetLogs());
jobOutputQueue.PutMessage(msg);
jobInputQueue.DeleteMessage(id, popId);
return rc;
}
private void TraceInfo(string format, params object[] args)
{
string msg = string.Format(format, args);
Trace.WriteLine(msg, "Information");
}
private void RoleEnvironmentChanging(object sender, RoleEnvironmentChangingEventArgs e)
{
// If a configuration setting is changing
if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange))
{
// Set e.Cancel to true to restart this role instance
e.Cancel = true;
}
}
void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
{
TraceInfo("AzJobHost::UnhandledException: " + (Exception)e.ExceptionObject);
RoleEnvironment.RequestRecycle();
}
}
}