diff --git a/api/rest-api.go b/api/rest-api.go index 7702bb1..2112116 100644 --- a/api/rest-api.go +++ b/api/rest-api.go @@ -32,10 +32,10 @@ type UserClaims struct { } type weatherRestApi struct { + weathersource.WeatherSourceBase connection string config config.RestConfig weaterStorage storage.WeatherStorage - weatherSource weathersource.WeatherSourceBase sensorRegistry storage.SensorRegistry } @@ -157,7 +157,7 @@ func (api *weatherRestApi) addWeatherDataHandler(w http.ResponseWriter, r *http. return } - api.addNewWeatherData(*weatherData) + api.NewWeatherData(weatherData) w.Header().Add("content-type", "application/json") w.WriteHeader(http.StatusCreated) @@ -336,12 +336,3 @@ func (api *weatherRestApi) parseToken(header http.Header) (*UserClaims, error) { ) return claims, err } - -//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data -func (api *weatherRestApi) AddNewWeatherDataCallback(callback weathersource.NewWeatherDataCallbackFunc) { - api.weatherSource.AddNewWeatherDataCallback(callback) -} - -func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) { - api.weatherSource.NewWeatherData(weatherData) -} diff --git a/main.go b/main.go index 7259589..ffd0c54 100644 --- a/main.go +++ b/main.go @@ -35,23 +35,24 @@ func main() { log.Fatal(err) } defer weatherSource.Close() - weatherSource.AddNewWeatherDataCallback(handleNewWeatherData) + weatherSource.OnNewWeatherData(handleNewWeatherData) //setup a API -> REST weatherAPI = api.NewRestAPI(":10000", weatherStorage, sensorRegistry, config.RestConfiguration) defer weatherAPI.Close() - weatherAPI.AddNewWeatherDataCallback(handleNewWeatherData) + weatherAPI.OnNewWeatherData(handleNewWeatherData) + log.Print("Application is running") err = weatherAPI.Start() if err != nil { log.Fatal(err) } - log.Print("Application is running") } -func handleNewWeatherData(wd storage.WeatherData) { - _, err := sensorRegistry.GetSensor(wd.SensorId) - if config.AllowUnregisteredSensors || err == nil { +func handleNewWeatherData(wd *storage.WeatherData) { + if config.AllowUnregisteredSensors { + weatherStorage.Save(wd) + } else if exist, err := sensorRegistry.ExistSensor(wd.SensorId); err == nil && exist { weatherStorage.Save(wd) } } diff --git a/storage/influxdb-storage.go b/storage/influxdb-storage.go index 85ecfe1..4b9780e 100644 --- a/storage/influxdb-storage.go +++ b/storage/influxdb-storage.go @@ -31,7 +31,7 @@ func NewInfluxStorage(cfg config.InfluxConfig) (*influxStorage, error) { } //Save WeatherData to InfluxDB -func (storage *influxStorage) Save(data WeatherData) error { +func (storage *influxStorage) Save(data *WeatherData) error { tags := map[string]string{ "sensorId": data.SensorId.String()} diff --git a/storage/weather-storage.go b/storage/weather-storage.go index c274fa2..5da8198 100644 --- a/storage/weather-storage.go +++ b/storage/weather-storage.go @@ -2,7 +2,7 @@ package storage //WeatherStorage interface for different storage-implementations of weather data type WeatherStorage interface { - Save(WeatherData) error + Save(*WeatherData) error GetData(*WeatherQuery) ([]*WeatherData, error) Close() error } diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go index 8ed79a6..e02d91e 100644 --- a/weathersource/mqtt-source.go +++ b/weathersource/mqtt-source.go @@ -20,9 +20,9 @@ var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern) var channelBufferSize = 10 type mqttWeatherSource struct { + WeatherSourceBase config config.MqttConfig mqttClient mqtt.Client - weatherSource WeatherSourceBase activeSensorMeasurements map[uuid.UUID](chan map[storage.SensorValueType]float64) sensorMutex sync.RWMutex } @@ -127,14 +127,5 @@ func (source *mqttWeatherSource) publishSensorMeasurement(sensorId uuid.UUID, ch } } - source.newWeatherData(*weatherData) -} - -//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data -func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDataCallbackFunc) { - source.weatherSource.AddNewWeatherDataCallback(callback) -} - -func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) { - source.weatherSource.NewWeatherData(datapoint) + source.NewWeatherData(weatherData) } diff --git a/weathersource/weather-source-base.go b/weathersource/weather-source-base.go deleted file mode 100644 index 347b8b4..0000000 --- a/weathersource/weather-source-base.go +++ /dev/null @@ -1,20 +0,0 @@ -package weathersource - -import "weather-data/storage" - -//WeatherSourceBase is the lowlevel-implementation of the WeatherSource interface, intended to used by highlevel-implementations -type WeatherSourceBase struct { - newWeatherDataCallbackFuncs []NewWeatherDataCallbackFunc -} - -//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data -func (source *WeatherSourceBase) AddNewWeatherDataCallback(callback NewWeatherDataCallbackFunc) { - source.newWeatherDataCallbackFuncs = append(source.newWeatherDataCallbackFuncs, callback) -} - -//NewWeatherData executes all newWeatherDataCallbackFuncs for this datapoint -func (source *WeatherSourceBase) NewWeatherData(datapoint storage.WeatherData) { - for _, callback := range source.newWeatherDataCallbackFuncs { - callback(datapoint) - } -} diff --git a/weathersource/weather-source.go b/weathersource/weather-source.go index 8930434..aed75dc 100644 --- a/weathersource/weather-source.go +++ b/weathersource/weather-source.go @@ -2,11 +2,28 @@ package weathersource import "weather-data/storage" -//NewWeatherDataCallbackFunc Function-Signature for new weather data callback function -type NewWeatherDataCallbackFunc func(storage.WeatherData) +//NewWeatherDataFunc Function-Signature for new weather data +type NewWeatherDataFunc func(*storage.WeatherData) //WeatherSource is the interface for different weather-source implementations type WeatherSource interface { - AddNewWeatherDataCallback(NewWeatherDataCallbackFunc) + OnNewWeatherData(callback NewWeatherDataFunc) Close() } + +//WeatherSourceBase is the lowlevel-implementation of the WeatherSource interface, intended to used by highlevel-implementations +type WeatherSourceBase struct { + onNewWeatherDataFunctions []NewWeatherDataFunc +} + +//OnNewWeatherData add a function executed on NewWeatherData called +func (source *WeatherSourceBase) OnNewWeatherData(callback NewWeatherDataFunc) { + source.onNewWeatherDataFunctions = append(source.onNewWeatherDataFunctions, callback) +} + +//NewWeatherData executes all NewWeatherDataFunc for the weatherData +func (source *WeatherSourceBase) NewWeatherData(weatherData *storage.WeatherData) { + for _, function := range source.onNewWeatherDataFunctions { + function(weatherData) + } +}