package storage import ( "context" "fmt" "log" "sort" "strings" "time" "weather-data/config" "github.com/google/uuid" influxdb2 "github.com/influxdata/influxdb-client-go/v2" ) //influxStorage is the Storage implementation for InfluxDB type influxStorage struct { config config.InfluxConfig measurement string client influxdb2.Client } //NewInfluxStorage Factory func NewInfluxStorage(cfg config.InfluxConfig) (*influxStorage, error) { influx := new(influxStorage) influx.config = cfg influx.client = influxdb2.NewClient(cfg.Host, cfg.Token) influx.measurement = "weather-data" log.Print("Successfully created influx-client") return influx, nil } //Save WeatherData to InfluxDB func (storage *influxStorage) Save(data *WeatherData) error { tags := map[string]string{ "sensorId": data.SensorId.String()} fields := make(map[string]interface{}) for k, v := range data.Values { fields[string(k)] = v } datapoint := influxdb2.NewPoint(storage.measurement, tags, fields, data.TimeStamp) writeAPI := storage.client.WriteAPI(storage.config.Organization, storage.config.Bucket) writeAPI.WritePoint(datapoint) return nil } //GetData datapoints from InfluxDB func (storage *influxStorage) GetData(query *WeatherQuery) ([]*WeatherData, error) { fluxQuery := storage.createFluxQuery(query) res, err := storage.executeFluxQuery(fluxQuery) return res, err } func (storage *influxStorage) createFluxQuery(query *WeatherQuery) string { fields := "" concat := "" sensorIds := "" for sensorValueType, value := range query.Values { if value { fields = fmt.Sprintf("%v %v r[\"_field\"] == \"%v\"", fields, concat, string(sensorValueType)) concat = "or" } } concat = "" for _, id := range query.SensorIds { sensorIds = fmt.Sprintf("%v %v r[\"sensorId\"] == \"%v\"", sensorIds, concat, id) concat = "or" } fromTemplate := fmt.Sprintf("from(bucket:\"%v\")", storage.config.Bucket) rangeTemplate := fmt.Sprintf("|> range(start: %v, stop: %v)", query.Start.Format(time.RFC3339), query.End.Format(time.RFC3339)) measurementTemplate := fmt.Sprintf("|> filter(fn: (r) => r[\"_measurement\"] == \"%v\")", storage.measurement) sensorIdsTemplate := "" if len(sensorIds) > 0 { sensorIdsTemplate = fmt.Sprintf("|> filter(fn: (r) => %v )", strings.Trim(sensorIds, " ")) } fieldsTemplate := "" if len(fields) > 0 { fieldsTemplate = fmt.Sprintf("|> filter(fn: (r) => %v )", strings.Trim(fields, " ")) } fluxQuery := fmt.Sprintf("%v \n %v \n %v \n %v \n %v", fromTemplate, rangeTemplate, measurementTemplate, sensorIdsTemplate, fieldsTemplate) return fluxQuery } func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, error) { queryAPI := storage.client.QueryAPI(storage.config.Organization) result, err := queryAPI.Query(context.Background(), query) if err != nil { return nil, err } var queryResults []*WeatherData for result.Next() { if result.Err() != nil { return nil, result.Err() } timestamp := result.Record().Time() sensorId, err := uuid.Parse(result.Record().ValueByKey("sensorId").(string)) if err != nil { return nil, err } data, contained := containsWeatherData(queryResults, sensorId, timestamp) data.Values[SensorValueType(result.Record().Field())] = result.Record().Value().(float64) if !contained { data.SensorId = sensorId data.TimeStamp = timestamp queryResults = append(queryResults, data) } } //if some attibutes missed in a few datapoints they are not ordered by time //influx query e.g. first all humidity, than pressure and temperature at last. if there are some datapoints with only pressore and/or temperature they are the last inserted in the array sort.Slice(queryResults, func(p, q int) bool { return queryResults[p].TimeStamp.Before(queryResults[q].TimeStamp) }) return queryResults, nil } func containsWeatherData(weatherData []*WeatherData, sensorId uuid.UUID, timestamp time.Time) (*WeatherData, bool) { for _, val := range weatherData { if val.SensorId == sensorId && val.TimeStamp == timestamp { return val, true } } var newData WeatherData newData.Values = make(map[SensorValueType]float64) return &newData, false } //Close InfluxDB connection func (storage *influxStorage) Close() error { storage.client.Close() return nil }