emitter.go 5.32 KB
package influxql

import (
	"fmt"
	"time"

	"github.com/influxdata/influxdb/models"
)

// Emitter groups values together by name, tags, and time.
type Emitter struct {
	buf       []Point
	itrs      []Iterator
	ascending bool
	chunkSize int

	tags Tags
	row  *models.Row

	// The columns to attach to each row.
	Columns []string

	// The time zone location.
	Location *time.Location

	// Removes the "time" column from output.
	// Used for meta queries where time does not apply.
	OmitTime bool
}

// NewEmitter returns a new instance of Emitter that pulls from itrs.
func NewEmitter(itrs []Iterator, ascending bool, chunkSize int) *Emitter {
	return &Emitter{
		buf:       make([]Point, len(itrs)),
		itrs:      itrs,
		ascending: ascending,
		chunkSize: chunkSize,
		Location:  time.UTC,
	}
}

// Close closes the underlying iterators.
func (e *Emitter) Close() error {
	return Iterators(e.itrs).Close()
}

// Emit returns the next row from the iterators.
func (e *Emitter) Emit() (*models.Row, bool, error) {
	// Immediately end emission if there are no iterators.
	if len(e.itrs) == 0 {
		return nil, false, nil
	}

	// Continually read from iterators until they are exhausted.
	for {
		// Fill buffer. Return row if no more points remain.
		t, name, tags, err := e.loadBuf()
		if err != nil {
			return nil, false, err
		} else if t == ZeroTime {
			row := e.row
			e.row = nil
			return row, false, nil
		}

		// Read next set of values from all iterators at a given time/name/tags.
		// If no values are returned then return row.
		values := e.readAt(t, name, tags)
		if values == nil {
			row := e.row
			e.row = nil
			return row, false, nil
		}

		// If there's no row yet then create one.
		// If the name and tags match the existing row, append to that row if
		// the number of values doesn't exceed the chunk size.
		// Otherwise return existing row and add values to next emitted row.
		if e.row == nil {
			e.createRow(name, tags, values)
		} else if e.row.Name == name && e.tags.Equals(&tags) {
			if e.chunkSize > 0 && len(e.row.Values) >= e.chunkSize {
				row := e.row
				row.Partial = true
				e.createRow(name, tags, values)
				return row, true, nil
			}
			e.row.Values = append(e.row.Values, values)
		} else {
			row := e.row
			e.createRow(name, tags, values)
			return row, true, nil
		}
	}
}

// loadBuf reads in points into empty buffer slots.
// Returns the next time/name/tags to emit for.
func (e *Emitter) loadBuf() (t int64, name string, tags Tags, err error) {
	t = ZeroTime

	for i := range e.itrs {
		// Load buffer, if empty.
		if e.buf[i] == nil {
			e.buf[i], err = e.readIterator(e.itrs[i])
			if err != nil {
				break
			}
		}

		// Skip if buffer is empty.
		p := e.buf[i]
		if p == nil {
			continue
		}
		itrTime, itrName, itrTags := p.time(), p.name(), p.tags()

		// Initialize range values if not set.
		if t == ZeroTime {
			t, name, tags = itrTime, itrName, itrTags
			continue
		}

		// Update range values if lower and emitter is in time ascending order.
		if e.ascending {
			if (itrName < name) || (itrName == name && itrTags.ID() < tags.ID()) || (itrName == name && itrTags.ID() == tags.ID() && itrTime < t) {
				t, name, tags = itrTime, itrName, itrTags
			}
			continue
		}

		// Update range values if higher and emitter is in time descending order.
		if (itrName > name) || (itrName == name && itrTags.ID() > tags.ID()) || (itrName == name && itrTags.ID() == tags.ID() && itrTime > t) {
			t, name, tags = itrTime, itrName, itrTags
		}
	}
	return
}

// createRow creates a new row attached to the emitter.
func (e *Emitter) createRow(name string, tags Tags, values []interface{}) {
	e.tags = tags
	e.row = &models.Row{
		Name:    name,
		Tags:    tags.KeyValues(),
		Columns: e.Columns,
		Values:  [][]interface{}{values},
	}
}

// readAt returns the next slice of values from the iterators at time/name/tags.
// Returns nil values once the iterators are exhausted.
func (e *Emitter) readAt(t int64, name string, tags Tags) []interface{} {
	offset := 1
	if e.OmitTime {
		offset = 0
	}

	values := make([]interface{}, len(e.itrs)+offset)
	if !e.OmitTime {
		values[0] = time.Unix(0, t).In(e.Location)
	}
	e.readInto(t, name, tags, values[offset:])
	return values
}

func (e *Emitter) readInto(t int64, name string, tags Tags, values []interface{}) {
	for i, p := range e.buf {
		// Skip if buffer is empty.
		if p == nil {
			values[i] = nil
			continue
		}

		// Skip point if it doesn't match time/name/tags.
		pTags := p.tags()
		if p.time() != t || p.name() != name || !pTags.Equals(&tags) {
			values[i] = nil
			continue
		}

		// Read point value.
		values[i] = p.value()

		// Clear buffer.
		e.buf[i] = nil
	}
}

// readIterator reads the next point from itr.
func (e *Emitter) readIterator(itr Iterator) (Point, error) {
	if itr == nil {
		return nil, nil
	}

	switch itr := itr.(type) {
	case FloatIterator:
		if p, err := itr.Next(); err != nil {
			return nil, err
		} else if p != nil {
			return p, nil
		}
	case IntegerIterator:
		if p, err := itr.Next(); err != nil {
			return nil, err
		} else if p != nil {
			return p, nil
		}
	case StringIterator:
		if p, err := itr.Next(); err != nil {
			return nil, err
		} else if p != nil {
			return p, nil
		}
	case BooleanIterator:
		if p, err := itr.Next(); err != nil {
			return nil, err
		} else if p != nil {
			return p, nil
		}
	default:
		panic(fmt.Sprintf("unsupported iterator: %T", itr))
	}
	return nil, nil
}