imporved mqtt message handling

This commit is contained in:
Joel Schmid 2021-04-30 19:33:58 +02:00
parent 059340a058
commit 084af78841
4 changed files with 63 additions and 58 deletions

View file

@ -32,8 +32,7 @@ MQTT_HOST | localhost:1883 | Hostadresse MQTT-Broker
MQTT_TOPIC | sensor/# | MQTT-Topic, in welchem nach Wetterdaten geschaut wird
MQTT_USER | mqtt | Username für MQTT
MQTT_PASS | mqtt | Passwort für MQTT
MQTT_PUBLISH_INTERVALL | 2500 | Intervall, nachdem über MQTT empfangene Wetterdaten in die DB geschrieben werden (in Millisekunden)
MQTT_MIN_DIST_LAST_VALUE | 250 | Zeit, die Wetterdaten mindestens zurückgehalten werden, bevor diese in die DB geschrieben werden -> Innerhalb dieser Zeitspanne kann ein Wetterdatensatz noch durch andere Werte ergänzt werden(in Millisekunden)
MQTT_PUBLISH_DELAY | 1000 | Innerhalb dieser Zeitspanne wird ein Wetterdatensatz noch durch weiter eintreffende Werte ergänzt. Danach wird der Datensatz veröffentlicht (in Millisekunden)
MQTT_ANONYMOUS | false | Anonyme Anmeldung am MQTT-Broker verwenden (ohne Username und Passwort)
ALLOW_UNREGISTERED_SENSORS | false | Wetterdaten nicht registrierter Sensoren erlauben

View file

@ -26,8 +26,7 @@ type MqttConfig struct {
Topic string
Username string
Password string
PublishInterval time.Duration
MinDistToLastValue time.Duration
PublishDelay time.Duration
AllowAnonymousAuthentication bool
}
@ -55,8 +54,7 @@ var MqttConfiguration = MqttConfig{
Topic: getEnv("MQTT_TOPIC", "sensor/#"),
Username: getEnv("MQTT_USER", "mqtt"),
Password: getEnv("MQTT_PASS", "mqtt"),
PublishInterval: getEnvDuration("MQTT_PUBLISH_INTERVALL", time.Millisecond*2500),
MinDistToLastValue: getEnvDuration("MQTT_MIN_DIST_LAST_VALUE", time.Millisecond*250),
PublishDelay: getEnvDuration("MQTT_PUBLISH_DELAY", time.Second),
AllowAnonymousAuthentication: getEnvBool("MQTT_ANONYMOUS", false),
}

View file

@ -11,8 +11,7 @@ Set-Item -Path "Env:MQTT_HOST" -Value "localhost:1883"
Set-Item -Path "Env:MQTT_TOPIC" -Value "sensor/#"
Set-Item -Path "Env:MQTT_USER" -Value "mqtt"
Set-Item -Path "Env:MQTT_PASS" -Value "mqtt"
Set-Item -Path "Env:MQTT_PUBLISH_INTERVALL" -Value "2500"
Set-Item -Path "Env:MQTT_MIN_DIST_LAST_VALUE" -Value "250"
Set-Item -Path "Env:MQTT_PUBLISH_DELAY" -Value "1000"
Set-Item -Path "Env:MQTT_ANONYMOUS" -Value "false"
Set-Item -Path "Env:MONGO_HOST" -Value "localhost:27017"

View file

