From 4f501344d581c4b7fa9fe168a6b77ede283d5787 Mon Sep 17 00:00:00 2001 From: Joel Schmid <joelschmidwitt@gmail.com> Date: Sat, 20 Mar 2021 18:01:11 +0100 Subject: [PATCH] new internal publish strategy for mqtt --- config/config.go | 20 ++++++++++++++++++++ run_default.ps1 | 2 ++ weathersource/mqtt-source.go | 24 +++++++++++++++++++----- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/config/config.go b/config/config.go index 78e39d9..31ae553 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "os" "strconv" + "time" ) // const influx stuff @@ -15,6 +16,8 @@ const influxURL = "https://influx.default-address.com" const mqttURL = "tcp://default-address.com:1883" const mqttTopic = "sensor/#" const defaultLocation = "default-location" +const mqttPublishInterval = time.Second +const mqttMinDistToLastValue = 250 * time.Millisecond //other config stuff const allowUnregisteredSensors = false @@ -45,6 +48,23 @@ func GetMqttTopic() string { return getVariableWithDefault("WEATHER-API-MQTT_TOPIC", mqttTopic) } +func MqttPublishInterval() time.Duration { + interval, err := strconv.ParseInt(os.Getenv("WEATHER-API-MQTT_PUBLISH_INTERVAL"), 10, 64) + if err != nil { + return mqttPublishInterval + } + return time.Millisecond * time.Duration(interval) +} + +func MqttMinDistToLastValue() time.Duration { + interval, err := strconv.ParseInt(os.Getenv("WEATHER-API-MQTT_MIN_DIST_TO_LAST_VALUE"), 10, 64) + if err != nil { + return mqttMinDistToLastValue + } + return time.Millisecond * time.Duration(interval) +} + +//common config func AllowUnregisteredSensors() bool { allow, err := strconv.ParseBool(os.Getenv("WEATHER-API-ALLOW_UNREGISTERED_SENSORS")) if err != nil { diff --git a/run_default.ps1 b/run_default.ps1 index 492013a..184314b 100644 --- a/run_default.ps1 +++ b/run_default.ps1 @@ -9,6 +9,8 @@ Set-Item -Path "Env:WEATHER-API-INFLUX_BUCKET" -Value "default-bucket" Set-Item -Path "Env:WEATHER-API-MQTT_URL" -Value "tcp://default-address.com:1883" Set-Item -Path "Env:WEATHER-API-MQTT_TOPIC" -Value "sensor/#" +Set-Item -Path "Env:WEATHER-API-MQTT_PUBLISH_INTERVAL" -Value "2500" +Set-Item -Path "Env:WEATHER-API-MQTT_MIN_DIST_TO_LAST_VALUE" -Value "250" Set-Item -Path "Env:WEATHER-API-ALLOW_UNREGISTERED_SENSORS" -Value "true" diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go index fe2b774..0c1f763 100644 --- a/weathersource/mqtt-source.go +++ b/weathersource/mqtt-source.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "time" + "weather-data/config" "weather-data/storage" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -53,6 +54,8 @@ func NewMqttSource(url, topic string) (*mqttWeatherSource, error) { return nil, token.Error() } + go source.publishDataValues() + return source, nil } @@ -77,11 +80,6 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData) } - diff := time.Now().Sub(lastWeatherData.TimeStamp) - if diff >= time.Second && diff < time.Hour*6 { - source.newWeatherData(*lastWeatherData) - } - if strings.HasSuffix(msg.Topic(), "pressure") { lastWeatherData.Pressure, _ = strconv.ParseFloat(string(msg.Payload()), 64) lastWeatherData.TimeStamp = time.Now() @@ -101,6 +99,22 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { } } +func (source *mqttWeatherSource) publishDataValues() { + for { + for len(source.lastWeatherDataPoints) != 0 { + current := *source.lastWeatherDataPoints[0] + diff := time.Now().Sub(current.TimeStamp) + if diff >= config.MqttMinDistToLastValue() { + source.newWeatherData(current) + source.lastWeatherDataPoints = source.lastWeatherDataPoints[1:] + } + + } + time.Sleep(config.MqttPublishInterval()) + } + +} + func (source *mqttWeatherSource) getUnwrittenDatapoints(sensorId uuid.UUID) (*storage.WeatherData, bool) { for _, data := range source.lastWeatherDataPoints { if data.SensorId == sensorId {