From 059340a0580dcdb770c92419a57c44e954a0cab4 Mon Sep 17 00:00:00 2001 From: Joel Schmid Date: Thu, 29 Apr 2021 18:08:38 +0200 Subject: [PATCH] optimized influx query builder and execution --- storage/influxdb-storage.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/storage/influxdb-storage.go b/storage/influxdb-storage.go index 49ec069..85ecfe1 100644 --- a/storage/influxdb-storage.go +++ b/storage/influxdb-storage.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "log" + "sort" + "strings" "time" "weather-data/config" @@ -62,14 +64,19 @@ func (storage *influxStorage) createFluxQuery(query *WeatherQuery) string { for sensorValueType, value := range query.Values { if value { - fields = fmt.Sprintf("%v %v r._field == \"%v\"", fields, concat, string(sensorValueType)) + fields = fmt.Sprintf("%v %v r[\"_field\"] == \"%v\"", fields, concat, string(sensorValueType)) concat = "or" } } - fields = fmt.Sprintf(" and ( %v )", fields) + fromTemplate := fmt.Sprintf("from(bucket:\"%v\")", storage.config.Bucket) + rangeTemplate := fmt.Sprintf("|> range(start: %v, stop: %v)", query.Start.Format(time.RFC3339), query.End.Format(time.RFC3339)) + measurementTemplate := fmt.Sprintf("|> filter(fn: (r) => r[\"_measurement\"] == \"%v\")", storage.measurement) + sensorIdsTemplate := fmt.Sprintf("|> filter(fn: (r) => r[\"sensorId\"] == \"%v\")", query.SensorId) + fields = fmt.Sprintf("|> filter(fn: (r) => %v )", strings.Trim(fields, " ")) + + fluxQuery := fmt.Sprintf("%v \n %v \n %v \n %v \n %v", fromTemplate, rangeTemplate, measurementTemplate, sensorIdsTemplate, fields) - fluxQuery := fmt.Sprintf("from(bucket:\"%v\")|> range(start: %v, stop: %v) |> filter(fn: (r) => r._measurement == \"%v\" and r.sensorId == \"%v\" %v)", storage.config.Bucket, query.Start.Format(time.RFC3339), query.End.Format(time.RFC3339), storage.measurement, query.SensorId, fields) return fluxQuery } @@ -107,6 +114,12 @@ func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, er } } + //if some attibutes missed in a few datapoints they are not ordered by time + //influx query e.g. first all humidity, than pressure and temperature at last. if there are some datapoints with only pressore and/or temperature they are the last inserted in the array + sort.Slice(queryResults, func(p, q int) bool { + return queryResults[p].TimeStamp.Before(queryResults[q].TimeStamp) + }) + return queryResults, nil }