add sensor registration (inmemory)
This commit is contained in:
parent
356e502ed8
commit
9cc656e937
8 changed files with 130 additions and 42 deletions
|
@ -12,16 +12,18 @@ import (
|
|||
)
|
||||
|
||||
type weatherRestApi struct {
|
||||
connection string
|
||||
weaterStorage storage.WeatherStorage
|
||||
weatherSource weathersource.WeatherSourceBase
|
||||
connection string
|
||||
weaterStorage storage.WeatherStorage
|
||||
weatherSource weathersource.WeatherSourceBase
|
||||
sensorRegistry storage.SensorRegistry
|
||||
}
|
||||
|
||||
//SetupAPI sets the REST-API up
|
||||
func NewRestAPI(connection string, weatherStorage storage.WeatherStorage) *weatherRestApi {
|
||||
func NewRestAPI(connection string, weatherStorage storage.WeatherStorage, sensorRegistry storage.SensorRegistry) *weatherRestApi {
|
||||
api := new(weatherRestApi)
|
||||
api.connection = connection
|
||||
api.weaterStorage = weatherStorage
|
||||
api.sensorRegistry = sensorRegistry
|
||||
return api
|
||||
}
|
||||
|
||||
|
@ -101,14 +103,15 @@ func (api *weatherRestApi) registerWeatherSensor(w http.ResponseWriter, r *http.
|
|||
w.Header().Add("content-type", "application/json")
|
||||
|
||||
vars := mux.Vars(r)
|
||||
key := vars["name"]
|
||||
name := vars["name"]
|
||||
|
||||
registration := SensorRegistration{
|
||||
Name: key,
|
||||
Id: uuid.New(),
|
||||
sensor, err := api.sensorRegistry.RegisterSensorByName(name)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(registration)
|
||||
json.NewEncoder(w).Encode(sensor)
|
||||
}
|
||||
|
||||
//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data
|
||||
|
@ -118,5 +121,4 @@ func (api *weatherRestApi) AddNewWeatherDataCallback(callback weathersource.NewW
|
|||
|
||||
func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) {
|
||||
api.weatherSource.NewWeatherData(weatherData)
|
||||
api.weaterStorage.Save(weatherData)
|
||||
}
|
||||
|
|
|
@ -2,8 +2,6 @@ package api
|
|||
|
||||
import (
|
||||
"weather-data/weathersource"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
//WeatherAPI is the common interface for different apis
|
||||
|
@ -12,10 +10,3 @@ type WeatherAPI interface {
|
|||
Close()
|
||||
weathersource.WeatherSource
|
||||
}
|
||||
|
||||
//SensorRegistration is the data for a new Sensorregistration
|
||||
type SensorRegistration struct {
|
||||
Name string
|
||||
Id uuid.UUID
|
||||
Location string
|
||||
}
|
||||
|
|
3
go.mod
3
go.mod
|
@ -4,7 +4,8 @@ go 1.16
|
|||
|
||||
require (
|
||||
github.com/eclipse/paho.mqtt.golang v1.3.2
|
||||
github.com/google/uuid v1.2.0 // indirect
|
||||
github.com/google/uuid v1.2.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.2.2
|
||||
github.com/pkg/errors v0.9.1
|
||||
)
|
||||
|
|
7
go.sum
7
go.sum
|
@ -1,5 +1,6 @@
|
|||
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/deepmap/oapi-codegen v1.3.13 h1:9HKGCsdJqE4dnrQ8VerFS0/1ZOJPmAhN+g8xgp8y3K4=
|
||||
github.com/deepmap/oapi-codegen v1.3.13/go.mod h1:WAmG5dWY8/PYHt4vKxlt90NsbHMAOCiteYKZMiIRfOo=
|
||||
|
@ -20,8 +21,10 @@ github.com/influxdata/influxdb-client-go/v2 v2.2.2 h1:O0CGIuIwQafvAxttAJ/VqMKfbW
|
|||
github.com/influxdata/influxdb-client-go/v2 v2.2.2/go.mod h1:fa/d1lAdUHxuc1jedx30ZfNG573oQTQmUni3N6pcW+0=
|
||||
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
|
||||
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
|
||||
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
|
||||
|
@ -34,9 +37,11 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME
|
|||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
||||
|
@ -46,7 +51,6 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
|
|||
golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191112182307-2180aed22343 h1:00ohfJ4K98s3m6BGUoBd8nyfp4Yl0GoIKvw5abItTjI=
|
||||
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
|
@ -64,6 +68,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
|
|||
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
||||
|
|
36
main.go
36
main.go
|
@ -8,10 +8,19 @@ import (
|
|||
"weather-data/weathersource"
|
||||
)
|
||||
|
||||
var sensorRegistry storage.SensorRegistry
|
||||
var weatherStorage storage.WeatherStorage
|
||||
var weatherSource weathersource.WeatherSource
|
||||
var weatherAPI api.WeatherAPI
|
||||
|
||||
func main() {
|
||||
//setup new sensorRegistry -> InmemorySensorRegistry
|
||||
sensorRegistry = storage.NewInmemorySensorRegistry()
|
||||
defer sensorRegistry.Close()
|
||||
|
||||
//setup a new weatherstorage -> InfluxDB
|
||||
var weatherStorage storage.WeatherStorage
|
||||
weatherStorage, err := storage.NewInfluxStorage(
|
||||
var err error
|
||||
weatherStorage, err = storage.NewInfluxStorage(
|
||||
config.GetInfluxToken(),
|
||||
config.GetInfluxBucket(),
|
||||
config.GetInfluxOrganization(),
|
||||
|
@ -22,32 +31,29 @@ func main() {
|
|||
}
|
||||
defer weatherStorage.Close()
|
||||
|
||||
var newWeatherDataHandler weathersource.NewWeatherDataCallbackFunc
|
||||
newWeatherDataHandler = func(wd storage.WeatherData) {
|
||||
weatherStorage.Save(wd)
|
||||
}
|
||||
|
||||
//add a new weatherData source -> mqtt
|
||||
var weatherSource weathersource.WeatherSource
|
||||
//setup new weatherData source -> mqtt
|
||||
weatherSource, err = weathersource.NewMqttSource(
|
||||
config.GetMqttUrl(),
|
||||
config.GetMqttTopic())
|
||||
config.GetMqttTopic(),
|
||||
sensorRegistry)
|
||||
|
||||
if err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
defer weatherSource.Close()
|
||||
|
||||
weatherSource.AddNewWeatherDataCallback(newWeatherDataHandler)
|
||||
weatherSource.AddNewWeatherDataCallback(handleNewWeatherData)
|
||||
|
||||
//setup a API -> REST
|
||||
var weatherAPI api.WeatherAPI
|
||||
weatherAPI = api.NewRestAPI(":10000", weatherStorage)
|
||||
weatherAPI = api.NewRestAPI(":10000", weatherStorage, sensorRegistry)
|
||||
defer weatherAPI.Close()
|
||||
weatherAPI.AddNewWeatherDataCallback(handleNewWeatherData)
|
||||
|
||||
err = weatherAPI.Start()
|
||||
if err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func handleNewWeatherData(wd storage.WeatherData) {
|
||||
weatherStorage.Save(wd)
|
||||
}
|
||||
|
|
|
@ -1 +1,61 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type inmemorySensorRegistry struct {
|
||||
weatherSensors []*WeatherSensor
|
||||
}
|
||||
|
||||
func NewInmemorySensorRegistry() *inmemorySensorRegistry {
|
||||
sensorRegistry := new(inmemorySensorRegistry)
|
||||
return sensorRegistry
|
||||
}
|
||||
|
||||
func (registry *inmemorySensorRegistry) RegisterSensorByName(name string) (*WeatherSensor, error) {
|
||||
if registry.ExistSensorName(name) {
|
||||
return nil, errors.Errorf("Sensorname already exists")
|
||||
}
|
||||
sensor := new(WeatherSensor)
|
||||
sensor.Name = name
|
||||
sensor.Id = uuid.New()
|
||||
registry.weatherSensors = append(registry.weatherSensors, sensor)
|
||||
return sensor, nil
|
||||
}
|
||||
|
||||
func (registry *inmemorySensorRegistry) ExistSensorName(name string) bool {
|
||||
for _, s := range registry.weatherSensors {
|
||||
if s.Name == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (registry *inmemorySensorRegistry) ExistSensorId(sensorId uuid.UUID) bool {
|
||||
for _, s := range registry.weatherSensors {
|
||||
if s.Id == sensorId {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (registry *inmemorySensorRegistry) ExistSensor(sensor *WeatherSensor) bool {
|
||||
for _, s := range registry.weatherSensors {
|
||||
if s.Id == sensor.Id {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (registry *inmemorySensorRegistry) GetSensors() []*WeatherSensor {
|
||||
return registry.weatherSensors
|
||||
}
|
||||
|
||||
func (registry *inmemorySensorRegistry) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -14,6 +14,14 @@ type WeatherStorage interface {
|
|||
Close() error
|
||||
}
|
||||
|
||||
type SensorRegistry interface {
|
||||
RegisterSensorByName(string) (*WeatherSensor, error)
|
||||
ExistSensor(*WeatherSensor) bool
|
||||
ExistSensorId(uuid.UUID) bool
|
||||
GetSensors() []*WeatherSensor
|
||||
Close() error
|
||||
}
|
||||
|
||||
//WeatherData type
|
||||
type WeatherData struct {
|
||||
Humidity float64 `json:"humidity"`
|
||||
|
@ -24,6 +32,15 @@ type WeatherData struct {
|
|||
TimeStamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
//WeatherSensor is the data for a new Sensorregistration
|
||||
type WeatherSensor struct {
|
||||
Name string
|
||||
Id uuid.UUID
|
||||
Location string
|
||||
Longitude float64
|
||||
Lattitude float64
|
||||
}
|
||||
|
||||
//NewRandomWeatherData creates random WeatherData with given Location
|
||||
func NewRandomWeatherData(sensorId uuid.UUID) WeatherData {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
|
|
@ -22,6 +22,7 @@ type mqttWeatherSource struct {
|
|||
url string
|
||||
topic string
|
||||
mqttClient mqtt.Client
|
||||
sensorRegistry storage.SensorRegistry
|
||||
lastWeatherDataPoints []*storage.WeatherData
|
||||
weatherSource WeatherSourceBase
|
||||
}
|
||||
|
@ -32,9 +33,10 @@ func (source *mqttWeatherSource) Close() {
|
|||
}
|
||||
|
||||
//NewMqttSource Factory function for mqttWeatherSource
|
||||
func NewMqttSource(url, topic string) (*mqttWeatherSource, error) {
|
||||
func NewMqttSource(url, topic string, sensorRegistry storage.SensorRegistry) (*mqttWeatherSource, error) {
|
||||
source := new(mqttWeatherSource)
|
||||
source.url = url
|
||||
source.sensorRegistry = sensorRegistry
|
||||
|
||||
opts := mqtt.NewClientOptions().AddBroker(url)
|
||||
|
||||
|
@ -68,7 +70,13 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
lastWeatherData, found := source.getLastWeatherData(sensorId)
|
||||
|
||||
if !source.sensorRegistry.ExistSensorId(sensorId) {
|
||||
fmt.Println("sensor not registered")
|
||||
return
|
||||
}
|
||||
|
||||
lastWeatherData, found := source.getUnwrittenDatapoints(sensorId)
|
||||
|
||||
if !found {
|
||||
lastWeatherData = new(storage.WeatherData)
|
||||
|
@ -100,7 +108,7 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
|
|||
}
|
||||
}
|
||||
|
||||
func (source *mqttWeatherSource) getLastWeatherData(sensorId uuid.UUID) (*storage.WeatherData, bool) {
|
||||
func (source *mqttWeatherSource) getUnwrittenDatapoints(sensorId uuid.UUID) (*storage.WeatherData, bool) {
|
||||
for _, data := range source.lastWeatherDataPoints {
|
||||
if data.SensorId == sensorId {
|
||||
return data, true
|
||||
|
@ -115,7 +123,5 @@ func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDa
|
|||
}
|
||||
|
||||
func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) {
|
||||
for _, callback := range source.weatherSource.newWeatherDataCallbackFuncs {
|
||||
callback(datapoint)
|
||||
}
|
||||
source.weatherSource.NewWeatherData(datapoint)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue