batcher_test.go
3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package tsdb_test
import (
"testing"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)
// TestBatch_Size ensures that a batcher generates a batch when the size threshold is reached.
func TestBatch_Size(t *testing.T) {
batchSize := 5
batcher := tsdb.NewPointBatcher(batchSize, 0, time.Hour)
if batcher == nil {
t.Fatal("failed to create batcher for size test")
}
batcher.Start()
var p models.Point
go func() {
for i := 0; i < batchSize; i++ {
batcher.In() <- p
}
}()
batch := <-batcher.Out()
if len(batch) != batchSize {
t.Errorf("received batch has incorrect length exp %d, got %d", batchSize, len(batch))
}
checkPointBatcherStats(t, batcher, -1, batchSize, 1, 0)
}
// TestBatch_Size ensures that a buffered batcher generates a batch when the size threshold is reached.
func TestBatch_SizeBuffered(t *testing.T) {
batchSize := 5
batcher := tsdb.NewPointBatcher(batchSize, 5, time.Hour)
if batcher == nil {
t.Fatal("failed to create batcher for size test")
}
batcher.Start()
var p models.Point
go func() {
for i := 0; i < batchSize; i++ {
batcher.In() <- p
}
}()
batch := <-batcher.Out()
if len(batch) != batchSize {
t.Errorf("received batch has incorrect length exp %d, got %d", batchSize, len(batch))
}
checkPointBatcherStats(t, batcher, -1, batchSize, 1, 0)
}
// TestBatch_Size ensures that a batcher generates a batch when the timeout triggers.
func TestBatch_Timeout(t *testing.T) {
batchSize := 5
batcher := tsdb.NewPointBatcher(batchSize+1, 0, 100*time.Millisecond)
if batcher == nil {
t.Fatal("failed to create batcher for timeout test")
}
batcher.Start()
var p models.Point
go func() {
for i := 0; i < batchSize; i++ {
batcher.In() <- p
}
}()
batch := <-batcher.Out()
if len(batch) != batchSize {
t.Errorf("received batch has incorrect length exp %d, got %d", batchSize, len(batch))
}
checkPointBatcherStats(t, batcher, -1, batchSize, 0, 1)
}
// TestBatch_Flush ensures that a batcher generates a batch when flushed
func TestBatch_Flush(t *testing.T) {
batchSize := 2
batcher := tsdb.NewPointBatcher(batchSize, 0, time.Hour)
if batcher == nil {
t.Fatal("failed to create batcher for flush test")
}
batcher.Start()
var p models.Point
go func() {
batcher.In() <- p
batcher.Flush()
}()
batch := <-batcher.Out()
if len(batch) != 1 {
t.Errorf("received batch has incorrect length exp %d, got %d", 1, len(batch))
}
checkPointBatcherStats(t, batcher, -1, 1, 0, 0)
}
// TestBatch_MultipleBatches ensures that a batcher correctly processes multiple batches.
func TestBatch_MultipleBatches(t *testing.T) {
batchSize := 2
batcher := tsdb.NewPointBatcher(batchSize, 0, 100*time.Millisecond)
if batcher == nil {
t.Fatal("failed to create batcher for size test")
}
batcher.Start()
var p models.Point
var b []models.Point
batcher.In() <- p
batcher.In() <- p
b = <-batcher.Out() // Batch threshold reached.
if len(b) != batchSize {
t.Errorf("received batch (size) has incorrect length exp %d, got %d", batchSize, len(b))
}
batcher.In() <- p
b = <-batcher.Out() // Timeout triggered.
if len(b) != 1 {
t.Errorf("received batch (timeout) has incorrect length exp %d, got %d", 1, len(b))
}
checkPointBatcherStats(t, batcher, -1, 3, 1, 1)
}
func checkPointBatcherStats(t *testing.T, b *tsdb.PointBatcher, batchTotal, pointTotal, sizeTotal, timeoutTotal int) {
stats := b.Stats()
if batchTotal != -1 && stats.BatchTotal != uint64(batchTotal) {
t.Errorf("batch total stat is incorrect: %d", stats.BatchTotal)
}
if pointTotal != -1 && stats.PointTotal != uint64(pointTotal) {
t.Errorf("point total stat is incorrect: %d", stats.PointTotal)
}
if sizeTotal != -1 && stats.SizeTotal != uint64(sizeTotal) {
t.Errorf("size total stat is incorrect: %d", stats.SizeTotal)
}
if timeoutTotal != -1 && stats.TimeoutTotal != uint64(timeoutTotal) {
t.Errorf("timeout total stat is incorrect: %d", stats.TimeoutTotal)
}
}