weather-api/weathersource/mqtt-source.go
2021-04-05 11:39:55 +02:00

131 lines
3.7 KiB
Go

package weathersource
import (
"log"
"regexp"
"strconv"
"time"
"weather-data/config"
"weather-data/storage"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
)
var mqttTopicRegexPattern = "(^sensor)/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})/(.*)"
var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern)
type mqttWeatherSource struct {
config config.MqttConfig
mqttClient mqtt.Client
lastWeatherDataPoints []*storage.WeatherData
weatherSource WeatherSourceBase
}
//Close mqtt client
func (source *mqttWeatherSource) Close() {
source.mqttClient.Disconnect(2)
}
//NewMqttSource Factory function for mqttWeatherSource with authentication
func NewMqttSource(cfg config.MqttConfig) (*mqttWeatherSource, error) {
source := new(mqttWeatherSource)
source.config = cfg
opts := mqtt.NewClientOptions().AddBroker(cfg.Host)
//mqtt
opts.SetKeepAlive(60 * time.Second)
opts.SetDefaultPublishHandler(source.mqttMessageHandler())
opts.SetPingTimeout(1 * time.Second)
if !cfg.AllowAnonymousAuthentication {
opts.Username = cfg.Username
opts.Password = cfg.Password
}
source.mqttClient = mqtt.NewClient(opts)
if token := source.mqttClient.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
if token := source.mqttClient.Subscribe(cfg.Topic, 2, nil); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
go source.publishDataValues()
log.Print("successfully connected to mqtt-broker")
return source, nil
}
//mqttMessageHandler returns a function that handles incoming mqtt-messages
func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
return func(client mqtt.Client, msg mqtt.Message) {
if !regexTopic.MatchString(msg.Topic()) {
return
}
sensorId, err := uuid.Parse(regexTopic.FindStringSubmatch(msg.Topic())[2])
if err != nil {
return
}
lastWeatherData, found := source.getUnwrittenDatapoints(sensorId)
if !found {
lastWeatherData = storage.NewWeatherData()
lastWeatherData.SensorId = sensorId
source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData)
}
value, err := strconv.ParseFloat(string(msg.Payload()), 64)
if err != nil {
return
}
sensorValueType := storage.SensorValueType(regexTopic.FindStringSubmatch(msg.Topic())[3])
lastWeatherData.Values[sensorValueType] = value
lastWeatherData.TimeStamp = time.Now()
}
}
func (source *mqttWeatherSource) publishDataValues() {
for {
for len(source.lastWeatherDataPoints) != 0 {
current := *source.lastWeatherDataPoints[0]
diff := time.Since(current.TimeStamp)
if diff >= source.config.MinDistToLastValue {
if err := source.newWeatherData(current); err != nil {
log.Fatal(err)
//if error than put the dataPoint to the end of the slice and try again later
dataPoint := source.lastWeatherDataPoints[0]
source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, dataPoint)
}
source.lastWeatherDataPoints = source.lastWeatherDataPoints[1:]
}
}
time.Sleep(source.config.PublishInterval)
}
}
func (source *mqttWeatherSource) getUnwrittenDatapoints(sensorId uuid.UUID) (*storage.WeatherData, bool) {
for _, data := range source.lastWeatherDataPoints {
if data.SensorId == sensorId {
return data, true
}
}
return nil, false
}
//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data
func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDataCallbackFunc) {
source.weatherSource.AddNewWeatherDataCallback(callback)
}
func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) error {
return source.weatherSource.NewWeatherData(datapoint)
}