diff --git a/api/rest-api.go b/api/rest-api.go index d7012b9..bd95b1e 100644 --- a/api/rest-api.go +++ b/api/rest-api.go @@ -12,16 +12,18 @@ import ( ) type weatherRestApi struct { - connection string - weaterStorage storage.WeatherStorage - weatherSource weathersource.WeatherSourceBase + connection string + weaterStorage storage.WeatherStorage + weatherSource weathersource.WeatherSourceBase + sensorRegistry storage.SensorRegistry } //SetupAPI sets the REST-API up -func NewRestAPI(connection string, weatherStorage storage.WeatherStorage) *weatherRestApi { +func NewRestAPI(connection string, weatherStorage storage.WeatherStorage, sensorRegistry storage.SensorRegistry) *weatherRestApi { api := new(weatherRestApi) api.connection = connection api.weaterStorage = weatherStorage + api.sensorRegistry = sensorRegistry return api } @@ -101,14 +103,15 @@ func (api *weatherRestApi) registerWeatherSensor(w http.ResponseWriter, r *http. w.Header().Add("content-type", "application/json") vars := mux.Vars(r) - key := vars["name"] + name := vars["name"] - registration := SensorRegistration{ - Name: key, - Id: uuid.New(), + sensor, err := api.sensorRegistry.RegisterSensorByName(name) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } - json.NewEncoder(w).Encode(registration) + json.NewEncoder(w).Encode(sensor) } //AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data @@ -118,5 +121,4 @@ func (api *weatherRestApi) AddNewWeatherDataCallback(callback weathersource.NewW func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) { api.weatherSource.NewWeatherData(weatherData) - api.weaterStorage.Save(weatherData) } diff --git a/api/weather-api.go b/api/weather-api.go index 37c1600..242ac5e 100644 --- a/api/weather-api.go +++ b/api/weather-api.go @@ -2,8 +2,6 @@ package api import ( "weather-data/weathersource" - - "github.com/google/uuid" ) //WeatherAPI is the common interface for different apis @@ -12,10 +10,3 @@ type WeatherAPI interface { Close() weathersource.WeatherSource } - -//SensorRegistration is the data for a new Sensorregistration -type SensorRegistration struct { - Name string - Id uuid.UUID - Location string -} diff --git a/go.mod b/go.mod index 17f72f4..24e201d 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,8 @@ go 1.16 require ( github.com/eclipse/paho.mqtt.golang v1.3.2 - github.com/google/uuid v1.2.0 // indirect + github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 github.com/influxdata/influxdb-client-go/v2 v2.2.2 + github.com/pkg/errors v0.9.1 ) diff --git a/go.sum b/go.sum index 797ebcd..7024863 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deepmap/oapi-codegen v1.3.13 h1:9HKGCsdJqE4dnrQ8VerFS0/1ZOJPmAhN+g8xgp8y3K4= github.com/deepmap/oapi-codegen v1.3.13/go.mod h1:WAmG5dWY8/PYHt4vKxlt90NsbHMAOCiteYKZMiIRfOo= @@ -20,8 +21,10 @@ github.com/influxdata/influxdb-client-go/v2 v2.2.2 h1:O0CGIuIwQafvAxttAJ/VqMKfbW github.com/influxdata/influxdb-client-go/v2 v2.2.2/go.mod h1:fa/d1lAdUHxuc1jedx30ZfNG573oQTQmUni3N6pcW+0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= @@ -34,9 +37,11 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= @@ -46,7 +51,6 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191112182307-2180aed22343 h1:00ohfJ4K98s3m6BGUoBd8nyfp4Yl0GoIKvw5abItTjI= golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= @@ -64,6 +68,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= diff --git a/main.go b/main.go index 0880989..fc0a7c1 100644 --- a/main.go +++ b/main.go @@ -8,10 +8,19 @@ import ( "weather-data/weathersource" ) +var sensorRegistry storage.SensorRegistry +var weatherStorage storage.WeatherStorage +var weatherSource weathersource.WeatherSource +var weatherAPI api.WeatherAPI + func main() { + //setup new sensorRegistry -> InmemorySensorRegistry + sensorRegistry = storage.NewInmemorySensorRegistry() + defer sensorRegistry.Close() + //setup a new weatherstorage -> InfluxDB - var weatherStorage storage.WeatherStorage - weatherStorage, err := storage.NewInfluxStorage( + var err error + weatherStorage, err = storage.NewInfluxStorage( config.GetInfluxToken(), config.GetInfluxBucket(), config.GetInfluxOrganization(), @@ -22,32 +31,29 @@ func main() { } defer weatherStorage.Close() - var newWeatherDataHandler weathersource.NewWeatherDataCallbackFunc - newWeatherDataHandler = func(wd storage.WeatherData) { - weatherStorage.Save(wd) - } - - //add a new weatherData source -> mqtt - var weatherSource weathersource.WeatherSource + //setup new weatherData source -> mqtt weatherSource, err = weathersource.NewMqttSource( config.GetMqttUrl(), - config.GetMqttTopic()) + config.GetMqttTopic(), + sensorRegistry) if err != nil { os.Exit(1) } defer weatherSource.Close() - - weatherSource.AddNewWeatherDataCallback(newWeatherDataHandler) + weatherSource.AddNewWeatherDataCallback(handleNewWeatherData) //setup a API -> REST - var weatherAPI api.WeatherAPI - weatherAPI = api.NewRestAPI(":10000", weatherStorage) + weatherAPI = api.NewRestAPI(":10000", weatherStorage, sensorRegistry) defer weatherAPI.Close() + weatherAPI.AddNewWeatherDataCallback(handleNewWeatherData) err = weatherAPI.Start() if err != nil { os.Exit(1) } - +} + +func handleNewWeatherData(wd storage.WeatherData) { + weatherStorage.Save(wd) } diff --git a/storage/inmemory-storage.go b/storage/inmemory-storage.go index 82be054..a297087 100644 --- a/storage/inmemory-storage.go +++ b/storage/inmemory-storage.go @@ -1 +1,61 @@ package storage + +import ( + "github.com/google/uuid" + "github.com/pkg/errors" +) + +type inmemorySensorRegistry struct { + weatherSensors []*WeatherSensor +} + +func NewInmemorySensorRegistry() *inmemorySensorRegistry { + sensorRegistry := new(inmemorySensorRegistry) + return sensorRegistry +} + +func (registry *inmemorySensorRegistry) RegisterSensorByName(name string) (*WeatherSensor, error) { + if registry.ExistSensorName(name) { + return nil, errors.Errorf("Sensorname already exists") + } + sensor := new(WeatherSensor) + sensor.Name = name + sensor.Id = uuid.New() + registry.weatherSensors = append(registry.weatherSensors, sensor) + return sensor, nil +} + +func (registry *inmemorySensorRegistry) ExistSensorName(name string) bool { + for _, s := range registry.weatherSensors { + if s.Name == name { + return true + } + } + return false +} + +func (registry *inmemorySensorRegistry) ExistSensorId(sensorId uuid.UUID) bool { + for _, s := range registry.weatherSensors { + if s.Id == sensorId { + return true + } + } + return false +} + +func (registry *inmemorySensorRegistry) ExistSensor(sensor *WeatherSensor) bool { + for _, s := range registry.weatherSensors { + if s.Id == sensor.Id { + return true + } + } + return false +} + +func (registry *inmemorySensorRegistry) GetSensors() []*WeatherSensor { + return registry.weatherSensors +} + +func (registry *inmemorySensorRegistry) Close() error { + return nil +} diff --git a/storage/weather-data.go b/storage/weather-data.go index f5e896f..e6d3728 100644 --- a/storage/weather-data.go +++ b/storage/weather-data.go @@ -14,6 +14,14 @@ type WeatherStorage interface { Close() error } +type SensorRegistry interface { + RegisterSensorByName(string) (*WeatherSensor, error) + ExistSensor(*WeatherSensor) bool + ExistSensorId(uuid.UUID) bool + GetSensors() []*WeatherSensor + Close() error +} + //WeatherData type type WeatherData struct { Humidity float64 `json:"humidity"` @@ -24,6 +32,15 @@ type WeatherData struct { TimeStamp time.Time `json:"timestamp"` } +//WeatherSensor is the data for a new Sensorregistration +type WeatherSensor struct { + Name string + Id uuid.UUID + Location string + Longitude float64 + Lattitude float64 +} + //NewRandomWeatherData creates random WeatherData with given Location func NewRandomWeatherData(sensorId uuid.UUID) WeatherData { rand.Seed(time.Now().UnixNano()) diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go index 5848d40..fc022ab 100644 --- a/weathersource/mqtt-source.go +++ b/weathersource/mqtt-source.go @@ -22,6 +22,7 @@ type mqttWeatherSource struct { url string topic string mqttClient mqtt.Client + sensorRegistry storage.SensorRegistry lastWeatherDataPoints []*storage.WeatherData weatherSource WeatherSourceBase } @@ -32,9 +33,10 @@ func (source *mqttWeatherSource) Close() { } //NewMqttSource Factory function for mqttWeatherSource -func NewMqttSource(url, topic string) (*mqttWeatherSource, error) { +func NewMqttSource(url, topic string, sensorRegistry storage.SensorRegistry) (*mqttWeatherSource, error) { source := new(mqttWeatherSource) source.url = url + source.sensorRegistry = sensorRegistry opts := mqtt.NewClientOptions().AddBroker(url) @@ -68,7 +70,13 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { if err != nil { return } - lastWeatherData, found := source.getLastWeatherData(sensorId) + + if !source.sensorRegistry.ExistSensorId(sensorId) { + fmt.Println("sensor not registered") + return + } + + lastWeatherData, found := source.getUnwrittenDatapoints(sensorId) if !found { lastWeatherData = new(storage.WeatherData) @@ -100,7 +108,7 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { } } -func (source *mqttWeatherSource) getLastWeatherData(sensorId uuid.UUID) (*storage.WeatherData, bool) { +func (source *mqttWeatherSource) getUnwrittenDatapoints(sensorId uuid.UUID) (*storage.WeatherData, bool) { for _, data := range source.lastWeatherDataPoints { if data.SensorId == sensorId { return data, true @@ -115,7 +123,5 @@ func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDa } func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) { - for _, callback := range source.weatherSource.newWeatherDataCallbackFuncs { - callback(datapoint) - } + source.weatherSource.NewWeatherData(datapoint) }