diff --git a/Hubs/StreamHub.cs b/Hubs/StreamHub.cs index 3a0ab5e..5a28374 100644 --- a/Hubs/StreamHub.cs +++ b/Hubs/StreamHub.cs @@ -5,39 +5,42 @@ using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; -public class StreamHub : Hub +namespace StreamR { - private readonly StreamManager _streamManager; - - public StreamHub(StreamManager streamManager) + public class StreamHub : Hub { - _streamManager = streamManager; - } + private readonly StreamManager _streamManager; - public List ListStreams() - { - return _streamManager.ListStreams(); - } - - public ChannelReader WatchStream(string streamName, CancellationToken token) - { - return _streamManager.Subscribe(streamName, token); - } - - public async Task StartStream(string streamName, ChannelReader streamContent) - { - try + public StreamHub(StreamManager streamManager) { - var streamTask = _streamManager.RunStreamAsync(streamName, streamContent); - - // Tell everyone about your stream! - await Clients.Others.SendAsync("NewStream", streamName); - - await streamTask; + _streamManager = streamManager; } - finally + + public List ListStreams() { - await Clients.Others.SendAsync("RemoveStream", streamName); + return _streamManager.ListStreams(); + } + + public IAsyncEnumerable WatchStream(string streamName, CancellationToken cancellationToken) + { + return _streamManager.Subscribe(streamName, cancellationToken); + } + + public async Task StartStream(string streamName, ChannelReader streamContent) + { + try + { + var streamTask = _streamManager.RunStreamAsync(streamName, streamContent); + + // Tell everyone about your stream! + await Clients.Others.SendAsync("NewStream", streamName); + + await streamTask; + } + finally + { + await Clients.Others.SendAsync("RemoveStream", streamName); + } } } } \ No newline at end of file diff --git a/Program.cs b/Program.cs index 281e54d..3a4cb00 100644 --- a/Program.cs +++ b/Program.cs @@ -1,12 +1,5 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.AspNetCore; -using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Logging; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Hosting; namespace StreamR { @@ -17,8 +10,11 @@ namespace StreamR CreateWebHostBuilder(args).Build().Run(); } - public static IWebHostBuilder CreateWebHostBuilder(string[] args) => - WebHost.CreateDefaultBuilder(args) - .UseStartup(); + public static IHostBuilder CreateWebHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .ConfigureWebHostDefaults(builder => + { + builder.UseStartup(); + }); } } diff --git a/Startup.cs b/Startup.cs index a6da335..5c7a1fa 100644 --- a/Startup.cs +++ b/Startup.cs @@ -1,14 +1,10 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; -using Microsoft.AspNetCore.HttpsPolicy; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; namespace StreamR { @@ -30,22 +26,17 @@ namespace StreamR options.CheckConsentNeeded = context => true; options.MinimumSameSitePolicy = SameSiteMode.None; }); - - services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_3_0); - services.AddSignalR(o => - { - o.EnableDetailedErrors = true; - }); + services.AddSignalR(); services.AddSingleton(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. - public void Configure(IApplicationBuilder app, IHostingEnvironment env) + public void Configure(IApplicationBuilder app, IWebHostEnvironment environment) { - if (env.IsDevelopment()) + if (environment.IsDevelopment()) { app.UseDeveloperExceptionPage(); } diff --git a/StreamManager.cs b/StreamManager.cs index db406fb..cd02014 100644 --- a/StreamManager.cs +++ b/StreamManager.cs @@ -5,41 +5,36 @@ using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; -public class StreamManager +namespace StreamR { - private readonly ConcurrentDictionary _streams; - private long _globalClientId; - - public StreamManager() + public class StreamManager { - _streams = new ConcurrentDictionary(); - } + private readonly ConcurrentDictionary _streams = new ConcurrentDictionary(); + private long _globalClientId; - public List ListStreams() - { - var streamList = new List(); - foreach (var item in _streams) + public List ListStreams() { - streamList.Add(item.Key); - } - return streamList; - } - - public async Task RunStreamAsync(string streamName, ChannelReader stream) - { - var streamHolder = new StreamHolder() { Source = stream }; - - // Add before yielding - // This fixes a race where we tell clients a new stream arrives before adding the stream - _streams.TryAdd(streamName, streamHolder); - - await Task.Yield(); - - try - { - while (await stream.WaitToReadAsync()) + var streamList = new List(); + foreach (var item in _streams) { - while (stream.TryRead(out var item)) + streamList.Add(item.Key); + } + return streamList; + } + + public async Task RunStreamAsync(string streamName, ChannelReader stream) + { + var streamHolder = new StreamHolder() { Source = stream }; + + // Add before yielding + // This fixes a race where we tell clients a new stream arrives before adding the stream + _streams.TryAdd(streamName, streamHolder); + + await Task.Yield(); + + try + { + await foreach (var item in stream.ReadAllAsync()) { foreach (var viewer in streamHolder.Viewers) { @@ -51,49 +46,50 @@ public class StreamManager } } } - } - finally - { - RemoveStream(streamName); - } - } - - public void RemoveStream(string streamName) - { - _streams.TryRemove(streamName, out var streamHolder); - foreach (var viewer in streamHolder.Viewers) - { - viewer.Value.Writer.TryComplete(); - } - } - - public ChannelReader Subscribe(string streamName, CancellationToken token) - { - if (!_streams.TryGetValue(streamName, out var source)) - { - throw new HubException("stream doesn't exist"); + finally + { + RemoveStream(streamName); + } } - var id = Interlocked.Increment(ref _globalClientId); - - var channel = Channel.CreateBounded(options: new BoundedChannelOptions(2) { - FullMode = BoundedChannelFullMode.DropOldest - }); - - source.Viewers.TryAdd(id, channel); - - // Register for client closing stream, this token will always fire (handled by SignalR) - token.Register(() => + public void RemoveStream(string streamName) { - source.Viewers.TryRemove(id, out _); - }); + _streams.TryRemove(streamName, out var streamHolder); + foreach (var viewer in streamHolder.Viewers) + { + viewer.Value.Writer.TryComplete(); + } + } - return channel.Reader; - } + public IAsyncEnumerable Subscribe(string streamName, CancellationToken cancellationToken) + { + if (!_streams.TryGetValue(streamName, out var source)) + { + throw new HubException("stream doesn't exist"); + } - private class StreamHolder - { - public ChannelReader Source; - public ConcurrentDictionary> Viewers = new ConcurrentDictionary>(); + var id = Interlocked.Increment(ref _globalClientId); + + var channel = Channel.CreateBounded(options: new BoundedChannelOptions(2) + { + FullMode = BoundedChannelFullMode.DropOldest + }); + + source.Viewers.TryAdd(id, channel); + + // Register for client closing stream, this token will always fire (handled by SignalR) + cancellationToken.Register(() => + { + source.Viewers.TryRemove(id, out _); + }); + + return channel.Reader.ReadAllAsync(); + } + + private class StreamHolder + { + public ChannelReader Source; + public ConcurrentDictionary> Viewers = new ConcurrentDictionary>(); + } } } \ No newline at end of file