Hello There, Guest!
View New Posts  |  View Today's Posts
[C#] Basic Generic Publisher/Subscriber Implementation

  • 0 Vote(s) - 0 Average


11-19-2017, 01:56 AM #1
AceInfinity
Developer
*******
Administrators
Posts: 9,733 Threads:1,026 Joined: Jun 2011 Reputation: 76

Basic Generic Publisher/Subscriber Implementation
I had to write a basic but re-usable implementation of a pubsub methodology, since I didn't have access to Rx for .NET or anything like that, so I came up with some basic interfaces to achieve this and wrote up a quick demo before I rolled it into my production module that I was working on for work...

I'll share the test code here, with some undefined types-but they should be obvious as to what they do.

Code:
#region [ Publisher / Subscriber API ]
    
/// <summary>
/// Class for publisher event args.
/// </summary>
/// <typeparam name="T">Base type of the subscriber</typeparam>
public abstract class PublisherEventArgs<T>
{
    public T Sender { get; private set; }
    
    protected PublisherEventArgs(T sender)
    {
        Sender = sender;
    }
}

/// <summary>
/// Interface for a publisher.
/// </summary>
/// <typeparam name="T">Publisher internal type to receive notifications</typeparam>
internal interface IPublisher<T>
{
    LinkedList<ISubscriber<T>> Subscribers { get; }
    void Subscribe(ISubscriber<T> subscriber);
    void Unsubscribe(ISubscriber<T> subscriber);
    void Notify(PublisherEventArgs<T> args);
}

/// <summary>
/// Interface for a subscriber.
/// </summary>
/// <typeparam name="T">Subscriber internal type to receive notifications</typeparam>
internal interface ISubscriber<T>
{
    Guid Id { get; }
    void ReceivedMessage(PublisherEventArgs<T> args);
    void SubscribeTo(IPublisher<T> publisher);
    void UnsubscribeFrom(IPublisher<T> publisher);
}

/// <summary>
/// Class which handles most of the fundamental subscriber base functionality.
/// </summary>
/// <typeparam name="T"></typeparam>
internal abstract class SubscriberBase<T> : ISubscriber<T>
{
    public Guid Id { get; private set; }

    protected SubscriberBase()
    {
        // Generate a new unique identifier for this instance
        Id = Guid.NewGuid();
    }
    
    public abstract void ReceivedMessage(PublisherEventArgs<T> args);

    /// <summary>
    /// Allows this subscriber to subscribe to the specified publisher.
    /// </summary>
    /// <param name="publisher">Publisher to subscribe to</param>
    public void SubscribeTo(IPublisher<T> publisher)
    {
        publisher.Subscribe(this);
    }

    /// <summary>
    /// Allows this subscriber to unsubscribe from the specified publisher.
    /// </summary>
    /// <param name="publisher">Publisher to unsubscribe from</param>
    public void UnsubscribeFrom(IPublisher<T> publisher)
    {
        publisher.Unsubscribe(this);
    }
    
    protected bool Equals(ISubscriber<T> other)
    {
        return Id.Equals(other.Id);
    }

    public override bool Equals(object obj)
    {
        if (ReferenceEquals(null, obj)) return false;
        if (ReferenceEquals(this, obj)) return true;
        return obj.GetType() == GetType()
               && Equals((ISubscriber<T>) obj);
    }

    public override int GetHashCode()
    {
        return Id.GetHashCode();
    }
}

#endregion

/// <summary>
/// Class to represent published module event arguments.
/// </summary>
internal class ModuleNotificationEventArgs : PublisherEventArgs<ModuleInstance>
{
    public bool NotifyAll { get; set; }
    public List<Guid> Guids { get; set; }
    
    public ModuleNotificationEventArgs(ModuleInstance sender)
        : base(sender)
    {
        NotifyAll = true;
    }
}

/// <summary>
/// Class to represent a module notification publisher.
/// </summary>
internal class ModuleNotificationPublisher : IPublisher<ModuleInstance>
{
    private readonly CMutex _mutex = new CMutex();

    public LinkedList<ISubscriber<ModuleInstance>> Subscribers { get; private set; }

    public delegate void NotificationHandler(PublisherEventArgs<ModuleInstance> e);
    private event NotificationHandler NotificationCallback = delegate { };

    public ModuleNotificationPublisher()
    {
        Subscribers = new LinkedList<ISubscriber<ModuleInstance>>();
    }

    /// <summary>
    /// Subscribe the subscriber to notifications published by this publisher.
    /// </summary>
    /// <param name="subscriber">Subscriber to add to the subscriptions list</param>
    public void Subscribe(ISubscriber<ModuleInstance> subscriber)
    {
        try
        {
            _mutex.WaitForMutex();

            // Add subscriber to the top of the linked list to avoid interference
            // with current traversals of the list in the event of direct access
            // notifications.
            Subscribers.AddFirst(subscriber);
            NotificationCallback += subscriber.ReceivedMessage;
        }
        finally
        {
            _mutex.ReleaseMutex();
        }
    }

    /// <summary>
    /// Unsubscribe the subscriber to notifications published by this publisher.
    /// </summary>
    /// <param name="subscriber">Subscriber to remove from the subscriptions list</param>
    public void Unsubscribe(ISubscriber<ModuleInstance> subscriber)
    {
        try
        {
            _mutex.WaitForMutex();

            Subscribers.Remove(subscriber);
            NotificationCallback -= subscriber.ReceivedMessage;
        }
        finally
        {
            _mutex.ReleaseMutex();
        }
    }

    /// <summary>
    /// Notify all subscribers with publisher event args.
    /// </summary>
    /// <param name="args">Publisher event args</param>
    public void Notify(PublisherEventArgs<ModuleInstance> args)
    {
        try
        {
            _mutex.WaitForMutex();

            if (NotificationCallback != null)
                NotificationCallback(args);
        }
        finally
        {
            _mutex.ReleaseMutex();
        }
    }
}

/// <summary>
/// Class to represent a module instance.
/// </summary>
internal class ModuleInstance : SubscriberBase<ModuleInstance>
{
    public string Name { get; private set; }
    
    public ModuleInstance(string name)
    {
        Name = name;
    }
    
    public override void ReceivedMessage(PublisherEventArgs<ModuleInstance> args)
    {
        var moduleNotification = args as ModuleNotificationEventArgs;
        if (moduleNotification != null)
        {
            ModuleInstance sender = moduleNotification.Sender;
            if (moduleNotification.NotifyAll)
            {
                if (!Equals(this, sender))
                {
                    Console.WriteLine("{0} Received a global notification from {1}", Name, sender.Name);
                }
            }
            else
            {
                if (moduleNotification.Guids.Contains(Id))
                {
                    Console.WriteLine("{0} Received an ID-specific notification from {1}", Name, sender.Name);
                }
            }
        }
    }
}

internal abstract class Program
{
    // ReSharper disable once UnusedParameter.Global
    public static void Main(string[] args)
    {
        var publisher1 = new ModuleNotificationPublisher();
        var subscriber1a = new ModuleInstance("1A");
        var subscriber1b = new ModuleInstance("1B");
        var subscriber1c = new ModuleInstance("1C");
        
        subscriber1a.SubscribeTo(publisher1);
        subscriber1b.SubscribeTo(publisher1);
        subscriber1c.SubscribeTo(publisher1);
        
        publisher1.Notify(new ModuleNotificationEventArgs(subscriber1a)
        {
            NotifyAll = false,
            Guids = new List<Guid> { subscriber1b.Id }
        });
        Console.WriteLine();
        
        publisher1.Notify(new ModuleNotificationEventArgs(subscriber1a));
        Console.WriteLine();
        
        subscriber1c.UnsubscribeFrom(publisher1);
        publisher1.Notify(new ModuleNotificationEventArgs(subscriber1b));
        Console.WriteLine();
        
        Console.ReadKey();
    }
}


Microsoft MVP .NET Programming - (2012 - Present)
®Crestron DMC-T Certified Automation Programmer

Development Site: aceinfinity.net

 ▲
 ▲ ▲




Forum Jump:


Possibly Related Threads...
Thread Author Replies Views Last Post
   Simple PriorityQueue<T> Implementation AceInfinity 0 24 11-19-2017, 01:50 AM
Last Post: AceInfinity
   Visual Basic Beginners Course- Signup Thread [OPEN] AceInfinity 113 47,210 05-10-2017, 12:04 AM
Last Post: ctthuhuong
  Visual Basic Cookbook Null 0 1,130 11-17-2014, 05:39 PM
Last Post: Null
   Merge Sort Algorithm Implementation AceInfinity 5 3,488 10-11-2013, 08:59 PM
Last Post: AceInfinity
Question  problem why my visual basic program not working ? teck 4 2,426 03-13-2013, 03:48 PM
Last Post: KoBE


Users browsing this thread: 1 Guest(s)