From 084af788416ae2bd743c029f67944262102683e9 Mon Sep 17 00:00:00 2001 From: Joel Schmid Date: Fri, 30 Apr 2021 19:33:58 +0200 Subject: [PATCH] imporved mqtt message handling --- README.md | 3 +- config/config.go | 6 +- run_default.ps1 | 3 +- weathersource/mqtt-source.go | 109 +++++++++++++++++++---------------- 4 files changed, 63 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index c264abc..733e672 100644 --- a/README.md +++ b/README.md @@ -32,8 +32,7 @@ MQTT_HOST | localhost:1883 | Hostadresse MQTT-Broker MQTT_TOPIC | sensor/# | MQTT-Topic, in welchem nach Wetterdaten geschaut wird MQTT_USER | mqtt | Username für MQTT MQTT_PASS | mqtt | Passwort für MQTT -MQTT_PUBLISH_INTERVALL | 2500 | Intervall, nachdem über MQTT empfangene Wetterdaten in die DB geschrieben werden (in Millisekunden) -MQTT_MIN_DIST_LAST_VALUE | 250 | Zeit, die Wetterdaten mindestens zurückgehalten werden, bevor diese in die DB geschrieben werden -> Innerhalb dieser Zeitspanne kann ein Wetterdatensatz noch durch andere Werte ergänzt werden(in Millisekunden) +MQTT_PUBLISH_DELAY | 1000 | Innerhalb dieser Zeitspanne wird ein Wetterdatensatz noch durch weiter eintreffende Werte ergänzt. Danach wird der Datensatz veröffentlicht (in Millisekunden) MQTT_ANONYMOUS | false | Anonyme Anmeldung am MQTT-Broker verwenden (ohne Username und Passwort) ALLOW_UNREGISTERED_SENSORS | false | Wetterdaten nicht registrierter Sensoren erlauben diff --git a/config/config.go b/config/config.go index a4b6186..68803b4 100644 --- a/config/config.go +++ b/config/config.go @@ -26,8 +26,7 @@ type MqttConfig struct { Topic string Username string Password string - PublishInterval time.Duration - MinDistToLastValue time.Duration + PublishDelay time.Duration AllowAnonymousAuthentication bool } @@ -55,8 +54,7 @@ var MqttConfiguration = MqttConfig{ Topic: getEnv("MQTT_TOPIC", "sensor/#"), Username: getEnv("MQTT_USER", "mqtt"), Password: getEnv("MQTT_PASS", "mqtt"), - PublishInterval: getEnvDuration("MQTT_PUBLISH_INTERVALL", time.Millisecond*2500), - MinDistToLastValue: getEnvDuration("MQTT_MIN_DIST_LAST_VALUE", time.Millisecond*250), + PublishDelay: getEnvDuration("MQTT_PUBLISH_DELAY", time.Second), AllowAnonymousAuthentication: getEnvBool("MQTT_ANONYMOUS", false), } diff --git a/run_default.ps1 b/run_default.ps1 index 430f793..d734f7c 100644 --- a/run_default.ps1 +++ b/run_default.ps1 @@ -11,8 +11,7 @@ Set-Item -Path "Env:MQTT_HOST" -Value "localhost:1883" Set-Item -Path "Env:MQTT_TOPIC" -Value "sensor/#" Set-Item -Path "Env:MQTT_USER" -Value "mqtt" Set-Item -Path "Env:MQTT_PASS" -Value "mqtt" -Set-Item -Path "Env:MQTT_PUBLISH_INTERVALL" -Value "2500" -Set-Item -Path "Env:MQTT_MIN_DIST_LAST_VALUE" -Value "250" +Set-Item -Path "Env:MQTT_PUBLISH_DELAY" -Value "1000" Set-Item -Path "Env:MQTT_ANONYMOUS" -Value "false" Set-Item -Path "Env:MONGO_HOST" -Value "localhost:27017" diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go index e941598..4e9a7cc 100644 --- a/weathersource/mqtt-source.go +++ b/weathersource/mqtt-source.go @@ -4,6 +4,7 @@ import ( "log" "regexp" "strconv" + "sync" "time" "weather-data/config" "weather-data/storage" @@ -16,11 +17,14 @@ var mqttTopicRegexPattern = "(^sensor)/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern) +var channelBufferSize = 10 + type mqttWeatherSource struct { - config config.MqttConfig - mqttClient mqtt.Client - lastWeatherDataPoints []*storage.WeatherData - weatherSource WeatherSourceBase + config config.MqttConfig + mqttClient mqtt.Client + weatherSource WeatherSourceBase + activeSensorMeasurements map[uuid.UUID](chan map[storage.SensorValueType]float64) + sensorMutex sync.RWMutex } //Close mqtt client @@ -37,7 +41,7 @@ func NewMqttSource(cfg config.MqttConfig) (*mqttWeatherSource, error) { //mqtt opts.SetKeepAlive(60 * time.Second) - opts.SetDefaultPublishHandler(source.mqttMessageHandler()) + opts.SetDefaultPublishHandler(source.mqttMessageHandler) opts.SetPingTimeout(1 * time.Second) if !cfg.AllowAnonymousAuthentication { @@ -55,70 +59,75 @@ func NewMqttSource(cfg config.MqttConfig) (*mqttWeatherSource, error) { return nil, token.Error() } - go source.publishDataValues() + source.activeSensorMeasurements = make(map[uuid.UUID]chan map[storage.SensorValueType]float64) + source.sensorMutex = sync.RWMutex{} log.Print("successfully connected to mqtt-broker") return source, nil } //mqttMessageHandler returns a function that handles incoming mqtt-messages -func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { +func (source *mqttWeatherSource) mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { + if !regexTopic.MatchString(msg.Topic()) { + return + } - return func(client mqtt.Client, msg mqtt.Message) { - if !regexTopic.MatchString(msg.Topic()) { - return - } + sensorId, err := uuid.Parse(regexTopic.FindStringSubmatch(msg.Topic())[2]) + if err != nil { + return + } - sensorId, err := uuid.Parse(regexTopic.FindStringSubmatch(msg.Topic())[2]) - if err != nil { - return - } + value, err := strconv.ParseFloat(string(msg.Payload()), 64) + if err != nil { + return + } - lastWeatherData, found := source.getUnwrittenDatapoints(sensorId) + sensorValueType := storage.SensorValueType(regexTopic.FindStringSubmatch(msg.Topic())[3]) - if !found { - lastWeatherData = storage.NewWeatherData() - lastWeatherData.SensorId = sensorId - source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData) - } + dataValue := map[storage.SensorValueType]float64{ + sensorValueType: value, + } - value, err := strconv.ParseFloat(string(msg.Payload()), 64) - if err != nil { - return - } + source.sensorMutex.RLock() + dataChannel, exists := source.activeSensorMeasurements[sensorId] + if !exists { + dataChannel = make(chan map[storage.SensorValueType]float64, channelBufferSize) + } + dataChannel <- dataValue + source.sensorMutex.RUnlock() - sensorValueType := storage.SensorValueType(regexTopic.FindStringSubmatch(msg.Topic())[3]) - lastWeatherData.Values[sensorValueType] = value - lastWeatherData.TimeStamp = time.Now() + if !exists { + go source.publishSensorMeasurement(sensorId, dataChannel) + go source.cleanupSensorMeasurement(sensorId, dataChannel) + + source.sensorMutex.Lock() + source.activeSensorMeasurements[sensorId] = dataChannel + source.sensorMutex.Unlock() } } -func (source *mqttWeatherSource) publishDataValues() { - for { - for len(source.lastWeatherDataPoints) != 0 { - current := *source.lastWeatherDataPoints[0] - diff := time.Since(current.TimeStamp) - if diff >= source.config.MinDistToLastValue { - if err := source.newWeatherData(current); err != nil { - log.Fatal(err) - //if error than put the dataPoint to the end of the slice and try again later - dataPoint := source.lastWeatherDataPoints[0] - source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, dataPoint) - } - source.lastWeatherDataPoints = source.lastWeatherDataPoints[1:] - } - } - time.Sleep(source.config.PublishInterval) - } +func (source *mqttWeatherSource) cleanupSensorMeasurement(sensorId uuid.UUID, channel chan<- map[storage.SensorValueType]float64) { + time.Sleep(source.config.PublishDelay) + + source.sensorMutex.Lock() + delete(source.activeSensorMeasurements, sensorId) + source.sensorMutex.Unlock() + + close(channel) } -func (source *mqttWeatherSource) getUnwrittenDatapoints(sensorId uuid.UUID) (*storage.WeatherData, bool) { - for _, data := range source.lastWeatherDataPoints { - if data.SensorId == sensorId { - return data, true +func (source *mqttWeatherSource) publishSensorMeasurement(sensorId uuid.UUID, channel <-chan map[storage.SensorValueType]float64) { + weatherData := storage.NewWeatherData() + weatherData.TimeStamp = time.Now() + weatherData.SensorId = sensorId + + for values := range channel { + for k, v := range values { + weatherData.Values[k] = v } } - return nil, false + + source.newWeatherData(*weatherData) } //AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data