Tuesday, January 25, 2011

Blog Post: Running Totals

To my complete and utter surprise, I discovered that articles on my blog do not write themselves magically after I started with the first one! Nobody told me and I was convinced that a blog just grows like a weed, but, alas, I actually need to put some work into it. So here we go…

The StreamInsight team recently posted about the behavior of snapshot windows and today, I’d like to to talk about a nice query pattern that highlights the versatility of this window type. The business question is this:

Compute the sum of some payload value within one day, but update the result throughout the day.

Or graphically:

image

 

Now, up to the comma in this sentence, one would assume that this implies a tumbling window with a length of one day. But such a window would only produce a result after it has ended, hence we would only see the sum once, after the day has passed (i.e., a CTI has been received with a timestamp past midnight). This computation needs to be event-driven: We want to see a result whenever it changes, or in other words, whenever we receive new input that causes the sum within that day to change. We call this pattern “running totals”. We still need a window, of course, since sum is an aggregate, and aggregates require windows that define the sets to aggregate on. Here comes the snapshot window: it is event-based by nature, as opposed to the timeline-based hopping/tumbling windows. The remaining question is now how to prepare the original event stream for the snapshot window so that we get the desired result. Let’s think about that for a moment: At each point throughout the day, we want to compute the sum over all events that happened so far. The aggregate function needs to be able to “see” every event that happened so far on that day, until midnight, when we want to reset again. From this insight it’s only a small step to the solution: We have to extend the duration of each event until the following midnight, and then run the snapshot+aggregation over that modified stream. Here is a figure to illustrate this behavior:

image

 

Feeding this interval-shaped result into a point output adapter will yield the result displayed in the first diagram, as soon as a new sum is available.

The trickiest part when formalizing this query in LINQ is to find the proper expression that extends the events as needed. There are various ways to do this, here is a really simple one:

  1. var result = from win in source
  2.                  .AlterEventDuration(e => TimeSpan.FromHours(24).Subtract(e.StartTime.TimeOfDay))
  3.                  .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  4.              select new { runningTotal = win.Sum(e => e.Value) };

 

However, if I want to change the resetting period from one day to, say, one hour, I need to change the TimeSpan methods. I like the following one better, because it is a little more generic:

  1. long PeriodAsTicks = TimeSpan.TicksPerHour * 24;
  2.  
  3. var result = from win in source
  4.                  .AlterEventDuration(e => TimeSpan.FromTicks(PeriodAsTicks - (e.StartTime.Ticks % PeriodAsTicks)))
  5.                  .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  6.              select new { runningTotal = win.Sum(e => e.Value) };

 

You can find the end-to-end example as a LINQPad query attached.

Regards,
Roman

Sarah Michelle Gellar Olivia Munn Melissa Sagemiller Roselyn Sanchez Soft Cell

No comments:

Post a Comment