index.go
3.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package tsdb
import (
"fmt"
"os"
"regexp"
"sort"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
)
type Index interface {
Open() error
Close() error
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
DropMeasurement(name []byte) error
ForEachMeasurementName(fn func(name []byte) error) error
InitializeSeries(key, name []byte, tags models.Tags) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(key []byte) error
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
HasTagKey(name, key []byte) (bool, error)
TagSets(name []byte, options influxql.IteratorOptions) ([]*influxql.TagSet, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL system iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error
SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
// Sets a shared fieldset from the engine.
SetFieldSet(fs *MeasurementFieldSet)
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error
// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
AssignShard(k string, shardID uint64)
UnassignShard(k string, shardID uint64) error
RemoveShard(shardID uint64)
Type() string
}
// IndexFormat represents the format for an index.
type IndexFormat int
const (
// InMemFormat is the format used by the original in-memory shared index.
InMemFormat IndexFormat = 1
// TSI1Format is the format used by the tsi1 index.
TSI1Format IndexFormat = 2
)
// NewIndexFunc creates a new index.
type NewIndexFunc func(id uint64, path string, options EngineOptions) Index
// newIndexFuncs is a lookup of index constructors by name.
var newIndexFuncs = make(map[string]NewIndexFunc)
// RegisterIndex registers a storage index initializer by name.
func RegisterIndex(name string, fn NewIndexFunc) {
if _, ok := newIndexFuncs[name]; ok {
panic("index already registered: " + name)
}
newIndexFuncs[name] = fn
}
// RegisteredIndexs returns the slice of currently registered indexes.
func RegisteredIndexes() []string {
a := make([]string, 0, len(newIndexFuncs))
for k := range newIndexFuncs {
a = append(a, k)
}
sort.Strings(a)
return a
}
// NewIndex returns an instance of an index based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewIndex(id uint64, path string, options EngineOptions) (Index, error) {
format := options.IndexVersion
// Use default format unless existing directory exists.
_, err := os.Stat(path)
if os.IsNotExist(err) {
// nop, use default
} else if err != nil {
return nil, err
} else if err == nil {
format = "tsi1"
}
// Lookup index by format.
fn := newIndexFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
}
return fn(id, path, options), nil
}
func MustOpenIndex(id uint64, path string, options EngineOptions) Index {
idx, err := NewIndex(id, path, options)
if err != nil {
panic(err)
} else if err := idx.Open(); err != nil {
panic(err)
}
return idx
}