From 641c05afc6431bef2ffcd338099ba4392417558b Mon Sep 17 00:00:00 2001 From: Joel Schmid Date: Sat, 3 Apr 2021 23:32:37 +0200 Subject: [PATCH] refactoring & generic weatherdata --- api/rest-api.go | 4 ++ storage/influxdb-storage.go | 50 +++++---------- storage/weather-data.go | 114 +++++++++++++++++------------------ weathersource/mqtt-source.go | 33 +++++----- 4 files changed, 91 insertions(+), 110 deletions(-) diff --git a/api/rest-api.go b/api/rest-api.go index 53596e9..2870267 100644 --- a/api/rest-api.go +++ b/api/rest-api.go @@ -107,6 +107,8 @@ func (api *weatherRestApi) addDataHandler(w http.ResponseWriter, r *http.Request return } + fmt.Println(r.Body) + var data storage.WeatherData err := json.NewDecoder(r.Body).Decode(&data) if err != nil { @@ -114,6 +116,8 @@ func (api *weatherRestApi) addDataHandler(w http.ResponseWriter, r *http.Request return } + fmt.Println(data) + err = api.addNewWeatherData(data) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/storage/influxdb-storage.go b/storage/influxdb-storage.go index b98b9b0..3a9a3e6 100644 --- a/storage/influxdb-storage.go +++ b/storage/influxdb-storage.go @@ -33,11 +33,11 @@ func (storage *influxStorage) Save(data WeatherData) error { tags := map[string]string{ "sensorId": data.SensorId.String()} - fields := map[string]interface{}{ - "temperature": data.Temperature, - "humidity": data.Humidity, - "pressure": data.Pressure, - "co2level": data.CO2Level} + fields := make(map[string]interface{}) + + for k, v := range data.Values { + fields[string(k)] = v + } datapoint := influxdb2.NewPoint(storage.measurement, tags, @@ -46,7 +46,6 @@ func (storage *influxStorage) Save(data WeatherData) error { writeAPI := storage.client.WriteAPI(storage.config.Organization, storage.config.Bucket) writeAPI.WritePoint(datapoint) - log.Print("Written weather data point to influx-db") return nil } @@ -61,24 +60,11 @@ func (storage *influxStorage) createFluxQuery(query *WeatherQuery) string { fields := "" concat := "" - if query.Temperature { - fields = fmt.Sprintf("%v %v r._field == \"temperature\"", fields, concat) - concat = "or" - } - - if query.Humidity { - fields = fmt.Sprintf("%v %v r._field == \"humidity\"", fields, concat) - concat = "or" - } - - if query.Pressure { - fields = fmt.Sprintf("%v %v r._field == \"pressure\"", fields, concat) - concat = "or" - } - - if query.Co2Level { - fields = fmt.Sprintf("%v %v r._field == \"co2level\"", fields, concat) - concat = "or" + for _, sensorValueType := range GetSensorValueTypes() { + if query.Values[sensorValueType] { + fields = fmt.Sprintf("%v %v r._field == \"%v\"", fields, concat, string(sensorValueType)) + concat = "or" + } } fields = fmt.Sprintf(" and ( %v )", fields) @@ -112,17 +98,10 @@ func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, er data, contained := containsWeatherData(queryResults, sensorId, timestamp) - if result.Record().Field() == "temperature" { - data.Temperature = result.Record().Value().(float64) - } - if result.Record().Field() == "pressure" { - data.Pressure = result.Record().Value().(float64) - } - if result.Record().Field() == "humidity" { - data.Humidity = result.Record().Value().(float64) - } - if result.Record().Field() == "co2level" { - data.CO2Level = result.Record().Value().(float64) + for _, sensorValueType := range GetSensorValueTypes() { + if result.Record().Field() == string(sensorValueType) { + data.Values[sensorValueType] = result.Record().Value().(float64) + } } if !contained { @@ -142,6 +121,7 @@ func containsWeatherData(weatherData []*WeatherData, sensorId uuid.UUID, timesta } } var newData WeatherData + newData.Values = make(map[SensorValueType]float64) return &newData, false } diff --git a/storage/weather-data.go b/storage/weather-data.go index 21ec527..60bdb6a 100644 --- a/storage/weather-data.go +++ b/storage/weather-data.go @@ -10,6 +10,19 @@ import ( "github.com/google/uuid" ) +type SensorValueType string + +const ( + Temperature SensorValueType = "temperature" + Pressure SensorValueType = "pressure" + Humidity SensorValueType = "humidity" + Co2Level SensorValueType = "co2level" +) + +func GetSensorValueTypes() []SensorValueType { + return []SensorValueType{Temperature, Pressure, Humidity, Co2Level} +} + //WeatherStorage interface for different storage-implementations of weather data type WeatherStorage interface { Save(WeatherData) error @@ -27,38 +40,37 @@ type SensorRegistry interface { //WeatherData type type WeatherData struct { - Humidity float64 `json:"humidity"` - Pressure float64 `json:"pressure"` - Temperature float64 `json:"temperature"` - CO2Level float64 `json:"co2level"` - SensorId uuid.UUID `json:"sensorId"` - TimeStamp time.Time `json:"timestamp"` + Values map[SensorValueType]float64 + SensorId uuid.UUID `json:"sensorId"` + TimeStamp time.Time `json:"timestamp"` } -func (data *WeatherData) GetQueriedValues(query *WeatherQuery) map[string]string { - result := map[string]string{ +func (data *WeatherData) OnlyQueriedValues(query *WeatherQuery) *WeatherData { + for _, sensorValueType := range GetSensorValueTypes() { + if !query.Values[sensorValueType] { + delete(data.Values, sensorValueType) + } + } + return data +} + +func (data *WeatherData) ToStringMap() map[string]string { + mappedData := map[string]string{ "sensorId": data.SensorId.String(), - "timestamp": data.TimeStamp.String(), + "timeStamp": data.TimeStamp.String(), } - if query.Temperature { - result["temperature"] = strconv.FormatFloat(data.Temperature, 'f', -1, 32) + + for sensorValueType, value := range data.Values { + mappedData[string(sensorValueType)] = strconv.FormatFloat(value, 'f', -1, 64) } - if query.Pressure { - result["pressure"] = strconv.FormatFloat(data.Pressure, 'f', -1, 32) - } - if query.Co2Level { - result["co2level"] = strconv.FormatFloat(data.CO2Level, 'f', -1, 32) - } - if query.Humidity { - result["humidity"] = strconv.FormatFloat(data.Humidity, 'f', -1, 32) - } - return result + + return mappedData } func GetOnlyQueriedFields(dataPoints []*WeatherData, query *WeatherQuery) []map[string]string { var result []map[string]string for _, data := range dataPoints { - result = append(result, data.GetQueriedValues(query)) + result = append(result, data.OnlyQueriedValues(query).ToStringMap()) } return result } @@ -73,23 +85,20 @@ type WeatherSensor struct { } type WeatherQuery struct { - Start time.Time - End time.Time - SensorId uuid.UUID - Temperature bool - Humidity bool - Pressure bool - Co2Level bool + Start time.Time + End time.Time + SensorId uuid.UUID + Values map[SensorValueType]bool } -func (data *WeatherQuery) Init() { - data.Start = time.Now().Add(-1 * time.Hour * 24 * 14) - data.End = time.Now() - data.SensorId = uuid.Nil - data.Temperature = true - data.Humidity = true - data.Pressure = true - data.Co2Level = true +func (query *WeatherQuery) Init() { + query.Start = time.Now().Add(-1 * time.Hour * 24 * 14) + query.End = time.Now() + query.SensorId = uuid.Nil + query.Values = make(map[SensorValueType]bool) + for _, sensorValueType := range GetSensorValueTypes() { + query.Values[sensorValueType] = true + } } func ParseFromUrlQuery(query url.Values) (*WeatherQuery, error) { @@ -98,10 +107,6 @@ func ParseFromUrlQuery(query url.Values) (*WeatherQuery, error) { start := query.Get("start") end := query.Get("end") - temperature := query.Get("temperature") - humidity := query.Get("humidity") - pressure := query.Get("pressure") - co2level := query.Get("co2level") if len(start) != 0 { if tval, err := time.Parse(time.RFC3339, start); err == nil { @@ -121,20 +126,11 @@ func ParseFromUrlQuery(query url.Values) (*WeatherQuery, error) { } } - if bval, err := strconv.ParseBool(temperature); err == nil { - result.Temperature = bval - } - - if bval, err := strconv.ParseBool(humidity); err == nil { - result.Humidity = bval - } - - if bval, err := strconv.ParseBool(pressure); err == nil { - result.Pressure = bval - } - - if bval, err := strconv.ParseBool(co2level); err == nil { - result.Co2Level = bval + for _, sensorValueType := range GetSensorValueTypes() { + queryParam := query.Get(string(sensorValueType)) + if bval, err := strconv.ParseBool(queryParam); err == nil { + result.Values[sensorValueType] = bval + } } return result, nil @@ -144,10 +140,10 @@ func ParseFromUrlQuery(query url.Values) (*WeatherQuery, error) { func NewRandomWeatherData(sensorId uuid.UUID) WeatherData { rand.Seed(time.Now().UnixNano()) var data WeatherData - data.Humidity = rand.Float64() * 100 - data.Pressure = rand.Float64()*80 + 960 - data.Temperature = rand.Float64()*40 - 5 - data.CO2Level = rand.Float64()*50 + 375 + data.Values[Humidity] = rand.Float64() * 100 + data.Values[Pressure] = rand.Float64()*80 + 960 + data.Values[Temperature] = rand.Float64()*40 - 5 + data.Values[Co2Level] = rand.Float64()*50 + 375 data.SensorId = sensorId data.TimeStamp = time.Now() return data diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go index d669742..79ef072 100644 --- a/weathersource/mqtt-source.go +++ b/weathersource/mqtt-source.go @@ -4,7 +4,6 @@ import ( "log" "regexp" "strconv" - "strings" "time" "weather-data/config" "weather-data/storage" @@ -13,7 +12,7 @@ import ( "github.com/google/uuid" ) -var mqttTopicRegexPattern = "(^sensor/)([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})(/(temp|pressure|humidity|co2level)$)" +var mqttTopicRegexPattern = "(^sensor)/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})/(.*)" var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern) @@ -79,26 +78,28 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { if !found { lastWeatherData = new(storage.WeatherData) + lastWeatherData.Values = make(map[storage.SensorValueType]float64) lastWeatherData.SensorId = sensorId source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData) } - if strings.HasSuffix(msg.Topic(), "pressure") { - lastWeatherData.Pressure, _ = strconv.ParseFloat(string(msg.Payload()), 64) - lastWeatherData.TimeStamp = time.Now() + value, err := strconv.ParseFloat(string(msg.Payload()), 64) + if err != nil { + return } - if strings.HasSuffix(msg.Topic(), "temp") { - lastWeatherData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64) - lastWeatherData.TimeStamp = time.Now() - } - if strings.HasSuffix(msg.Topic(), "humidity") { - lastWeatherData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64) - lastWeatherData.TimeStamp = time.Now() - } - if strings.HasSuffix(msg.Topic(), "co2level") { - lastWeatherData.CO2Level, _ = strconv.ParseFloat(string(msg.Payload()), 64) - lastWeatherData.TimeStamp = time.Now() + + sensorValueType := storage.SensorValueType(regexTopic.FindStringSubmatch(msg.Topic())[3]) + lastWeatherData.Values[sensorValueType] = value + lastWeatherData.TimeStamp = time.Now() + + /* only use predefined sensorValueTypes + for _, sensorValueType := range storage.GetSensorValueTypes() { + if strings.HasSuffix(msg.Topic(), string(sensorValueType)) { + lastWeatherData.Values[sensorValueType], _ = strconv.ParseFloat(string(msg.Payload()), 64) + lastWeatherData.TimeStamp = time.Now() + } } + */ } }