Filled in implementations. Refactored classes in DecisionService.cs. Currently server communications are blocking. This needs to be async.

This commit is contained in:
Luong Hoang 2015-01-21 17:52:20 -05:00
Родитель 0b7455c6e2
Коммит d558b4dd66
11 изменённых файлов: 474 добавлений и 194 удалений

Просмотреть файл

@ -1,6 +1,14 @@
<?xml version="1.0" encoding="utf-8" ?>
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
</startup>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-6.0.0.0" newVersion="6.0.0.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>

Просмотреть файл

@ -0,0 +1,28 @@
using System;
namespace DecisionSample
{
/// <summary>
/// Represents a collection of batching criteria.
/// </summary>
/// <remarks>
/// A batch is created whenever a criterion is met.
/// </remarks>
class BatchingConfiguration
{
/// <summary>
/// Period of time where events are grouped in one batch.
/// </summary>
public TimeSpan Duration { get; set; }
/// <summary>
/// Maximum number of events in a batch.
/// </summary>
public int EventCount { get; set; }
/// <summary>
/// Maximum size (in bytes) of a batch.
/// </summary>
public int BufferSize { get; set; }
}
}

Просмотреть файл

@ -78,6 +78,19 @@
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Reactive.Core">
<HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
</Reference>
<Reference Include="System.Reactive.Interfaces">
<HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
</Reference>
<Reference Include="System.Reactive.Linq">
<HintPath>..\packages\Rx-Linq.2.2.5\lib\net45\System.Reactive.Linq.dll</HintPath>
</Reference>
<Reference Include="System.Reactive.PlatformServices">
<HintPath>..\packages\Rx-PlatformServices.2.2.5\lib\net45\System.Reactive.PlatformServices.dll</HintPath>
</Reference>
<Reference Include="System.Spatial, Version=5.6.2.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\System.Spatial.5.6.2\lib\net40\System.Spatial.dll</HintPath>
@ -89,7 +102,14 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="BatchingConfiguration.cs" />
<Compile Include="DecisionService.cs" />
<Compile Include="DecisionServiceConfiguration.cs" />
<Compile Include="DecisionServicePolicy.cs" />
<Compile Include="DecisionServiceRecorder.cs" />
<Compile Include="Events.cs" />
<Compile Include="Explorers.cs" />
<Compile Include="Extensions.cs" />
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>

Просмотреть файл

