Add channels to the EventAggregator

This commit is contained in:
Antony Male 2014-06-17 17:44:35 +01:00
parent ad4aa28b64
commit 6f63c5cc2d
2 changed files with 200 additions and 38 deletions

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
@ -37,20 +38,23 @@ namespace Stylet
/// Register an instance as wanting to receive events. Implement IHandle{T} for each event type you want to receive.
/// </summary>
/// <param name="handler">Instance that will be registered with the EventAggregator</param>
void Subscribe(IHandle handler);
/// <param name="channels">Channel(s) which should be subscribed to. Defaults to EventAggregator.DefaultChannel if none given</param>
void Subscribe(IHandle handler, params string[] channels);
/// <summary>
/// Unregister as wanting to receive events. The instance will no longer receive events after this is called.
/// </summary>
/// <param name="handler">Instance to unregister</param>
void Unsubscribe(IHandle handler);
/// <param name="channels">Channel(s) to unsubscribe from. Unsubscribes from everything if no channels given</param>
void Unsubscribe(IHandle handler, params string[] channels);
/// <summary>
/// Publish an event to all subscribers, using the specified dispatcher
/// </summary>
/// <param name="message">Event to publish</param>
/// <param name="dispatcher">Dispatcher to use to call each subscriber's handle method(s)</param>
void PublishWithDispatcher(object message, Action<Action> dispatcher);
/// <param name="channels">Channel(s) to publish the message to. Defaults to EventAggregator.DefaultChannel none given</param>
void PublishWithDispatcher(object message, Action<Action> dispatcher, params string[] channels);
}
/// <summary>
@ -58,6 +62,11 @@ namespace Stylet
/// </summary>
public class EventAggregator : IEventAggregator
{
/// <summary>
/// Channel which handlers are subscribed to / messages are published to, if no channels are named
/// </summary>
public static readonly string DefaultChannel = "DefaultChannel";
private readonly List<Handler> handlers = new List<Handler>();
private readonly object handlersLock = new object();
@ -65,15 +74,17 @@ namespace Stylet
/// Register an instance as wanting to receive events. Implement IHandle{T} for each event type you want to receive.
/// </summary>
/// <param name="handler">Instance that will be registered with the EventAggregator</param>
public void Subscribe(IHandle handler)
/// <param name="channels">Channel(s) which should be subscribed to. Defaults to EventAggregator.DefaultChannel if none given</param>
public void Subscribe(IHandle handler, params string[] channels)
{
lock (this.handlersLock)
{
// Is it already subscribed?
if (this.handlers.Any(x => x.IsHandlerForInstance(handler)))
return;
this.handlers.Add(new Handler(handler));
var subscribed = this.handlers.FirstOrDefault(x => x.IsHandlerForInstance(handler));
if (subscribed == null)
this.handlers.Add(new Handler(handler, channels)); // Adds default topic if appropriate
else
subscribed.SubscribeToChannels(channels);
}
}
@ -81,14 +92,19 @@ namespace Stylet
/// Unregister as wanting to receive events. The instance will no longer receive events after this is called.
/// </summary>
/// <param name="handler">Instance to unregister</param>
public void Unsubscribe(IHandle handler)
/// <param name="channels">Channel(s) to unsubscribe from. Unsubscribes from everything if no channels given</param>
public void Unsubscribe(IHandle handler, params string[] channels)
{
lock (this.handlersLock)
{
var existingHandler = this.handlers.FirstOrDefault(x => x.IsHandlerForInstance(handler));
if (existingHandler != null)
{
if (existingHandler.UnsubscribeFromChannels(channels)) // Handles default topic appropriately
this.handlers.Remove(existingHandler);
}
}
}
/// <summary>
@ -96,12 +112,13 @@ namespace Stylet
/// </summary>
/// <param name="message">Event to publish</param>
/// <param name="dispatcher">Dispatcher to use to call each subscriber's handle method(s)</param>
public void PublishWithDispatcher(object message, Action<Action> dispatcher)
/// <param name="channels">Channel(s) to publish the message to. Defaults to EventAggregator.DefaultChannel none given</param>
public void PublishWithDispatcher(object message, Action<Action> dispatcher, params string[] channels)
{
lock (this.handlersLock)
{
var messageType = message.GetType();
var deadHandlers = this.handlers.Where(x => !x.Handle(messageType, message, dispatcher)).ToArray();
var deadHandlers = this.handlers.Where(x => !x.Handle(messageType, message, dispatcher, channels)).ToArray();
foreach (var deadHandler in deadHandlers)
{
this.handlers.Remove(deadHandler);
@ -113,8 +130,9 @@ namespace Stylet
{
private readonly WeakReference target;
private readonly List<HandlerInvoker> invokers = new List<HandlerInvoker>();
private HashSet<string> channels = new HashSet<string>();
public Handler(object handler)
public Handler(object handler, string[] channels)
{
var handlerType = handler.GetType();
this.target = new WeakReference(handler);
@ -124,6 +142,10 @@ namespace Stylet
var messageType = implementation.GetGenericArguments()[0];
this.invokers.Add(new HandlerInvoker(handlerType, messageType, implementation.GetMethod("Handle")));
}
if (channels.Length == 0)
channels = new[] { EventAggregator.DefaultChannel };
this.SubscribeToChannels(channels);
}
public bool IsHandlerForInstance(object subscriber)
@ -131,12 +153,33 @@ namespace Stylet
return this.target.Target == subscriber;
}
public bool Handle(Type messageType, object message, Action<Action> dispatcher)
public void SubscribeToChannels(string[] channels)
{
this.channels.UnionWith(channels);
}
public bool UnsubscribeFromChannels(string[] channels)
{
// If channels is empty, unsubscribe from everything
if (channels.Length == 0)
return true;
this.channels.ExceptWith(channels);
return this.channels.Count == 0;
}
public bool Handle(Type messageType, object message, Action<Action> dispatcher, string[] channels)
{
var target = this.target.Target;
if (target == null)
return false;
if (channels.Length == 0)
channels = new[] { EventAggregator.DefaultChannel };
// We're not subscribed to any of the channels
if (!channels.All(x => this.channels.Contains(x)))
return true;
foreach (var invoker in this.invokers)
{
invoker.Invoke(target, messageType, message, dispatcher);
@ -180,9 +223,10 @@ namespace Stylet
/// </summary>
/// <param name="eventAggregator">EventAggregator to publish the message with</param>
/// <param name="message">Event to publish</param>
public static void PublishOnUIThread(this IEventAggregator eventAggregator, object message)
/// <param name="channels">Channel(s) to publish the message to. Defaults to EventAggregator.DefaultChannel none given</param>
public static void PublishOnUIThread(this IEventAggregator eventAggregator, object message, params string[] channels)
{
eventAggregator.PublishWithDispatcher(message, Execute.OnUIThread);
eventAggregator.PublishWithDispatcher(message, Execute.OnUIThread, channels);
}
/// <summary>
@ -190,9 +234,10 @@ namespace Stylet
/// </summary>
/// <param name="eventAggregator">EventAggregator to publish the message with</param>
/// <param name="message">Event to publish</param>
public static void Publish(this IEventAggregator eventAggregator, object message)
/// <param name="channels">Channel(s) to publish the message to. Defaults to EventAggregator.DefaultChannel none given</param>
public static void Publish(this IEventAggregator eventAggregator, object message, params string[] channels)
{
eventAggregator.PublishWithDispatcher(message, a => a());
eventAggregator.PublishWithDispatcher(message, a => a(), channels);
}
}
}

View File

@ -34,21 +34,28 @@ namespace StyletUnitTests
public void Handle(M1 message) { throw new Exception("Should not be called. Ever"); }
}
private EventAggregator ea;
[TestFixtureSetUp]
public void SetUpFixture()
{
Execute.TestExecuteSynchronously = true;
}
[SetUp]
public void SetUp()
{
this.ea = new EventAggregator();
}
[Test]
public void SubscribesAndDeliversExactMessage()
{
var ea = new EventAggregator();
var target = new C1();
ea.Subscribe(target);
this.ea.Subscribe(target);
var message = new M1();
ea.Publish(message);
this.ea.Publish(message);
Assert.AreEqual(message, target.ReceivedMessage);
}
@ -56,12 +63,11 @@ namespace StyletUnitTests
[Test]
public void DeliversToAllHandlersIncludingDerived()
{
var ea = new EventAggregator();
var target = new C2();
ea.Subscribe(target);
this.ea.Subscribe(target);
var message = new M2();
ea.Publish(message);
this.ea.Publish(message);
Assert.AreEqual(message, target.ReceivedM1);
Assert.AreEqual(message, target.ReceivedM2);
@ -70,13 +76,12 @@ namespace StyletUnitTests
[Test]
public void UnsubscribeUnsubscribes()
{
var ea = new EventAggregator();
var target = new C1();
ea.Subscribe(target);
ea.Unsubscribe(target);
this.ea.Subscribe(target);
this.ea.Unsubscribe(target);
var message = new M1();
ea.Publish(message);
this.ea.Publish(message);
Assert.IsNull(target.ReceivedMessage);
}
@ -84,29 +89,27 @@ namespace StyletUnitTests
[Test]
public void TargetReferenceIsWeak()
{
var ea = new EventAggregator();
var target = new C3();
var weaktarget = new WeakReference(target);
ea.Subscribe(target);
this.ea.Subscribe(target);
// Ugly, but it's the only way to test a WeakReference...
target = null;
GC.Collect();
Assert.DoesNotThrow(() => ea.Publish(new M1()));
Assert.DoesNotThrow(() => this.ea.Publish(new M1()));
Assert.IsNull(weaktarget.Target);
}
[Test]
public void SubscribingTwiceDoesNothing()
{
var ea = new EventAggregator();
var target = new C1();
ea.Subscribe(target);
ea.Subscribe(target);
this.ea.Subscribe(target);
this.ea.Subscribe(target);
var message = new M1();
ea.Publish(message);
this.ea.Publish(message);
Assert.AreEqual(1, target.ReceivedMessageCount);
}
@ -114,14 +117,128 @@ namespace StyletUnitTests
[Test]
public void PublishOnUIThreadPublishedOnUIThread()
{
var ea = new EventAggregator();
var target = new C1();
ea.Subscribe(target);
this.ea.Subscribe(target);
var message = new M1();
ea.PublishOnUIThread(message);
this.ea.PublishOnUIThread(message);
Assert.AreEqual(message, target.ReceivedMessage);
}
[Test]
public void TargetReceivesMessagesOnTopicsItIsSubscribedTo()
{
var target = new C1();
this.ea.Subscribe(target, "C1", "C2");
var message = new M1();
this.ea.Publish(message, "C1");
this.ea.Publish(message, "C2");
Assert.AreEqual(2, target.ReceivedMessageCount);
}
[Test]
public void TargetDoesNotReceiveMessagesOnTopicsItIsNotSubscribedTo()
{
var target = new C1();
this.ea.Subscribe(target, "C1");
var message = new M1();
this.ea.Publish(message, "C2");
Assert.AreEqual(0, target.ReceivedMessageCount);
}
[Test]
public void SubscribesToDefaultChannelByDefault()
{
var target = new C1();
this.ea.Subscribe(target);
var message = new M1();
this.ea.Publish(message, EventAggregator.DefaultChannel);
Assert.AreEqual(message, target.ReceivedMessage);
}
[Test]
public void DoesNotSubscribeToDefaultChannelIfAChannelIsGiven()
{
var target = new C1();
this.ea.Subscribe(target, "C1");
var message = new M1();
this.ea.Publish(message);
Assert.AreEqual(0, target.ReceivedMessageCount);
}
[Test]
public void DoesNotPublishToDefaultChannelIfAChannelIsGiven()
{
var target = new C1();
this.ea.Subscribe(target);
var message = new M1();
this.ea.Publish(message, "C1");
Assert.AreEqual(0, target.ReceivedMessageCount);
}
[Test]
public void AdditionalChannelsCanBeSubscribedTo()
{
var target = new C1();
this.ea.Subscribe(target, "C1");
this.ea.Subscribe(target, "C2");
var message = new M1();
this.ea.Publish(message, "C1");
this.ea.Publish(message, "C2");
Assert.AreEqual(2, target.ReceivedMessageCount);
}
[Test]
public void IndividualChannelsCanBeUnsubscribedFrom()
{
var target = new C1();
this.ea.Subscribe(target, "C1", "C2");
this.ea.Unsubscribe(target, "C1");
var message = new M1();
this.ea.Publish(message, "C1");
Assert.AreEqual(0, target.ReceivedMessageCount);
this.ea.Publish(message, "C2");
Assert.AreEqual(1, target.ReceivedMessageCount);
}
[Test]
public void UnsubscribeUnsubscribesFromEverythingIfNoChannelsGiven()
{
var target = new C1();
this.ea.Subscribe(target, "C1", "C2");
this.ea.Unsubscribe(target);
var message = new M1();
this.ea.Publish(message, "C1", "C2");
Assert.AreEqual(0, target.ReceivedMessageCount);
}
[Test]
public void MessagePublishedToMultipleChannelsGetsDeliveredOnce()
{
var target = new C1();
this.ea.Subscribe(target, "C1", "C2");
var message = new M1();
this.ea.Publish(message, "C1", "C2");
Assert.AreEqual(1, target.ReceivedMessageCount);
}
}
}