1 module vivaldi.latency_filter;
2 
3 import vivaldi.coordinate;
4 
5 import std.traits;
6 
7 /**
8  * A helper for constructing and properly initializing a Buffer.
9  */
10 private auto buffer(T, size_t window)()
11 {
12     auto buf = new Buffer!(T, window);
13     buf.initialize();
14     return buf;
15 }
16 
17 /**
18  * Median filter based on "Better Than Average" by Paul Ekstrom.
19  *
20  * By combining a ring buffer with a sorted linked list, this
21  * implementation offers O(n) complexity. A naive implementation which
22  * requires sorting the window is O(n^2).
23  *
24  * Params:
25  *      T = The datum type.
26  *      window = The number of data points in the filter window.
27  */
28 private struct Buffer(T, size_t window)
29      if (isFloatingPoint!(T) && window > 0)
30 {
31     struct Node {
32         T value;
33         size_t prev;
34         size_t next;
35     }
36 
37     Node[window] buffer;
38 
39     // Cursor points at the next insertion point in the ring buffer.
40     size_t cursor = 0;
41 
42     // Head points at the smallest value in the linked list.
43     size_t head = 0;
44 
45     // Median points at the median value.
46     size_t median = 0;
47 
48     /**
49      * Initializes the linked list and sets the ringbuffer values to NaN.
50      *
51      * This method must be called prior to `push`, but may be called
52      * again to reset the state of the buffer.
53      */
54     void initialize() nothrow @safe @nogc {
55         foreach (i, ref node; buffer) {
56             node.value = T.nan;
57             node.prev = (i + window - 1) % window;
58             node.next = (i + 1) % window;
59         }
60     }
61 
62     /**
63      * Returns the minimum datum within this buffer. If no data has
64      * been pushed, returns NaN.
65      */
66     T min() const pure nothrow @safe @nogc {
67         return buffer[head].value;
68     }
69 
70     /**
71      * Returns the maximum datum within this buffer. If no data has
72      * been pushed, returns NaN.
73      */
74     T max() const pure nothrow @safe @nogc {
75         import std.math : isNaN;
76 
77         size_t cur = buffer[head].next;
78 
79         while (!isNaN(buffer[cur].value) && cur != head) {
80             cur = buffer[cur].next;
81         }
82 
83         auto prev = buffer[cur].prev;
84         return buffer[prev].value;
85     }
86 
87     /**
88      * Pushes a new datum into the ring buffer, and updates the head
89      * and median indexes.
90      *
91      * Returns the median after the datum has been pushed.
92      */
93     T push(const T datum) pure nothrow @safe @nogc {
94         import std.math : isNaN;
95 
96         // If the current head will be overwritten, move it to the
97         // next node.
98         if (cursor == head) {
99             head = buffer[head].next;
100         }
101 
102         // Remove the node at cursor; it will be overwritten.
103 
104         const pred = buffer[cursor].prev;
105         const succ = buffer[cursor].next;
106 
107         buffer[pred].next = succ;
108 
109         buffer[cursor].value = T.nan;
110         buffer[cursor].prev = size_t.max;
111         buffer[cursor].next = size_t.max;
112 
113         buffer[succ].prev = pred;
114 
115         // Point the median at the minimum value in the list.
116         median = head;
117 
118         auto cur = head;
119         auto inserted = false;
120 
121         for (size_t i = 0; i < window; i++) {
122             if (!inserted) {
123                 auto shouldInsert = true;
124 
125                 if (!isNaN(buffer[cur].value)) {
126                     shouldInsert = (i + 1 == window) ||
127                         buffer[cur].value >= datum;
128                 }
129 
130                 if (shouldInsert) {
131                     // Insert the removed node with its new value.
132                     insert(datum, cur);
133                     inserted = true;
134                 }
135             }
136 
137             // Shift the median on every other node. It will
138             // eventually end up in the middle. This is similar to
139             // Floyd's "tortoise and hare" cycle detection
140             // algorithm. Here, i is the hare, and the median pointer
141             // is the tortoise.
142             if (i % 2 == 1) {
143                 median = buffer[median].next;
144             }
145 
146             // Break once an unallocated node has been reached.
147             if (isNaN(buffer[cur].value)) {
148                 break;
149             }
150 
151             cur = buffer[cur].next;
152         }
153 
154         const hd = buffer[head].value;
155         auto updateHead = true;
156 
157         // Update the head if the new datum is the minimum value.
158         if (!isNaN(hd)) {
159             updateHead = datum <= hd;
160         }
161 
162         if (updateHead) {
163             head = cursor;
164             // Move the median pointer back if a new minimum was
165             // inserted.
166             median = buffer[median].prev;
167         }
168 
169         cursor = (cursor + 1) % window;
170 
171         assert(!isNaN(buffer[median].value));
172         return buffer[median].value;
173     }
174 
175     void insert(const T datum, const size_t index) pure nothrow @safe @nogc {
176         const succ = index;
177         const pred = buffer[index].prev;
178 
179         static if (window > 1) {
180             assert(index != cursor);
181         }
182 
183         buffer[pred].next = cursor;
184 
185         buffer[cursor].value = datum;
186         buffer[cursor].prev = pred;
187         buffer[cursor].next = succ;
188 
189         buffer[succ].prev = cursor;
190     }
191 }
192 
193 version(unittest) {
194     double[] compute(size_t n)(const double[] input) {
195         import std.algorithm;
196         import std.math : isNaN;
197 
198         auto buf = buffer!(double, n);
199         double[] output;
200 
201         foreach (i; input) {
202             output ~= buf.push(i);
203 
204             version (LDC) {
205                 const auto expected = buf.buffer.dup
206                     .filter!(a => !isNaN(a.value)) // min(NaN, ...) == NaN on LDC.
207                     .map!(a => a.value)
208                     .reduce!min;
209             } else {
210                 const auto expected = buf.buffer.dup
211                     .map!(a => a.value)
212                     .reduce!min;
213             }
214 
215             assert(buf.min == expected);
216             assert(buf.max == buf.buffer.dup.map!(a => a.value).reduce!max);
217         }
218 
219         return output;
220     }
221  }
222 
223 @("attributes")
224 nothrow @safe @nogc unittest {
225     auto buf = Buffer!(double, 4)();
226 
227     buf.initialize();
228 
229     assert(buf.push(10) == 10);
230     assert(buf.min == 10);
231     assert(buf.max == 10);
232 }
233 
234 @("single peak 4")
235 unittest {
236     double[] input = [10, 20, 30, 100, 30, 20, 10];
237     double[] output = [10, 20, 20, 30, 30, 30, 30];
238 
239     assert(compute!4(input) == output);
240 }
241 
242 @("single peak 5")
243 unittest {
244     double[] input = [10, 20, 30, 100, 30, 20, 10];
245     double[] output = [10, 20, 20, 30, 30, 30, 30];
246 
247     assert(compute!5(input) == output);
248 }
249 
250 @("single valley 4")
251 unittest {
252     double[] input = [90, 80, 70, 10, 70, 80, 90];
253     double[] output = [90, 90, 80, 80, 70, 70, 80];
254 
255     assert(compute!4(input) == output);
256 }
257 
258 @("single valley 5")
259 unittest {
260     double[] input = [90, 80, 70, 10, 70, 80, 90];
261     double[] output = [90, 90, 80, 80, 70, 70, 70];
262 
263     assert(compute!5(input) == output);
264 }
265 
266 @("single outlier 4")
267 unittest {
268     double[] input = [10, 10, 10, 100, 10, 10, 10];
269     double[] output = [10, 10, 10, 10, 10, 10, 10];
270 
271     assert(compute!4(input) == output);
272 }
273 
274 @("single outlier 5")
275 unittest {
276     double[] input = [10, 10, 10, 100, 10, 10, 10];
277     double[] output = [10, 10, 10, 10, 10, 10, 10];
278 
279     assert(compute!5(input) == output);
280 }
281 
282 @("triple outlier 4")
283 unittest {
284     double[] input = [10, 10, 100, 100, 100, 10, 10];
285     double[] output = [10, 10, 10, 100, 100, 100, 100];
286 
287     assert(compute!4(input) == output);
288 }
289 
290 @("triple outlier 5")
291 unittest {
292     double[] input = [10, 10, 100, 100, 100, 10, 10];
293     double[] output = [10, 10, 10, 100, 100, 100, 100];
294 
295     assert(compute!5(input) == output);
296 }
297 
298 @("quintuple outlier 4")
299 unittest {
300     double[] input = [10, 100, 100, 100, 100, 100, 10];
301     double[] output = [10, 100, 100, 100, 100, 100, 100];
302 
303     assert(compute!4(input) == output);
304 }
305 
306 @("quintuple outlier 5")
307 unittest {
308     double[] input = [10, 100, 100, 100, 100, 100, 10];
309     double[] output = [10, 100, 100, 100, 100, 100, 100];
310 
311     assert(compute!5(input) == output);
312 }
313 
314 @("alternating 4")
315 unittest {
316     double[] input = [10, 20, 10, 20, 10, 20, 10];
317     double[] output = [10, 20, 10, 20, 20, 20, 20];
318 
319     assert(compute!4(input) == output);
320 }
321 
322 @("alternating 5")
323 unittest {
324     double[] input = [10, 20, 10, 20, 10, 20, 10];
325     double[] output = [10, 20, 10, 20, 10, 20, 10];
326 
327     assert(compute!5(input) == output);
328 }
329 
330 @("ascending 4")
331 unittest {
332     double[] input = [10, 20, 30, 40, 50, 60, 70];
333     double[] output = [10, 20, 20, 30, 40, 50, 60];
334 
335     assert(compute!4(input) == output);
336 }
337 
338 @("ascending 5")
339 unittest {
340     double[] input = [10, 20, 30, 40, 50, 60, 70];
341     double[] output = [10, 20, 20, 30, 30, 40, 50];
342 
343     assert(compute!5(input) == output);
344 }
345 
346 @("descending 4")
347 unittest {
348     double[] input = [70, 60, 50, 40, 30, 20, 10];
349     double[] output = [70, 70, 60, 60, 50, 40, 30];
350 
351     assert(compute!4(input) == output);
352 }
353 
354 @("descending 5")
355 unittest {
356     double[] input = [70, 60, 50, 40, 30, 20, 10];
357     double[] output = [70, 70, 60, 60, 50, 40, 30];
358 
359     assert(compute!5(input) == output);
360 }
361 
362 /**
363  * Tests whether a type K is suitable for use as a hash key, under the following
364  * constraints:
365  *   - is it a non-void scalar type?
366  *   - is it a string/char[]/wchar[]?
367  *   - is it a struct or class which implements size_t toHash and bool opEquals?
368  */
369 private enum bool isHashKey(K) =
370     (!is(K : void) &&
371      isBasicType!(K)) ||
372     isNarrowString!(K) ||
373     (isAggregateType!(K) &&
374      is(ReturnType!((K k) => k.toHash) == size_t) &&
375      is(ReturnType!((K k) => k.opEquals(k)) == bool));
376 
377 /**
378  * A latency filter tracks a stream of latency measurements involving
379  * a remote node, and returns an expected latency value using a moving
380  * median filter.
381  *
382  * Filter buffers for each node are allocated on the GC heap, but
383  * operate in constant space thereafter.
384  *
385  * See "Network Coordinates in the Wild" by Jonathan Ledlie, Paul
386  * Gardner, and Margo Seltzer, Section 7.2.
387  *
388  * Params:
389  *      T = The node type.
390  *      U = The datum type.
391  *      window = The size of the moving filter window.
392  */
393 struct LatencyFilter(T, U, size_t window)
394      if (isHashKey!T && isFloatingPoint!U && window > 0)
395 {
396     private alias B = Buffer!(U, window);
397 
398     /**
399      * Pushes a new latency datum into the filter window for a node,
400      * and returns the current median value from the filter.
401      */
402     U push(const T node, const U rtt) @safe {
403         import std.math : isNaN;
404 
405         assert(!isNaN(rtt));
406 
407         B* buf = data.require(node, buffer!(U, window));
408 
409         return buf.push(rtt);
410     }
411 
412     /**
413      * Returns the current median latency for a node. If no data has
414      * been recorded for the node, returns NaN.
415      */
416     U get(const T node) const pure nothrow @safe @nogc {
417         const(B*)* p = node in data;
418 
419         if (p is null) {
420             return U.nan;
421         }
422 
423         auto buf = *p;
424 
425         // NB. This may return NaN if a buffer has been allocated, but
426         // no data has been recorded.
427         return buf.buffer[buf.median].value;
428     }
429 
430     /**
431      * Discards data collected for a node.
432      */
433     void discard(const T node) nothrow @safe @nogc {
434         data.remove(node);
435     }
436 
437     /**
438      * Clears the latency filter of all data collected.
439      */
440     void clear() nothrow {
441         data.clear();
442     }
443 
444 private:
445 
446     B*[const(T)] data;
447 }
448 
449 @("type parameters")
450 unittest {
451     class A;
452     struct B;
453 
454     class C {
455         override size_t toHash() nothrow {
456             return 42;
457         }
458 
459         override bool opEquals(Object o) {
460             return false;
461         }
462     }
463 
464     struct D {
465         size_t toHash() const @safe pure nothrow {
466             return 42;
467         }
468 
469         bool opEquals(ref const D s) const @safe pure nothrow {
470             return false;
471         }
472     }
473 
474     assert(!__traits(compiles, new LatencyFilter!(void, double, 5)));
475     assert(__traits(compiles, new LatencyFilter!(int, double, 5)));
476     assert(__traits(compiles, new LatencyFilter!(float, double, 5)));
477     assert(__traits(compiles, new LatencyFilter!(char, double, 5)));
478 
479     // A bit non-sensical, but sure.
480     assert(__traits(compiles, new LatencyFilter!(bool, double, 5)));
481 
482     assert(__traits(compiles, new LatencyFilter!(string, double, 5)));
483     assert(__traits(compiles, new LatencyFilter!(char[], double, 5)));
484     assert(__traits(compiles, new LatencyFilter!(wchar[], double, 5)));
485     assert(!__traits(compiles, new LatencyFilter!(dchar[], double, 5)));
486 
487     assert(!__traits(compiles, new LatencyFilter!(A, double, 5)));
488     assert(!__traits(compiles, new LatencyFilter!(B, double, 5)));
489     assert(__traits(compiles, new LatencyFilter!(C, double, 5)));
490     assert(__traits(compiles, new LatencyFilter!(D, double, 5)));
491 
492     assert(!__traits(compiles, new LatencyFilter!(string, int, 5)));
493     assert(__traits(compiles, new LatencyFilter!(string, float, 5)));
494     assert(__traits(compiles, new LatencyFilter!(string, double, 5)));
495     assert(__traits(compiles, new LatencyFilter!(string, real, 5)));
496 
497     assert(__traits(compiles, new LatencyFilter!(string, double, 1)));
498     assert(!__traits(compiles, new LatencyFilter!(string, double, 0)));
499 }
500 
501 @("usage")
502 unittest {
503     import std.math : isNaN;
504 
505     auto filter = new LatencyFilter!(string, double, 5);
506 
507     double[] input = [3, 2, 4, 6, 5, 1];
508     double[] output;
509 
510     foreach (i; input) {
511         output ~= filter.push("10.0.0.1", i);
512     }
513 
514     assert(output == [3, 3, 3, 4, 4, 4]);
515     assert(filter.get("10.0.0.1") == 4);
516 
517     filter.push("10.0.0.2", 100);
518     assert(filter.get("10.0.0.2") == 100);
519 
520     filter.discard("10.0.0.1");
521     assert(isNaN(filter.get("10.0.0.1")));
522     assert(!isNaN(filter.get("10.0.0.2")));
523 
524     filter.clear();
525     assert(isNaN(filter.get("10.0.0.1")));
526     assert(isNaN(filter.get("10.0.0.2")));
527 }
528 
529 @("push attributes")
530 @safe unittest {
531     auto filter = new LatencyFilter!(string, double, 5);
532 
533     filter.push("10.0.0.1", 42);
534 }
535 
536 @("get/discard attributes")
537 nothrow @safe @nogc unittest {
538     import std.math : isNaN;
539 
540     auto filter = LatencyFilter!(string, double, 5)();
541 
542     assert(isNaN(filter.get("10.0.0.1")));
543 
544     filter.discard("10.0.0.1");
545 }
546 
547 @("clear attributes")
548 nothrow unittest {
549     auto filter = LatencyFilter!(string, double, 5)();
550     filter.clear();
551 }