@ -1,39 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MultiWorldTesting;
using Newtonsoft.Json;
using Microsoft.WindowsAzure.Storage;
using MultiWorldTesting;
using System;
namespace DecisionSample
{
/// <summary>
/// Configuration object for the client decision service which contains settings for batching, retry storage, etc...
/// </summary>
class DecisionServiceConfiguration<TContext>
{
public DecisionServiceConfiguration()
{
ContextJsonSerializer = x => JsonConvert.SerializeObject(x);
// Default configuration for batching
BatchConfig = new BatchingConfiguration()
{
BufferSize = 4 * 1024 * 1024,
Duration = TimeSpan.FromMinutes(1),
EventCount = 10000
};
}
public string AppId { get; set; }
public string AuthorizationToken { get; set; }
public IExploreAlgorithm<TContext> Explorer { get; set; }
public bool IsPolicyUpdatable { get; set; }
public BatchingConfiguration BatchConfig { get; set; }
public Func<TContext, string> ContextJsonSerializer { get; set; }
}
/// <summary>
/// Encapsulates logic for recorder with async server communications & policy update.
/// </summary>
@ -41,7 +10,8 @@ namespace DecisionSample
{
public DecisionService(DecisionServiceConfiguration<TContext> config)
{
recorder = new DecisionServiceRecorder<TContext>(config.BatchConfig);
recorder = new DecisionServiceRecorder<TContext>(config.BatchConfig, config.ContextJsonSerializer,
config.ExperimentalUnitDurationInSeconds, config.AuthorizationToken);
policy = new DecisionServicePolicy<TContext>();
mwt = new MwtExplorer<TContext>(config.AppId, recorder);
exploreAlgorithm = config.Explorer;
@ -50,12 +20,12 @@ namespace DecisionSample
/*ReportSimpleReward*/
public void ReportReward(float reward, string uniqueKey)
{
//recorder.ReportOutcome(outcomeJson, reward, uniqueKey);
recorder.ReportReward(reward, uniqueKey);
}
public void ReportOutcome(string outcomeJson, string uniqueKey)
{
//recorder.ReportOutcome(outcomeJson, reward, uniqueKey);
recorder.ReportOutcome(outcomeJson, uniqueKey);
}
public uint ChooseAction(string uniqueKey, TContext context)
@ -73,161 +43,4 @@ namespace DecisionSample
private DecisionServicePolicy<TContext> policy;
private MwtExplorer<TContext> mwt;
}
/// <summary>
/// Represents a collection of batching criteria.
/// </summary>
/// <remarks>
/// A batch is created whenever a criterion is met.
/// </remarks>
class BatchingConfiguration
{
/// <summary>
/// Period of time where events are grouped in one batch.
/// </summary>
public TimeSpan Duration { get; set; }
/// <summary>
/// Maximum number of events in a batch.
/// </summary>
public uint EventCount { get; set; }
/// <summary>
/// Maximum size (in bytes) of a batch.
/// </summary>
public ulong BufferSize { get; set; }
}
// TODO: rename Recorder to Logger?
// TODO: Client can tag event as interaction or observation
internal class DecisionServiceRecorder<TContext> : IRecorder<TContext>, IDisposable
{
public DecisionServiceRecorder(BatchingConfiguration batchConfig) { }
public void Record(TContext context, uint action, float probability, string uniqueKey)
{
string contextJson = JsonConvert.SerializeObject(context);
// TODO: at the time of server communication, if the client is out of memory (or meets some predefined upper bound):
// 1. It can block the execution flow.
// 2. Or drop events.
}
// TODO: should this also take a float reward?
public void ReportOutcome(string outcomeJson, float? reward, string uniqueKey)
{
// . . .
}
// Internally, background tasks can get back latest model version as a return value from the HTTP communication with Ingress worker
public void Dispose() { }
}
internal class DecisionServicePolicy<TContext> : IPolicy<TContext>, IDisposable
{
// Recorder should talk to the Policy to pass over latest model version
public uint ChooseAction(TContext context)
{
return 0;
}
public void Dispose() { }
}
/* Temp classes to support interface */
interface IExploreAlgorithm<TContext>
{
IExplorer<TContext> Get();
}
class EpsilonGreedyExplorer<TContext> : IExploreAlgorithm<TContext>
{
public EpsilonGreedyExplorer(IPolicy<TContext> policy, float epsilon, uint numActions)
{
Epsilon = epsilon;
NumActions = numActions;
Policy = policy;
}
public IPolicy<TContext> Policy { get; set; }
public float Epsilon { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
throw new NotImplementedException();
}
}
class TauFirstExplorer<TContext> : IExploreAlgorithm<TContext>
{
public TauFirstExplorer(IPolicy<TContext> policy, uint tau, uint numActions)
{
Tau = tau;
NumActions = numActions;
Policy = policy;
}
public IPolicy<TContext> Policy { get; set; }
public uint Tau { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
throw new NotImplementedException();
}
}
class BootstrapExplorer<TContext> : IExploreAlgorithm<TContext>
{
public BootstrapExplorer(IPolicy<TContext>[] policies, uint numActions)
{
NumActions = numActions;
Policies = policies;
}
public IPolicy<TContext>[] Policies { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
throw new NotImplementedException();
}
}
class SoftmaxExplorer<TContext> : IExploreAlgorithm<TContext>
{
public SoftmaxExplorer(IScorer<TContext> scorer, float lambda, uint numActions)
{
Lambda = lambda;
NumActions = numActions;
Scorer = scorer;
}
public IScorer<TContext> Scorer { get; set; }
public float Lambda { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
throw new NotImplementedException();
}
}
class GenericExplorer<TContext> : IExploreAlgorithm<TContext>
{
public GenericExplorer(IScorer<TContext> scorer, uint numActions)
{
NumActions = numActions;
Scorer = scorer;
}
public IScorer<TContext> Scorer { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
throw new NotImplementedException();
}
}
}

Просмотреть файл

@ -0,0 +1,31 @@
using Newtonsoft.Json;
using System;
namespace DecisionSample
{
/// <summary>
/// Configuration object for the client decision service which contains settings for batching, retry storage, etc...
/// </summary>
class DecisionServiceConfiguration<TContext>
{
public DecisionServiceConfiguration()
{
ContextJsonSerializer = x => JsonConvert.SerializeObject(x);
// Default configuration for batching
BatchConfig = new BatchingConfiguration()
{
BufferSize = 4 * 1024 * 1024,
Duration = TimeSpan.FromMinutes(1),
EventCount = 10000
};
}
public string AppId { get; set; }
public string AuthorizationToken { get; set; }
public IExploreAlgorithm<TContext> Explorer { get; set; }
public int ExperimentalUnitDurationInSeconds { get; set; }
public bool IsPolicyUpdatable { get; set; }
public BatchingConfiguration BatchConfig { get; set; }
public Func<TContext, string> ContextJsonSerializer { get; set; }
}
}

Просмотреть файл

@ -0,0 +1,17 @@
using MultiWorldTesting;
using System;
namespace DecisionSample
{
internal class DecisionServicePolicy<TContext> : IPolicy<TContext>, IDisposable
{
// Recorder should talk to the Policy to pass over latest model version
public uint ChooseAction(TContext context)
{
return 0;
}
public void Dispose() { }
}
}

Просмотреть файл

@ -0,0 +1,122 @@
using MultiWorldTesting;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Newtonsoft.Json;
using System.Net.Http;
using System.Net.Http.Headers;
using System.IO;
using System.Text;
using System.Threading.Tasks;
namespace DecisionSample
{
// TODO: rename Recorder to Logger?
internal class DecisionServiceRecorder<TContext> : IRecorder<TContext>, IDisposable
{
public DecisionServiceRecorder(BatchingConfiguration batchConfig,
Func<TContext, string> contextSerializer,
int experimentalUnitDurationInSeconds,
string authorizationToken)
{
this.batchConfig = batchConfig;
this.contextSerializer = contextSerializer;
this.experimentalUnitDurationInSeconds = experimentalUnitDurationInSeconds;
this.authorizationToken = authorizationToken;
this.batch = new Subject<IEvent>();
this.batch.Window(batchConfig.Duration)
.Select(w => w.Buffer(batchConfig.EventCount, batchConfig.BufferSize, ev => ev.Measure()))
.SelectMany(buffer => buffer)
.Subscribe(events => this.UploadBatch(events));
}
public void Record(TContext context, uint action, float probability, string uniqueKey)
{
this.batch.OnNext(new Interaction {
ID = uniqueKey,
Action = (int)action,
Probability = probability,
Context = this.contextSerializer(context)
});
}
public void ReportReward(float reward, string uniqueKey)
{
this.batch.OnNext(new Observation {
ID = uniqueKey,
Value = reward.ToString()
});
}
public void ReportOutcome(string outcomeJson, string uniqueKey)
{
this.batch.OnNext(new Observation {
ID = uniqueKey,
Value = outcomeJson
});
}
// TODO: at the time of server communication, if the client is out of memory (or meets some predefined upper bound):
// 1. It can block the execution flow.
// 2. Or drop events.
private void UploadBatch(IList<IEvent> events)
{
using (var client = new HttpClient())
{
client.BaseAddress = new Uri(this.ServiceAddress);
client.Timeout = TimeSpan.FromSeconds(this.ConnectionTimeOutInSeconds);
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(this.AuthenticationScheme, this.authorizationToken);
using (var jsonMemStream = new MemoryStream())
using (var jsonWriter = new JsonTextWriter(new StreamWriter(jsonMemStream)))
{
JsonSerializer ser = new JsonSerializer();
ser.Serialize(jsonWriter, new EventBatch
{
Events = events,
ExperimentalUnitDurationInSeconds = this.experimentalUnitDurationInSeconds
});
jsonWriter.Flush();
jsonMemStream.Position = 0;
Task<HttpResponseMessage> taskPost = client.PostAsync(this.ServicePostAddress, new StreamContent(jsonMemStream));
taskPost.Wait();
HttpResponseMessage response = taskPost.Result;
if (!response.IsSuccessStatusCode)
{
Task<string> taskReadResponse = response.Content.ReadAsStringAsync();
taskReadResponse.Wait();
string responseMessage = taskReadResponse.Result;
// TODO: throw exception with custom message?
}
}
}
}
// Internally, background tasks can get back latest model version as a return value from the HTTP communication with Ingress worker
public void Dispose() { }
#region Members
private BatchingConfiguration batchConfig;
private Func<TContext, string> contextSerializer;
private Subject<IEvent> batch;
private int experimentalUnitDurationInSeconds;
private string authorizationToken;
#endregion
#region Constants
//private readonly string ServiceAddress = "http://decisionservice.cloudapp.net";
private readonly string ServiceAddress = "http://localhost:1362";
private readonly string ServicePostAddress = "/DecisionService.svc/PostExperimentalUnits";
private readonly int ConnectionTimeOutInSeconds = 60;
private readonly string AuthenticationScheme = "Bearer";
#endregion
}
}

90
DecisionSample/Events.cs Normal file
Просмотреть файл

@ -0,0 +1,90 @@
using Newtonsoft.Json;
using System.Collections.Generic;
using System.Text;
namespace DecisionSample
{
public enum EventType
{
Interaction = 0,
Observation
}
public interface IEvent
{
[JsonProperty(PropertyName = "t")]
EventType Type { get; }
[JsonProperty(PropertyName = "i")]
string ID { get; set; }
int Measure();
}
public class Interaction : IEvent
{
public EventType Type
{
get
{
return EventType.Interaction;
}
}
public string ID { get; set; }
[JsonProperty(PropertyName = "a")]
public int Action { get; set; }
[JsonProperty(PropertyName = "p")]
public double Probability { get; set; }
[JsonProperty(PropertyName = "c")]
public string Context { get; set; }
public int Measure()
{
// TODO: Reflection? potential perf hit
return
sizeof(EventType) +
sizeof(int) +
sizeof(double) +
Encoding.Unicode.GetByteCount(ID) +
Encoding.Unicode.GetByteCount(Context);
}
}
public class Observation : IEvent
{
public EventType Type
{
get
{
return EventType.Observation;
}
}
public string ID { get; set; }
[JsonProperty(PropertyName = "v")]
public string Value { get; set; }
public int Measure()
{
// TODO: Reflection? potential perf hit
return
sizeof(EventType) +
Encoding.Unicode.GetByteCount(ID) +
Encoding.Unicode.GetByteCount(Value);
}
}
public class EventBatch
{
[JsonProperty(PropertyName = "e")]
public IList<IEvent> Events { get; set; }
[JsonProperty(PropertyName = "d")]
public long ExperimentalUnitDurationInSeconds { get; set; }
}
}

102
DecisionSample/Explorers.cs Normal file
Просмотреть файл

@ -0,0 +1,102 @@
using MultiWorldTesting;
using System;
namespace DecisionSample
{
/* Temp classes to support interface */
interface IExploreAlgorithm<TContext>
{
IExplorer<TContext> Get();
}
class EpsilonGreedyExplorer<TContext> : IExploreAlgorithm<TContext>
{
public EpsilonGreedyExplorer(IPolicy<TContext> policy, float epsilon, uint numActions)
{
Epsilon = epsilon;
NumActions = numActions;
Policy = policy;
}
public IPolicy<TContext> Policy { get; set; }
public float Epsilon { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
return new MultiWorldTesting.EpsilonGreedyExplorer<TContext>(Policy, Epsilon, NumActions);
}
}
class TauFirstExplorer<TContext> : IExploreAlgorithm<TContext>
{
public TauFirstExplorer(IPolicy<TContext> policy, uint tau, uint numActions)
{
Tau = tau;
NumActions = numActions;
Policy = policy;
}
public IPolicy<TContext> Policy { get; set; }
public uint Tau { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
return new MultiWorldTesting.TauFirstExplorer<TContext>(Policy, Tau, NumActions);
}
}
class BootstrapExplorer<TContext> : IExploreAlgorithm<TContext>
{
public BootstrapExplorer(IPolicy<TContext>[] policies, uint numActions)
{
NumActions = numActions;
Policies = policies;
}
public IPolicy<TContext>[] Policies { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
return new MultiWorldTesting.BootstrapExplorer<TContext>(Policies, NumActions);
}
}
class SoftmaxExplorer<TContext> : IExploreAlgorithm<TContext>
{
public SoftmaxExplorer(IScorer<TContext> scorer, float lambda, uint numActions)
{
Lambda = lambda;
NumActions = numActions;
Scorer = scorer;
}
public IScorer<TContext> Scorer { get; set; }
public float Lambda { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
return new MultiWorldTesting.SoftmaxExplorer<TContext>(Scorer, Lambda, NumActions);
}
}
class GenericExplorer<TContext> : IExploreAlgorithm<TContext>
{
public GenericExplorer(IScorer<TContext> scorer, uint numActions)
{
NumActions = numActions;
Scorer = scorer;
}
public IScorer<TContext> Scorer { get; set; }
public uint NumActions { get; set; }
public IExplorer<TContext> Get()
{
return new MultiWorldTesting.GenericExplorer<TContext>(Scorer, NumActions);
}
}
}

Просмотреть файл

@ -0,0 +1,44 @@
using System;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Subjects;
namespace DecisionSample
{
public static class ExtensionMethods
{
public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int maxCount, int maxSize, Func<T, int> measure)
{
var subject = new Subject<IList<T>>();
var state = new List<T>();
var size = 0;
// TODO: handle IDisposable
source.Subscribe(Observer.Create<T>(
onNext: v =>
{
size += measure(v);
state.Add(v);
if (size >= maxSize || state.Count >= maxCount)
{
subject.OnNext(state);
state = new List<T>();
size = 0;
}
},
onError: e => subject.OnError(e),
onCompleted: () =>
{
if (state.Count > 0)
{
subject.OnNext(state);
}
subject.OnCompleted();
}));
return subject;
}
}
}

Просмотреть файл

@ -5,6 +5,11 @@
<package id="Microsoft.Data.Services.Client" version="5.6.2" targetFramework="net45" />
<package id="Microsoft.WindowsAzure.ConfigurationManager" version="1.8.0.0" targetFramework="net45" />
<package id="Newtonsoft.Json" version="6.0.7" targetFramework="net45" />
<package id="Rx-Core" version="2.2.5" targetFramework="net45" />
<package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
<package id="Rx-Linq" version="2.2.5" targetFramework="net45" />
<package id="Rx-Main" version="2.2.5" targetFramework="net45" />
<package id="Rx-PlatformServices" version="2.2.5" targetFramework="net45" />
<package id="System.Spatial" version="5.6.2" targetFramework="net45" />
<package id="WindowsAzure.Storage" version="4.3.0" targetFramework="net45" />
</packages>