diff --git a/api/rest-api.go b/api/rest-api.go new file mode 100644 index 0000000..9e2329a --- /dev/null +++ b/api/rest-api.go @@ -0,0 +1,86 @@ +package api + +import ( + "encoding/json" + "fmt" + "net/http" + "weather-data/storage" + "weather-data/weathersource" + + "github.com/gorilla/mux" +) + +type weatherRestApi struct { + connection string + weaterStorage storage.WeatherStorage + weatherSource weathersource.WeatherSourceBase +} + +//SetupAPI sets the REST-API up +func NewRestAPI(connection string, weatherStorage storage.WeatherStorage) *weatherRestApi { + api := new(weatherRestApi) + api.connection = connection + api.weaterStorage = weatherStorage + return api +} + +//Start a new Rest-API instance +func (api *weatherRestApi) Start() error { + return http.ListenAndServe(api.connection, api.handleRequests()) +} + +func (api *weatherRestApi) Close() { +} + +func (api *weatherRestApi) handleRequests() *mux.Router { + + router := mux.NewRouter().StrictSlash(true) + + router.HandleFunc("/", api.homePageHandler) + router.HandleFunc("/random", api.randomWeatherHandler) + router.HandleFunc("/randomlist", api.randomWeatherListHandler) + router.HandleFunc("/addData", api.addDataHandler) + + return router +} + +func (api *weatherRestApi) randomWeatherHandler(w http.ResponseWriter, r *http.Request) { + datapoint := storage.NewRandomWeatherData("swablab") + + w.Header().Add("content-type", "application/json") + json.NewEncoder(w).Encode(datapoint) +} + +func (api *weatherRestApi) randomWeatherListHandler(w http.ResponseWriter, r *http.Request) { + var datapoints = make([]storage.WeatherData, 0) + for i := 0; i < 10; i++ { + datapoints = append(datapoints, storage.NewRandomWeatherData("swablab")) + } + + w.Header().Add("content-type", "application/json") + json.NewEncoder(w).Encode(datapoints) +} + +func (api *weatherRestApi) addDataHandler(w http.ResponseWriter, r *http.Request) { + var data storage.WeatherData + err := json.NewDecoder(r.Body).Decode(&data) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + api.addNewWeatherData(data) +} + +func (api *weatherRestApi) homePageHandler(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Welcome to the Weather API!") +} + +//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data +func (api *weatherRestApi) AddNewWeatherDataCallback(callback weathersource.NewWeatherDataCallbackFunc) { + api.weatherSource.AddNewWeatherDataCallback(callback) +} + +func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) { + api.weatherSource.NewWeatherData(weatherData) + api.weaterStorage.Save(weatherData) +} diff --git a/api/weather-api.go b/api/weather-api.go new file mode 100644 index 0000000..a472e4a --- /dev/null +++ b/api/weather-api.go @@ -0,0 +1,10 @@ +package api + +import "weather-data/weathersource" + +//WeatherAPI is the common interface for different apis +type WeatherAPI interface { + Start() error + Close() + weathersource.WeatherSource +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2285679 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module weather-data + +go 1.16 + +require ( + github.com/eclipse/paho.mqtt.golang v1.3.2 + github.com/gorilla/mux v1.8.0 + github.com/influxdata/influxdb-client-go/v2 v2.2.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d036c07 --- /dev/null +++ b/go.sum @@ -0,0 +1,68 @@ +github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deepmap/oapi-codegen v1.3.13 h1:9HKGCsdJqE4dnrQ8VerFS0/1ZOJPmAhN+g8xgp8y3K4= +github.com/deepmap/oapi-codegen v1.3.13/go.mod h1:WAmG5dWY8/PYHt4vKxlt90NsbHMAOCiteYKZMiIRfOo= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/eclipse/paho.mqtt.golang v1.3.2 h1:ICzfxSyrR8bOsh9l8JBBOwO1tc2C26oEyody0ml0L6E= +github.com/eclipse/paho.mqtt.golang v1.3.2/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= +github.com/getkin/kin-openapi v0.13.0/go.mod h1:WGRs2ZMM1Q8LR1QBEwUxC6RJEfaBcD0s+pcEVXFuAjw= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= +github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/influxdata/influxdb-client-go/v2 v2.2.2 h1:O0CGIuIwQafvAxttAJ/VqMKfbWWn2Mt8rbOmaM2Zj4w= +github.com/influxdata/influxdb-client-go/v2 v2.2.2/go.mod h1:fa/d1lAdUHxuc1jedx30ZfNG573oQTQmUni3N6pcW+0= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= +github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= +github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= +github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +github.com/valyala/fasttemplate v1.1.0/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191112182307-2180aed22343 h1:00ohfJ4K98s3m6BGUoBd8nyfp4Yl0GoIKvw5abItTjI= +golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191115151921-52ab43148777/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..a7606e6 --- /dev/null +++ b/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "os" + "weather-data/api" + "weather-data/storage" + "weather-data/weathersource" +) + +// const influx stuff +const influxToken = "Pg34RXv4QE488ayCeY6JX4p3EwcoNhLu-zPQDn9zxirFmc0og9DCgamf02jrVEAN9mS4mT05nprGUkSrKQAUjA==" +const influxWeatherBucket = "weatherdata" +const influxOrganization = "weather-org" +const influxURL = "https://influx.gamlo-cloud.de" + +//const mqtt stuff +const mqttURL = "tcp://gamlo-cloud.de:1883" +const mqttTopic = "sensor/#" +const defaultLocation = "default location" + +//const api stuff +const apiAddress = ":10000" + +func main() { + //setup a new weatherstorage -> InfluxDB + var weatherStorage storage.WeatherStorage + weatherStorage, err := storage.NewInfluxStorage(influxToken, influxWeatherBucket, influxOrganization, influxURL) + if err != nil { + os.Exit(1) + } + defer weatherStorage.Close() + + var newWeatherDataHandler weathersource.NewWeatherDataCallbackFunc + newWeatherDataHandler = func(wd storage.WeatherData) { + weatherStorage.Save(wd) + } + + //add a new weatherData source -> mqtt + var weatherSource weathersource.WeatherSource + weatherSource, err = weathersource.NewMqttSource(mqttURL, mqttTopic, defaultLocation) + if err != nil { + os.Exit(1) + } + defer weatherSource.Close() + weatherSource.AddNewWeatherDataCallback(newWeatherDataHandler) + + //setup a API -> REST + var weatherAPI api.WeatherAPI + weatherAPI = api.NewRestAPI(apiAddress, weatherStorage) + + err = weatherAPI.Start() + if err != nil { + os.Exit(1) + } + defer weatherAPI.Close() +} diff --git a/storage/influxdb-storage.go b/storage/influxdb-storage.go new file mode 100644 index 0000000..b61173a --- /dev/null +++ b/storage/influxdb-storage.go @@ -0,0 +1,51 @@ +package storage + +import ( + influxdb2 "github.com/influxdata/influxdb-client-go/v2" +) + +//influxStorage is the Storage implementation for InfluxDB +type influxStorage struct { + token string + bucket string + organization string + url string + client influxdb2.Client +} + +//NewInfluxStorage Factory +func NewInfluxStorage(token, bucket, organization, url string) (*influxStorage, error) { + influx := new(influxStorage) + influx.bucket = bucket + influx.token = token + influx.organization = organization + influx.url = url + influx.client = influxdb2.NewClient(url, token) + return influx, nil +} + +//Save WeatherData to InfluxDB +func (storage *influxStorage) Save(data WeatherData) error { + tags := map[string]string{ + "location": data.Location} + + fields := map[string]interface{}{ + "temperature": data.Temperature, + "humidity": data.Humidity, + "preasure": data.Preasure} + + datapoint := influxdb2.NewPoint("new2", + tags, + fields, + data.TimeStamp) + + writeAPI := storage.client.WriteAPI(storage.organization, storage.bucket) + writeAPI.WritePoint(datapoint) + return nil +} + +//Close InfluxDB connection +func (storage *influxStorage) Close() error { + storage.client.Close() + return nil +} diff --git a/storage/inmemory-storage.go b/storage/inmemory-storage.go new file mode 100644 index 0000000..82be054 --- /dev/null +++ b/storage/inmemory-storage.go @@ -0,0 +1 @@ +package storage diff --git a/storage/weather-data.go b/storage/weather-data.go new file mode 100644 index 0000000..d6e2cf5 --- /dev/null +++ b/storage/weather-data.go @@ -0,0 +1,33 @@ +package storage + +import ( + "math/rand" + "time" +) + +//WeatherStorage interface for different storage-implementations of weather data +type WeatherStorage interface { + Save(WeatherData) error + Close() error +} + +//WeatherData type +type WeatherData struct { + Humidity float64 `json:"humidity"` + Preasure float64 `json:"airPreasure"` + Temperature float64 `json:"temperature"` + Location string `json:"location"` + TimeStamp time.Time `json:"timestamp"` +} + +//NewRandomWeatherData creates random WeatherData with given Location +func NewRandomWeatherData(location string) WeatherData { + rand.Seed(time.Now().UnixNano()) + var data WeatherData + data.Humidity = rand.Float64() * 100 + data.Preasure = rand.Float64()*80 + 960 + data.Temperature = rand.Float64()*40 - 5 + data.Location = location + data.TimeStamp = time.Now() + return data +} diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go new file mode 100644 index 0000000..bc03dfa --- /dev/null +++ b/weathersource/mqtt-source.go @@ -0,0 +1,85 @@ +package weathersource + +import ( + "strconv" + "strings" + "time" + "weather-data/storage" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type mqttWeatherSource struct { + url string + topic string + mqttClient mqtt.Client + lastData storage.WeatherData + weatherSource WeatherSourceBase +} + +//Close mqtt client +func (source *mqttWeatherSource) Close() { + source.mqttClient.Disconnect(2) +} + +//NewMqttSource Factory function for mqttWeatherSource +func NewMqttSource(url, topic, defaultLocation string) (*mqttWeatherSource, error) { + source := new(mqttWeatherSource) + source.url = url + + opts := mqtt.NewClientOptions().AddBroker(url) + + //mqtt + opts.SetKeepAlive(60 * time.Second) + opts.SetDefaultPublishHandler(source.mqttMessageHandler()) + opts.SetPingTimeout(1 * time.Second) + + source.mqttClient = mqtt.NewClient(opts) + source.lastData.Location = defaultLocation + + if token := source.mqttClient.Connect(); token.Wait() && token.Error() != nil { + return nil, token.Error() + } + + if token := source.mqttClient.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil { + return nil, token.Error() + } + + return source, nil +} + +//mqttMessageHandler returns a function that handles incoming mqtt-messages +func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler { + + return func(client mqtt.Client, msg mqtt.Message) { + + diff := time.Now().Sub(source.lastData.TimeStamp) + if diff >= time.Second && diff < time.Hour*6 { + source.newWeatherData(source.lastData) + } + + if strings.HasSuffix(msg.Topic(), "pressure") { + source.lastData.Preasure, _ = strconv.ParseFloat(string(msg.Payload()), 64) + source.lastData.TimeStamp = time.Now() + } + if strings.HasSuffix(msg.Topic(), "temp") { + source.lastData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64) + source.lastData.TimeStamp = time.Now() + } + if strings.HasSuffix(msg.Topic(), "humidity") { + source.lastData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64) + source.lastData.TimeStamp = time.Now() + } + } +} + +//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data +func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDataCallbackFunc) { + source.weatherSource.AddNewWeatherDataCallback(callback) +} + +func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) { + for _, callback := range source.weatherSource.newWeatherDataCallbackFuncs { + callback(datapoint) + } +} diff --git a/weathersource/weather-source.go b/weathersource/weather-source.go new file mode 100644 index 0000000..f5ddd00 --- /dev/null +++ b/weathersource/weather-source.go @@ -0,0 +1,29 @@ +package weathersource + +import "weather-data/storage" + +//NewWeatherDataCallbackFunc Function-Signature for new weather data callback function +type NewWeatherDataCallbackFunc func(storage.WeatherData) + +//WeatherSource is the interface for different weather-source implementations +type WeatherSource interface { + AddNewWeatherDataCallback(NewWeatherDataCallbackFunc) + Close() +} + +//WeatherSourceBase is the lowlevel-implementation of the WeatherSource interface, intended to used by highlevel-implementations +type WeatherSourceBase struct { + newWeatherDataCallbackFuncs []NewWeatherDataCallbackFunc +} + +//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data +func (source *WeatherSourceBase) AddNewWeatherDataCallback(callback NewWeatherDataCallbackFunc) { + source.newWeatherDataCallbackFuncs = append(source.newWeatherDataCallbackFuncs, callback) +} + +//NewWeatherData executes all newWeatherDataCallbackFuncs for this datapoint +func (source *WeatherSourceBase) NewWeatherData(datapoint storage.WeatherData) { + for _, callback := range source.newWeatherDataCallbackFuncs { + callback(datapoint) + } +}