new internal publish strategy for mqtt

This commit is contained in:
Joel Schmid 2021-03-20 18:01:11 +01:00
parent 395e4530af
commit 4f501344d5
3 changed files with 41 additions and 5 deletions

View file

@ -3,6 +3,7 @@ package config
import ( import (
"os" "os"
"strconv" "strconv"
"time"
) )
// const influx stuff // const influx stuff
@ -15,6 +16,8 @@ const influxURL = "https://influx.default-address.com"
const mqttURL = "tcp://default-address.com:1883" const mqttURL = "tcp://default-address.com:1883"
const mqttTopic = "sensor/#" const mqttTopic = "sensor/#"
const defaultLocation = "default-location" const defaultLocation = "default-location"
const mqttPublishInterval = time.Second
const mqttMinDistToLastValue = 250 * time.Millisecond
//other config stuff //other config stuff
const allowUnregisteredSensors = false const allowUnregisteredSensors = false
@ -45,6 +48,23 @@ func GetMqttTopic() string {
return getVariableWithDefault("WEATHER-API-MQTT_TOPIC", mqttTopic) return getVariableWithDefault("WEATHER-API-MQTT_TOPIC", mqttTopic)
} }
func MqttPublishInterval() time.Duration {
interval, err := strconv.ParseInt(os.Getenv("WEATHER-API-MQTT_PUBLISH_INTERVAL"), 10, 64)
if err != nil {
return mqttPublishInterval
}
return time.Millisecond * time.Duration(interval)
}
func MqttMinDistToLastValue() time.Duration {
interval, err := strconv.ParseInt(os.Getenv("WEATHER-API-MQTT_MIN_DIST_TO_LAST_VALUE"), 10, 64)
if err != nil {
return mqttMinDistToLastValue
}
return time.Millisecond * time.Duration(interval)
}
//common config
func AllowUnregisteredSensors() bool { func AllowUnregisteredSensors() bool {
allow, err := strconv.ParseBool(os.Getenv("WEATHER-API-ALLOW_UNREGISTERED_SENSORS")) allow, err := strconv.ParseBool(os.Getenv("WEATHER-API-ALLOW_UNREGISTERED_SENSORS"))
if err != nil { if err != nil {

View file

@ -9,6 +9,8 @@ Set-Item -Path "Env:WEATHER-API-INFLUX_BUCKET" -Value "default-bucket"
Set-Item -Path "Env:WEATHER-API-MQTT_URL" -Value "tcp://default-address.com:1883" Set-Item -Path "Env:WEATHER-API-MQTT_URL" -Value "tcp://default-address.com:1883"
Set-Item -Path "Env:WEATHER-API-MQTT_TOPIC" -Value "sensor/#" Set-Item -Path "Env:WEATHER-API-MQTT_TOPIC" -Value "sensor/#"
Set-Item -Path "Env:WEATHER-API-MQTT_PUBLISH_INTERVAL" -Value "2500"
Set-Item -Path "Env:WEATHER-API-MQTT_MIN_DIST_TO_LAST_VALUE" -Value "250"
Set-Item -Path "Env:WEATHER-API-ALLOW_UNREGISTERED_SENSORS" -Value "true" Set-Item -Path "Env:WEATHER-API-ALLOW_UNREGISTERED_SENSORS" -Value "true"

View file

@ -6,6 +6,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"weather-data/config"
"weather-data/storage" "weather-data/storage"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
@ -53,6 +54,8 @@ func NewMqttSource(url, topic string) (*mqttWeatherSource, error) {
return nil, token.Error() return nil, token.Error()
} }
go source.publishDataValues()
return source, nil return source, nil
} }
@ -77,11 +80,6 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData) source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData)
} }
diff := time.Now().Sub(lastWeatherData.TimeStamp)
if diff >= time.Second && diff < time.Hour*6 {
source.newWeatherData(*lastWeatherData)
}
if strings.HasSuffix(msg.Topic(), "pressure") { if strings.HasSuffix(msg.Topic(), "pressure") {
lastWeatherData.Pressure, _ = strconv.ParseFloat(string(msg.Payload()), 64) lastWeatherData.Pressure, _ = strconv.ParseFloat(string(msg.Payload()), 64)
lastWeatherData.TimeStamp = time.Now() lastWeatherData.TimeStamp = time.Now()
@ -101,6 +99,22 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
} }
} }
func (source *mqttWeatherSource) publishDataValues() {
for {
for len(source.lastWeatherDataPoints) != 0 {
current := *source.lastWeatherDataPoints[0]
diff := time.Now().Sub(current.TimeStamp)
if diff >= config.MqttMinDistToLastValue() {
source.newWeatherData(current)
source.lastWeatherDataPoints = source.lastWeatherDataPoints[1:]
}
}
time.Sleep(config.MqttPublishInterval())
}
}
func (source *mqttWeatherSource) getUnwrittenDatapoints(sensorId uuid.UUID) (*storage.WeatherData, bool) { func (source *mqttWeatherSource) getUnwrittenDatapoints(sensorId uuid.UUID) (*storage.WeatherData, bool) {
for _, data := range source.lastWeatherDataPoints { for _, data := range source.lastWeatherDataPoints {
if data.SensorId == sensorId { if data.SensorId == sensorId {