diff --git a/api/rest-api.go b/api/rest-api.go index 1ec6a48..d7012b9 100644 --- a/api/rest-api.go +++ b/api/rest-api.go @@ -57,7 +57,7 @@ func (api *weatherRestApi) getData(w http.ResponseWriter, r *http.Request) { } func (api *weatherRestApi) randomWeatherHandler(w http.ResponseWriter, r *http.Request) { - datapoint := storage.NewRandomWeatherData("swablab") + datapoint := storage.NewRandomWeatherData(uuid.Nil) w.Header().Add("content-type", "application/json") json.NewEncoder(w).Encode(datapoint) @@ -66,7 +66,7 @@ func (api *weatherRestApi) randomWeatherHandler(w http.ResponseWriter, r *http.R func (api *weatherRestApi) randomWeatherListHandler(w http.ResponseWriter, r *http.Request) { var datapoints = make([]storage.WeatherData, 0) for i := 0; i < 10; i++ { - datapoints = append(datapoints, storage.NewRandomWeatherData("swablab")) + datapoints = append(datapoints, storage.NewRandomWeatherData(uuid.Nil)) } w.Header().Add("content-type", "application/json") diff --git a/config/config.go b/config/config.go index 64ebf8a..aa5ab69 100644 --- a/config/config.go +++ b/config/config.go @@ -41,10 +41,6 @@ func GetMqttTopic() string { return getVariableWithDefault("WEATHER-API-MQTT_TOPIC", mqttTopic) } -func GetMqttLocation() string { - return getVariableWithDefault("WEATHER-API-MQTT_LOCATION", defaultLocation) -} - //helper func getVariableWithDefault(variableKey, defaultValue string) string { variable := os.Getenv(variableKey) diff --git a/main.go b/main.go index 27f2a85..0880989 100644 --- a/main.go +++ b/main.go @@ -31,8 +31,7 @@ func main() { var weatherSource weathersource.WeatherSource weatherSource, err = weathersource.NewMqttSource( config.GetMqttUrl(), - config.GetMqttTopic(), - config.GetMqttLocation()) + config.GetMqttTopic()) if err != nil { os.Exit(1) diff --git a/run_default.ps1 b/run_default.ps1 index 866a815..6ef9ae9 100644 --- a/run_default.ps1 +++ b/run_default.ps1 @@ -9,7 +9,6 @@ Set-Item -Path "Env:WEATHER-API-INFLUX_BUCKET" -Value "default-bucket" Set-Item -Path "Env:WEATHER-API-MQTT_URL" -Value "tcp://default-address.com:1883" Set-Item -Path "Env:WEATHER-API-MQTT_TOPIC" -Value "sensor/#" -Set-Item -Path "Env:WEATHER-API-MQTT_LOCATION" -Value "default-location" #start application Start-Process "main.exe" -Wait -NoNewWindow diff --git a/storage/influxdb-storage.go b/storage/influxdb-storage.go index 54f3144..8b670f0 100644 --- a/storage/influxdb-storage.go +++ b/storage/influxdb-storage.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/google/uuid" influxdb2 "github.com/influxdata/influxdb-client-go/v2" ) @@ -33,7 +34,7 @@ func NewInfluxStorage(token, bucket, organization, url string) (*influxStorage, //Save WeatherData to InfluxDB func (storage *influxStorage) Save(data WeatherData) error { tags := map[string]string{ - "location": data.Location} + "sensorId": data.SensorId.String()} fields := map[string]interface{}{ "temperature": data.Temperature, @@ -73,12 +74,17 @@ func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, er for result.Next() { if result.Err() != nil { + return nil, result.Err() + } + + timestamp := result.Record().Time() + sensorId, err := uuid.Parse(result.Record().ValueByKey("sensorId").(string)) + + if err != nil { return nil, err } - location := result.Record().ValueByKey("location").(string) - timestamp := result.Record().Time() - data, contained := containsWeatherData(queryResults, location, timestamp) + data, contained := containsWeatherData(queryResults, sensorId, timestamp) if result.Record().Field() == "temperature" { data.Temperature = result.Record().Value().(float64) @@ -94,7 +100,7 @@ func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, er } if !contained { - data.Location = location + data.SensorId = sensorId data.TimeStamp = timestamp queryResults = append(queryResults, data) } @@ -103,9 +109,9 @@ func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, er return queryResults, nil } -func containsWeatherData(weatherData []*WeatherData, location string, timestamp time.Time) (*WeatherData, bool) { +func containsWeatherData(weatherData []*WeatherData, sensorId uuid.UUID, timestamp time.Time) (*WeatherData, bool) { for _, val := range weatherData { - if val.Location == location && val.TimeStamp == timestamp { + if val.SensorId == sensorId && val.TimeStamp == timestamp { return val, true } } diff --git a/storage/weather-data.go b/storage/weather-data.go index 8b1d2c5..f5e896f 100644 --- a/storage/weather-data.go +++ b/storage/weather-data.go @@ -3,6 +3,8 @@ package storage import ( "math/rand" "time" + + "github.com/google/uuid" ) //WeatherStorage interface for different storage-implementations of weather data @@ -18,19 +20,19 @@ type WeatherData struct { Pressure float64 `json:"airPressure"` Temperature float64 `json:"temperature"` CO2Level float64 `json:"co2level"` - Location string `json:"location"` + SensorId uuid.UUID `json:"SensorId"` TimeStamp time.Time `json:"timestamp"` } //NewRandomWeatherData creates random WeatherData with given Location -func NewRandomWeatherData(location string) WeatherData { +func NewRandomWeatherData(sensorId uuid.UUID) WeatherData { rand.Seed(time.Now().UnixNano()) var data WeatherData data.Humidity = rand.Float64() * 100 data.Pressure = rand.Float64()*80 + 960 data.Temperature = rand.Float64()*40 - 5 data.CO2Level = rand.Float64()*50 + 375 - data.Location = location + data.SensorId = sensorId data.TimeStamp = time.Now() return data } diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go index e8d5f47..a6bf798 100644 --- a/weathersource/mqtt-source.go +++ b/weathersource/mqtt-source.go @@ -1,20 +1,28 @@ package weathersource import ( + "regexp" "strconv" "strings" "time" "weather-data/storage" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/google/uuid" ) +var uuidRegexPattern = "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}" +var mqttTopicRegexPattern = "^sensor/[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}/(temp|pressure|humidity|co2level)$" + +var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern) +var regexUuid *regexp.Regexp = regexp.MustCompile(uuidRegexPattern) + type mqttWeatherSource struct { - url string - topic string - mqttClient mqtt.Client - lastData storage.WeatherData - weatherSource WeatherSourceBase + url string + topic string + mqttClient mqtt.Client + lastWeatherDataPoints []*storage.WeatherData + weatherSource WeatherSourceBase } //Close mqtt client @@ -23,7 +31,7 @@ func (source *mqttWeatherSource) Close() { } //NewMqttSource Factory function for mqttWeatherSource -func NewMqttSource(url, topic, defaultLocation string) (*mqttWeatherSource, error) { +func NewMqttSource(url, topic string) (*mqttWeatherSource, error) { source := new(mqttWeatherSource) source.url = url @@ -35,7 +43,6 @@ func NewMqttSource(url, topic, defaultLocation string) (*mqttWeatherSource, erro opts.SetPingTimeout(1 * time.Second) source.mqttClient = mqtt.NewClient(opts) - source.lastData.Location = defaultLocation if token := source.mqttClient.Connect(); token.Wait() && token.Error() != nil { return nil, token.Error() @@ -52,31 +59,55 @@ func NewMqttSource(url, topic, defaultLocation string) (*mqttWeatherSource, erro func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { return func(client mqtt.Client, msg mqtt.Message) { + if !regexTopic.MatchString(msg.Topic()) { + return + } - diff := time.Now().Sub(source.lastData.TimeStamp) + sensorId, err := uuid.Parse(regexUuid.FindAllString(msg.Topic(), 1)[0]) + if err != nil { + return + } + lastWeatherData, found := source.getLastWeatherData(sensorId) + + if !found { + lastWeatherData = new(storage.WeatherData) + lastWeatherData.SensorId = sensorId + source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData) + } + + diff := time.Now().Sub(lastWeatherData.TimeStamp) if diff >= time.Second && diff < time.Hour*6 { - source.newWeatherData(source.lastData) + source.newWeatherData(*lastWeatherData) } if strings.HasSuffix(msg.Topic(), "pressure") { - source.lastData.Pressure, _ = strconv.ParseFloat(string(msg.Payload()), 64) - source.lastData.TimeStamp = time.Now() + lastWeatherData.Pressure, _ = strconv.ParseFloat(string(msg.Payload()), 64) + lastWeatherData.TimeStamp = time.Now() } if strings.HasSuffix(msg.Topic(), "temp") { - source.lastData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64) - source.lastData.TimeStamp = time.Now() + lastWeatherData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64) + lastWeatherData.TimeStamp = time.Now() } if strings.HasSuffix(msg.Topic(), "humidity") { - source.lastData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64) - source.lastData.TimeStamp = time.Now() + lastWeatherData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64) + lastWeatherData.TimeStamp = time.Now() } if strings.HasSuffix(msg.Topic(), "co2level") { - source.lastData.CO2Level, _ = strconv.ParseFloat(string(msg.Payload()), 64) - source.lastData.TimeStamp = time.Now() + lastWeatherData.CO2Level, _ = strconv.ParseFloat(string(msg.Payload()), 64) + lastWeatherData.TimeStamp = time.Now() } } } +func (source *mqttWeatherSource) getLastWeatherData(sensorId uuid.UUID) (*storage.WeatherData, bool) { + for _, data := range source.lastWeatherDataPoints { + if data.SensorId == sensorId { + return data, true + } + } + return nil, false +} + //AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDataCallbackFunc) { source.weatherSource.AddNewWeatherDataCallback(callback)