functions.gen.go.tmpl 8.17 KB
package influxql

import (
"sort"
"time"
"math/rand"
)

{{with $types := .}}{{range $k := $types}}

// {{$k.Name}}PointAggregator aggregates points to produce a single point.
type {{$k.Name}}PointAggregator interface {
	Aggregate{{$k.Name}}(p *{{$k.Name}}Point)
}

// {{$k.Name}}BulkPointAggregator aggregates multiple points at a time.
type {{$k.Name}}BulkPointAggregator interface {
	Aggregate{{$k.Name}}Bulk(points []{{$k.Name}}Point)
}

// Aggregate{{$k.Name}}Points feeds a slice of {{$k.Name}}Point into an
// aggregator. If the aggregator is a {{$k.Name}}BulkPointAggregator, it will
// use the AggregateBulk method.
func Aggregate{{$k.Name}}Points(a {{$k.Name}}PointAggregator, points []{{$k.Name}}Point) {
	switch a := a.(type) {
	case {{$k.Name}}BulkPointAggregator:
		a.Aggregate{{$k.Name}}Bulk(points)
	default:
		for _, p := range points {
			a.Aggregate{{$k.Name}}(&p)
		}
	}
}

// {{$k.Name}}PointEmitter produces a single point from an aggregate.
type {{$k.Name}}PointEmitter interface {
	Emit() []{{$k.Name}}Point
}

{{range $v := $types}}