@ -4,6 +4,7 @@ import (
"log"
"regexp"
"strconv"
"sync"
"time"
"weather-data/config"
"weather-data/storage"
@ -16,11 +17,14 @@ var mqttTopicRegexPattern = "(^sensor)/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F
var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern)
var channelBufferSize = 10
type mqttWeatherSource struct {
config config.MqttConfig
mqttClient mqtt.Client
lastWeatherDataPoints []*storage.WeatherData
weatherSource WeatherSourceBase
config config.MqttConfig
mqttClient mqtt.Client
weatherSource WeatherSourceBase
activeSensorMeasurements map[uuid.UUID](chan map[storage.SensorValueType]float64)
sensorMutex sync.RWMutex
}
//Close mqtt client
@ -37,7 +41,7 @@ func NewMqttSource(cfg config.MqttConfig) (*mqttWeatherSource, error) {
//mqtt
opts.SetKeepAlive(60 * time.Second)
opts.SetDefaultPublishHandler(source.mqttMessageHandler())
opts.SetDefaultPublishHandler(source.mqttMessageHandler)
opts.SetPingTimeout(1 * time.Second)
if !cfg.AllowAnonymousAuthentication {
@ -55,70 +59,75 @@ func NewMqttSource(cfg config.MqttConfig) (*mqttWeatherSource, error) {
return nil, token.Error()
}
go source.publishDataValues()
source.activeSensorMeasurements = make(map[uuid.UUID]chan map[storage.SensorValueType]float64)
source.sensorMutex = sync.RWMutex{}
log.Print("successfully connected to mqtt-broker")
return source, nil
}
//mqttMessageHandler returns a function that handles incoming mqtt-messages
func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
func (source *mqttWeatherSource) mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
if !regexTopic.MatchString(msg.Topic()) {
return
}
return func(client mqtt.Client, msg mqtt.Message) {
if !regexTopic.MatchString(msg.Topic()) {
return
}
sensorId, err := uuid.Parse(regexTopic.FindStringSubmatch(msg.Topic())[2])
if err != nil {
return
}
sensorId, err := uuid.Parse(regexTopic.FindStringSubmatch(msg.Topic())[2])
if err != nil {
return
}
value, err := strconv.ParseFloat(string(msg.Payload()), 64)
if err != nil {
return
}
lastWeatherData, found := source.getUnwrittenDatapoints(sensorId)
sensorValueType := storage.SensorValueType(regexTopic.FindStringSubmatch(msg.Topic())[3])
if !found {
lastWeatherData = storage.NewWeatherData()
lastWeatherData.SensorId = sensorId
source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData)
}
dataValue := map[storage.SensorValueType]float64{
sensorValueType: value,
}
value, err := strconv.ParseFloat(string(msg.Payload()), 64)
if err != nil {
return
}
source.sensorMutex.RLock()
dataChannel, exists := source.activeSensorMeasurements[sensorId]
if !exists {
dataChannel = make(chan map[storage.SensorValueType]float64, channelBufferSize)
}
dataChannel <- dataValue
source.sensorMutex.RUnlock()
sensorValueType := storage.SensorValueType(regexTopic.FindStringSubmatch(msg.Topic())[3])
lastWeatherData.Values[sensorValueType] = value
lastWeatherData.TimeStamp = time.Now()
if !exists {
go source.publishSensorMeasurement(sensorId, dataChannel)
go source.cleanupSensorMeasurement(sensorId, dataChannel)
source.sensorMutex.Lock()
source.activeSensorMeasurements[sensorId] = dataChannel
source.sensorMutex.Unlock()
}
}
func (source *mqttWeatherSource) publishDataValues() {
for {
for len(source.lastWeatherDataPoints) != 0 {
current := *source.lastWeatherDataPoints[0]
diff := time.Since(current.TimeStamp)
if diff >= source.config.MinDistToLastValue {
if err := source.newWeatherData(current); err != nil {
log.Fatal(err)
//if error than put the dataPoint to the end of the slice and try again later
dataPoint := source.lastWeatherDataPoints[0]
source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, dataPoint)
}
source.lastWeatherDataPoints = source.lastWeatherDataPoints[1:]
}
}
time.Sleep(source.config.PublishInterval)
}
func (source *mqttWeatherSource) cleanupSensorMeasurement(sensorId uuid.UUID, channel chan<- map[storage.SensorValueType]float64) {
time.Sleep(source.config.PublishDelay)
source.sensorMutex.Lock()
delete(source.activeSensorMeasurements, sensorId)
source.sensorMutex.Unlock()
close(channel)
}
func (source *mqttWeatherSource) getUnwrittenDatapoints(sensorId uuid.UUID) (*storage.WeatherData, bool) {
for _, data := range source.lastWeatherDataPoints {
if data.SensorId == sensorId {
return data, true
func (source *mqttWeatherSource) publishSensorMeasurement(sensorId uuid.UUID, channel <-chan map[storage.SensorValueType]float64) {
weatherData := storage.NewWeatherData()
weatherData.TimeStamp = time.Now()
weatherData.SensorId = sensorId
for values := range channel {
for k, v := range values {
weatherData.Values[k] = v
}
}
return nil, false
source.newWeatherData(*weatherData)
}
//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data