check allow unregistered sensors in handleNewData
This commit is contained in:
parent
c0d315a409
commit
15db885db8
4 changed files with 24 additions and 20 deletions
|
@ -87,7 +87,11 @@ func (api *weatherRestApi) addDataHandler(w http.ResponseWriter, r *http.Request
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
api.addNewWeatherData(data)
|
|
||||||
|
err = api.addNewWeatherData(data)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *weatherRestApi) homePageHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *weatherRestApi) homePageHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -119,6 +123,6 @@ func (api *weatherRestApi) AddNewWeatherDataCallback(callback weathersource.NewW
|
||||||
api.weatherSource.AddNewWeatherDataCallback(callback)
|
api.weatherSource.AddNewWeatherDataCallback(callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) {
|
func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) error {
|
||||||
api.weatherSource.NewWeatherData(weatherData)
|
return api.weatherSource.NewWeatherData(weatherData)
|
||||||
}
|
}
|
||||||
|
|
12
main.go
12
main.go
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"weather-data/api"
|
"weather-data/api"
|
||||||
"weather-data/config"
|
"weather-data/config"
|
||||||
|
@ -34,8 +35,7 @@ func main() {
|
||||||
//setup new weatherData source -> mqtt
|
//setup new weatherData source -> mqtt
|
||||||
weatherSource, err = weathersource.NewMqttSource(
|
weatherSource, err = weathersource.NewMqttSource(
|
||||||
config.GetMqttUrl(),
|
config.GetMqttUrl(),
|
||||||
config.GetMqttTopic(),
|
config.GetMqttTopic())
|
||||||
sensorRegistry)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
@ -54,6 +54,10 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleNewWeatherData(wd storage.WeatherData) {
|
func handleNewWeatherData(wd storage.WeatherData) error {
|
||||||
weatherStorage.Save(wd)
|
if !config.AllowUnregisteredSensors() && !sensorRegistry.ExistSensorId(wd.SensorId) {
|
||||||
|
return errors.New("sensor have to be registered")
|
||||||
|
}
|
||||||
|
weatherStorage.Save(wd)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
"weather-data/config"
|
|
||||||
"weather-data/storage"
|
"weather-data/storage"
|
||||||
|
|
||||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
@ -23,7 +22,6 @@ type mqttWeatherSource struct {
|
||||||
url string
|
url string
|
||||||
topic string
|
topic string
|
||||||
mqttClient mqtt.Client
|
mqttClient mqtt.Client
|
||||||
sensorRegistry storage.SensorRegistry
|
|
||||||
lastWeatherDataPoints []*storage.WeatherData
|
lastWeatherDataPoints []*storage.WeatherData
|
||||||
weatherSource WeatherSourceBase
|
weatherSource WeatherSourceBase
|
||||||
}
|
}
|
||||||
|
@ -34,10 +32,9 @@ func (source *mqttWeatherSource) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
//NewMqttSource Factory function for mqttWeatherSource
|
//NewMqttSource Factory function for mqttWeatherSource
|
||||||
func NewMqttSource(url, topic string, sensorRegistry storage.SensorRegistry) (*mqttWeatherSource, error) {
|
func NewMqttSource(url, topic string) (*mqttWeatherSource, error) {
|
||||||
source := new(mqttWeatherSource)
|
source := new(mqttWeatherSource)
|
||||||
source.url = url
|
source.url = url
|
||||||
source.sensorRegistry = sensorRegistry
|
|
||||||
|
|
||||||
opts := mqtt.NewClientOptions().AddBroker(url)
|
opts := mqtt.NewClientOptions().AddBroker(url)
|
||||||
|
|
||||||
|
@ -72,11 +69,6 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !config.AllowUnregisteredSensors() && !source.sensorRegistry.ExistSensorId(sensorId) {
|
|
||||||
fmt.Println("sensor have to be registered:", sensorId)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
lastWeatherData, found := source.getUnwrittenDatapoints(sensorId)
|
lastWeatherData, found := source.getUnwrittenDatapoints(sensorId)
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
|
@ -123,6 +115,6 @@ func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDa
|
||||||
source.weatherSource.AddNewWeatherDataCallback(callback)
|
source.weatherSource.AddNewWeatherDataCallback(callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) {
|
func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) error {
|
||||||
source.weatherSource.NewWeatherData(datapoint)
|
return source.weatherSource.NewWeatherData(datapoint)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ package weathersource
|
||||||
import "weather-data/storage"
|
import "weather-data/storage"
|
||||||
|
|
||||||
//NewWeatherDataCallbackFunc Function-Signature for new weather data callback function
|
//NewWeatherDataCallbackFunc Function-Signature for new weather data callback function
|
||||||
type NewWeatherDataCallbackFunc func(storage.WeatherData)
|
type NewWeatherDataCallbackFunc func(storage.WeatherData) error
|
||||||
|
|
||||||
//WeatherSource is the interface for different weather-source implementations
|
//WeatherSource is the interface for different weather-source implementations
|
||||||
type WeatherSource interface {
|
type WeatherSource interface {
|
||||||
|
@ -22,8 +22,12 @@ func (source *WeatherSourceBase) AddNewWeatherDataCallback(callback NewWeatherDa
|
||||||
}
|
}
|
||||||
|
|
||||||
//NewWeatherData executes all newWeatherDataCallbackFuncs for this datapoint
|
//NewWeatherData executes all newWeatherDataCallbackFuncs for this datapoint
|
||||||
func (source *WeatherSourceBase) NewWeatherData(datapoint storage.WeatherData) {
|
func (source *WeatherSourceBase) NewWeatherData(datapoint storage.WeatherData) error {
|
||||||
for _, callback := range source.newWeatherDataCallbackFuncs {
|
for _, callback := range source.newWeatherDataCallbackFuncs {
|
||||||
callback(datapoint)
|
err := callback(datapoint)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue