131 lines
3.5 KiB
Go
131 lines
3.5 KiB
Go
package weathersource
|
|
|
|
import (
|
|
"log"
|
|
"regexp"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
"weather-data/config"
|
|
"weather-data/storage"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
var mqttTopicRegexPattern = "(^sensor)/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})/(.*)"
|
|
|
|
var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern)
|
|
|
|
var channelBufferSize = 10
|
|
|
|
type mqttWeatherSource struct {
|
|
WeatherSourceBase
|
|
config config.MqttConfig
|
|
mqttClient mqtt.Client
|
|
activeSensorMeasurements map[uuid.UUID](chan map[storage.SensorValueType]float64)
|
|
sensorMutex sync.RWMutex
|
|
}
|
|
|
|
//Close mqtt client
|
|
func (source *mqttWeatherSource) Close() {
|
|
source.mqttClient.Disconnect(2)
|
|
}
|
|
|
|
//NewMqttSource Factory function for mqttWeatherSource with authentication
|
|
func NewMqttSource(cfg config.MqttConfig) (*mqttWeatherSource, error) {
|
|
source := new(mqttWeatherSource)
|
|
source.config = cfg
|
|
|
|
opts := mqtt.NewClientOptions().AddBroker(cfg.Host)
|
|
|
|
//mqtt
|
|
opts.SetKeepAlive(60 * time.Second)
|
|
opts.SetDefaultPublishHandler(source.mqttMessageHandler)
|
|
opts.SetPingTimeout(1 * time.Second)
|
|
|
|
if !cfg.AllowAnonymousAuthentication {
|
|
opts.Username = cfg.Username
|
|
opts.Password = cfg.Password
|
|
}
|
|
|
|
source.mqttClient = mqtt.NewClient(opts)
|
|
|
|
if token := source.mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
|
return nil, token.Error()
|
|
}
|
|
|
|
if token := source.mqttClient.Subscribe(cfg.Topic, 2, nil); token.Wait() && token.Error() != nil {
|
|
return nil, token.Error()
|
|
}
|
|
|
|
source.activeSensorMeasurements = make(map[uuid.UUID]chan map[storage.SensorValueType]float64)
|
|
source.sensorMutex = sync.RWMutex{}
|
|
|
|
log.Print("successfully connected to mqtt-broker")
|
|
return source, nil
|
|
}
|
|
|
|
//mqttMessageHandler returns a function that handles incoming mqtt-messages
|
|
func (source *mqttWeatherSource) mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
|
|
if !regexTopic.MatchString(msg.Topic()) {
|
|
return
|
|
}
|
|
|
|
sensorId, err := uuid.Parse(regexTopic.FindStringSubmatch(msg.Topic())[2])
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
value, err := strconv.ParseFloat(string(msg.Payload()), 64)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
sensorValueType := storage.SensorValueType(regexTopic.FindStringSubmatch(msg.Topic())[3])
|
|
|
|
dataValue := map[storage.SensorValueType]float64{
|
|
sensorValueType: value,
|
|
}
|
|
|
|
source.sensorMutex.RLock()
|
|
dataChannel, exists := source.activeSensorMeasurements[sensorId]
|
|
if !exists {
|
|
dataChannel = make(chan map[storage.SensorValueType]float64, channelBufferSize)
|
|
}
|
|
dataChannel <- dataValue
|
|
source.sensorMutex.RUnlock()
|
|
|
|
if !exists {
|
|
go source.publishSensorMeasurement(sensorId, dataChannel)
|
|
go source.cleanupSensorMeasurement(sensorId, dataChannel)
|
|
|
|
source.sensorMutex.Lock()
|
|
source.activeSensorMeasurements[sensorId] = dataChannel
|
|
source.sensorMutex.Unlock()
|
|
}
|
|
}
|
|
|
|
func (source *mqttWeatherSource) cleanupSensorMeasurement(sensorId uuid.UUID, channel chan<- map[storage.SensorValueType]float64) {
|
|
time.Sleep(source.config.PublishDelay)
|
|
|
|
source.sensorMutex.Lock()
|
|
delete(source.activeSensorMeasurements, sensorId)
|
|
source.sensorMutex.Unlock()
|
|
|
|
close(channel)
|
|
}
|
|
|
|
func (source *mqttWeatherSource) publishSensorMeasurement(sensorId uuid.UUID, channel <-chan map[storage.SensorValueType]float64) {
|
|
weatherData := storage.NewWeatherData()
|
|
weatherData.TimeStamp = time.Now()
|
|
weatherData.SensorId = sensorId
|
|
|
|
for values := range channel {
|
|
for k, v := range values {
|
|
weatherData.Values[k] = v
|
|
}
|
|
}
|
|
|
|
source.NewWeatherData(weatherData)
|
|
}
|