1

Circular buffers

2

A circular buffer is a common data structure for doing real-time data processing. Internally, it has a buffer, and once

3

that buffer is full, and a new item is added, it will drop the oldest item to make room for it.

4

5

In this post, I'm going to build a small example application to illustrate when this data structure could be useful!

6

7

Stock exchange

8

The application we'll be creating is going to read from a data stream that emits stock price changes. The prices are

9

going to update at an interval of 0-4 milliseconds. However, our application has to perform some advanced trend line

10

calculation with each price change. This calculation has a fixed processing time of 2 milliseconds.

11

12

That means that sometimes, the prices are going to change faster than we're able to process them. This is known as

13

backpressure, and could potentially blow up our system if it were to happen over an extended period of time.

14

15

To begin, we'll create one function to generate the stock prices, and another to simulate the trend line calculation:

16

17
import (
18
	"fmt"
19
	"math/rand/v2"
20
	"time"
21
)
22
 
23
// producePrices is going to write 100 random stock prices to ch, and then close it.
24
func producePrices(ch chan<- int) {
25
	fmt.Println("The stock exchange has opened for the day.")
26
	for i := 0; i < 100; i++ {
27
		ch <- rand.IntN(9999)
28
		// To simulate bursts of price changes we'll add a random delay between 0 and 4ms.
29
		time.Sleep(time.Duration(rand.IntN(4)) * time.Millisecond)
30
	}
31
	fmt.Println("The stock exchange is closing for the day.")
32
	close(ch)
33
}
34
 
35
// calculateTrendLine is going to sleep for 2ms to simulate a calculation.
36
func calculateTrendLine() {
37
	time.Sleep(2 * time.Millisecond)
38
}
18

19

Next, we'll setup the main function to consume these price changes:

20

21
func main() {
22
	stockPriceStream := make(chan int)
23
	go producePrices(stockPriceStream)
24
 
25
	for v := range stockPriceStream {
26
		calculateTrendLine()
27
		fmt.Printf("Updated the trend line with value: %v\n", v)
28
	}
29
}
22

23

When we run this code using go run . we'll see some output like this:

24

25
The stock exchange has opened for the day.
26
Updated the trend line with value: 6622
27
Updated the trend line with value: 4554
28
Updated the trend line with value: 739
29
...
30
The stock exchange is closing for the day.
31
Updated the trend line with value: 3665
26

27

Even though the output looks fine, our application has some serious flaws. To expose them, we can increase the

28

processing time to 2 seconds and then run the application again:

29

30
func calculateTrendLine() {
31
	// time.Sleep(2 * time.Millisecond)
32
	time.Sleep(2000 * time.Millisecond)
33
}
31

32

As you've probably noticed, the writes to our stock price channel are blocking, which isn't good. We want to be doing

33

our trend line calculation as close to real-time as possible. Otherwise, we might be buying when we should be selling

34

and vice versa!

35

36

One way to address this issue could be to make the producePrices function perform non-blocking writes:

37

38
func producePrices(ch chan<- int) {
39
	fmt.Println("The stock exchange has opened for the day.")
40
	for i := 0; i < 100; i++ {
41
		// ch <- rand.IntN(9999)
42
		select {
43
		case ch <- rand.IntN(9999):
44
		default:
45
		}
46
		// To simulate bursts of price changes we'll add a random delay between 0 and 4ms.
47
		time.Sleep(time.Duration(r.Intn(4)) * time.Millisecond)
48
	}
49
	fmt.Println("The stock exchange is closing for the day.")
50
	close(ch)
51
}
39

40

Running the application again, I was only able to process 54 out of 100 price changes. That means that we dropped

41

46% of all price changes. If you recall, the stock prices are changing at an interval of 0-4 milliseconds, and our

42

calculation takes 2.

43

44

Our processing time is simply too slow to consume all of the data in real-time. However, we should be able make

45

our trend line less fragmented if we were able to "catch up" each time there were more than 2 milliseconds between a

46

price change.

47

48

To achieve that, we'll start by creating a second buffered channel that we'll read from so that the producePrices

49

function can go back to performing blocking writes:

50