// {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func is the function called by a {{$k.Name}}Point reducer.
type {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func func(prev *{{$v.Name}}Point, curr *{{$k.Name}}Point) (t int64, v {{$v.Type}}, aux []interface{})

// {{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer is a reducer that reduces
// the passed in points to a single point using a reduce function.
type {{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer struct {
	prev *{{$v.Name}}Point
	fn   {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func
}

// New{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer creates a new {{$k.Name}}Func{{$v.Name}}Reducer.
func New{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer(fn {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Func, prev *{{$v.Name}}Point) *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer {
	return &{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer{fn: fn, prev: prev}
}

// Aggregate{{$k.Name}} takes a {{$k.Name}}Point and invokes the reduce function with the
// current and new point to modify the current point.
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
	t, v, aux := r.fn(r.prev, p)
	if r.prev == nil {
		r.prev = &{{$v.Name}}Point{}
	}
	r.prev.Time = t
	r.prev.Value = v
	r.prev.Aux = aux
	if p.Aggregated > 1 {
		r.prev.Aggregated += p.Aggregated
	} else {
		r.prev.Aggregated++
	}
}

// Emit emits the point that was generated when reducing the points fed in with Aggregate{{$k.Name}}.
func (r *{{$k.Name}}Func{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() []{{$v.Name}}Point {
	return []{{$v.Name}}Point{*r.prev}
}

// {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}SliceFunc is the function called by a {{$k.Name}}Point reducer.
type {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}SliceFunc func(a []{{$k.Name}}Point) []{{$v.Name}}Point

// {{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer is a reducer that aggregates
// the passed in points and then invokes the function to reduce the points when they are emitted.
type {{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer struct {
	points []{{$k.Name}}Point
	fn     {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}SliceFunc
}

// New{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer creates a new {{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer.
func New{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer(fn {{$k.Name}}Reduce{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}SliceFunc) *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer {
	return &{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer{fn: fn}
}

// Aggregate{{$k.Name}} copies the {{$k.Name}}Point into the internal slice to be passed
// to the reduce function when Emit is called.
func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
	r.points = append(r.points, *p.Clone())
}

// Aggregate{{$k.Name}}Bulk performs a bulk copy of {{$k.Name}}Points into the internal slice.
// This is a more efficient version of calling Aggregate{{$k.Name}} on each point.
func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Aggregate{{$k.Name}}Bulk(points []{{$k.Name}}Point) {
	r.points = append(r.points, points...)
}

// Emit invokes the reduce function on the aggregated points to generate the aggregated points.
// This method does not clear the points from the internal slice.
func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() []{{$v.Name}}Point {
	return r.fn(r.points)
}
{{end}}

// {{$k.Name}}DistinctReducer returns the distinct points in a series.
type {{$k.Name}}DistinctReducer struct {
	m map[{{$k.Type}}]{{$k.Name}}Point
}

// New{{$k.Name}}DistinctReducer creates a new {{$k.Name}}DistinctReducer.
func New{{$k.Name}}DistinctReducer() *{{$k.Name}}DistinctReducer {
	return &{{$k.Name}}DistinctReducer{m: make(map[{{$k.Type}}]{{$k.Name}}Point)}
}

// Aggregate{{$k.Name}} aggregates a point into the reducer.
func (r *{{$k.Name}}DistinctReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
	if _, ok := r.m[p.Value]; !ok {
		r.m[p.Value] = *p
	}
}

// Emit emits the distinct points that have been aggregated into the reducer.
func (r *{{$k.Name}}DistinctReducer) Emit() []{{$k.Name}}Point {
	points := make([]{{$k.Name}}Point, 0, len(r.m))
	for _, p := range r.m {
		points = append(points, {{$k.Name}}Point{Time: p.Time, Value: p.Value})
	}
	sort.Sort({{$k.name}}Points(points))
	return points
}

// {{$k.Name}}ElapsedReducer calculates the elapsed of the aggregated points.
type {{$k.Name}}ElapsedReducer struct {
	unitConversion int64
	prev           {{$k.Name}}Point
	curr           {{$k.Name}}Point
}

// New{{$k.Name}}ElapsedReducer creates a new {{$k.Name}}ElapsedReducer.
func New{{$k.Name}}ElapsedReducer(interval Interval) *{{$k.Name}}ElapsedReducer {
	return &{{$k.Name}}ElapsedReducer{
		unitConversion: int64(interval.Duration),
		prev:           {{$k.Name}}Point{Nil: true},
		curr:           {{$k.Name}}Point{Nil: true},
	}
}

// Aggregate{{$k.Name}} aggregates a point into the reducer and updates the current window.
func (r *{{$k.Name}}ElapsedReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
	r.prev = r.curr
	r.curr = *p
}

// Emit emits the elapsed of the reducer at the current point.
func (r *{{$k.Name}}ElapsedReducer) Emit() []IntegerPoint {
	if !r.prev.Nil {
		elapsed := (r.curr.Time - r.prev.Time) / r.unitConversion
		return []IntegerPoint{
			{Time: r.curr.Time, Value: elapsed},
		}
	}
	return nil
}

// {{$k.Name}}SampleReducer implements a reservoir sampling to calculate a random subset of points
type {{$k.Name}}SampleReducer struct {
	count int // how many points we've iterated over
	rng   *rand.Rand // random number generator for each reducer

	points {{$k.name}}Points // the reservoir
}

// New{{$k.Name}}SampleReducer creates a new {{$k.Name}}SampleReducer
func New{{$k.Name}}SampleReducer(size int) *{{$k.Name}}SampleReducer {
	return &{{$k.Name}}SampleReducer{
		rng:    rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
		points: make({{$k.name}}Points, size),
	}
}

// Aggregate{{$k.Name}} aggregates a point into the reducer.
func (r *{{$k.Name}}SampleReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
	r.count++
	// Fill the reservoir with the first n points
	if r.count-1 < len(r.points) {
		p.CopyTo(&r.points[r.count-1])
		return
	}

	// Generate a random integer between 1 and the count and
	// if that number is less than the length of the slice
	// replace the point at that index rnd with p.
	rnd := r.rng.Intn(r.count)
	if rnd < len(r.points) {
		p.CopyTo(&r.points[rnd])
	}
}

// Emit emits the reservoir sample as many points.
func (r *{{$k.Name}}SampleReducer) Emit() []{{$k.Name}}Point {
	min := len(r.points)
	if r.count < min {
		min = r.count
	}
	pts := r.points[:min]
	sort.Sort(pts)
	return pts
}


{{end}}{{end}}