diff --git a/api/rest-api.go b/api/rest-api.go index bd95b1e..85b43c8 100644 --- a/api/rest-api.go +++ b/api/rest-api.go @@ -87,7 +87,11 @@ func (api *weatherRestApi) addDataHandler(w http.ResponseWriter, r *http.Request http.Error(w, err.Error(), http.StatusBadRequest) return } - api.addNewWeatherData(data) + + err = api.addNewWeatherData(data) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + } } func (api *weatherRestApi) homePageHandler(w http.ResponseWriter, r *http.Request) { @@ -119,6 +123,6 @@ func (api *weatherRestApi) AddNewWeatherDataCallback(callback weathersource.NewW api.weatherSource.AddNewWeatherDataCallback(callback) } -func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) { - api.weatherSource.NewWeatherData(weatherData) +func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) error { + return api.weatherSource.NewWeatherData(weatherData) } diff --git a/main.go b/main.go index fc0a7c1..9f9559a 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "errors" "os" "weather-data/api" "weather-data/config" @@ -34,8 +35,7 @@ func main() { //setup new weatherData source -> mqtt weatherSource, err = weathersource.NewMqttSource( config.GetMqttUrl(), - config.GetMqttTopic(), - sensorRegistry) + config.GetMqttTopic()) if err != nil { os.Exit(1) @@ -54,6 +54,10 @@ func main() { } } -func handleNewWeatherData(wd storage.WeatherData) { +func handleNewWeatherData(wd storage.WeatherData) error { + if !config.AllowUnregisteredSensors() && !sensorRegistry.ExistSensorId(wd.SensorId) { + return errors.New("sensor have to be registered") + } weatherStorage.Save(wd) + return nil } diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go index 829744e..fe2b774 100644 --- a/weathersource/mqtt-source.go +++ b/weathersource/mqtt-source.go @@ -6,7 +6,6 @@ import ( "strconv" "strings" "time" - "weather-data/config" "weather-data/storage" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -23,7 +22,6 @@ type mqttWeatherSource struct { url string topic string mqttClient mqtt.Client - sensorRegistry storage.SensorRegistry lastWeatherDataPoints []*storage.WeatherData weatherSource WeatherSourceBase } @@ -34,10 +32,9 @@ func (source *mqttWeatherSource) Close() { } //NewMqttSource Factory function for mqttWeatherSource -func NewMqttSource(url, topic string, sensorRegistry storage.SensorRegistry) (*mqttWeatherSource, error) { +func NewMqttSource(url, topic string) (*mqttWeatherSource, error) { source := new(mqttWeatherSource) source.url = url - source.sensorRegistry = sensorRegistry opts := mqtt.NewClientOptions().AddBroker(url) @@ -72,11 +69,6 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { return } - if !config.AllowUnregisteredSensors() && !source.sensorRegistry.ExistSensorId(sensorId) { - fmt.Println("sensor have to be registered:", sensorId) - return - } - lastWeatherData, found := source.getUnwrittenDatapoints(sensorId) if !found { @@ -123,6 +115,6 @@ func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDa source.weatherSource.AddNewWeatherDataCallback(callback) } -func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) { - source.weatherSource.NewWeatherData(datapoint) +func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) error { + return source.weatherSource.NewWeatherData(datapoint) } diff --git a/weathersource/weather-source.go b/weathersource/weather-source.go index f5ddd00..18a6772 100644 --- a/weathersource/weather-source.go +++ b/weathersource/weather-source.go @@ -3,7 +3,7 @@ package weathersource import "weather-data/storage" //NewWeatherDataCallbackFunc Function-Signature for new weather data callback function -type NewWeatherDataCallbackFunc func(storage.WeatherData) +type NewWeatherDataCallbackFunc func(storage.WeatherData) error //WeatherSource is the interface for different weather-source implementations type WeatherSource interface { @@ -22,8 +22,12 @@ func (source *WeatherSourceBase) AddNewWeatherDataCallback(callback NewWeatherDa } //NewWeatherData executes all newWeatherDataCallbackFuncs for this datapoint -func (source *WeatherSourceBase) NewWeatherData(datapoint storage.WeatherData) { +func (source *WeatherSourceBase) NewWeatherData(datapoint storage.WeatherData) error { for _, callback := range source.newWeatherDataCallbackFuncs { - callback(datapoint) + err := callback(datapoint) + if err != nil { + return err + } } + return nil }