1 module vivaldi.latency_filter; 2 3 import vivaldi.coordinate; 4 5 import std.traits; 6 7 /** 8 * A helper for constructing and properly initializing a Buffer. 9 */ 10 private auto buffer(T, size_t window)() 11 { 12 auto buf = new Buffer!(T, window); 13 buf.initialize(); 14 return buf; 15 } 16 17 /** 18 * Median filter based on "Better Than Average" by Paul Ekstrom. 19 * 20 * By combining a ring buffer with a sorted linked list, this 21 * implementation offers O(n) complexity. A naive implementation which 22 * requires sorting the window is O(n^2). 23 * 24 * Params: 25 * T = The datum type. 26 * window = The number of data points in the filter window. 27 */ 28 private struct Buffer(T, size_t window) 29 if (isFloatingPoint!(T) && window > 0) 30 { 31 struct Node { 32 T value; 33 size_t prev; 34 size_t next; 35 } 36 37 Node[window] buffer; 38 39 // Cursor points at the next insertion point in the ring buffer. 40 size_t cursor = 0; 41 42 // Head points at the smallest value in the linked list. 43 size_t head = 0; 44 45 // Median points at the median value. 46 size_t median = 0; 47 48 /** 49 * Initializes the linked list and sets the ringbuffer values to NaN. 50 * 51 * This method must be called prior to `push`, but may be called 52 * again to reset the state of the buffer. 53 */ 54 void initialize() nothrow @safe @nogc { 55 foreach (i, ref node; buffer) { 56 node.value = T.nan; 57 node.prev = (i + window - 1) % window; 58 node.next = (i + 1) % window; 59 } 60 } 61 62 /** 63 * Returns the minimum datum within this buffer. If no data has 64 * been pushed, returns NaN. 65 */ 66 T min() const pure nothrow @safe @nogc { 67 return buffer[head].value; 68 } 69 70 /** 71 * Returns the maximum datum within this buffer. If no data has 72 * been pushed, returns NaN. 73 */ 74 T max() const pure nothrow @safe @nogc { 75 import std.math : isNaN; 76 77 size_t cur = buffer[head].next; 78 79 while (!isNaN(buffer[cur].value) && cur != head) { 80 cur = buffer[cur].next; 81 } 82 83 auto prev = buffer[cur].prev; 84 return buffer[prev].value; 85 } 86 87 /** 88 * Pushes a new datum into the ring buffer, and updates the head 89 * and median indexes. 90 * 91 * Returns the median after the datum has been pushed. 92 */ 93 T push(const T datum) pure nothrow @safe @nogc { 94 import std.math : isNaN; 95 96 // If the current head will be overwritten, move it to the 97 // next node. 98 if (cursor == head) { 99 head = buffer[head].next; 100 } 101 102 // Remove the node at cursor; it will be overwritten. 103 104 const pred = buffer[cursor].prev; 105 const succ = buffer[cursor].next; 106 107 buffer[pred].next = succ; 108 109 buffer[cursor].value = T.nan; 110 buffer[cursor].prev = size_t.max; 111 buffer[cursor].next = size_t.max; 112 113 buffer[succ].prev = pred; 114 115 // Point the median at the minimum value in the list. 116 median = head; 117 118 auto cur = head; 119 auto inserted = false; 120 121 for (size_t i = 0; i < window; i++) { 122 if (!inserted) { 123 auto shouldInsert = true; 124 125 if (!isNaN(buffer[cur].value)) { 126 shouldInsert = (i + 1 == window) || 127 buffer[cur].value >= datum; 128 } 129 130 if (shouldInsert) { 131 // Insert the removed node with its new value. 132 insert(datum, cur); 133 inserted = true; 134 } 135 } 136 137 // Shift the median on every other node. It will 138 // eventually end up in the middle. This is similar to 139 // Floyd's "tortoise and hare" cycle detection 140 // algorithm. Here, i is the hare, and the median pointer 141 // is the tortoise. 142 if (i % 2 == 1) { 143 median = buffer[median].next; 144 } 145 146 // Break once an unallocated node has been reached. 147 if (isNaN(buffer[cur].value)) { 148 break; 149 } 150 151 cur = buffer[cur].next; 152 } 153 154 const hd = buffer[head].value; 155 auto updateHead = true; 156 157 // Update the head if the new datum is the minimum value. 158 if (!isNaN(hd)) { 159 updateHead = datum <= hd; 160 } 161 162 if (updateHead) { 163 head = cursor; 164 // Move the median pointer back if a new minimum was 165 // inserted. 166 median = buffer[median].prev; 167 } 168 169 cursor = (cursor + 1) % window; 170 171 assert(!isNaN(buffer[median].value)); 172 return buffer[median].value; 173 } 174 175 void insert(const T datum, const size_t index) pure nothrow @safe @nogc { 176 const succ = index; 177 const pred = buffer[index].prev; 178 179 static if (window > 1) { 180 assert(index != cursor); 181 } 182 183 buffer[pred].next = cursor; 184 185 buffer[cursor].value = datum; 186 buffer[cursor].prev = pred; 187 buffer[cursor].next = succ; 188 189 buffer[succ].prev = cursor; 190 } 191 } 192 193 version(unittest) { 194 double[] compute(size_t n)(const double[] input) { 195 import std.algorithm; 196 import std.math : isNaN; 197 198 auto buf = buffer!(double, n); 199 double[] output; 200 201 foreach (i; input) { 202 output ~= buf.push(i); 203 204 version (LDC) { 205 const auto expected = buf.buffer.dup 206 .filter!(a => !isNaN(a.value)) // min(NaN, ...) == NaN on LDC. 207 .map!(a => a.value) 208 .reduce!min; 209 } else { 210 const auto expected = buf.buffer.dup 211 .map!(a => a.value) 212 .reduce!min; 213 } 214 215 assert(buf.min == expected); 216 assert(buf.max == buf.buffer.dup.map!(a => a.value).reduce!max); 217 } 218 219 return output; 220 } 221 } 222 223 @("attributes") 224 nothrow @safe @nogc unittest { 225 auto buf = Buffer!(double, 4)(); 226 227 buf.initialize(); 228 229 assert(buf.push(10) == 10); 230 assert(buf.min == 10); 231 assert(buf.max == 10); 232 } 233 234 @("single peak 4") 235 unittest { 236 double[] input = [10, 20, 30, 100, 30, 20, 10]; 237 double[] output = [10, 20, 20, 30, 30, 30, 30]; 238 239 assert(compute!4(input) == output); 240 } 241 242 @("single peak 5") 243 unittest { 244 double[] input = [10, 20, 30, 100, 30, 20, 10]; 245 double[] output = [10, 20, 20, 30, 30, 30, 30]; 246 247 assert(compute!5(input) == output); 248 } 249 250 @("single valley 4") 251 unittest { 252 double[] input = [90, 80, 70, 10, 70, 80, 90]; 253 double[] output = [90, 90, 80, 80, 70, 70, 80]; 254 255 assert(compute!4(input) == output); 256 } 257 258 @("single valley 5") 259 unittest { 260 double[] input = [90, 80, 70, 10, 70, 80, 90]; 261 double[] output = [90, 90, 80, 80, 70, 70, 70]; 262 263 assert(compute!5(input) == output); 264 } 265 266 @("single outlier 4") 267 unittest { 268 double[] input = [10, 10, 10, 100, 10, 10, 10]; 269 double[] output = [10, 10, 10, 10, 10, 10, 10]; 270 271 assert(compute!4(input) == output); 272 } 273 274 @("single outlier 5") 275 unittest { 276 double[] input = [10, 10, 10, 100, 10, 10, 10]; 277 double[] output = [10, 10, 10, 10, 10, 10, 10]; 278 279 assert(compute!5(input) == output); 280 } 281 282 @("triple outlier 4") 283 unittest { 284 double[] input = [10, 10, 100, 100, 100, 10, 10]; 285 double[] output = [10, 10, 10, 100, 100, 100, 100]; 286 287 assert(compute!4(input) == output); 288 } 289 290 @("triple outlier 5") 291 unittest { 292 double[] input = [10, 10, 100, 100, 100, 10, 10]; 293 double[] output = [10, 10, 10, 100, 100, 100, 100]; 294 295 assert(compute!5(input) == output); 296 } 297 298 @("quintuple outlier 4") 299 unittest { 300 double[] input = [10, 100, 100, 100, 100, 100, 10]; 301 double[] output = [10, 100, 100, 100, 100, 100, 100]; 302 303 assert(compute!4(input) == output); 304 } 305 306 @("quintuple outlier 5") 307 unittest { 308 double[] input = [10, 100, 100, 100, 100, 100, 10]; 309 double[] output = [10, 100, 100, 100, 100, 100, 100]; 310 311 assert(compute!5(input) == output); 312 } 313 314 @("alternating 4") 315 unittest { 316 double[] input = [10, 20, 10, 20, 10, 20, 10]; 317 double[] output = [10, 20, 10, 20, 20, 20, 20]; 318 319 assert(compute!4(input) == output); 320 } 321 322 @("alternating 5") 323 unittest { 324 double[] input = [10, 20, 10, 20, 10, 20, 10]; 325 double[] output = [10, 20, 10, 20, 10, 20, 10]; 326 327 assert(compute!5(input) == output); 328 } 329 330 @("ascending 4") 331 unittest { 332 double[] input = [10, 20, 30, 40, 50, 60, 70]; 333 double[] output = [10, 20, 20, 30, 40, 50, 60]; 334 335 assert(compute!4(input) == output); 336 } 337 338 @("ascending 5") 339 unittest { 340 double[] input = [10, 20, 30, 40, 50, 60, 70]; 341 double[] output = [10, 20, 20, 30, 30, 40, 50]; 342 343 assert(compute!5(input) == output); 344 } 345 346 @("descending 4") 347 unittest { 348 double[] input = [70, 60, 50, 40, 30, 20, 10]; 349 double[] output = [70, 70, 60, 60, 50, 40, 30]; 350 351 assert(compute!4(input) == output); 352 } 353 354 @("descending 5") 355 unittest { 356 double[] input = [70, 60, 50, 40, 30, 20, 10]; 357 double[] output = [70, 70, 60, 60, 50, 40, 30]; 358 359 assert(compute!5(input) == output); 360 } 361 362 /** 363 * Tests whether a type K is suitable for use as a hash key, under the following 364 * constraints: 365 * - is it a non-void scalar type? 366 * - is it a string/char[]/wchar[]? 367 * - is it a struct or class which implements size_t toHash and bool opEquals? 368 */ 369 private enum bool isHashKey(K) = 370 (!is(K : void) && 371 isBasicType!(K)) || 372 isNarrowString!(K) || 373 (isAggregateType!(K) && 374 is(ReturnType!((K k) => k.toHash) == size_t) && 375 is(ReturnType!((K k) => k.opEquals(k)) == bool)); 376 377 /** 378 * A latency filter tracks a stream of latency measurements involving 379 * a remote node, and returns an expected latency value using a moving 380 * median filter. 381 * 382 * Filter buffers for each node are allocated on the GC heap, but 383 * operate in constant space thereafter. 384 * 385 * See "Network Coordinates in the Wild" by Jonathan Ledlie, Paul 386 * Gardner, and Margo Seltzer, Section 7.2. 387 * 388 * Params: 389 * T = The node type. 390 * U = The datum type. 391 * window = The size of the moving filter window. 392 */ 393 struct LatencyFilter(T, U, size_t window) 394 if (isHashKey!T && isFloatingPoint!U && window > 0) 395 { 396 private alias B = Buffer!(U, window); 397 398 /** 399 * Pushes a new latency datum into the filter window for a node, 400 * and returns the current median value from the filter. 401 */ 402 U push(const T node, const U rtt) @safe { 403 import std.math : isNaN; 404 405 assert(!isNaN(rtt)); 406 407 B* buf = data.require(node, buffer!(U, window)); 408 409 return buf.push(rtt); 410 } 411 412 /** 413 * Returns the current median latency for a node. If no data has 414 * been recorded for the node, returns NaN. 415 */ 416 U get(const T node) const pure nothrow @safe @nogc { 417 const(B*)* p = node in data; 418 419 if (p is null) { 420 return U.nan; 421 } 422 423 auto buf = *p; 424 425 // NB. This may return NaN if a buffer has been allocated, but 426 // no data has been recorded. 427 return buf.buffer[buf.median].value; 428 } 429 430 /** 431 * Discards data collected for a node. 432 */ 433 void discard(const T node) nothrow @safe @nogc { 434 data.remove(node); 435 } 436 437 /** 438 * Clears the latency filter of all data collected. 439 */ 440 void clear() nothrow { 441 data.clear(); 442 } 443 444 private: 445 446 B*[const(T)] data; 447 } 448 449 @("type parameters") 450 unittest { 451 class A; 452 struct B; 453 454 class C { 455 override size_t toHash() nothrow { 456 return 42; 457 } 458 459 override bool opEquals(Object o) { 460 return false; 461 } 462 } 463 464 struct D { 465 size_t toHash() const @safe pure nothrow { 466 return 42; 467 } 468 469 bool opEquals(ref const D s) const @safe pure nothrow { 470 return false; 471 } 472 } 473 474 assert(!__traits(compiles, new LatencyFilter!(void, double, 5))); 475 assert(__traits(compiles, new LatencyFilter!(int, double, 5))); 476 assert(__traits(compiles, new LatencyFilter!(float, double, 5))); 477 assert(__traits(compiles, new LatencyFilter!(char, double, 5))); 478 479 // A bit non-sensical, but sure. 480 assert(__traits(compiles, new LatencyFilter!(bool, double, 5))); 481 482 assert(__traits(compiles, new LatencyFilter!(string, double, 5))); 483 assert(__traits(compiles, new LatencyFilter!(char[], double, 5))); 484 assert(__traits(compiles, new LatencyFilter!(wchar[], double, 5))); 485 assert(!__traits(compiles, new LatencyFilter!(dchar[], double, 5))); 486 487 assert(!__traits(compiles, new LatencyFilter!(A, double, 5))); 488 assert(!__traits(compiles, new LatencyFilter!(B, double, 5))); 489 assert(__traits(compiles, new LatencyFilter!(C, double, 5))); 490 assert(__traits(compiles, new LatencyFilter!(D, double, 5))); 491 492 assert(!__traits(compiles, new LatencyFilter!(string, int, 5))); 493 assert(__traits(compiles, new LatencyFilter!(string, float, 5))); 494 assert(__traits(compiles, new LatencyFilter!(string, double, 5))); 495 assert(__traits(compiles, new LatencyFilter!(string, real, 5))); 496 497 assert(__traits(compiles, new LatencyFilter!(string, double, 1))); 498 assert(!__traits(compiles, new LatencyFilter!(string, double, 0))); 499 } 500 501 @("usage") 502 unittest { 503 import std.math : isNaN; 504 505 auto filter = new LatencyFilter!(string, double, 5); 506 507 double[] input = [3, 2, 4, 6, 5, 1]; 508 double[] output; 509 510 foreach (i; input) { 511 output ~= filter.push("10.0.0.1", i); 512 } 513 514 assert(output == [3, 3, 3, 4, 4, 4]); 515 assert(filter.get("10.0.0.1") == 4); 516 517 filter.push("10.0.0.2", 100); 518 assert(filter.get("10.0.0.2") == 100); 519 520 filter.discard("10.0.0.1"); 521 assert(isNaN(filter.get("10.0.0.1"))); 522 assert(!isNaN(filter.get("10.0.0.2"))); 523 524 filter.clear(); 525 assert(isNaN(filter.get("10.0.0.1"))); 526 assert(isNaN(filter.get("10.0.0.2"))); 527 } 528 529 @("push attributes") 530 @safe unittest { 531 auto filter = new LatencyFilter!(string, double, 5); 532 533 filter.push("10.0.0.1", 42); 534 } 535 536 @("get/discard attributes") 537 nothrow @safe @nogc unittest { 538 import std.math : isNaN; 539 540 auto filter = LatencyFilter!(string, double, 5)(); 541 542 assert(isNaN(filter.get("10.0.0.1"))); 543 544 filter.discard("10.0.0.1"); 545 } 546 547 @("clear attributes") 548 nothrow unittest { 549 auto filter = LatencyFilter!(string, double, 5)(); 550 filter.clear(); 551 }