- Put things in namespaces
- Use 3.0 template
- Use IAsyncEnumerable
This commit is contained in:
David Fowler 2019-03-10 22:19:26 -07:00 коммит произвёл GitHub
Родитель d5c4ac0451
Коммит ded6a1da64
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 107 добавлений и 121 удалений

Просмотреть файл

@ -5,39 +5,42 @@ using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
public class StreamHub : Hub
namespace StreamR
{
private readonly StreamManager _streamManager;
public StreamHub(StreamManager streamManager)
public class StreamHub : Hub
{
_streamManager = streamManager;
}
private readonly StreamManager _streamManager;
public List<string> ListStreams()
{
return _streamManager.ListStreams();
}
public ChannelReader<string> WatchStream(string streamName, CancellationToken token)
{
return _streamManager.Subscribe(streamName, token);
}
public async Task StartStream(string streamName, ChannelReader<string> streamContent)
{
try
public StreamHub(StreamManager streamManager)
{
var streamTask = _streamManager.RunStreamAsync(streamName, streamContent);
// Tell everyone about your stream!
await Clients.Others.SendAsync("NewStream", streamName);
await streamTask;
_streamManager = streamManager;
}
finally
public List<string> ListStreams()
{
await Clients.Others.SendAsync("RemoveStream", streamName);
return _streamManager.ListStreams();
}
public IAsyncEnumerable<string> WatchStream(string streamName, CancellationToken cancellationToken)
{
return _streamManager.Subscribe(streamName, cancellationToken);
}
public async Task StartStream(string streamName, ChannelReader<string> streamContent)
{
try
{
var streamTask = _streamManager.RunStreamAsync(streamName, streamContent);
// Tell everyone about your stream!
await Clients.Others.SendAsync("NewStream", streamName);
await streamTask;
}
finally
{
await Clients.Others.SendAsync("RemoveStream", streamName);
}
}
}
}

Просмотреть файл

@ -1,12 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;
namespace StreamR
{
@ -17,8 +10,11 @@ namespace StreamR
CreateWebHostBuilder(args).Build().Run();
}
public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>();
public static IHostBuilder CreateWebHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(builder =>
{
builder.UseStartup<Startup>();
});
}
}

Просмотреть файл

@ -1,14 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace StreamR
{
@ -30,22 +26,17 @@ namespace StreamR
options.CheckConsentNeeded = context => true;
options.MinimumSameSitePolicy = SameSiteMode.None;
});
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_3_0);
services.AddSignalR(o =>
{
o.EnableDetailedErrors = true;
});
services.AddSignalR();
services.AddSingleton<StreamManager>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
public void Configure(IApplicationBuilder app, IWebHostEnvironment environment)
{
if (env.IsDevelopment())
if (environment.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}

Просмотреть файл

@ -5,41 +5,36 @@ using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
public class StreamManager
namespace StreamR
{
private readonly ConcurrentDictionary<string, StreamHolder> _streams;
private long _globalClientId;
public StreamManager()
public class StreamManager
{
_streams = new ConcurrentDictionary<string, StreamHolder>();
}
private readonly ConcurrentDictionary<string, StreamHolder> _streams = new ConcurrentDictionary<string, StreamHolder>();
private long _globalClientId;
public List<string> ListStreams()
{
var streamList = new List<string>();
foreach (var item in _streams)
public List<string> ListStreams()
{
streamList.Add(item.Key);
}
return streamList;
}
public async Task RunStreamAsync(string streamName, ChannelReader<string> stream)
{
var streamHolder = new StreamHolder() { Source = stream };
// Add before yielding
// This fixes a race where we tell clients a new stream arrives before adding the stream
_streams.TryAdd(streamName, streamHolder);
await Task.Yield();
try
{
while (await stream.WaitToReadAsync())
var streamList = new List<string>();
foreach (var item in _streams)
{
while (stream.TryRead(out var item))
streamList.Add(item.Key);
}
return streamList;
}
public async Task RunStreamAsync(string streamName, ChannelReader<string> stream)
{
var streamHolder = new StreamHolder() { Source = stream };
// Add before yielding
// This fixes a race where we tell clients a new stream arrives before adding the stream
_streams.TryAdd(streamName, streamHolder);
await Task.Yield();
try
{
await foreach (var item in stream.ReadAllAsync())
{
foreach (var viewer in streamHolder.Viewers)
{
@ -51,49 +46,50 @@ public class StreamManager
}
}
}
}
finally
{
RemoveStream(streamName);
}
}
public void RemoveStream(string streamName)
{
_streams.TryRemove(streamName, out var streamHolder);
foreach (var viewer in streamHolder.Viewers)
{
viewer.Value.Writer.TryComplete();
}
}
public ChannelReader<string> Subscribe(string streamName, CancellationToken token)
{
if (!_streams.TryGetValue(streamName, out var source))
{
throw new HubException("stream doesn't exist");
finally
{
RemoveStream(streamName);
}
}
var id = Interlocked.Increment(ref _globalClientId);
var channel = Channel.CreateBounded<string>(options: new BoundedChannelOptions(2) {
FullMode = BoundedChannelFullMode.DropOldest
});
source.Viewers.TryAdd(id, channel);
// Register for client closing stream, this token will always fire (handled by SignalR)
token.Register(() =>
public void RemoveStream(string streamName)
{
source.Viewers.TryRemove(id, out _);
});
_streams.TryRemove(streamName, out var streamHolder);
foreach (var viewer in streamHolder.Viewers)
{
viewer.Value.Writer.TryComplete();
}
}
return channel.Reader;
}
public IAsyncEnumerable<string> Subscribe(string streamName, CancellationToken cancellationToken)
{
if (!_streams.TryGetValue(streamName, out var source))
{
throw new HubException("stream doesn't exist");
}
private class StreamHolder
{
public ChannelReader<string> Source;
public ConcurrentDictionary<long, Channel<string>> Viewers = new ConcurrentDictionary<long, Channel<string>>();
var id = Interlocked.Increment(ref _globalClientId);
var channel = Channel.CreateBounded<string>(options: new BoundedChannelOptions(2)
{
FullMode = BoundedChannelFullMode.DropOldest
});
source.Viewers.TryAdd(id, channel);
// Register for client closing stream, this token will always fire (handled by SignalR)
cancellationToken.Register(() =>
{
source.Viewers.TryRemove(id, out _);
});
return channel.Reader.ReadAllAsync();
}
private class StreamHolder
{
public ChannelReader<string> Source;
public ConcurrentDictionary<long, Channel<string>> Viewers = new ConcurrentDictionary<long, Channel<string>>();
}
}
}