Tech in the 603, The Granite State Hacker

Intro to Rx.NET (Reactive Extensions)

Thanks to the gang for joining me at the Microsoft Store in Salem NH for my preso on “Intro to Rx.NET”   Being that it’s a toolkit I’ve been digging a lot at work lately, I had a feeling folks might appreciate a broad brush into to it.

Please check out the Granite State (NH) Windows Platform App Devs (#WPDevNH) on meetup.com to connect with the group and maybe even participate, yourself.  In addition to the core presentation topic, we had a great debate in speculation on how Microsoft’s purchase of Xamarin might settle out.  Also, I’ll be attending Build 2016, so we’re talking about having a special meeting early in April to recap and consider future presentations. (stay tuned!)

Rx reminds me a lot of other declarative language elements (XSL, XAML) in that it seems really natural, then you start looking at more advanced stuff and the complexity becomes boggling… then you start to really understand the abstractions and it feels natural again.

Without further ado, here’s my slides for the presentation:

[office src=”https://onedrive.live.com/embed?cid=90A564D76FC99F8F&resid=90A564D76FC99F8F%21824879&authkey=AC01r6jwJX5ng5M&em=2″ width=”402″ height=”327″]
I’d like to thank the folks at http://IntroToRx.com, I referenced them more than any other source putting this together.
Finally, for the code I demoed, please check out the post I mentioned, here:
Hope to see you soon!
-Jim Wilcox
The Granite State Hacker
Tech in the 603, The Granite State Hacker

Rate Limiting Events with the Reactive Extensions Rx

A fun little challenge that came up at work…  we have a stream of events that we publish / subscribe via a Reactive IObservable.   One of the things we’re testing for is swamping the subscribers with events for periods of time.  The tools in System.Reactive.Linq namespace include utilities like .Throttle(…), .Sample(…).  None of these supported our needs.

For our needs in this particular event stream, in this point in our stream, we can’t afford to drop events.  

Sample(…) picks off an item at various intervals, dropping the rest.  Throttle(…) sets up a window of time. It won’t release any object until there’s been a case where only one object was buffered in the given time window.  If you get another while the window’s open, the window widens to the original timespan.

Then there’s .Buffer(…) which can store event objects for a window, and then release them.  That amounts to releasing all the events in periodic bursts, but it’s not a rate limit.

Finally there’s the .Delay(…) method… which, ironically, delays publishing objects by an offset amount…  but that delays all objects by the same time offset.  If you have three events come in, 1 millisecond apart each, and put a 1 minute delay on them, they’ll enter the collection, and one minute later, will be published out… in a three-millisecond burst.

I want to be able to constrain my publisher such that I only want n number of entities per second. 

My solution separates the pub/sub.  It loads a queue with incoming events, and emits them immediately, up to the limit on the subscriber side. On the publisher side, it resets the counter and emits any overflow objects in the queue, also up to the limit.   

Yes, this model has problems, so address your risks appropriately… (“use at your own risk”).    You can run out of memory if the origin provides more items than the limiter is allowed to emit over long periods of time.

Anyway, here’s a Program.cs with the RxHelper extension RateLimit(…).  The program has a decent little before/after RateLimit(…).



using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

using System.Reactive.Linq;

 
namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
 
            // simulate very fast (no delay) data feed
            var unThrottledFeed = GetFastObservableSequence();
 

            unThrottledFeed.Subscribe(Console.WriteLine);

            Console.WriteLine("That was an example of an event stream (with 100 events) only constrained by system resources.");

            Console.WriteLine();


            Console.WriteLine("Now rate-limiting at 10 items per second...\n");


            const int itemsPerSecond = 10;
 
            var throttledFeed = GetFastObservableSequence()
                    .RateLimit(itemsPerSecond);
 
 
            throttledFeed.Subscribe(Console.WriteLine);
 
            Console.WriteLine("END OF LINE");
            Console.WriteLine("Note that the Main() method would be done here, were it not for the ReadKey(), the RateLimit subscriber is scheduled.");
            Console.WriteLine();
            Console.WriteLine("Rate limited events will appear here:");
            Console.ReadKey(true);
 
 
        }
 
        #region Example Artifacts
        private static IObservable<TestClass> GetFastObservableSequence()
        {
            var counter = 0;
            var rnd = new Random();
            return Observable.Defer(() =>
                Observable.Return(0)
                    .Select(p =>;
                    {
                        counter++;
                        var x = new TestClass(30.0, counter);
                        x.Value += Math.Round(rnd.NextDouble(), 2);
                        return x;
                    })
                    .Repeat(100));
        }
 
        private class TestClass
        {
            public TestClass(double value, int instance)
            {
                Value = value;
                Instance = instance;
            }
            private int Instance { get; set; }
            public double Value { get; set; }
            public override string ToString()
            {
                return $"{Instance}: {Value}";
            }
        }
        #endregion
    }
 
 
   

    internal static class RxHelper
    {
        public static IObservable<TSource> RateLimit<TSource>(
            this IObservable<TSource> source,
            int itemsPerSecond,
            ISchedulerscheduler = null)
        {
            scheduler = scheduler ?? Scheduler.Default;
            var timeSpan = TimeSpan.FromSeconds(1);
            var itemsEmitted = 0L;
            return Observable.Create<TSource>(
                observer =>
                {
                    var buffer = new ConcurrentQueue<TSource>();
                    Action emit = delegate()
                    {
                        while (Interlocked.Read(ref itemsEmitted) < itemsPerSecond)
                        {
                            TSourceitem;
                            if (!buffer.TryDequeue(out item))
                                break;
                            observer.OnNext(item);
                            Interlocked.Increment(ref itemsEmitted);
                        }
                    };
                   
                    var sourceSub = source
                        .Subscribe(x =>
                        {
                            buffer.Enqueue(x);
                            emit();
                        });
                    var timer = Observable.Interval(timeSpan, scheduler)
                        .Subscribe(x =>
                        {
                            Interlocked.Exchange(ref itemsEmitted, 0);
                            emit();
                        }, observer.OnError, observer.OnCompleted);
                    return new CompositeDisposable(sourceSub, timer);
                });
        }
    }


}
 
 

Edit 1/27/2016:  Had to tweak RateLimiter(…) to immediately emit objects as long as it hadn’t hit it’s limit for the time span.  It always queues, just in case, to maintain order.