Iker Narvaez

error reporter

Showing 468 changed files with 4862 additions and 0 deletions
memo = "ba130e44a2a718c8f858bfbb806c3bcd55868a3315fd8a252326651bac45340f"
[[projects]]
branch = "master"
name = "git.ukko.mx/ukko/influx_client_go.git"
packages = ["."]
revision = "dd2c8fc821245c8eec272fbe1a01e41b8d39bf2d"
[[projects]]
branch = "master"
name = "github.com/influxdata/influxdb"
packages = ["client/v2","models","pkg/escape"]
revision = "a6c543039763c0f08253d71a43aefe3b570ecf37"
## Gopkg.toml example (these lines may be deleted)
## "required" lists a set of packages (not projects) that must be included in
## Gopkg.lock. This list is merged with the set of packages imported by the current
## project. Use it when your project needs a package it doesn't explicitly import -
## including "main" packages.
# required = ["github.com/user/thing/cmd/thing"]
## "ignored" lists a set of packages (not projects) that are ignored when
## dep statically analyzes source code. Ignored packages can be in this project,
## or in a dependency.
# ignored = ["github.com/user/project/badpkg"]
## Dependencies define constraints on dependent projects. They are respected by
## dep whether coming from the Gopkg.toml of the current project or a dependency.
# [[dependencies]]
## Required: the root import path of the project being constrained.
# name = "github.com/user/project"
#
## Recommended: the version constraint to enforce for the project.
## Only one of "branch", "version" or "revision" can be specified.
# version = "1.0.0"
# branch = "master"
# revision = "abc123"
#
## Optional: an alternate location (URL or import path) for the project's source.
# source = "https://github.com/myfork/package.git"
## Overrides have the same structure as [[dependencies]], but supercede all
## [[dependencies]] declarations from all projects. Only the current project's
## [[overrides]] are applied.
##
## Overrides are a sledgehammer. Use them only as a last resort.
# [[overrides]]
## Required: the root import path of the project being constrained.
# name = "github.com/user/project"
#
## Optional: specifying a version constraint override will cause all other
## constraints on this project to be ignored; only the overriden constraint
## need be satisfied.
## Again, only one of "branch", "version" or "revision" can be specified.
# version = "1.0.0"
# branch = "master"
# revision = "abc123"
#
## Optional: specifying an alternate source location as an override will
## enforce that the alternate location is used for that project, regardless of
## what source location any dependent projects specify.
# source = "https://github.com/myfork/package.git"
[[dependencies]]
branch = "master"
name = "git.ukko.mx/ukko/influx_client_go.git"
package error_reporter
import(
"time"
"log"
"os"
"git.ukko.mx/ukko/influx_client_go.git"
)
var(
defaultUsername = os.Getenv("INFLUX_USERNAME")
defaultPassword = os.Getenv("INFLUX_PASSWORD")
defaultDbAddress = os.Getenv("INFLUX_DB_ADDRESS")
defaultSeries = "errors"
influxClient *influx_client.InfluxClient
series string
)
func InitReporting(db string, username string, password string, dbAddress string) {
influxClient = influx_client.CreateClient(db, username, password, dbAddress)
series = defaultSeries
}
func InitDefaultReporting(db string) {
influxClient = influx_client.CreateClient(db, defaultUsername, defaultPassword, defaultDbAddress)
series = defaultSeries
}
func Report(who string, message string, err error) {
log.Fatalf("%s: %s", message, err)
tags := errorSystemTags(who, message, err)
fields := map[string]interface{}{
"value": 1,
}
stamp := time.Now()
influxClient.CreatePoint(series, tags, fields, stamp)
}
func errorSystemTags(who string, message string, err error) map[string]string {
host, _:= os.Hostname()
return map[string]string{
"hostname": host,
"process": who,
"message": message,
"exception": err.Error(),
}
}
memo = "efe4a26b5775ea537c0383b685d50fa64ee8fa6eec77406c5326d5f54744423f"
[[projects]]
branch = "master"
name = "github.com/influxdata/influxdb"
packages = ["client/v2","models","pkg/escape"]
revision = "31db9d6f468239346a1fe7464b5cf9c85580488f"
## Gopkg.toml example (these lines may be deleted)
## "required" lists a set of packages (not projects) that must be included in
## Gopkg.lock. This list is merged with the set of packages imported by the current
## project. Use it when your project needs a package it doesn't explicitly import -
## including "main" packages.
# required = ["github.com/user/thing/cmd/thing"]
## "ignored" lists a set of packages (not projects) that are ignored when
## dep statically analyzes source code. Ignored packages can be in this project,
## or in a dependency.
# ignored = ["github.com/user/project/badpkg"]
## Dependencies define constraints on dependent projects. They are respected by
## dep whether coming from the Gopkg.toml of the current project or a dependency.
# [[dependencies]]
## Required: the root import path of the project being constrained.
# name = "github.com/user/project"
#
## Recommended: the version constraint to enforce for the project.
## Only one of "branch", "version" or "revision" can be specified.
# version = "1.0.0"
# branch = "master"
# revision = "abc123"
#
## Optional: an alternate location (URL or import path) for the project's source.
# source = "https://github.com/myfork/package.git"
## Overrides have the same structure as [[dependencies]], but supercede all
## [[dependencies]] declarations from all projects. Only the current project's
## [[overrides]] are applied.
##
## Overrides are a sledgehammer. Use them only as a last resort.
# [[overrides]]
## Required: the root import path of the project being constrained.
# name = "github.com/user/project"
#
## Optional: specifying a version constraint override will cause all other
## constraints on this project to be ignored; only the overriden constraint
## need be satisfied.
## Again, only one of "branch", "version" or "revision" can be specified.
# version = "1.0.0"
# branch = "master"
# revision = "abc123"
#
## Optional: specifying an alternate source location as an override will
## enforce that the alternate location is used for that project, regardless of
## what source location any dependent projects specify.
# source = "https://github.com/myfork/package.git"
[[dependencies]]
branch = "master"
name = "github.com/influxdata/influxdb"
package influx_client
import(
influx "github.com/influxdata/influxdb/client/v2"
"time"
)
type InfluxClient struct {
Client influx.Client
InfluxErr error
Db string
}
func CreateClient(db string, username string, password string, dbAddress string) *InfluxClient {
i := InfluxClient {}
i.Db = db
i.Client, i.InfluxErr = influx.NewHTTPClient(influx.HTTPConfig{
Addr: dbAddress,
Username: username,
Password: password,
})
if i.InfluxErr != nil {
panic(i.InfluxErr)
}
return &i
}
func (i *InfluxClient) CreatePoint(tableName string, tags map[string]string, fields map[string]interface{}, timestamp time.Time) {
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: i.Db,
Precision: "s",
})
pt, err := influx.NewPoint(tableName, tags, fields, timestamp)
if err != nil {
panic(err)
// log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := i.Client.Write(bp); err != nil {
panic(err)
}
}
### Directions
_GitHub Issues are reserved for actionable bug reports and feature requests._
_General questions should be sent to the [InfluxDB Community Site](https://community.influxdata.com)._
_Before opening an issue, search for similar bug reports or feature requests on GitHub Issues._
_If no similar issue can be found, fill out either the "Bug Report" or the "Feature Request" section below.
_Erase the other section and everything on and above this line._
### Bug report
__System info:__ [Include InfluxDB version, operating system name, and other relevant details]
__Steps to reproduce:__
1. [First Step]
2. [Second Step]
3. [and so on...]
__Expected behavior:__ [What you expected to happen]
__Actual behavior:__ [What actually happened]
__Additional info:__ [Include gist of relevant config, logs, etc.]
Also, if this is an issue of for performance, locking, etc the following commands are useful to create debug information for the team.
```
curl -o profiles.tar.gz "http://localhost:8086/debug/pprof/all?cpu=true"
curl -o vars.txt "http://localhost:8086/debug/vars"
iostat -xd 1 30 > iostat.txt
```
**Please note** It will take at least 30 seconds for the first cURL command above to return a response.
This is because it will run a CPU profile as part of its information gathering, which takes 30 seconds to collect.
Ideally you should run these commands when you're experiencing problems, so we can capture the state of the system at that time.
If you're concerned about running a CPU profile (which only has a small, temporary impact on performance), then you can set `?cpu=false` or omit `?cpu=true` altogether.
Please run those if possible and link them from a [gist](http://gist.github.com) or simply attach them as a comment to the issue.
*Please note, the quickest way to fix a bug is to open a Pull Request.*
### Feature Request
Opening a feature request kicks off a discussion.
Requests may be closed if we're not actively planning to work on them.
__Proposal:__ [Description of the feature]
__Current behavior:__ [What currently happens]
__Desired behavior:__ [What you would like to happen]
__Use case:__ [Why is this important (helps with prioritizing requests)]
###### Required for all non-trivial PRs
- [ ] Rebased/mergable
- [ ] Tests pass
- [ ] CHANGELOG.md updated
- [ ] Sign [CLA](https://influxdata.com/community/cla/) (if not already signed)
###### Required only if applicable
_You can erase any checkboxes below this note if they are not applicable to your Pull Request._
- [ ] [InfluxQL Spec](https://github.com/influxdata/influxdb/blob/master/influxql/README.md) updated
- [ ] Provide example syntax
- [ ] Update man page when modifying a command
- [ ] Config changes: update sample config (`etc/config.sample.toml`), server `NewDemoConfig` method, and `Diagnostics` methods reporting config settings, if necessary
- [ ] [InfluxData Documentation](https://github.com/influxdata/docs.influxdata.com): issue filed or pull request submitted \<link to issue or pull request\>
# Keep editor-specific, non-project specific ignore rules in global .gitignore:
# https://help.github.com/articles/ignoring-files/#create-a-global-gitignore
*~
src/
config.json
/bin/
/query/a.out*
# ignore generated files.
cmd/influxd/version.go
# executables
*.test
influx_tsm
**/influx_tsm
!**/influx_tsm/
influx_stress
**/influx_stress
!**/influx_stress/
influxd
**/influxd
!**/influxd/
influx
**/influx
!**/influx/
influxdb
**/influxdb
!**/influxdb/
influx_inspect
**/influx_inspect
!**/influx_inspect/
/benchmark-tool
/main
/benchmark-storage
godef
gosym
gocode
inspect-raft
# dependencies
out_rpm/
packages/
# autconf
autom4te.cache/
config.log
config.status
# log file
influxdb.log
benchmark.log
# config file
config.toml
# test data files
integration/migration_data/
# man outputs
man/*.xml
man/*.1
man/*.1.gz
# test outputs
/test-results.xml
#!/usr/bin/env bash
fmtcount=`git ls-files | grep '.go$' | xargs gofmt -l 2>&1 | wc -l`
if [ $fmtcount -gt 0 ]; then
echo "Some files aren't formatted, please run 'go fmt ./...' to format your source code before committing"
exit 1
fi
vetcount=`go tool vet ./ 2>&1 | wc -l`
if [ $vetcount -gt 0 ]; then
echo "Some files aren't passing vet heuristics, please run 'go vet ./...' to see the errors it flags and correct your source code before committing"
exit 1
fi
exit 0
# Ensure FIXME lines are removed before commit.
fixme_lines=$(git diff --cached | grep ^+ | grep -v pre-commit | grep FIXME | sed 's_^+\s*__g')
if [ "$fixme_lines" != "" ]; then
echo "Please remove the following lines:"
echo -e "$fixme_lines"
exit 1
fi
{
"maxReviewers": 3,
"fileBlacklist": ["CHANGELOG.md"],
"userBlacklist": ["pauldix", "toddboom", "aviau", "mark-rushakoff"],
"requiredOrgs": ["influxdata"]
}
This diff could not be displayed because it is too large.
_This document is currently in draft form._
# Background
The goal of this guide is to capture some Do and Don'ts of Go code for the InfluxDB database. When it comes to Go, writing good code is often achieved with the help of tools like `go fmt` and `go vet`. However there are still some practices not enforceable by any tools. This guide lists some specific practices to follow when writing code for the database.
*Like everything, one needs to use good judgment.* There will always be times when it doesn't make sense to follow a guideline outlined in this document. If that case arises, be ready to justify your choices.
# The Guidelines
## Try not to use third-party libraries
A third-party package is defined as one that is not part of the standard Go distribution. Generally speaking we prefer to minimize our use of third-party packages, and avoid them unless absolutely necessarily. We'll often write a little bit of code rather than pull in a third-party package. Of course, we do use some third-party packages -- most importantly we use [BoltDB](https://github.com/boltdb/bolt) in some storage engines. So to maximise the chance your change will be accepted by us, use only the standard libraries, or the third-party packages we have decided to use.
For rationale, check out the post [The Case Against Third Party Libraries](http://blog.gopheracademy.com/advent-2014/case-against-3pl/).
## Always include a default case in a 'switch' statement
The lack of a `default` case in a `switch` statement can be a significant source of bugs. This is particularly true in the case of a type-assertions switch. So always include a `default` statement unless you have an explicit reason not to.
## When -- and when not -- set a channel to 'nil'
## Use defer with anonymous functions to handle complex locking
Consider a block of code like the following.
```
mu.Lock()
if foo == "quit" {
mu.Unlock()
return
} else if foo == "continue" {
if bar == "quit" {
mu.Unlock()
return
}
bar = "still going"
} else {
qux = "here at last"
mu.Unlock()
return
}
foo = "more to do"
bar = "still more to do"
mu.Unlock()
qux = "finished now"
return
```
While this is obviously contrived, complex lock control like this is sometimes required, and doesn't lend itself to `defer`. But as the code evolves, it's easy to introduce new cases, and forget to release locks. One way to address this is to use an anonymous function like so:
```
more := func() bool {
mu.Lock()
defer mu.Unlock()
if foo == "quit" {
return false
} else if foo == "continue" {
if bar == "quit" {
return false
}
bar = "still going"
} else {
qux = "here at last"
return false
}
foo = "more to do"
bar = "still more to do"
return true
}()
if more {
qux = "finished"
}
return
```
This allows us to use `defer` but ensures that if any new cases are added to the logic within the anonymous function, the lock will always be released. Another advantage of this approach is that `defer` will still run even in the event of a panic, ensuring the locks will be released even in that case.
## When to call 'panic()'
# Useful links
- [Useful techniques in Go](http://arslan.io/ten-useful-techniques-in-go)
- [Go in production](http://peter.bourgon.org/go-in-production/)
- [Principles of designing Go APIs with channels](https://inconshreveable.com/07-08-2014/principles-of-designing-go-apis-with-channels/)
- [Common mistakes in Golang](http://soryy.com/blog/2014/common-mistakes-with-go-lang/). Especially this section `Loops, Closures, and Local Variables`
This diff is collapsed. Click to expand it.
FROM ioft/i386-ubuntu:14.04
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
python-software-properties \
software-properties-common \
wget \
git \
mercurial \
make \
ruby \
ruby-dev \
rpm \
zip \
python \
python-boto
RUN gem install fpm
# Install go
ENV GOPATH /root/go
ENV GO_VERSION 1.8.1
ENV GO_ARCH 386
RUN wget https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \
tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \
rm /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz
ENV PATH /usr/local/go/bin:$PATH
ENV PROJECT_DIR $GOPATH/src/github.com/influxdata/influxdb
ENV PATH $GOPATH/bin:$PATH
RUN mkdir -p $PROJECT_DIR
WORKDIR $PROJECT_DIR
VOLUME $PROJECT_DIR
ENTRYPOINT [ "/root/go/src/github.com/influxdata/influxdb/build.py" ]
FROM ubuntu:trusty
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
python-software-properties \
software-properties-common \
wget \
git \
mercurial \
make \
ruby \
ruby-dev \
rpm \
zip \
python \
python-boto \
asciidoc \
xmlto \
docbook-xsl
RUN gem install fpm
# Install go
ENV GOPATH /root/go
ENV GO_VERSION 1.8.1
ENV GO_ARCH amd64
RUN wget https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \
tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \
rm /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz
ENV PATH /usr/local/go/bin:$PATH
ENV PROJECT_DIR $GOPATH/src/github.com/influxdata/influxdb
ENV PATH $GOPATH/bin:$PATH
RUN mkdir -p $PROJECT_DIR
WORKDIR $PROJECT_DIR
VOLUME $PROJECT_DIR
ENTRYPOINT [ "/root/go/src/github.com/influxdata/influxdb/build.py" ]
FROM ubuntu:trusty
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
python-software-properties \
software-properties-common \
wget \
git \
mercurial \
make \
ruby \
ruby-dev \
rpm \
zip \
python \
python-boto
RUN gem install fpm
# Setup env
ENV GOPATH /root/go
ENV PROJECT_DIR $GOPATH/src/github.com/influxdata/influxdb
ENV PATH $GOPATH/bin:$PATH
RUN mkdir -p $PROJECT_DIR
VOLUME $PROJECT_DIR
# Install go
ENV GO_VERSION 1.8.1
ENV GO_ARCH amd64
RUN wget https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \
tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \
rm /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz
# Clone Go tip for compilation
ENV GOROOT_BOOTSTRAP /usr/local/go
RUN git clone https://go.googlesource.com/go
ENV PATH /go/bin:$PATH
# Add script for compiling go
ENV GO_CHECKOUT master
ADD ./gobuild.sh /gobuild.sh
ENTRYPOINT [ "/gobuild.sh" ]
FROM 32bit/ubuntu:14.04
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y python-software-properties software-properties-common git
RUN add-apt-repository ppa:evarlast/golang1.4
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y -o Dpkg::Options::="--force-overwrite" golang-go
ENV GOPATH=/root/go
RUN mkdir -p /root/go/src/github.com/influxdata/influxdb
RUN mkdir -p /tmp/artifacts
VOLUME /root/go/src/github.com/influxdata/influxdb
VOLUME /tmp/artifacts
collectd.org e84e8af5356e7f47485bbc95c96da6dd7984a67e
github.com/BurntSushi/toml 99064174e013895bbd9b025c31100bd1d9b590ca
github.com/bmizerany/pat c068ca2f0aacee5ac3681d68e4d0a003b7d1fd2c
github.com/boltdb/bolt 4b1ebc1869ad66568b313d0dc410e2be72670dda
github.com/cespare/xxhash 4a94f899c20bc44d4f5f807cb14529e72aca99d6
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
github.com/dgrijalva/jwt-go 24c63f56522a87ec5339cc3567883f1039378fdb
github.com/dgryski/go-bits 2ad8d707cc05b1815ce6ff2543bb5e8d8f9298ef
github.com/dgryski/go-bitstream 7d46cd22db7004f0cceb6f7975824b560cf0e486
github.com/gogo/protobuf a9cd0c35b97daf74d0ebf3514c5254814b2703b4
github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815
github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac
github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d
github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6
github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577
golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
The MIT License (MIT)
Copyright (c) 2013-2016 Errplane Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# List
- bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE)
- collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
- github.com/BurntSushi/toml [WTFPL LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING)
- github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license)
- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE)
- github.com/cespare/xxhash [MIT LICENSE](https://github.com/cespare/xxhash/blob/master/LICENSE.txt)
- github.com/clarkduvall/hyperloglog [MIT LICENSE](https://github.com/clarkduvall/hyperloglog/blob/master/LICENSE)
- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE)
- github.com/dgrijalva/jwt-go [MIT LICENSE](https://github.com/dgrijalva/jwt-go/blob/master/LICENSE)
- github.com/dgryski/go-bits [MIT LICENSE](https://github.com/dgryski/go-bits/blob/master/LICENSE)
- github.com/dgryski/go-bitstream [MIT LICENSE](https://github.com/dgryski/go-bitstream/blob/master/LICENSE)
- github.com/gogo/protobuf/proto [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE)
- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE)
- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt)
- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE)
- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE)
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE)
- github.com/uber-go/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt)
- github.com/uber-go/zap [MIT LICENSE](https://github.com/uber-go/zap/blob/master/LICENSE.txt)
- glyphicons [LICENSE](http://glyphicons.com/license/)
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)
- react 0.13.3 [BSD LICENSE](https://github.com/facebook/react/blob/master/LICENSE)
PACKAGES=$(shell find . -name '*.go' -print0 | xargs -0 -n1 dirname | sort --unique)
default:
metalint: deadcode cyclo aligncheck defercheck structcheck lint errcheck
deadcode:
@deadcode $(PACKAGES) 2>&1
cyclo:
@gocyclo -over 10 $(PACKAGES)
aligncheck:
@aligncheck $(PACKAGES)
defercheck:
@defercheck $(PACKAGES)
structcheck:
@structcheck $(PACKAGES)
lint:
@for pkg in $(PACKAGES); do golint $$pkg; done
errcheck:
@for pkg in $(PACKAGES); do \
errcheck -ignorepkg=bytes,fmt -ignore=":(Rollback|Close)" $$pkg \
done
tools:
go get github.com/remyoudompheng/go-misc/deadcode
go get github.com/alecthomas/gocyclo
go get github.com/opennota/check/...
go get github.com/golang/lint/golint
go get github.com/kisielk/errcheck
go get github.com/sparrc/gdm
.PHONY: default,metalint,deadcode,cyclo,aligncheck,defercheck,structcheck,lint,errcheck,tools
The top level name is called a measurement. These names can contain any characters. Then there are field names, field values, tag keys and tag values, which can also contain any characters. However, if the measurement, field, or tag contains any character other than [A-Z,a-z,0-9,_], or if it starts with a digit, it must be double-quoted. Therefore anywhere a measurement name, field key, or tag key appears it should be wrapped in double quotes.
# Databases & retention policies
```sql
-- create a database
CREATE DATABASE <name>
-- create a retention policy
CREATE RETENTION POLICY <rp-name> ON <db-name> DURATION <duration> REPLICATION <n> [DEFAULT]
-- alter retention policy
ALTER RETENTION POLICY <rp-name> ON <db-name> (DURATION <duration> | REPLICATION <n> | DEFAULT)+
-- drop a database
DROP DATABASE <name>
-- drop a retention policy
DROP RETENTION POLICY <rp-name> ON <db-name>
```
where `<duration>` is either `INF` for infinite retention, or an integer followed by the desired unit of time: u,ms,s,m,h,d,w for microseconds, milliseconds, seconds, minutes, hours, days, or weeks, respectively. `<replication>` must be an integer.
If present, `DEFAULT` sets the retention policy as the default retention policy for writes and reads.
# Users and permissions
```sql
-- create user
CREATE USER <name> WITH PASSWORD '<password>'
-- grant privilege on a database
GRANT <privilege> ON <db> TO <user>
-- grant cluster admin privileges
GRANT ALL [PRIVILEGES] TO <user>
-- revoke privilege
REVOKE <privilege> ON <db> FROM <user>
-- revoke all privileges for a DB
REVOKE ALL [PRIVILEGES] ON <db> FROM <user>
-- revoke all privileges including cluster admin
REVOKE ALL [PRIVILEGES] FROM <user>
-- combine db creation with privilege assignment (user must already exist)
CREATE DATABASE <name> GRANT <privilege> TO <user>
CREATE DATABASE <name> REVOKE <privilege> FROM <user>
-- delete a user
DROP USER <name>
```
where `<privilege> := READ | WRITE | All `.
Authentication must be enabled in the influxdb.conf file for user permissions to be in effect.
By default, newly created users have no privileges to any databases.
Cluster administration privileges automatically grant full read and write permissions to all databases, regardless of subsequent database-specific privilege revocation statements.
# Select
```sql
SELECT mean(value) from cpu WHERE host = 'serverA' AND time > now() - 4h GROUP BY time(5m)
SELECT mean(value) from cpu WHERE time > now() - 4h GROUP BY time(5m), region
```
## Group By
# Delete
# Series
## Destroy
```sql
DROP MEASUREMENT <name>
DROP MEASUREMENT cpu WHERE region = 'uswest'
```
## Show
Show series queries are for pulling out individual series from measurement names and tag data. They're useful for discovery.
```sql
-- show all databases
SHOW DATABASES
-- show measurement names
SHOW MEASUREMENTS
SHOW MEASUREMENTS LIMIT 15
SHOW MEASUREMENTS LIMIT 10 OFFSET 40
SHOW MEASUREMENTS WHERE service = 'redis'
-- LIMIT and OFFSET can be applied to any of the SHOW type queries
-- show all series across all measurements/tagsets
SHOW SERIES
-- get a show of all series for any measurements where tag key region = tak value 'uswest'
SHOW SERIES WHERE region = 'uswest'
SHOW SERIES FROM cpu_load WHERE region = 'uswest' LIMIT 10
-- returns the 100 - 109 rows in the result. In the case of SHOW SERIES, which returns
-- series split into measurements. Each series counts as a row. So you could see only a
-- single measurement returned, but 10 series within it.
SHOW SERIES FROM cpu_load WHERE region = 'uswest' LIMIT 10 OFFSET 100
-- show all retention policies on a database
SHOW RETENTION POLICIES ON mydb
-- get a show of all tag keys across all measurements
SHOW TAG KEYS
-- show all the tag keys for a given measurement
SHOW TAG KEYS FROM cpu
SHOW TAG KEYS FROM temperature, wind_speed
-- show all the tag values. note that a single WHERE TAG KEY = '...' clause is required
SHOW TAG VALUES WITH TAG KEY = 'region'
SHOW TAG VALUES FROM cpu WHERE region = 'uswest' WITH TAG KEY = 'host'
-- and you can do stuff against fields
SHOW FIELD KEYS FROM cpu
-- but you can't do this
SHOW FIELD VALUES
-- we don't index field values, so this query should be invalid.
-- show all users
SHOW USERS
```
Note that `FROM` and `WHERE` are optional clauses in most of the show series queries.
And the show series output looks like this:
```json
[
{
"name": "cpu",
"columns": ["id", "region", "host"],
"values": [
1, "uswest", "servera",
2, "uswest", "serverb"
]
},
{
"name": "reponse_time",
"columns": ["id", "application", "host"],
"values": [
3, "myRailsApp", "servera"
]
}
]
```
# Continuous Queries
Continuous queries are going to be inspired by MySQL `TRIGGER` syntax:
http://dev.mysql.com/doc/refman/5.0/en/trigger-syntax.html
Instead of having automatically-assigned ids, named continuous queries allows for some level of duplication prevention,
particularly in the case where creation is scripted.
## Create
CREATE CONTINUOUS QUERY <name> AS SELECT ... FROM ...
## Destroy
DROP CONTINUOUS QUERY <name>
## List
SHOW CONTINUOUS QUERIES
# InfluxDB [![Circle CI](https://circleci.com/gh/influxdata/influxdb/tree/master.svg?style=svg)](https://circleci.com/gh/influxdata/influxdb/tree/master) [![Go Report Card](https://goreportcard.com/badge/github.com/influxdata/influxdb)](https://goreportcard.com/report/github.com/influxdata/influxdb) [![Docker pulls](https://img.shields.io/docker/pulls/library/influxdb.svg)](https://hub.docker.com/_/influxdb/)
## An Open-Source Time Series Database
InfluxDB is an open source **time series database** with
**no external dependencies**. It's useful for recording metrics,
events, and performing analytics.
## Features
* Built-in [HTTP API](https://docs.influxdata.com/influxdb/latest/guides/writing_data/) so you don't have to write any server side code to get up and running.
* Data can be tagged, allowing very flexible querying.
* SQL-like query language.
* Simple to install and manage, and fast to get data in and out.
* It aims to answer queries in real-time. That means every data point is
indexed as it comes in and is immediately available in queries that
should return in < 100ms.
## Installation
We recommend installing InfluxDB using one of the [pre-built packages](https://influxdata.com/downloads/#influxdb). Then start InfluxDB using:
* `service influxdb start` if you have installed InfluxDB using an official Debian or RPM package.
* `systemctl start influxdb` if you have installed InfluxDB using an official Debian or RPM package, and are running a distro with `systemd`. For example, Ubuntu 15 or later.
* `$GOPATH/bin/influxd` if you have built InfluxDB from source.
## Getting Started
### Create your first database
```
curl -XPOST 'http://localhost:8086/query' --data-urlencode "q=CREATE DATABASE mydb"
```
### Insert some data
```
curl -XPOST 'http://localhost:8086/write?db=mydb' \
-d 'cpu,host=server01,region=uswest load=42 1434055562000000000'
curl -XPOST 'http://localhost:8086/write?db=mydb' \
-d 'cpu,host=server02,region=uswest load=78 1434055562000000000'
curl -XPOST 'http://localhost:8086/write?db=mydb' \
-d 'cpu,host=server03,region=useast load=15.4 1434055562000000000'
```
### Query for the data
```JSON
curl -G http://localhost:8086/query?pretty=true --data-urlencode "db=mydb" \
--data-urlencode "q=SELECT * FROM cpu WHERE host='server01' AND time < now() - 1d"
```
### Analyze the data
```JSON
curl -G http://localhost:8086/query?pretty=true --data-urlencode "db=mydb" \
--data-urlencode "q=SELECT mean(load) FROM cpu WHERE region='uswest'"
```
## Documentation
* Read more about the [design goals and motivations of the project](https://docs.influxdata.com/influxdb/latest/).
* Follow the [getting started guide](https://docs.influxdata.com/influxdb/latest/introduction/getting_started/) to learn the basics in just a few minutes.
* Learn more about [InfluxDB's key concepts](https://docs.influxdata.com/influxdb/latest/guides/writing_data/).
## Contributing
If you're feeling adventurous and want to contribute to InfluxDB, see our [contributing doc](https://github.com/influxdata/influxdb/blob/master/CONTRIBUTING.md) for info on how to make feature requests, build from source, and run tests.
## Looking for Support?
InfluxDB offers a number of services to help your project succeed. We offer Developer Support for organizations in active development, Managed Hosting to make it easy to move into production, and Enterprise Support for companies requiring the best response times, SLAs, and technical fixes. Visit our [support page](https://influxdata.com/services/) or contact [sales@influxdb.com](mailto:sales@influxdb.com) to learn how we can best help you succeed.
# TODO
## v2
TODO list for v2. Here is a list of things we want to add to v1, but can't because they would be a breaking change.
- [#1834](https://github.com/influxdata/influxdb/issues/1834): Disallow using time as a tag key or field key.
- [#2124](https://github.com/influxdata/influxdb/issues/2124): Prohibit writes with precision, but without an explicit timestamp.
- [#4461](https://github.com/influxdata/influxdb/issues/4461): Change default time boundaries.
version: 0.{build}
pull_requests:
do_not_increment_build_number: true
branches:
only:
- master
os: Windows Server 2012 R2
# Custom clone folder (variables are not expanded here).
clone_folder: c:\gopath\src\github.com\influxdata\influxdb
# Environment variables
environment:
GOROOT: C:\go17
GOPATH: C:\gopath
# Scripts that run after cloning repository
install:
- set PATH=%GOROOT%\bin;%GOPATH%\bin;%PATH%
- rmdir c:\go /s /q
- echo %PATH%
- echo %GOPATH%
- cd C:\gopath\src\github.com\influxdata\influxdb
- go version
- go env
- go get github.com/sparrc/gdm
- cd C:\gopath\src\github.com\influxdata\influxdb
- gdm restore
# To run your custom scripts instead of automatic MSBuild
build_script:
- go get -t -v ./...
- go test -race -v ./...
# To disable deployment
deploy: off
This diff is collapsed. Click to expand it.
#!/bin/bash
# Run the build utility via Docker
set -e
# Make sure our working dir is the dir of the script
DIR=$(cd $(dirname ${BASH_SOURCE[0]}) && pwd)
cd $DIR
# Build new docker image
docker build -f Dockerfile_build_ubuntu64 -t influxdb-builder $DIR
echo "Running build.py"
# Run docker
docker run --rm \
-e AWS_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID" \
-e AWS_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY" \
-v $HOME/.aws.conf:/root/.aws.conf \
-v $DIR:/root/go/src/github.com/influxdata/influxdb \
influxdb-builder \
"$@"
#!/bin/bash
#
# This is the InfluxDB test script for CircleCI, it is a light wrapper around ./test.sh.
# Exit if any command fails
set -e
# Get dir of script and make it is our working directory.
DIR=$(cd $(dirname "${BASH_SOURCE[0]}") && pwd)
cd $DIR
export OUTPUT_DIR="$CIRCLE_ARTIFACTS"
# Don't delete the container since CircleCI doesn't have permission to do so.
export DOCKER_RM="false"
# Get number of test environments.
count=$(./test.sh count)
# Check that we aren't wasting CircleCI nodes.
if [ $CIRCLE_NODE_TOTAL -gt $count ]
then
echo "More CircleCI nodes allocated than tests environments to run!"
exit 1
fi
# Map CircleCI nodes to test environments.
tests=$(seq 0 $((count - 1)))
for i in $tests
do
mine=$(( $i % $CIRCLE_NODE_TOTAL ))
if [ $mine -eq $CIRCLE_NODE_INDEX ]
then
echo "Running test env index: $i"
./test.sh $i
fi
done
# Copy the JUnit test XML to the test reports folder.
mkdir -p $CIRCLE_TEST_REPORTS/reports
cp test-results.xml $CIRCLE_TEST_REPORTS/reports/test-results.xml
machine:
services:
- docker
environment:
GODIST: "go1.8.1.linux-amd64.tar.gz"
post:
- mkdir -p download
- test -e download/$GODIST || curl -o download/$GODIST https://storage.googleapis.com/golang/$GODIST
- sudo rm -rf /usr/local/go
- sudo tar -C /usr/local -xzf download/$GODIST
dependencies:
cache_directories:
- "~/docker"
- ~/download
override:
- ./test.sh save:
# building the docker images can take a long time, hence caching
timeout: 1800
test:
override:
- bash circle-test.sh:
parallel: true
# Race tests using 960s timeout
timeout: 960
deployment:
release:
tag: /^v[0-9]+(\.[0-9]+)*(\S*)$/
commands:
- >
docker run
-e "AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID"
-e "AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY"
-v $(pwd):/root/go/src/github.com/influxdata/influxdb
influxdb_build_ubuntu64
--release
--package
--platform all
--arch all
--upload
--bucket dl.influxdata.com/influxdb/releases
# InfluxDB Client
[![GoDoc](https://godoc.org/github.com/influxdata/influxdb?status.svg)](http://godoc.org/github.com/influxdata/influxdb/client/v2)
## Description
**NOTE:** The Go client library now has a "v2" version, with the old version
being deprecated. The new version can be imported at
`import "github.com/influxdata/influxdb/client/v2"`. It is not backwards-compatible.
A Go client library written and maintained by the **InfluxDB** team.
This package provides convenience functions to read and write time series data.
It uses the HTTP protocol to communicate with your **InfluxDB** cluster.
## Getting Started
### Connecting To Your Database
Connecting to an **InfluxDB** database is straightforward. You will need a host
name, a port and the cluster user credentials if applicable. The default port is
8086. You can customize these settings to your specific installation via the
**InfluxDB** configuration file.
Though not necessary for experimentation, you may want to create a new user
and authenticate the connection to your database.
For more information please check out the
[Admin Docs](https://docs.influxdata.com/influxdb/latest/administration/).
For the impatient, you can create a new admin user _bubba_ by firing off the
[InfluxDB CLI](https://github.com/influxdata/influxdb/blob/master/cmd/influx/main.go).
```shell
influx
> create user bubba with password 'bumblebeetuna'
> grant all privileges to bubba
```
And now for good measure set the credentials in you shell environment.
In the example below we will use $INFLUX_USER and $INFLUX_PWD
Now with the administrivia out of the way, let's connect to our database.
NOTE: If you've opted out of creating a user, you can omit Username and Password in
the configuration below.
```go
package main
import (
"log"
"time"
"github.com/influxdata/influxdb/client/v2"
)
const (
MyDB = "square_holes"
username = "bubba"
password = "bumblebeetuna"
)
func main() {
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
Username: username,
Password: password,
})
if err != nil {
log.Fatal(err)
}
// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: MyDB,
Precision: "s",
})
if err != nil {
log.Fatal(err)
}
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}
}
```
### Inserting Data
Time series data aka *points* are written to the database using batch inserts.
The mechanism is to create one or more points and then create a batch aka
*batch points* and write these to a given database and series. A series is a
combination of a measurement (time/values) and a set of tags.
In this sample we will create a batch of a 1,000 points. Each point has a time and
a single value as well as 2 tags indicating a shape and color. We write these points
to a database called _square_holes_ using a measurement named _shapes_.
NOTE: You can specify a RetentionPolicy as part of the batch points. If not
provided InfluxDB will use the database _default_ retention policy.
```go
func writePoints(clnt client.Client) {
sampleSize := 1000
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "systemstats",
Precision: "us",
})
if err != nil {
log.Fatal(err)
}
rand.Seed(time.Now().UnixNano())
for i := 0; i < sampleSize; i++ {
regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"}
tags := map[string]string{
"cpu": "cpu-total",
"host": fmt.Sprintf("host%d", rand.Intn(1000)),
"region": regions[rand.Intn(len(regions))],
}
idle := rand.Float64() * 100.0
fields := map[string]interface{}{
"idle": idle,
"busy": 100.0 - idle,
}
pt, err := client.NewPoint(
"cpu_usage",
tags,
fields,
time.Now(),
)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
}
if err := clnt.Write(bp); err != nil {
log.Fatal(err)
}
}
```
### Querying Data
One nice advantage of using **InfluxDB** the ability to query your data using familiar
SQL constructs. In this example we can create a convenience function to query the database
as follows:
```go
// queryDB convenience function to query the database
func queryDB(clnt client.Client, cmd string) (res []client.Result, err error) {
q := client.Query{
Command: cmd,
Database: MyDB,
}
if response, err := clnt.Query(q); err == nil {
if response.Error() != nil {
return res, response.Error()
}
res = response.Results
} else {
return res, err
}
return res, nil
}
```
#### Creating a Database
```go
_, err := queryDB(clnt, fmt.Sprintf("CREATE DATABASE %s", MyDB))
if err != nil {
log.Fatal(err)
}
```
#### Count Records
```go
q := fmt.Sprintf("SELECT count(%s) FROM %s", "value", MyMeasurement)
res, err := queryDB(clnt, q)
if err != nil {
log.Fatal(err)
}
count := res[0].Series[0].Values[0][1]
log.Printf("Found a total of %v records\n", count)
```
#### Find the last 10 _shapes_ records
```go
q := fmt.Sprintf("SELECT * FROM %s LIMIT %d", MyMeasurement, 20)
res, err = queryDB(clnt, q)
if err != nil {
log.Fatal(err)
}
for i, row := range res[0].Series[0].Values {
t, err := time.Parse(time.RFC3339, row[0].(string))
if err != nil {
log.Fatal(err)
}
val := row[1].(string)
log.Printf("[%2d] %s: %s\n", i, t.Format(time.Stamp), val)
}
```
### Using the UDP Client
The **InfluxDB** client also supports writing over UDP.
```go
func WriteUDP() {
// Make client
c, err := client.NewUDPClient("localhost:8089")
if err != nil {
panic(err.Error())
}
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
panic(err.Error())
}
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
}
```
### Point Splitting
The UDP client now supports splitting single points that exceed the configured
payload size. The logic for processing each point is listed here, starting with
an empty payload.
1. If adding the point to the current (non-empty) payload would exceed the
configured size, send the current payload. Otherwise, add it to the current
payload.
1. If the point is smaller than the configured size, add it to the payload.
1. If the point has no timestamp, just try to send the entire point as a single
UDP payload, and process the next point.
1. Since the point has a timestamp, re-use the existing measurement name,
tagset, and timestamp and create multiple new points by splitting up the
fields. The per-point length will be kept close to the configured size,
staying under it if possible. This does mean that one large field, maybe a
long string, could be sent as a larger-than-configured payload.
The above logic attempts to respect configured payload sizes, but not sacrifice
any data integrity. Points without a timestamp can't be split, as that may
cause fields to have differing timestamps when processed by the server.
## Go Docs
Please refer to
[http://godoc.org/github.com/influxdata/influxdb/client/v2](http://godoc.org/github.com/influxdata/influxdb/client/v2)
for documentation.
## See Also
You can also examine how the client library is used by the
[InfluxDB CLI](https://github.com/influxdata/influxdb/blob/master/cmd/influx/main.go).
package client_test
import (
"fmt"
"log"
"math/rand"
"net/url"
"os"
"strconv"
"time"
"github.com/influxdata/influxdb/client"
)
func ExampleNewClient() {
host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
if err != nil {
log.Fatal(err)
}
// NOTE: this assumes you've setup a user and have setup shell env variables,
// namely INFLUX_USER/INFLUX_PWD. If not just omit Username/Password below.
conf := client.Config{
URL: *host,
Username: os.Getenv("INFLUX_USER"),
Password: os.Getenv("INFLUX_PWD"),
}
con, err := client.NewClient(conf)
if err != nil {
log.Fatal(err)
}
log.Println("Connection", con)
}
func ExampleClient_Ping() {
host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
dur, ver, err := con.Ping()
if err != nil {
log.Fatal(err)
}
log.Printf("Happy as a hippo! %v, %s", dur, ver)
}
func ExampleClient_Query() {
host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
q := client.Query{
Command: "select count(value) from shapes",
Database: "square_holes",
}
if response, err := con.Query(q); err == nil && response.Error() == nil {
log.Println(response.Results)
}
}
func ExampleClient_Write() {
host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
if err != nil {
log.Fatal(err)
}
con, err := client.NewClient(client.Config{URL: *host})
if err != nil {
log.Fatal(err)
}
var (
shapes = []string{"circle", "rectangle", "square", "triangle"}
colors = []string{"red", "blue", "green"}
sampleSize = 1000
pts = make([]client.Point, sampleSize)
)
rand.Seed(42)
for i := 0; i < sampleSize; i++ {
pts[i] = client.Point{
Measurement: "shapes",
Tags: map[string]string{
"color": strconv.Itoa(rand.Intn(len(colors))),
"shape": strconv.Itoa(rand.Intn(len(shapes))),
},
Fields: map[string]interface{}{
"value": rand.Intn(sampleSize),
},
Time: time.Now(),
Precision: "s",
}
}
bps := client.BatchPoints{
Points: pts,
Database: "BumbeBeeTuna",
RetentionPolicy: "default",
}
_, err = con.Write(bps)
if err != nil {
log.Fatal(err)
}
}
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
package client_test
import (
"fmt"
"math/rand"
"os"
"time"
"github.com/influxdata/influxdb/client/v2"
)
// Create a new client
func ExampleClient() {
// NOTE: this assumes you've setup a user and have setup shell env variables,
// namely INFLUX_USER/INFLUX_PWD. If not just omit Username/Password below.
_, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
Username: os.Getenv("INFLUX_USER"),
Password: os.Getenv("INFLUX_PWD"),
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
}
}
// Write a point using the UDP client
func ExampleClient_uDP() {
// Make client
config := client.UDPConfig{Addr: "localhost:8089"}
c, err := client.NewUDPClient(config)
if err != nil {
fmt.Println("Error: ", err.Error())
}
defer c.Close()
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
}
// Ping the cluster using the HTTP client
func ExampleClient_Ping() {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
}
defer c.Close()
_, _, err = c.Ping(0)
if err != nil {
fmt.Println("Error pinging InfluxDB Cluster: ", err.Error())
}
}
// Write a point using the HTTP client
func ExampleClient_write() {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
}
defer c.Close()
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: "BumbleBeeTuna",
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
}
// Create a batch and add a point
func ExampleBatchPoints() {
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: "BumbleBeeTuna",
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
}
// Using the BatchPoints setter functions
func ExampleBatchPoints_setters() {
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{})
bp.SetDatabase("BumbleBeeTuna")
bp.SetPrecision("ms")
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
}
// Create a new point with a timestamp
func ExamplePoint() {
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err == nil {
fmt.Println("We created a point: ", pt.String())
}
}
// Create a new point without a timestamp
func ExamplePoint_withoutTime() {
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields)
if err == nil {
fmt.Println("We created a point w/o time: ", pt.String())
}
}
// Write 1000 points
func ExampleClient_write1000() {
sampleSize := 1000
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
}
defer c.Close()
rand.Seed(42)
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: "systemstats",
Precision: "us",
})
for i := 0; i < sampleSize; i++ {
regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"}
tags := map[string]string{
"cpu": "cpu-total",
"host": fmt.Sprintf("host%d", rand.Intn(1000)),
"region": regions[rand.Intn(len(regions))],
}
idle := rand.Float64() * 100.0
fields := map[string]interface{}{
"idle": idle,
"busy": 100.0 - idle,
}
pt, err := client.NewPoint(
"cpu_usage",
tags,
fields,
time.Now(),
)
if err != nil {
println("Error:", err.Error())
continue
}
bp.AddPoint(pt)
}
err = c.Write(bp)
if err != nil {
fmt.Println("Error: ", err.Error())
}
}
// Make a Query
func ExampleClient_query() {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
}
defer c.Close()
q := client.NewQuery("SELECT count(value) FROM shapes", "square_holes", "ns")
if response, err := c.Query(q); err == nil && response.Error() == nil {
fmt.Println(response.Results)
}
}
// Create a Database with a query
func ExampleClient_createDatabase() {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
}
defer c.Close()
q := client.NewQuery("CREATE DATABASE telegraf", "", "")
if response, err := c.Query(q); err == nil && response.Error() == nil {
fmt.Println(response.Results)
}
}
package client
import (
"fmt"
"io"
"net"
"time"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client.
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type udpclient struct {
conn io.WriteCloser
payloadSize int
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b = b[:0]
}
}
for _, p := range bp.Points() {
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"
checkBuffer(pointSize)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
return delayedError
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}
This diff is collapsed. Click to expand it.
package cli
import "testing"
func TestParseCommand_InsertInto(t *testing.T) {
t.Parallel()
c := CommandLine{}
tests := []struct {
cmd, db, rp string
}{
{
cmd: `INSERT INTO test cpu,host=serverA,region=us-west value=1.0`,
db: "",
rp: "test",
},
{
cmd: ` INSERT INTO .test cpu,host=serverA,region=us-west value=1.0`,
db: "",
rp: "test",
},
{
cmd: `INSERT INTO "test test" cpu,host=serverA,region=us-west value=1.0`,
db: "",
rp: "test test",
},
{
cmd: `Insert iNTO test.test cpu,host=serverA,region=us-west value=1.0`,
db: "test",
rp: "test",
},
{
cmd: `insert into "test test" cpu,host=serverA,region=us-west value=1.0`,
db: "",
rp: "test test",
},
{
cmd: `insert into "d b"."test test" cpu,host=serverA,region=us-west value=1.0`,
db: "d b",
rp: "test test",
},
}
for _, test := range tests {
t.Logf("command: %s", test.cmd)
bp, err := c.parseInsert(test.cmd)
if err != nil {
t.Fatal(err)
}
if bp.Database != test.db {
t.Fatalf(`Command "insert into" db parsing failed, expected: %q, actual: %q`, test.db, bp.Database)
}
if bp.RetentionPolicy != test.rp {
t.Fatalf(`Command "insert into" rp parsing failed, expected: %q, actual: %q`, test.rp, bp.RetentionPolicy)
}
}
}
This diff is collapsed. Click to expand it.
package cli
import (
"bytes"
"fmt"
)
func parseDatabaseAndRetentionPolicy(stmt []byte) (string, string, error) {
var db, rp []byte
var quoted bool
var seperatorCount int
stmt = bytes.TrimSpace(stmt)
for _, b := range stmt {
if b == '"' {
quoted = !quoted
continue
}
if b == '.' && !quoted {
seperatorCount++
if seperatorCount > 1 {
return "", "", fmt.Errorf("unable to parse database and retention policy from %s", string(stmt))
}
continue
}
if seperatorCount == 1 {
rp = append(rp, b)
continue
}
db = append(db, b)
}
return string(db), string(rp), nil
}
package cli
import (
"errors"
"testing"
)
func Test_parseDatabaseAndretentionPolicy(t *testing.T) {
tests := []struct {
stmt string
db string
rp string
err error
}{
{
stmt: `foo`,
db: "foo",
},
{
stmt: `"foo.bar"`,
db: "foo.bar",
},
{
stmt: `"foo.bar".`,
db: "foo.bar",
},
{
stmt: `."foo.bar"`,
rp: "foo.bar",
},
{
stmt: `foo.bar`,
db: "foo",
rp: "bar",
},
{
stmt: `"foo".bar`,
db: "foo",
rp: "bar",
},
{
stmt: `"foo"."bar"`,
db: "foo",
rp: "bar",
},
{
stmt: `"foo.bin"."bar"`,
db: "foo.bin",
rp: "bar",
},
{
stmt: `"foo.bin"."bar.baz...."`,
db: "foo.bin",
rp: "bar.baz....",
},
{
stmt: ` "foo.bin"."bar.baz...." `,
db: "foo.bin",
rp: "bar.baz....",
},
{
stmt: `"foo.bin"."bar".boom`,
err: errors.New("foo"),
},
{
stmt: "foo.bar.",
err: errors.New("foo"),
},
}
for _, test := range tests {
db, rp, err := parseDatabaseAndRetentionPolicy([]byte(test.stmt))
if err != nil && test.err == nil {
t.Errorf("unexpected error: got %s", err)
continue
}
if test.err != nil && err == nil {
t.Errorf("expected err: got: nil, exp: %s", test.err)
continue
}
if db != test.db {
t.Errorf("unexpected database: got: %s, exp: %s", db, test.db)
}
if rp != test.rp {
t.Errorf("unexpected retention policy: got: %s, exp: %s", rp, test.rp)
}
}
}
// The influx command is a CLI client to InfluxDB.
package main
import (
"flag"
"fmt"
"os"
"github.com/influxdata/influxdb/client"
"github.com/influxdata/influxdb/cmd/influx/cli"
)
// These variables are populated via the Go linker.
var (
version string
)
const (
// defaultFormat is the default format of the results when issuing queries
defaultFormat = "column"
// defaultPrecision is the default timestamp format of the results when issuing queries
defaultPrecision = "ns"
// defaultPPS is the default points per second that the import will throttle at
// by default it's 0, which means it will not throttle
defaultPPS = 0
)
func init() {
// If version is not set, make that clear.
if version == "" {
version = "unknown"
}
}
func main() {
c := cli.New(version)
fs := flag.NewFlagSet("InfluxDB shell version "+version, flag.ExitOnError)
fs.StringVar(&c.Host, "host", client.DefaultHost, "Influxdb host to connect to.")
fs.IntVar(&c.Port, "port", client.DefaultPort, "Influxdb port to connect to.")
fs.StringVar(&c.ClientConfig.UnixSocket, "socket", "", "Influxdb unix socket to connect to.")
fs.StringVar(&c.ClientConfig.Username, "username", "", "Username to connect to the server.")
fs.StringVar(&c.ClientConfig.Password, "password", "", `Password to connect to the server. Leaving blank will prompt for password (--password="").`)
fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.")
fs.BoolVar(&c.Ssl, "ssl", false, "Use https for connecting to cluster.")
fs.BoolVar(&c.ClientConfig.UnsafeSsl, "unsafeSsl", false, "Set this when connecting to the cluster using https and not use SSL verification.")
fs.StringVar(&c.Format, "format", defaultFormat, "Format specifies the format of the server responses: json, csv, or column.")
fs.StringVar(&c.ClientConfig.Precision, "precision", defaultPrecision, "Precision specifies the format of the timestamp: rfc3339,h,m,s,ms,u or ns.")
fs.StringVar(&c.ClientConfig.WriteConsistency, "consistency", "all", "Set write consistency level: any, one, quorum, or all.")
fs.BoolVar(&c.Pretty, "pretty", false, "Turns on pretty print for the json format.")
fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.")
fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.")
fs.BoolVar(&c.Import, "import", false, "Import a previous database.")
fs.IntVar(&c.ImporterConfig.PPS, "pps", defaultPPS, "How many points per second the import will allow. By default it is zero and will not throttle importing.")
fs.StringVar(&c.ImporterConfig.Path, "path", "", "path to the file to import")
fs.BoolVar(&c.ImporterConfig.Compressed, "compressed", false, "set to true if the import file is compressed")
// Define our own custom usage to print
fs.Usage = func() {
fmt.Println(`Usage of influx:
-version
Display the version and exit.
-host 'host name'
Host to connect to.
-port 'port #'
Port to connect to.
-socket 'unix domain socket'
Unix socket to connect to.
-database 'database name'
Database to connect to the server.
-password 'password'
Password to connect to the server. Leaving blank will prompt for password (--password '').
-username 'username'
Username to connect to the server.
-ssl
Use https for requests.
-unsafeSsl
Set this when connecting to the cluster using https and not use SSL verification.
-execute 'command'
Execute command and quit.
-format 'json|csv|column'
Format specifies the format of the server responses: json, csv, or column.
-precision 'rfc3339|h|m|s|ms|u|ns'
Precision specifies the format of the timestamp: rfc3339, h, m, s, ms, u or ns.
-consistency 'any|one|quorum|all'
Set write consistency level: any, one, quorum, or all
-pretty
Turns on pretty print for the json format.
-import
Import a previous database export from file
-pps
How many points per second the import will allow. By default it is zero and will not throttle importing.
-path
Path to file to import
-compressed
Set to true if the import file is compressed
Examples:
# Use influx in a non-interactive mode to query the database "metrics" and pretty print json:
$ influx -database 'metrics' -execute 'select * from cpu' -format 'json' -pretty
# Connect to a specific database on startup and set database context:
$ influx -database 'metrics' -host 'localhost' -port '8086'
`)
}
fs.Parse(os.Args[1:])
if c.ShowVersion {
c.Version()
os.Exit(0)
}
if err := c.Run(); err != nil {
fmt.Fprintf(os.Stderr, "%s\n", err)
os.Exit(1)
}
}
# `influx_inspect`
## Ways to run
### `influx_inspect`
Will print usage for the tool.
### `influx_inspect report`
Displays series meta-data for all shards. Default location [$HOME/.influxdb]
### `influx_inspect dumptsm`
Dumps low-level details about tsm1 files
#### Flags
##### `-index` bool
Dump raw index data.
`default` = false
#### `-blocks` bool
Dump raw block data.
`default` = false
#### `-all`
Dump all data. Caution: This may print a lot of information.
`default` = false
#### `-filter-key`
Only display index and block data match this key substring.
`default` = ""
### `influx_inspect export`
Exports all tsm files to line protocol. This output file can be imported via the [influx](https://github.com/influxdata/influxdb/tree/master/importer#running-the-import-command) command.
#### `-datadir` string
Data storage path.
`default` = "$HOME/.influxdb/data"
#### `-waldir` string
WAL storage path.
`default` = "$HOME/.influxdb/wal"
#### `-out` string
Destination file to export to
`default` = "$HOME/.influxdb/export"
#### `-database` string (optional)
Database to export.
`default` = ""
#### `-retention` string (optional)
Retention policy to export.
`default` = ""
#### `-start` string (optional)
Optional. The time range to start at.
#### `-end` string (optional)
Optional. The time range to end at.
#### `-compress` bool (optional)
Compress the output.
`default` = false
#### Sample Commands
Export entire database and compress output:
```
influx_inspect export --compress
```
Export specific retention policy:
```
influx_inspect export --database mydb --retention autogen
```
##### Sample Data
This is a sample of what the output will look like.
```
# DDL
CREATE DATABASE MY_DB_NAME
CREATE RETENTION POLICY autogen ON MY_DB_NAME DURATION inf REPLICATION 1
# DML
# CONTEXT-DATABASE:MY_DB_NAME
# CONTEXT-RETENTION-POLICY:autogen
randset value=97.9296104805 1439856000000000000
randset value=25.3849066842 1439856100000000000
```
# Caveats
The system does not have access to the meta store when exporting TSM shards. As such, it always creates the retention policy with infinite duration and replication factor of 1.
End users may want to change this prior to re-importing if they are importing to a cluster or want a different duration for retention.
// Package dumptsm inspects low-level details about tsm1 files.
package dumptsm
import (
"encoding/binary"
"flag"
"fmt"
"io"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// Command represents the program execution for "influxd dumptsm".
type Command struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Stdout io.Writer
dumpIndex bool
dumpBlocks bool
dumpAll bool
filterKey string
path string
}
// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
}
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("file", flag.ExitOnError)
fs.BoolVar(&cmd.dumpIndex, "index", false, "Dump raw index data")
fs.BoolVar(&cmd.dumpBlocks, "blocks", false, "Dump raw block data")
fs.BoolVar(&cmd.dumpAll, "all", false, "Dump all data. Caution: This may print a lot of information")
fs.StringVar(&cmd.filterKey, "filter-key", "", "Only display index and block data match this key substring")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
}
if fs.Arg(0) == "" {
fmt.Printf("TSM file not specified\n\n")
fs.Usage()
return nil
}
cmd.path = fs.Args()[0]
cmd.dumpBlocks = cmd.dumpBlocks || cmd.dumpAll || cmd.filterKey != ""
cmd.dumpIndex = cmd.dumpIndex || cmd.dumpAll || cmd.filterKey != ""
return cmd.dump()
}
func (cmd *Command) dump() error {
var errors []error
f, err := os.Open(cmd.path)
if err != nil {
return err
}
// Get the file size
stat, err := f.Stat()
if err != nil {
return err
}
b := make([]byte, 8)
r, err := tsm1.NewTSMReader(f)
if err != nil {
return fmt.Errorf("Error opening TSM files: %s", err.Error())
}
defer r.Close()
minTime, maxTime := r.TimeRange()
keyCount := r.KeyCount()
blockStats := &blockStats{}
println("Summary:")
fmt.Printf(" File: %s\n", cmd.path)
fmt.Printf(" Time Range: %s - %s\n",
time.Unix(0, minTime).UTC().Format(time.RFC3339Nano),
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
)
fmt.Printf(" Duration: %s ", time.Unix(0, maxTime).Sub(time.Unix(0, minTime)))
fmt.Printf(" Series: %d ", keyCount)
fmt.Printf(" File Size: %d\n", stat.Size())
println()
tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
if cmd.dumpIndex {
println("Index:")
tw.Flush()
println()
fmt.Fprintln(tw, " "+strings.Join([]string{"Pos", "Min Time", "Max Time", "Ofs", "Size", "Key", "Field"}, "\t"))
var pos int
for i := 0; i < keyCount; i++ {
key, _ := r.KeyAt(i)
for _, e := range r.Entries(string(key)) {
pos++
split := strings.Split(string(key), "#!~#")
// Possible corruption? Try to read as much as we can and point to the problem.
measurement := split[0]
field := split[1]
if cmd.filterKey != "" && !strings.Contains(string(key), cmd.filterKey) {
continue
}
fmt.Fprintln(tw, " "+strings.Join([]string{
strconv.FormatInt(int64(pos), 10),
time.Unix(0, e.MinTime).UTC().Format(time.RFC3339Nano),
time.Unix(0, e.MaxTime).UTC().Format(time.RFC3339Nano),
strconv.FormatInt(int64(e.Offset), 10),
strconv.FormatInt(int64(e.Size), 10),
measurement,
field,
}, "\t"))
tw.Flush()
}
}
}
tw = tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
fmt.Fprintln(tw, " "+strings.Join([]string{"Blk", "Chk", "Ofs", "Len", "Type", "Min Time", "Points", "Enc [T/V]", "Len [T/V]"}, "\t"))
// Starting at 5 because the magic number is 4 bytes + 1 byte version
i := int64(5)
var blockCount, pointCount, blockSize int64
indexSize := r.IndexSize()
// Start at the beginning and read every block
for j := 0; j < keyCount; j++ {
key, _ := r.KeyAt(j)
for _, e := range r.Entries(string(key)) {
f.Seek(int64(e.Offset), 0)
f.Read(b[:4])
chksum := binary.BigEndian.Uint32(b[:4])
buf := make([]byte, e.Size-4)
f.Read(buf)
blockSize += int64(e.Size)
if cmd.filterKey != "" && !strings.Contains(string(key), cmd.filterKey) {
i += blockSize
blockCount++
continue
}
blockType := buf[0]
encoded := buf[1:]
var v []tsm1.Value
v, err := tsm1.DecodeBlock(buf, v)
if err != nil {
return err
}
startTime := time.Unix(0, v[0].UnixNano())
pointCount += int64(len(v))
// Length of the timestamp block
tsLen, j := binary.Uvarint(encoded)
// Unpack the timestamp bytes
ts := encoded[int(j) : int(j)+int(tsLen)]
// Unpack the value bytes
values := encoded[int(j)+int(tsLen):]
tsEncoding := timeEnc[int(ts[0]>>4)]
vEncoding := encDescs[int(blockType+1)][values[0]>>4]
typeDesc := blockTypes[blockType]
blockStats.inc(0, ts[0]>>4)
blockStats.inc(int(blockType+1), values[0]>>4)
blockStats.size(len(buf))
if cmd.dumpBlocks {
fmt.Fprintln(tw, " "+strings.Join([]string{
strconv.FormatInt(blockCount, 10),
strconv.FormatUint(uint64(chksum), 10),
strconv.FormatInt(i, 10),
strconv.FormatInt(int64(len(buf)), 10),
typeDesc,
startTime.UTC().Format(time.RFC3339Nano),
strconv.FormatInt(int64(len(v)), 10),
fmt.Sprintf("%s/%s", tsEncoding, vEncoding),
fmt.Sprintf("%d/%d", len(ts), len(values)),
}, "\t"))
}
i += blockSize
blockCount++
}
}
if cmd.dumpBlocks {
println("Blocks:")
tw.Flush()
println()
}
var blockSizeAvg int64
if blockCount > 0 {
blockSizeAvg = blockSize / blockCount
}
fmt.Printf("Statistics\n")
fmt.Printf(" Blocks:\n")
fmt.Printf(" Total: %d Size: %d Min: %d Max: %d Avg: %d\n",
blockCount, blockSize, blockStats.min, blockStats.max, blockSizeAvg)
fmt.Printf(" Index:\n")
fmt.Printf(" Total: %d Size: %d\n", blockCount, indexSize)
fmt.Printf(" Points:\n")
fmt.Printf(" Total: %d", pointCount)
println()
println(" Encoding:")
for i, counts := range blockStats.counts {
if len(counts) == 0 {
continue
}
fmt.Printf(" %s: ", strings.Title(fieldType[i]))
for j, v := range counts {
fmt.Printf("\t%s: %d (%d%%) ", encDescs[i][j], v, int(float64(v)/float64(blockCount)*100))
}
println()
}
fmt.Printf(" Compression:\n")
fmt.Printf(" Per block: %0.2f bytes/point\n", float64(blockSize)/float64(pointCount))
fmt.Printf(" Total: %0.2f bytes/point\n", float64(stat.Size())/float64(pointCount))
if len(errors) > 0 {
println()
fmt.Printf("Errors (%d):\n", len(errors))
for _, err := range errors {
fmt.Printf(" * %v\n", err)
}
println()
return fmt.Errorf("error count %d", len(errors))
}
return nil
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := `Dumps low-level details about tsm1 files.
Usage: influx_inspect dumptsm [flags] <path
-index
Dump raw index data
-blocks
Dump raw block data
-all
Dump all data. Caution: This may print a lot of information
-filter-key <name>
Only display index and block data match this key substring
`
fmt.Fprintf(cmd.Stdout, usage)
}
var (
fieldType = []string{
"timestamp", "float", "int", "bool", "string",
}
blockTypes = []string{
"float64", "int64", "bool", "string",
}
timeEnc = []string{
"none", "s8b", "rle",
}
floatEnc = []string{
"none", "gor",
}
intEnc = []string{
"none", "s8b", "rle",
}
boolEnc = []string{
"none", "bp",
}
stringEnc = []string{
"none", "snpy",
}
encDescs = [][]string{
timeEnc, floatEnc, intEnc, boolEnc, stringEnc,
}
)
type blockStats struct {
min, max int
counts [][]int
}
func (b *blockStats) inc(typ int, enc byte) {
for len(b.counts) <= typ {
b.counts = append(b.counts, []int{})
}
for len(b.counts[typ]) <= int(enc) {
b.counts[typ] = append(b.counts[typ], 0)
}
b.counts[typ][enc]++
}
func (b *blockStats) size(sz int) {
if b.min == 0 || sz < b.min {
b.min = sz
}
if b.min == 0 || sz > b.max {
b.max = sz
}
}
package export
import (
"bytes"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"testing"
"github.com/golang/snappy"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
type corpus map[string][]tsm1.Value
var (
basicCorpus = corpus{
tsm1.SeriesFieldKey("floats,k=f", "f"): []tsm1.Value{
tsm1.NewValue(1, float64(1.5)),
tsm1.NewValue(2, float64(3)),
},
tsm1.SeriesFieldKey("ints,k=i", "i"): []tsm1.Value{
tsm1.NewValue(10, int64(15)),
tsm1.NewValue(20, int64(30)),
},
tsm1.SeriesFieldKey("bools,k=b", "b"): []tsm1.Value{
tsm1.NewValue(100, true),
tsm1.NewValue(200, false),
},
tsm1.SeriesFieldKey("strings,k=s", "s"): []tsm1.Value{
tsm1.NewValue(1000, "1k"),
tsm1.NewValue(2000, "2k"),
},
}
basicCorpusExpLines = []string{
"floats,k=f f=1.5 1",
"floats,k=f f=3 2",
"ints,k=i i=15i 10",
"ints,k=i i=30i 20",
"bools,k=b b=true 100",
"bools,k=b b=false 200",
`strings,k=s s="1k" 1000`,
`strings,k=s s="2k" 2000`,
}
escapeStringCorpus = corpus{
tsm1.SeriesFieldKey("t", "s"): []tsm1.Value{
tsm1.NewValue(1, `1. "quotes"`),
tsm1.NewValue(2, `2. back\slash`),
tsm1.NewValue(3, `3. bs\q"`),
},
}
escCorpusExpLines = []string{
`t s="1. \"quotes\"" 1`,
`t s="2. back\\slash" 2`,
`t s="3. bs\\q\"" 3`,
}
)
func Test_exportWALFile(t *testing.T) {
for _, c := range []struct {
corpus corpus
lines []string
}{
{corpus: basicCorpus, lines: basicCorpusExpLines},
{corpus: escapeStringCorpus, lines: escCorpusExpLines},
} {
walFile := writeCorpusToWALFile(c.corpus)
defer os.Remove(walFile.Name())
var out bytes.Buffer
if err := newCommand().exportWALFile(walFile.Name(), &out, func() {}); err != nil {
t.Fatal(err)
}
lines := strings.Split(out.String(), "\n")
for _, exp := range c.lines {
found := false
for _, l := range lines {
if exp == l {
found = true
break
}
}
if !found {
t.Fatalf("expected line %q to be in exported output:\n%s", exp, out.String())
}
}
}
}
func Test_exportTSMFile(t *testing.T) {
for _, c := range []struct {
corpus corpus
lines []string
}{
{corpus: basicCorpus, lines: basicCorpusExpLines},
{corpus: escapeStringCorpus, lines: escCorpusExpLines},
} {
tsmFile := writeCorpusToTSMFile(c.corpus)
defer os.Remove(tsmFile.Name())
var out bytes.Buffer
if err := newCommand().exportTSMFile(tsmFile.Name(), &out); err != nil {
t.Fatal(err)
}
lines := strings.Split(out.String(), "\n")
for _, exp := range c.lines {
found := false
for _, l := range lines {
if exp == l {
found = true
break
}
}
if !found {
t.Fatalf("expected line %q to be in exported output:\n%s", exp, out.String())
}
}
}
}
var sink interface{}
func benchmarkExportTSM(c corpus, b *testing.B) {
// Garbage collection is relatively likely to happen during export, so track allocations.
b.ReportAllocs()
f := writeCorpusToTSMFile(c)
defer os.Remove(f.Name())
cmd := newCommand()
var out bytes.Buffer
b.ResetTimer()
b.StartTimer()
for i := 0; i < b.N; i++ {
if err := cmd.exportTSMFile(f.Name(), &out); err != nil {
b.Fatal(err)
}
sink = out.Bytes()
out.Reset()
}
}
func BenchmarkExportTSMFloats_100s_250vps(b *testing.B) {
benchmarkExportTSM(makeFloatsCorpus(100, 250), b)
}
func BenchmarkExportTSMInts_100s_250vps(b *testing.B) {
benchmarkExportTSM(makeIntsCorpus(100, 250), b)
}
func BenchmarkExportTSMBools_100s_250vps(b *testing.B) {
benchmarkExportTSM(makeBoolsCorpus(100, 250), b)
}
func BenchmarkExportTSMStrings_100s_250vps(b *testing.B) {
benchmarkExportTSM(makeStringsCorpus(100, 250), b)
}
func benchmarkExportWAL(c corpus, b *testing.B) {
// Garbage collection is relatively likely to happen during export, so track allocations.
b.ReportAllocs()
f := writeCorpusToWALFile(c)
defer os.Remove(f.Name())
cmd := newCommand()
var out bytes.Buffer
b.ResetTimer()
b.StartTimer()
for i := 0; i < b.N; i++ {
if err := cmd.exportWALFile(f.Name(), &out, func() {}); err != nil {
b.Fatal(err)
}
sink = out.Bytes()
out.Reset()
}
}
func BenchmarkExportWALFloats_100s_250vps(b *testing.B) {
benchmarkExportWAL(makeFloatsCorpus(100, 250), b)
}
func BenchmarkExportWALInts_100s_250vps(b *testing.B) {
benchmarkExportWAL(makeIntsCorpus(100, 250), b)
}
func BenchmarkExportWALBools_100s_250vps(b *testing.B) {
benchmarkExportWAL(makeBoolsCorpus(100, 250), b)
}
func BenchmarkExportWALStrings_100s_250vps(b *testing.B) {
benchmarkExportWAL(makeStringsCorpus(100, 250), b)
}
// newCommand returns a command that discards its output and that accepts all timestamps.
func newCommand() *Command {
return &Command{
Stderr: ioutil.Discard,
Stdout: ioutil.Discard,
startTime: math.MinInt64,
endTime: math.MaxInt64,
}
}
// makeCorpus returns a new corpus filled with values generated by fn.
// The RNG passed to fn is seeded with numSeries * numValuesPerSeries, for predictable output.
func makeCorpus(numSeries, numValuesPerSeries int, fn func(*rand.Rand) interface{}) corpus {
rng := rand.New(rand.NewSource(int64(numSeries) * int64(numValuesPerSeries)))
var unixNano int64
corpus := make(corpus, numSeries)
for i := 0; i < numSeries; i++ {
vals := make([]tsm1.Value, numValuesPerSeries)
for j := 0; j < numValuesPerSeries; j++ {
vals[j] = tsm1.NewValue(unixNano, fn(rng))
unixNano++
}
k := fmt.Sprintf("m,t=%d", i)
corpus[tsm1.SeriesFieldKey(k, "x")] = vals
}
return corpus
}
func makeFloatsCorpus(numSeries, numFloatsPerSeries int) corpus {
return makeCorpus(numSeries, numFloatsPerSeries, func(rng *rand.Rand) interface{} {
return rng.Float64()
})
}
func makeIntsCorpus(numSeries, numIntsPerSeries int) corpus {
return makeCorpus(numSeries, numIntsPerSeries, func(rng *rand.Rand) interface{} {
// This will only return positive integers. That's probably okay.
return rng.Int63()
})
}
func makeBoolsCorpus(numSeries, numBoolsPerSeries int) corpus {
return makeCorpus(numSeries, numBoolsPerSeries, func(rng *rand.Rand) interface{} {
return rand.Int63n(2) == 1
})
}
func makeStringsCorpus(numSeries, numStringsPerSeries int) corpus {
return makeCorpus(numSeries, numStringsPerSeries, func(rng *rand.Rand) interface{} {
// The string will randomly have 2-6 parts
parts := make([]string, rand.Intn(4)+2)
for i := range parts {
// Each part is a random base36-encoded number
parts[i] = strconv.FormatInt(rand.Int63(), 36)
}
// Join the individual parts with underscores.
return strings.Join(parts, "_")
})
}
// writeCorpusToWALFile writes the given corpus as a WAL file, and returns a handle to that file.
// It is the caller's responsibility to remove the returned temp file.
// writeCorpusToWALFile will panic on any error that occurs.
func writeCorpusToWALFile(c corpus) *os.File {
walFile, err := ioutil.TempFile("", "export_test_corpus_wal")
if err != nil {
panic(err)
}
e := &tsm1.WriteWALEntry{Values: c}
b, err := e.Encode(nil)
if err != nil {
panic(err)
}
w := tsm1.NewWALSegmentWriter(walFile)
if err := w.Write(e.Type(), snappy.Encode(nil, b)); err != nil {
panic(err)
}
if err := w.Flush(); err != nil {
panic(err)
}
// (*tsm1.WALSegmentWriter).sync isn't exported, but it only Syncs the file anyway.
if err := walFile.Sync(); err != nil {
panic(err)
}
return walFile
}
// writeCorpusToTSMFile writes the given corpus as a TSM file, and returns a handle to that file.
// It is the caller's responsibility to remove the returned temp file.
// writeCorpusToTSMFile will panic on any error that occurs.
func writeCorpusToTSMFile(c corpus) *os.File {
tsmFile, err := ioutil.TempFile("", "export_test_corpus_tsm")
if err != nil {
panic(err)
}
w, err := tsm1.NewTSMWriter(tsmFile)
if err != nil {
panic(err)
}
// Write the series in alphabetical order so that each test run is comparable,
// given an identical corpus.
keys := make([]string, 0, len(c))
for k := range c {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write(k, c[k]); err != nil {
panic(err)
}
}
if err := w.WriteIndex(); err != nil {
panic(err)
}
if err := w.Close(); err != nil {
panic(err)
}
return tsmFile
}
// Package help contains the help for the influx_inspect command.
package help
import (
"fmt"
"io"
"os"
"strings"
)
// Command displays help for command-line sub-commands.
type Command struct {
Stdout io.Writer
}
// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stdout: os.Stdout,
}
}
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fmt.Fprintln(cmd.Stdout, strings.TrimSpace(usage))
return nil
}
const usage = `
Usage: influx_inspect [[command] [arguments]]
The commands are:
dumptsm dumps low-level details about tsm1 files.
export exports raw data from a shard to line protocol
help display this help message
report displays a shard level report
verify verifies integrity of TSM files
"help" is the default command.
Use "influx_inspect [command] -help" for more information about a command.
`
// The influx_inspect command displays detailed information about InfluxDB data files.
package main
import (
"fmt"
"io"
"log"
"os"
"github.com/influxdata/influxdb/cmd"
"github.com/influxdata/influxdb/cmd/influx_inspect/dumptsi"
"github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm"
"github.com/influxdata/influxdb/cmd/influx_inspect/export"
"github.com/influxdata/influxdb/cmd/influx_inspect/help"
"github.com/influxdata/influxdb/cmd/influx_inspect/report"
"github.com/influxdata/influxdb/cmd/influx_inspect/verify"
_ "github.com/influxdata/influxdb/tsdb/engine"
)
func main() {
m := NewMain()
if err := m.Run(os.Args[1:]...); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
// Main represents the program execution.
type Main struct {
Logger *log.Logger
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
// NewMain returns a new instance of Main.
func NewMain() *Main {
return &Main{
Logger: log.New(os.Stderr, "[influx_inspect] ", log.LstdFlags),
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
}
// Run determines and runs the command specified by the CLI args.
func (m *Main) Run(args ...string) error {
name, args := cmd.ParseCommandName(args)
// Extract name from args.
switch name {
case "", "help":
if err := help.NewCommand().Run(args...); err != nil {
return fmt.Errorf("help: %s", err)
}
case "dumptsi":
name := dumptsi.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("dumptsi: %s", err)
}
case "dumptsmdev":
fmt.Fprintf(m.Stderr, "warning: dumptsmdev is deprecated, use dumptsm instead.\n")
fallthrough
case "dumptsm":
name := dumptsm.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("dumptsm: %s", err)
}
case "export":
name := export.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("export: %s", err)
}
case "report":
name := report.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("report: %s", err)
}
case "verify":
name := verify.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("verify: %s", err)
}
default:
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influx_inspect help' for usage`+"\n\n", name)
}
return nil
}
// Package report reports statistics about TSM files.
package report
import (
"flag"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/retailnext/hllpp"
)
// Command represents the program execution for "influxd report".
type Command struct {
Stderr io.Writer
Stdout io.Writer
dir string
pattern string
detailed bool
}
// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
}
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("report", flag.ExitOnError)
fs.StringVar(&cmd.pattern, "pattern", "", "Include only files matching a pattern")
fs.BoolVar(&cmd.detailed, "detailed", false, "Report detailed cardinality estimates")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
}
cmd.dir = fs.Arg(0)
start := time.Now()
files, err := filepath.Glob(filepath.Join(cmd.dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension)))
if err != nil {
return err
}
var filtered []string
if cmd.pattern != "" {
for _, f := range files {
if strings.Contains(f, cmd.pattern) {
filtered = append(filtered, f)
}
}
files = filtered
}
if len(files) == 0 {
return fmt.Errorf("no tsm files at %v", cmd.dir)
}
tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
fmt.Fprintln(tw, strings.Join([]string{"File", "Series", "Load Time"}, "\t"))
totalSeries := hllpp.New()
tagCardinalities := map[string]*hllpp.HLLPP{}
measCardinalities := map[string]*hllpp.HLLPP{}
fieldCardinalities := map[string]*hllpp.HLLPP{}
for _, f := range files {
file, err := os.OpenFile(f, os.O_RDONLY, 0600)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", f, err)
continue
}
loadStart := time.Now()
reader, err := tsm1.NewTSMReader(file)
if err != nil {
fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
continue
}
loadTime := time.Since(loadStart)
seriesCount := reader.KeyCount()
for i := 0; i < seriesCount; i++ {
key, _ := reader.KeyAt(i)
totalSeries.Add([]byte(key))
if cmd.detailed {
sep := strings.Index(string(key), "#!~#")
seriesKey, field := key[:sep], key[sep+4:]
measurement, tags, _ := models.ParseKey(seriesKey)
measCount, ok := measCardinalities[measurement]
if !ok {
measCount = hllpp.New()
measCardinalities[measurement] = measCount
}
measCount.Add([]byte(key))
fieldCount, ok := fieldCardinalities[measurement]
if !ok {
fieldCount = hllpp.New()
fieldCardinalities[measurement] = fieldCount
}
fieldCount.Add([]byte(field))
for _, t := range tags {
tagCount, ok := tagCardinalities[string(t.Key)]
if !ok {
tagCount = hllpp.New()
tagCardinalities[string(t.Key)] = tagCount
}
tagCount.Add(t.Value)
}
}
}
reader.Close()
fmt.Fprintln(tw, strings.Join([]string{
filepath.Base(file.Name()),
strconv.FormatInt(int64(seriesCount), 10),
loadTime.String(),
}, "\t"))
tw.Flush()
}
tw.Flush()
println()
fmt.Printf("Statistics\n")
fmt.Printf("\tSeries:\n")
fmt.Printf("\t\tTotal (est): %d\n", totalSeries.Count())
if cmd.detailed {
fmt.Printf("\tMeasurements (est):\n")
for _, t := range sortKeys(measCardinalities) {
fmt.Printf("\t\t%v: %d (%d%%)\n", t, measCardinalities[t].Count(), int((float64(measCardinalities[t].Count())/float64(totalSeries.Count()))*100))
}
fmt.Printf("\tFields (est):\n")
for _, t := range sortKeys(fieldCardinalities) {
fmt.Printf("\t\t%v: %d\n", t, fieldCardinalities[t].Count())
}
fmt.Printf("\tTags (est):\n")
for _, t := range sortKeys(tagCardinalities) {
fmt.Printf("\t\t%v: %d\n", t, tagCardinalities[t].Count())
}
}
fmt.Printf("Completed in %s\n", time.Since(start))
return nil
}
// sortKeys is a quick helper to return the sorted set of a map's keys
func sortKeys(vals map[string]*hllpp.HLLPP) (keys []string) {
for k := range vals {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := `Displays shard level report.
Usage: influx_inspect report [flags]
-pattern <pattern>
Include only files matching a pattern.
-detailed
Report detailed cardinality estimates.
Defaults to "false".
`
fmt.Fprintf(cmd.Stdout, usage)
}
// Package verify verifies integrity of TSM files.
package verify
import (
"flag"
"fmt"
"hash/crc32"
"io"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// Command represents the program execution for "influx_inspect verify".
type Command struct {
Stderr io.Writer
Stdout io.Writer
}
// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
}
// Run executes the command.
func (cmd *Command) Run(args ...string) error {
var path string
fs := flag.NewFlagSet("verify", flag.ExitOnError)
fs.StringVar(&path, "dir", os.Getenv("HOME")+"/.influxdb", "Root storage path. [$HOME/.influxdb]")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
}
start := time.Now()
dataPath := filepath.Join(path, "data")
brokenBlocks := 0
totalBlocks := 0
// No need to do this in a loop
ext := fmt.Sprintf(".%s", tsm1.TSMFileExtension)
// Get all TSM files by walking through the data dir
files := []string{}
err := filepath.Walk(dataPath, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}
if filepath.Ext(path) == ext {
files = append(files, path)
}
return nil
})
if err != nil {
panic(err)
}
tw := tabwriter.NewWriter(cmd.Stdout, 16, 8, 0, '\t', 0)
// Verify the checksums of every block in every file
for _, f := range files {
file, err := os.OpenFile(f, os.O_RDONLY, 0600)
if err != nil {
return err
}
reader, err := tsm1.NewTSMReader(file)
if err != nil {
return err
}
blockItr := reader.BlockIterator()
brokenFileBlocks := 0
count := 0
for blockItr.Next() {
totalBlocks++
key, _, _, _, checksum, buf, err := blockItr.Read()
if err != nil {
brokenBlocks++
fmt.Fprintf(tw, "%s: could not get checksum for key %v block %d due to error: %q\n", f, key, count, err)
} else if expected := crc32.ChecksumIEEE(buf); checksum != expected {
brokenBlocks++
fmt.Fprintf(tw, "%s: got %d but expected %d for key %v, block %d\n", f, checksum, expected, key, count)
}
count++
}
if brokenFileBlocks == 0 {
fmt.Fprintf(tw, "%s: healthy\n", f)
}
reader.Close()
}
fmt.Fprintf(tw, "Broken Blocks: %d / %d, in %vs\n", brokenBlocks, totalBlocks, time.Since(start).Seconds())
tw.Flush()
return nil
}
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
usage := fmt.Sprintf(`Verifies the integrity of TSM files.
Usage: influx_inspect verify [flags]
-dir <path>
Root storage path
Defaults to "%[1]s/.influxdb".
`, os.Getenv("HOME"))
fmt.Fprintf(cmd.Stdout, usage)
}
# `influx_stress`
If you run into any issues with this tool please mention @jackzampolin when you create an issue.
## Ways to run
### `influx_stress`
This runs a basic stress test with the [default config](https://github.com/influxdata/influxdb/blob/master/stress/stress.toml) For more information on the configuration file please see the default.
### `influx_stress -config someConfig.toml`
This runs the stress test with a valid configuration file located at `someConfig.tom`
### `influx_stress -v2 -config someConfig.iql`
This runs the stress test with a valid `v2` configuration file. For more information about the `v2` stress test see the [v2 stress README](https://github.com/influxdata/influxdb/blob/master/stress/v2/README.md).
## Flags
If flags are defined they overwrite the config from any file passed in.
### `-addr` string
IP address and port of database where response times will persist (e.g., localhost:8086)
`default` = "http://localhost:8086"
### `-config` string
The relative path to the stress test configuration file.
`default` = [config](https://github.com/influxdata/influxdb/blob/master/stress/stress.toml)
### `-cpuprofile` filename
Writes the result of Go's cpu profile to filename
`default` = no profiling
### `-database` string
Name of database on `-addr` that `influx_stress` will persist write and query response times
`default` = "stress"
### `-tags` value
A comma separated list of tags to add to write and query response times.
`default` = ""
# This section can be removed
[provision]
# The basic provisioner simply deletes and creates database.
# If `reset_database` is false, it will not attempt to delete the database
[provision.basic]
# If enabled the provisioner will actually run
enabled = true
# Address of the instance that is to be provisioned
address = "localhost:8086"
# Database the will be created/deleted
database = "stress"
# Attempt to delete database
reset_database = true
# This section cannot be commented out
# To prevent writes set `enabled=false`
# in [write.influx_client.basic]
[write]
[write.point_generator]
# The basic point generator will generate points of the form
# `cpu,host=server-%v,location=us-west value=234 123456`
[write.point_generator.basic]
# number of points that will be written for each of the series
point_count = 100
# number of series
series_count = 100000
# How much time between each timestamp
tick = "10s"
# Randomize timestamp a bit (not functional)
jitter = true
# Precision of points that are being written
precision = "s"
# name of the measurement that will be written
measurement = "cpu"
# The date for the first point that is written into influx
start_date = "2006-Jan-02"
# Defines a tag for a series
[[write.point_generator.basic.tag]]
key = "host"
value = "server"
[[write.point_generator.basic.tag]]
key = "location"
value = "us-west"
# Defines a field for a series
[[write.point_generator.basic.field]]
key = "value"
value = "float64" # supported types: float64, int, bool
[write.influx_client]
[write.influx_client.basic]
# If enabled the writer will actually write
enabled = true
# Addresses is an array of the Influxdb instances
addresses = ["localhost:8086"] # stress_test_server runs on port 1234
# Database that is being written to
database = "stress"
# Precision of points that are being written
precision = "s"
# Size of batches that are sent to db
batch_size = 10000
# Interval between each batch
batch_interval = "0s"
# How many concurrent writers to the db
concurrency = 10
# ssl enabled?
ssl = false
# format of points that are written to influxdb
format = "line_http" # line_udp (not supported yet), graphite_tcp (not supported yet), graphite_udp (not supported yet)
# This section can be removed
[read]
[read.query_generator]
[read.query_generator.basic]
# Template of the query that will be ran against the instance
template = "SELECT count(value) FROM cpu where host='server-%v'"
# How many times the templated query will be ran
query_count = 250
[read.query_client]
[read.query_client.basic]
# if enabled the reader will actually read
enabled = true
# Address of the instance that will be queried
addresses = ["localhost:8086"]
# Database that will be queried
database = "stress"
# Interval bewteen queries
query_interval = "100ms"
# Number of concurrent queriers
concurrency = 1
// Command influx_stress is deprecated; use github.com/influxdata/influx-stress instead.
package main
import (
"flag"
"fmt"
"log"
"os"
"runtime/pprof"
"github.com/influxdata/influxdb/stress"
v2 "github.com/influxdata/influxdb/stress/v2"
)
var (
useV2 = flag.Bool("v2", false, "Use version 2 of stress tool")
config = flag.String("config", "", "The stress test file")
cpuprofile = flag.String("cpuprofile", "", "Write the cpu profile to `filename`")
db = flag.String("db", "", "target database within test system for write and query load")
)
func main() {
o := stress.NewOutputConfig()
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
fmt.Println(err)
return
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
if *useV2 {
if *config != "" {
v2.RunStress(*config)
} else {
v2.RunStress("stress/v2/iql/file.iql")
}
} else {
c, err := stress.NewConfig(*config)
if err != nil {
log.Fatal(err)
return
}
if *db != "" {
c.Provision.Basic.Database = *db
c.Write.InfluxClients.Basic.Database = *db
c.Read.QueryClients.Basic.Database = *db
}
w := stress.NewWriter(c.Write.PointGenerators.Basic, &c.Write.InfluxClients.Basic)
r := stress.NewQuerier(&c.Read.QueryGenerators.Basic, &c.Read.QueryClients.Basic)
s := stress.NewStressTest(&c.Provision.Basic, w, r)
bw := stress.NewBroadcastChannel()
bw.Register(c.Write.InfluxClients.Basic.BasicWriteHandler)
bw.Register(o.HTTPHandler("write"))
br := stress.NewBroadcastChannel()
br.Register(c.Read.QueryClients.Basic.BasicReadHandler)
br.Register(o.HTTPHandler("read"))
s.Start(bw.Handle, br.Handle)
}
}
# Converting b1 and bz1 shards to tsm1
`influx_tsm` is a tool for converting b1 and bz1 shards to tsm1
format. Converting shards to tsm1 format results in a very significant
reduction in disk usage, and significantly improved write-throughput,
when writing data into those shards.
Conversion can be controlled on a database-by-database basis. By
default a database is backed up before it is converted, allowing you
to roll back any changes. Because of the backup process, ensure the
host system has at least as much free disk space as the disk space
consumed by the _data_ directory of your InfluxDB system.
The tool automatically ignores tsm1 shards, and can be run
idempotently on any database.
Conversion is an offline process, and the InfluxDB system must be
stopped during conversion. However the conversion process reads and
writes shards directly on disk and should be fast.
## Steps
Follow these steps to perform a conversion.
* Identify the databases you wish to convert. You can convert one or more databases at a time. By default all databases are converted.
* Decide on parallel operation. By default the conversion operation peforms each operation in a serial manner. This minimizes load on the host system performing the conversion, but also takes the most time. If you wish to minimize the time conversion takes, enable parallel mode. Conversion will then perform as many operations as possible in parallel, but the process may place significant load on the host system (CPU, disk, and RAM, usage will all increase).
* Stop all write-traffic to your InfluxDB system.
* Restart the InfluxDB service and wait until all WAL data is flushed to disk -- this has completed when the system responds to queries. This is to ensure all data is present in shards.
* Stop the InfluxDB service. It should not be restarted until conversion is complete.
* Run conversion tool. Depending on the size of the data directory, this might be a lengthy operation. Consider running the conversion tool under a "screen" session to avoid any interruptions.
* Unless you ran the conversion tool as the same user as that which runs InfluxDB, then you may need to set the correct read-and-write permissions on the new tsm1 directories.
* Restart node and ensure data looks correct.
* If everything looks OK, you may then wish to remove or archive the backed-up databases.
* Restart write traffic.
## Example session
Below is an example session, showing a database being converted.
```
$ # Create a backup location that the `influxdb` user has full access to
$ mkdir -m 0777 /path/to/influxdb_backup
$ sudo -u influxdb influx_tsm -backup /path/to/influxdb_backup -parallel /var/lib/influxdb/data
b1 and bz1 shard conversion.
-----------------------------------
Data directory is: /var/lib/influxdb/data
Backup directory is: /path/to/influxdb_backup
Databases specified: all
Database backups enabled: yes
Parallel mode enabled (GOMAXPROCS): yes (8)
Found 1 shards that will be converted.
Database Retention Path Engine Size
_internal monitor /var/lib/influxdb/data/_internal/monitor/1 bz1 65536
These shards will be converted. Proceed? y/N: y
Conversion starting....
Backing up 1 databases...
2016/01/28 12:23:43.699266 Backup of databse '_internal' started
2016/01/28 12:23:43.699883 Backing up file /var/lib/influxdb/data/_internal/monitor/1
2016/01/28 12:23:43.700052 Database _internal backed up (851.776µs)
2016/01/28 12:23:43.700320 Starting conversion of shard: /var/lib/influxdb/data/_internal/monitor/1
2016/01/28 12:23:43.706276 Conversion of /var/lib/influxdb/data/_internal/monitor/1 successful (6.040148ms)
Summary statistics
========================================
Databases converted: 1
Shards converted: 1
TSM files created: 1
Points read: 369
Points written: 369
NaN filtered: 0
Inf filtered: 0
Points without fields filtered: 0
Disk usage pre-conversion (bytes): 65536
Disk usage post-conversion (bytes): 11000
Reduction factor: 83%
Bytes per TSM point: 29.81
Total conversion time: 7.330443ms
$ # restart node, verify data
$ sudo rm -r /path/to/influxdb_backup
```
Note that the tool first lists the shards that will be converted,
before asking for confirmation. You can abort the conversion process
at this step if you just wish to see what would be converted, or if
the list of shards does not look correct.
__WARNING:__ If you run the `influx_tsm` tool as a user other than the
`influxdb` user (or the user that the InfluxDB process runs under),
please make sure to verify the shard permissions are correct prior to
starting InfluxDB. If needed, shard permissions can be corrected with
the `chown` command. For example:
```
sudo chown -R influxdb:influxdb /var/lib/influxdb
```
## Rolling back a conversion
After a successful backup (the message `Database XYZ backed up` was
logged), you have a duplicate of that database in the _backup_
directory you provided on the command line. If, when checking your
data after a successful conversion, you notice things missing or
something just isn't right, you can "undo" the conversion:
- Shut down your node (this is very important)
- Remove the database's directory from the influxdb `data` directory (default: `~/.influxdb/data/XYZ` for binary installations or `/var/lib/influxdb/data/XYZ` for packaged installations)
- Copy (to really make sure the shard is preserved) the database's directory from the backup directory you created into the `data` directory.
Using the same directories as above, and assuming a database named `stats`:
```
$ sudo rm -r /var/lib/influxdb/data/stats
$ sudo cp -r /path/to/influxdb_backup/stats /var/lib/influxdb/data/
$ # restart influxd node
```
#### How to avoid downtime when upgrading shards
*Identify non-`tsm1` shards*
Non-`tsm1` shards are files of the form: `data/<database>/<retention_policy>/<shard_id>`.
`tsm1` shards are files of the form: `data/<database>/<retention_policy>/<shard_id>/<file>.tsm`.
*Determine which `bz`/`bz1` shards are cold for writes*
Run the `SHOW SHARDS` query to see the start and end dates for shards.
If the date range for a shard does not span the current time then the shard is said to be cold for writes.
This means that no new points are expected to be added to the shard.
The shard whose date range spans now is said to be hot for writes.
You can only safely convert cold shards without stopping the InfluxDB process.
*Convert cold shards*
1. Copy each of the cold shards you'd like to convert to a new directory with the structure `/tmp/data/<database>/<retention_policy>/<shard_id>`.
2. Run the `influx_tsm` tool on the copied files:
```
influx_tsm -parallel /tmp/data/
```
3. Remove the existing cold `b1`/`bz1` shards from the production data directory.
4. Move the new `tsm1` shards into the original directory, overwriting the existing `b1`/`bz1` shards of the same name. Do this simultaneously with step 3 to avoid any query errors.
5. Wait an hour, a day, or a week (depending on your retention period) for any hot `b1`/`bz1` shards to become cold and repeat steps 1 through 4 on the newly cold shards.
> **Note:** Any points written to the cold shards after making a copy will be lost when the `tsm1` shard overwrites the existing cold shard.
Nothing in InfluxDB will prevent writes to cold shards, they are merely unexpected, not impossible.
It is your responsibility to prevent writes to cold shards to prevent data loss.
// Package b1 reads data from b1 shards.
package b1 // import "github.com/influxdata/influxdb/cmd/influx_tsm/b1"
import (
"encoding/binary"
"math"
"sort"
"time"
"github.com/boltdb/bolt"
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// DefaultChunkSize is the size of chunks read from the b1 shard
const DefaultChunkSize int = 1000
var excludedBuckets = map[string]bool{
"fields": true,
"meta": true,
"series": true,
"wal": true,
}
// Reader is used to read all data from a b1 shard.
type Reader struct {
path string
db *bolt.DB
tx *bolt.Tx
cursors []*cursor
currCursor int
keyBuf string
values []tsm1.Value
valuePos int
fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec
stats *stats.Stats
}
// NewReader returns a reader for the b1 shard at path.
func NewReader(path string, stats *stats.Stats, chunkSize int) *Reader {
r := &Reader{
path: path,
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
stats: stats,
}
if chunkSize <= 0 {
chunkSize = DefaultChunkSize
}
r.values = make([]tsm1.Value, chunkSize)
return r
}
// Open opens the reader.
func (r *Reader) Open() error {
// Open underlying storage.
db, err := bolt.Open(r.path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err
}
r.db = db
// Load fields.
if err := r.db.View(func(tx *bolt.Tx) error {
meta := tx.Bucket([]byte("fields"))
c := meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
mf := &tsdb.MeasurementFields{}
if err := mf.UnmarshalBinary(v); err != nil {
return err
}
r.fields[string(k)] = mf
r.codecs[string(k)] = tsdb.NewFieldCodec(mf.Fields)
}
return nil
}); err != nil {
return err
}
seriesSet := make(map[string]bool)
// ignore series index and find all series in this shard
if err := r.db.View(func(tx *bolt.Tx) error {
tx.ForEach(func(name []byte, _ *bolt.Bucket) error {
key := string(name)
if !excludedBuckets[key] {
seriesSet[key] = true
}
return nil
})
return nil
}); err != nil {
return err
}
r.tx, err = r.db.Begin(false)
if err != nil {
return err
}
// Create cursor for each field of each series.
for s := range seriesSet {
measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[measurement]
if fields == nil {
r.stats.IncrFiltered()
continue
}
for _, f := range fields.Fields {
c := newCursor(r.tx, s, f.Name, r.codecs[measurement])
c.SeekTo(0)
r.cursors = append(r.cursors, c)
}
}
sort.Sort(cursors(r.cursors))
return nil
}
// Next returns whether any data remains to be read. It must be called before
// the next call to Read().
func (r *Reader) Next() bool {
r.valuePos = 0
OUTER:
for {
if r.currCursor >= len(r.cursors) {
// All cursors drained. No more data remains.
return false
}
cc := r.cursors[r.currCursor]
r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field)
for {
k, v := cc.Next()
if k == -1 {
// Go to next cursor and try again.
r.currCursor++
if r.valuePos == 0 {
// The previous cursor had no data. Instead of returning
// just go immediately to the next cursor.
continue OUTER
}
// There is some data available. Indicate that it should be read.
return true
}
if f, ok := v.(float64); ok {
if math.IsInf(f, 0) {
r.stats.AddPointsRead(1)
r.stats.IncrInf()
continue
}
if math.IsNaN(f) {
r.stats.AddPointsRead(1)
r.stats.IncrNaN()
continue
}
}
r.values[r.valuePos] = tsm1.NewValue(k, v)
r.valuePos++
if r.valuePos >= len(r.values) {
return true
}
}
}
}
// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is
// emitted completely for every field, in every series, before the next field is processed.
// Data from Read() adheres to the requirements for writing to tsm1 shards
func (r *Reader) Read() (string, []tsm1.Value, error) {
return r.keyBuf, r.values[:r.valuePos], nil
}
// Close closes the reader.
func (r *Reader) Close() error {
r.tx.Rollback()
return r.db.Close()
}
// cursor provides ordered iteration across a series.
type cursor struct {
// Bolt cursor and readahead buffer.
cursor *bolt.Cursor
keyBuf int64
valBuf interface{}
series string
field string
dec *tsdb.FieldCodec
}
// Cursor returns an iterator for a key over a single field.
func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) *cursor {
cur := &cursor{
keyBuf: -2,
series: series,
field: field,
dec: dec,
}
// Retrieve series bucket.
b := tx.Bucket([]byte(series))
if b != nil {
cur.cursor = b.Cursor()
}
return cur
}
// Seek moves the cursor to a position.
func (c *cursor) SeekTo(seek int64) {
var seekBytes [8]byte
binary.BigEndian.PutUint64(seekBytes[:], uint64(seek))
k, v := c.cursor.Seek(seekBytes[:])
c.keyBuf, c.valBuf = tsdb.DecodeKeyValue(c.field, c.dec, k, v)
}
// Next returns the next key/value pair from the cursor.
func (c *cursor) Next() (key int64, value interface{}) {
for {
k, v := func() (int64, interface{}) {
if c.keyBuf != -2 {
k, v := c.keyBuf, c.valBuf
c.keyBuf = -2
return k, v
}
k, v := c.cursor.Next()
if k == nil {
return -1, nil
}
return tsdb.DecodeKeyValue(c.field, c.dec, k, v)
}()
if k != -1 && v == nil {
// There is a point in the series at the next timestamp,
// but not for this cursor's field. Go to the next point.
continue
}
return k, v
}
}
// Sort b1 cursors in correct order for writing to TSM files.
type cursors []*cursor
func (a cursors) Len() int { return len(a) }
func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a cursors) Less(i, j int) bool {
if a[i].series == a[j].series {
return a[i].field < a[j].field
}
return a[i].series < a[j].series
}
// Package bz1 reads data from bz1 shards.
package bz1 // import "github.com/influxdata/influxdb/cmd/influx_tsm/bz1"
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"math"
"sort"
"time"
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// DefaultChunkSize is the size of chunks read from the bz1 shard
const DefaultChunkSize = 1000
// Reader is used to read all data from a bz1 shard.
type Reader struct {
path string
db *bolt.DB
tx *bolt.Tx
cursors []*cursor
currCursor int
keyBuf string
values []tsm1.Value
valuePos int
fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec
stats *stats.Stats
}
// NewReader returns a reader for the bz1 shard at path.
func NewReader(path string, stats *stats.Stats, chunkSize int) *Reader {
r := &Reader{
path: path,
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
stats: stats,
}
if chunkSize <= 0 {
chunkSize = DefaultChunkSize
}
r.values = make([]tsm1.Value, chunkSize)
return r
}
// Open opens the reader.
func (r *Reader) Open() error {
// Open underlying storage.
db, err := bolt.Open(r.path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err
}
r.db = db
seriesSet := make(map[string]bool)
if err := r.db.View(func(tx *bolt.Tx) error {
var data []byte
meta := tx.Bucket([]byte("meta"))
if meta == nil {
// No data in this shard.
return nil
}
pointsBucket := tx.Bucket([]byte("points"))
if pointsBucket == nil {
return nil
}
if err := pointsBucket.ForEach(func(key, _ []byte) error {
seriesSet[string(key)] = true
return nil
}); err != nil {
return err
}
buf := meta.Get([]byte("fields"))
if buf == nil {
// No data in this shard.
return nil
}
data, err = snappy.Decode(nil, buf)
if err != nil {
return err
}
if err := json.Unmarshal(data, &r.fields); err != nil {
return err
}
return nil
}); err != nil {
return err
}
// Build the codec for each measurement.
for k, v := range r.fields {
r.codecs[k] = tsdb.NewFieldCodec(v.Fields)
}
r.tx, err = r.db.Begin(false)
if err != nil {
return err
}
// Create cursor for each field of each series.
for s := range seriesSet {
measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[measurement]
if fields == nil {
r.stats.IncrFiltered()
continue
}
for _, f := range fields.Fields {
c := newCursor(r.tx, s, f.Name, r.codecs[measurement])
if c == nil {
continue
}
c.SeekTo(0)
r.cursors = append(r.cursors, c)
}
}
sort.Sort(cursors(r.cursors))
return nil
}
// Next returns whether there is any more data to be read.
func (r *Reader) Next() bool {
r.valuePos = 0
OUTER:
for {
if r.currCursor >= len(r.cursors) {
// All cursors drained. No more data remains.
return false
}
cc := r.cursors[r.currCursor]
r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field)
for {
k, v := cc.Next()
if k == -1 {
// Go to next cursor and try again.
r.currCursor++
if r.valuePos == 0 {
// The previous cursor had no data. Instead of returning
// just go immediately to the next cursor.
continue OUTER
}
// There is some data available. Indicate that it should be read.
return true
}
if f, ok := v.(float64); ok {
if math.IsInf(f, 0) {
r.stats.AddPointsRead(1)
r.stats.IncrInf()
continue
}
if math.IsNaN(f) {
r.stats.AddPointsRead(1)
r.stats.IncrNaN()
continue
}
}
r.values[r.valuePos] = tsm1.NewValue(k, v)
r.valuePos++
if r.valuePos >= len(r.values) {
return true
}
}
}
}
// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is
// emitted completely for every field, in every series, before the next field is processed.
// Data from Read() adheres to the requirements for writing to tsm1 shards
func (r *Reader) Read() (string, []tsm1.Value, error) {
return r.keyBuf, r.values[:r.valuePos], nil
}
// Close closes the reader.
func (r *Reader) Close() error {
r.tx.Rollback()
return r.db.Close()
}
// cursor provides ordered iteration across a series.
type cursor struct {
cursor *bolt.Cursor
buf []byte // uncompressed buffer
off int // buffer offset
fieldIndices []int
index int
series string
field string
dec *tsdb.FieldCodec
keyBuf int64
valBuf interface{}
}
// newCursor returns an instance of a bz1 cursor.
func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) *cursor {
// Retrieve points bucket. Ignore if there is no bucket.
b := tx.Bucket([]byte("points")).Bucket([]byte(series))
if b == nil {
return nil
}
return &cursor{
cursor: b.Cursor(),
series: series,
field: field,
dec: dec,
keyBuf: -2,
}
}
// Seek moves the cursor to a position.
func (c *cursor) SeekTo(seek int64) {
var seekBytes [8]byte
binary.BigEndian.PutUint64(seekBytes[:], uint64(seek))
// Move cursor to appropriate block and set to buffer.
k, v := c.cursor.Seek(seekBytes[:])
if v == nil { // get the last block, it might have this time
_, v = c.cursor.Last()
} else if seek < int64(binary.BigEndian.Uint64(k)) { // the seek key is less than this block, go back one and check
_, v = c.cursor.Prev()
// if the previous block max time is less than the seek value, reset to where we were originally
if v == nil || seek > int64(binary.BigEndian.Uint64(v[0:8])) {
_, v = c.cursor.Seek(seekBytes[:])
}
}
c.setBuf(v)
// Read current block up to seek position.
c.seekBuf(seekBytes[:])
// Return current entry.
c.keyBuf, c.valBuf = c.read()
}
// seekBuf moves the cursor to a position within the current buffer.
func (c *cursor) seekBuf(seek []byte) (key, value []byte) {
for {
// Slice off the current entry.
buf := c.buf[c.off:]
// Exit if current entry's timestamp is on or after the seek.
if len(buf) == 0 {
return
}
if bytes.Compare(buf[0:8], seek) != -1 {
return
}
c.off += entryHeaderSize + entryDataSize(buf)
}
}
// Next returns the next key/value pair from the cursor. If there are no values
// remaining, -1 is returned.
func (c *cursor) Next() (int64, interface{}) {
for {
k, v := func() (int64, interface{}) {
if c.keyBuf != -2 {
k, v := c.keyBuf, c.valBuf
c.keyBuf = -2
return k, v
}
// Ignore if there is no buffer.
if len(c.buf) == 0 {
return -1, nil
}
// Move forward to next entry.
c.off += entryHeaderSize + entryDataSize(c.buf[c.off:])
// If no items left then read first item from next block.
if c.off >= len(c.buf) {
_, v := c.cursor.Next()
c.setBuf(v)
}
return c.read()
}()
if k != -1 && v == nil {
// There is a point in the series at the next timestamp,
// but not for this cursor's field. Go to the next point.
continue
}
return k, v
}
}
// setBuf saves a compressed block to the buffer.
func (c *cursor) setBuf(block []byte) {
// Clear if the block is empty.
if len(block) == 0 {
c.buf, c.off, c.fieldIndices, c.index = c.buf[0:0], 0, c.fieldIndices[0:0], 0
return
}
// Otherwise decode block into buffer.
// Skip over the first 8 bytes since they are the max timestamp.
buf, err := snappy.Decode(nil, block[8:])
if err != nil {
c.buf = c.buf[0:0]
fmt.Printf("block decode error: %s\n", err)
}
c.buf, c.off = buf, 0
}
// read reads the current key and value from the current block.
func (c *cursor) read() (key int64, value interface{}) {
// Return nil if the offset is at the end of the buffer.
if c.off >= len(c.buf) {
return -1, nil
}
// Otherwise read the current entry.
buf := c.buf[c.off:]
dataSize := entryDataSize(buf)
return tsdb.DecodeKeyValue(c.field, c.dec, buf[0:8], buf[entryHeaderSize:entryHeaderSize+dataSize])
}
// Sort bz1 cursors in correct order for writing to TSM files.
type cursors []*cursor
func (a cursors) Len() int { return len(a) }
func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a cursors) Less(i, j int) bool {
if a[i].series == a[j].series {
return a[i].field < a[j].field
}
return a[i].series < a[j].series
}
// entryHeaderSize is the number of bytes required for the header.
const entryHeaderSize = 8 + 4
// entryDataSize returns the size of an entry's data field, in bytes.
func entryDataSize(v []byte) int { return int(binary.BigEndian.Uint32(v[8:12])) }
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
const (
maxBlocksPerKey = 65535
)
// KeyIterator is used to iterate over b* keys for conversion to tsm keys
type KeyIterator interface {
Next() bool
Read() (string, []tsm1.Value, error)
}
// Converter encapsulates the logic for converting b*1 shards to tsm1 shards.
type Converter struct {
path string
maxTSMFileSize uint32
sequence int
stats *stats.Stats
}
// NewConverter returns a new instance of the Converter.
func NewConverter(path string, sz uint32, stats *stats.Stats) *Converter {
return &Converter{
path: path,
maxTSMFileSize: sz,
stats: stats,
}
}
// Process writes the data provided by iter to a tsm1 shard.
func (c *Converter) Process(iter KeyIterator) error {
// Ensure the tsm1 directory exists.
if err := os.MkdirAll(c.path, 0777); err != nil {
return err
}
// Iterate until no more data remains.
var w tsm1.TSMWriter
var keyCount map[string]int
for iter.Next() {
k, v, err := iter.Read()
if err != nil {
return err
}
if w == nil {
w, err = c.nextTSMWriter()
if err != nil {
return err
}
keyCount = map[string]int{}
}
if err := w.Write(k, v); err != nil {
return err
}
keyCount[k]++
c.stats.AddPointsRead(len(v))
c.stats.AddPointsWritten(len(v))
// If we have a max file size configured and we're over it, start a new TSM file.
if w.Size() > c.maxTSMFileSize || keyCount[k] == maxBlocksPerKey {
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
return err
}
c.stats.AddTSMBytes(w.Size())
if err := w.Close(); err != nil {
return err
}
w = nil
}
}
if w != nil {
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
return err
}
c.stats.AddTSMBytes(w.Size())
if err := w.Close(); err != nil {
return err
}
}
return nil
}
// nextTSMWriter returns the next TSMWriter for the Converter.
func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) {
c.sequence++
fileName := filepath.Join(c.path, fmt.Sprintf("%09d-%09d.%s", 1, c.sequence, tsm1.TSMFileExtension))
fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
}
// Create the writer for the new TSM file.
w, err := tsm1.NewTSMWriter(fd)
if err != nil {
return nil, err
}
c.stats.IncrTSMFileCount()
return w, nil
}
This diff is collapsed. Click to expand it.
// Package stats contains statistics for converting non-TSM shards to TSM.
package stats
import (
"sync/atomic"
"time"
)
// Stats are the statistics captured while converting non-TSM shards to TSM
type Stats struct {
NanFiltered uint64
InfFiltered uint64
FieldsFiltered uint64
PointsWritten uint64
PointsRead uint64
TsmFilesCreated uint64
TsmBytesWritten uint64
CompletedShards uint64
TotalTime time.Duration
}
// AddPointsRead increments the number of read points.
func (s *Stats) AddPointsRead(n int) {
atomic.AddUint64(&s.PointsRead, uint64(n))
}
// AddPointsWritten increments the number of written points.
func (s *Stats) AddPointsWritten(n int) {
atomic.AddUint64(&s.PointsWritten, uint64(n))
}
// AddTSMBytes increments the number of TSM Bytes.
func (s *Stats) AddTSMBytes(n uint32) {
atomic.AddUint64(&s.TsmBytesWritten, uint64(n))
}
// IncrTSMFileCount increments the number of TSM files created.
func (s *Stats) IncrTSMFileCount() {
atomic.AddUint64(&s.TsmFilesCreated, 1)
}
// IncrNaN increments the number of NaNs filtered.
func (s *Stats) IncrNaN() {
atomic.AddUint64(&s.NanFiltered, 1)
}
// IncrInf increments the number of Infs filtered.
func (s *Stats) IncrInf() {
atomic.AddUint64(&s.InfFiltered, 1)
}
// IncrFiltered increments the number of fields filtered.
func (s *Stats) IncrFiltered() {
atomic.AddUint64(&s.FieldsFiltered, 1)
}
package main
import (
"fmt"
"log"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
)
// tracker will orchestrate and track the conversions of non-TSM shards to TSM
type tracker struct {
Stats stats.Stats
shards tsdb.ShardInfos
opts options
pg ParallelGroup
wg sync.WaitGroup
}
// newTracker will setup and return a clean tracker instance
func newTracker(shards tsdb.ShardInfos, opts options) *tracker {
t := &tracker{
shards: shards,
opts: opts,
pg: NewParallelGroup(runtime.GOMAXPROCS(0)),
}
return t
}
func (t *tracker) Run() error {
conversionStart := time.Now()
// Backup each directory.
if !opts.SkipBackup {
databases := t.shards.Databases()
fmt.Printf("Backing up %d databases...\n", len(databases))
t.wg.Add(len(databases))
for i := range databases {
db := databases[i]
go t.pg.Do(func() {
defer t.wg.Done()
start := time.Now()
log.Printf("Backup of database '%v' started", db)
err := backupDatabase(db)
if err != nil {
log.Fatalf("Backup of database %v failed: %v\n", db, err)
}
log.Printf("Database %v backed up (%v)\n", db, time.Since(start))
})
}
t.wg.Wait()
} else {
fmt.Println("Database backup disabled.")
}
t.wg.Add(len(t.shards))
for i := range t.shards {
si := t.shards[i]
go t.pg.Do(func() {
defer func() {
atomic.AddUint64(&t.Stats.CompletedShards, 1)
t.wg.Done()
}()
start := time.Now()
log.Printf("Starting conversion of shard: %v", si.FullPath(opts.DataPath))
if err := convertShard(si, t); err != nil {
log.Fatalf("Failed to convert %v: %v\n", si.FullPath(opts.DataPath), err)
}
log.Printf("Conversion of %v successful (%v)\n", si.FullPath(opts.DataPath), time.Since(start))
})
}
done := make(chan struct{})
go func() {
t.wg.Wait()
close(done)
}()
WAIT_LOOP:
for {
select {
case <-done:
break WAIT_LOOP
case <-time.After(opts.UpdateInterval):
t.StatusUpdate()
}
}
t.Stats.TotalTime = time.Since(conversionStart)
return nil
}
func (t *tracker) StatusUpdate() {
shardCount := atomic.LoadUint64(&t.Stats.CompletedShards)
pointCount := atomic.LoadUint64(&t.Stats.PointsRead)
pointWritten := atomic.LoadUint64(&t.Stats.PointsWritten)
log.Printf("Still Working: Completed Shards: %d/%d Points read/written: %d/%d", shardCount, len(t.shards), pointCount, pointWritten)
}
func (t *tracker) PrintStats() {
preSize := t.shards.Size()
postSize := int64(t.Stats.TsmBytesWritten)
fmt.Printf("\nSummary statistics\n========================================\n")
fmt.Printf("Databases converted: %d\n", len(t.shards.Databases()))
fmt.Printf("Shards converted: %d\n", len(t.shards))
fmt.Printf("TSM files created: %d\n", t.Stats.TsmFilesCreated)
fmt.Printf("Points read: %d\n", t.Stats.PointsRead)
fmt.Printf("Points written: %d\n", t.Stats.PointsWritten)
fmt.Printf("NaN filtered: %d\n", t.Stats.NanFiltered)
fmt.Printf("Inf filtered: %d\n", t.Stats.InfFiltered)
fmt.Printf("Points without fields filtered: %d\n", t.Stats.FieldsFiltered)
fmt.Printf("Disk usage pre-conversion (bytes): %d\n", preSize)
fmt.Printf("Disk usage post-conversion (bytes): %d\n", postSize)
fmt.Printf("Reduction factor: %d%%\n", 100*(preSize-postSize)/preSize)
fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(t.Stats.PointsWritten))
fmt.Printf("Total conversion time: %v\n", t.Stats.TotalTime)
fmt.Println()
}
package tsdb
import (
"encoding/binary"
"errors"
"fmt"
"math"
)
const (
fieldFloat = 1
fieldInteger = 2
fieldBoolean = 3
fieldString = 4
)
var (
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors.New("field not found")
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors.New("field ID not mapped")
)
// FieldCodec provides encoding and decoding functionality for the fields of a given
// Measurement.
type FieldCodec struct {
fieldsByID map[uint8]*Field
fieldsByName map[string]*Field
}
// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with
// a RLock that protects the Measurement.
func NewFieldCodec(fields map[string]*Field) *FieldCodec {
fieldsByID := make(map[uint8]*Field, len(fields))
fieldsByName := make(map[string]*Field, len(fields))
for _, f := range fields {
fieldsByID[f.ID] = f
fieldsByName[f.Name] = f
}
return &FieldCodec{fieldsByID: fieldsByID, fieldsByName: fieldsByName}
}
// FieldIDByName returns the ID for the given field.
func (f *FieldCodec) FieldIDByName(s string) (uint8, error) {
fi := f.fieldsByName[s]
if fi == nil {
return 0, ErrFieldNotFound
}
return fi.ID, nil
}
// DecodeByID scans a byte slice for a field with the given ID, converts it to its
// expected type, and return that value.
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
var value interface{}
for {
if len(b) == 0 {
// No more bytes.
return nil, ErrFieldNotFound
}
field := f.fieldsByID[b[0]]
if field == nil {
// This can happen, though is very unlikely. If this node receives encoded data, to be written
// to disk, and is queried for that data before its metastore is updated, there will be no field
// mapping for the data during decode. All this can happen because data is encoded by the node
// that first received the write request, not the node that actually writes the data to disk.
// So if this happens, the read must be aborted.
return nil, ErrFieldUnmappedID
}
switch field.Type {
case fieldFloat:
if field.ID == targetID {
value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
}
b = b[9:]
case fieldInteger:
if field.ID == targetID {
value = int64(binary.BigEndian.Uint64(b[1:9]))
}
b = b[9:]
case fieldBoolean:
if field.ID == targetID {
value = b[1] == 1
}
b = b[2:]
case fieldString:
length := binary.BigEndian.Uint16(b[1:3])
if field.ID == targetID {
value = string(b[3 : 3+length])
}
b = b[3+length:]
default:
panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type))
}
if value != nil {
return value, nil
}
}
}
// DecodeByName scans a byte slice for a field with the given name, converts it to its
// expected type, and return that value.
func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error) {
fi := f.FieldByName(name)
if fi == nil {
return 0, ErrFieldNotFound
}
return f.DecodeByID(fi.ID, b)
}
// FieldByName returns the field by its name. It will return a nil if not found
func (f *FieldCodec) FieldByName(name string) *Field {
return f.fieldsByName[name]
}
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.