51
func producePrices(ch chan<- int) {
52
	fmt.Println("The stock exchange has opened for the day.")
53
	for i := 0; i < 100; i++ {
54
		ch <- r.Intn(9999):
55
		time.Sleep(time.Duration(r.Intn(4)) * time.Millisecond)
56
	}
57
	fmt.Println("The stock exchange is closing for the day.")
58
	close(ch)
59
}
60
 
61
func main() {
62
	originalStream := make(chan int)
63
	bufferedStream := make(chan int, 3)
64
 
65
	// Simulate a stream of data that is going to produce stock prices at a high phase.
66
	go producePrices(originalStream)
67
 
68
	for v := range bufferedStream {
69
		calculateTrendLine()
70
		fmt.Printf("Updated the trend line with value: %v\n", v)
71
	}
72
}
52

53

Now we need to connect these two streams in a way where the original stream never gets blocked, while ensuring that the

54

buffered stream emits the most recent price changes.

55

56

To achieve that, we'll create a new CircularBuffer type:

57

58
type CircularBuffer[T any] struct {
59
	inputStream  <-chan T
60
	outputStream chan T
61
}
62
 
63
func NewCircularBuffer[T any](inputStream <-chan T, outputStream chan T) *CircularBuffer[T] {
64
	return &CircularBuffer[T]{
65
		inputStream:  inputStream,
66
		outputStream: outputStream,
67
	}
68
}
69
 
70
func (cb *CircularBuffer[T]) Run() {
71
	for v := range cb.inputStream {
72
		select {
73
		case cb.outputStream <- v:
74
		default:
75
			fmt.Printf("The buffer is full. Dropping the oldest value: %v\n", <-cb.outputStream)
76
			cb.outputStream <- v
77
		}
78
	}
79
	fmt.Println("The input stream was closed. Closing the output stream.")
80
	close(cb.outputStream)
81
}
59

60

the interesting part that we should be focusing on here is the select statement:

61

62
select {
63
case cb.outputStream <- v:
64
default:
65
    fmt.Printf("The buffer is full. Dropping the oldest value: %v\n", <-cb.outputStream)
66
    cb.outputStream <- v
67
}
63

64

If the buffer is full, we'll leverage the default case to pop and discard the oldest value, and then simply write

65

the new value again.

66

67

With that, we can head back to the main function and use the CircularBuffer to connect our two streams:

68

69
func main() {
70
	originalStream := make(chan int)
71
	bufferedStream := make(chan int, 3)
72
 
73
	cb := NewCircularBuffer(originalStream, bufferedStream)
74
	go cb.Run()
75
 
76
    // The code here stays the same...
77
}
70

71

and then run the program again:

72

73
go run .
74

75
The stock exchange has opened for the day.
76
Updated the trend line with value: 5690
77
Updated the trend line with value: 4011
78
Updated the trend line with value: 2018
79
The buffer is full. Dropping the oldest value: 1294
80
Updated the trend line with value: 9617
81
...
82
The stock exchange is closing for the day.
83
The input stream was closed. Closing the output stream.
76

77

This time, we only dropped 21 prices. That is a huge improvement compared to the original 46. The cost for that is

78

of course that our trend line calculation could be 3 price changes behind (our buffer size).

79

80

By using our CircularBuffer we are able to tip the scale between real-time data (small buffer size) and less data

81

fragmentation (large buffer size) while not having to worry about any backpressure.

82

83

This concludes this post, I hope you enjoyed it!

83

84

The end

85

I usually tweet something when I've finished writing a new post. You can find me on Twitter

86

by clicking 

normalintroduction.md
||239:48

Recently Edited

Recently Edited

File name

Tags

Time to read

Created at

context

  • go
  • context
9 minutes2024-02-28

circular-buffers

  • go
  • concurrency
  • data processing
5 minutes2024-02-04

go-directives

  • go
  • compiler
  • performance
4 minutes2023-10-21

async-tree-traversals

  • node
  • trees
  • graphs
  • typescript
21 minutes2023-09-10

All Files

All Files

  • go

    5 files

  • node

    2 files

  • typescript

    1 file

  • frontend

    1 file

  • workflow

    7 files