initial weather-api

This commit is contained in:
Joel Schmid 2021-03-05 23:20:47 +01:00
parent 87891b840f
commit dcefbdecad
10 changed files with 428 additions and 0 deletions

86
api/rest-api.go Normal file
View file

@ -0,0 +1,86 @@
package api
import (
"encoding/json"
"fmt"
"net/http"
"weather-data/storage"
"weather-data/weathersource"
"github.com/gorilla/mux"
)
type weatherRestApi struct {
connection string
weaterStorage storage.WeatherStorage
weatherSource weathersource.WeatherSourceBase
}
//SetupAPI sets the REST-API up
func NewRestAPI(connection string, weatherStorage storage.WeatherStorage) *weatherRestApi {
api := new(weatherRestApi)
api.connection = connection
api.weaterStorage = weatherStorage
return api
}
//Start a new Rest-API instance
func (api *weatherRestApi) Start() error {
return http.ListenAndServe(api.connection, api.handleRequests())
}
func (api *weatherRestApi) Close() {
}
func (api *weatherRestApi) handleRequests() *mux.Router {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", api.homePageHandler)
router.HandleFunc("/random", api.randomWeatherHandler)
router.HandleFunc("/randomlist", api.randomWeatherListHandler)
router.HandleFunc("/addData", api.addDataHandler)
return router
}
func (api *weatherRestApi) randomWeatherHandler(w http.ResponseWriter, r *http.Request) {
datapoint := storage.NewRandomWeatherData("swablab")
w.Header().Add("content-type", "application/json")
json.NewEncoder(w).Encode(datapoint)
}
func (api *weatherRestApi) randomWeatherListHandler(w http.ResponseWriter, r *http.Request) {
var datapoints = make([]storage.WeatherData, 0)
for i := 0; i < 10; i++ {
datapoints = append(datapoints, storage.NewRandomWeatherData("swablab"))
}
w.Header().Add("content-type", "application/json")
json.NewEncoder(w).Encode(datapoints)
}
func (api *weatherRestApi) addDataHandler(w http.ResponseWriter, r *http.Request) {
var data storage.WeatherData
err := json.NewDecoder(r.Body).Decode(&data)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
api.addNewWeatherData(data)
}
func (api *weatherRestApi) homePageHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Welcome to the Weather API!")
}
//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data
func (api *weatherRestApi) AddNewWeatherDataCallback(callback weathersource.NewWeatherDataCallbackFunc) {
api.weatherSource.AddNewWeatherDataCallback(callback)
}
func (api *weatherRestApi) addNewWeatherData(weatherData storage.WeatherData) {
api.weatherSource.NewWeatherData(weatherData)
api.weaterStorage.Save(weatherData)
}

10
api/weather-api.go Normal file
View file

@ -0,0 +1,10 @@
package api
import "weather-data/weathersource"
//WeatherAPI is the common interface for different apis
type WeatherAPI interface {
Start() error
Close()
weathersource.WeatherSource
}

9
go.mod Normal file
View file

@ -0,0 +1,9 @@
module weather-data
go 1.16
require (
github.com/eclipse/paho.mqtt.golang v1.3.2
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb-client-go/v2 v2.2.2
)

68
go.sum Normal file
View file

@ -0,0 +1,68 @@
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deepmap/oapi-codegen v1.3.13 h1:9HKGCsdJqE4dnrQ8VerFS0/1ZOJPmAhN+g8xgp8y3K4=
github.com/deepmap/oapi-codegen v1.3.13/go.mod h1:WAmG5dWY8/PYHt4vKxlt90NsbHMAOCiteYKZMiIRfOo=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/eclipse/paho.mqtt.golang v1.3.2 h1:ICzfxSyrR8bOsh9l8JBBOwO1tc2C26oEyody0ml0L6E=
github.com/eclipse/paho.mqtt.golang v1.3.2/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/getkin/kin-openapi v0.13.0/go.mod h1:WGRs2ZMM1Q8LR1QBEwUxC6RJEfaBcD0s+pcEVXFuAjw=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/influxdata/influxdb-client-go/v2 v2.2.2 h1:O0CGIuIwQafvAxttAJ/VqMKfbWWn2Mt8rbOmaM2Zj4w=
github.com/influxdata/influxdb-client-go/v2 v2.2.2/go.mod h1:fa/d1lAdUHxuc1jedx30ZfNG573oQTQmUni3N6pcW+0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.1.0/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343 h1:00ohfJ4K98s3m6BGUoBd8nyfp4Yl0GoIKvw5abItTjI=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191115151921-52ab43148777/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

56
main.go Normal file
View file

@ -0,0 +1,56 @@
package main
import (
"os"
"weather-data/api"
"weather-data/storage"
"weather-data/weathersource"
)
// const influx stuff
const influxToken = "Pg34RXv4QE488ayCeY6JX4p3EwcoNhLu-zPQDn9zxirFmc0og9DCgamf02jrVEAN9mS4mT05nprGUkSrKQAUjA=="
const influxWeatherBucket = "weatherdata"
const influxOrganization = "weather-org"
const influxURL = "https://influx.gamlo-cloud.de"
//const mqtt stuff
const mqttURL = "tcp://gamlo-cloud.de:1883"
const mqttTopic = "sensor/#"
const defaultLocation = "default location"
//const api stuff
const apiAddress = ":10000"
func main() {
//setup a new weatherstorage -> InfluxDB
var weatherStorage storage.WeatherStorage
weatherStorage, err := storage.NewInfluxStorage(influxToken, influxWeatherBucket, influxOrganization, influxURL)
if err != nil {
os.Exit(1)
}
defer weatherStorage.Close()
var newWeatherDataHandler weathersource.NewWeatherDataCallbackFunc
newWeatherDataHandler = func(wd storage.WeatherData) {
weatherStorage.Save(wd)
}
//add a new weatherData source -> mqtt
var weatherSource weathersource.WeatherSource
weatherSource, err = weathersource.NewMqttSource(mqttURL, mqttTopic, defaultLocation)
if err != nil {
os.Exit(1)
}
defer weatherSource.Close()
weatherSource.AddNewWeatherDataCallback(newWeatherDataHandler)
//setup a API -> REST
var weatherAPI api.WeatherAPI
weatherAPI = api.NewRestAPI(apiAddress, weatherStorage)
err = weatherAPI.Start()
if err != nil {
os.Exit(1)
}
defer weatherAPI.Close()
}

View file

@ -0,0 +1,51 @@
package storage
import (
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
//influxStorage is the Storage implementation for InfluxDB
type influxStorage struct {
token string
bucket string
organization string
url string
client influxdb2.Client
}
//NewInfluxStorage Factory
func NewInfluxStorage(token, bucket, organization, url string) (*influxStorage, error) {
influx := new(influxStorage)
influx.bucket = bucket
influx.token = token
influx.organization = organization
influx.url = url
influx.client = influxdb2.NewClient(url, token)
return influx, nil
}
//Save WeatherData to InfluxDB
func (storage *influxStorage) Save(data WeatherData) error {
tags := map[string]string{
"location": data.Location}
fields := map[string]interface{}{
"temperature": data.Temperature,
"humidity": data.Humidity,
"preasure": data.Preasure}
datapoint := influxdb2.NewPoint("new2",
tags,
fields,
data.TimeStamp)
writeAPI := storage.client.WriteAPI(storage.organization, storage.bucket)
writeAPI.WritePoint(datapoint)
return nil
}
//Close InfluxDB connection
func (storage *influxStorage) Close() error {
storage.client.Close()
return nil
}

View file

@ -0,0 +1 @@
package storage

33
storage/weather-data.go Normal file
View file

@ -0,0 +1,33 @@
package storage
import (
"math/rand"
"time"
)
//WeatherStorage interface for different storage-implementations of weather data
type WeatherStorage interface {
Save(WeatherData) error
Close() error
}
//WeatherData type
type WeatherData struct {
Humidity float64 `json:"humidity"`
Preasure float64 `json:"airPreasure"`
Temperature float64 `json:"temperature"`
Location string `json:"location"`
TimeStamp time.Time `json:"timestamp"`
}
//NewRandomWeatherData creates random WeatherData with given Location
func NewRandomWeatherData(location string) WeatherData {
rand.Seed(time.Now().UnixNano())
var data WeatherData
data.Humidity = rand.Float64() * 100
data.Preasure = rand.Float64()*80 + 960
data.Temperature = rand.Float64()*40 - 5
data.Location = location
data.TimeStamp = time.Now()
return data
}

View file

@ -0,0 +1,85 @@
package weathersource
import (
"strconv"
"strings"
"time"
"weather-data/storage"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type mqttWeatherSource struct {
url string
topic string
mqttClient mqtt.Client
lastData storage.WeatherData
weatherSource WeatherSourceBase
}
//Close mqtt client
func (source *mqttWeatherSource) Close() {
source.mqttClient.Disconnect(2)
}
//NewMqttSource Factory function for mqttWeatherSource
func NewMqttSource(url, topic, defaultLocation string) (*mqttWeatherSource, error) {
source := new(mqttWeatherSource)
source.url = url
opts := mqtt.NewClientOptions().AddBroker(url)
//mqtt
opts.SetKeepAlive(60 * time.Second)
opts.SetDefaultPublishHandler(source.mqttMessageHandler())
opts.SetPingTimeout(1 * time.Second)
source.mqttClient = mqtt.NewClient(opts)
source.lastData.Location = defaultLocation
if token := source.mqttClient.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
if token := source.mqttClient.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return source, nil
}
//mqttMessageHandler returns a function that handles incoming mqtt-messages
func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
return func(client mqtt.Client, msg mqtt.Message) {
diff := time.Now().Sub(source.lastData.TimeStamp)
if diff >= time.Second && diff < time.Hour*6 {
source.newWeatherData(source.lastData)
}
if strings.HasSuffix(msg.Topic(), "pressure") {
source.lastData.Preasure, _ = strconv.ParseFloat(string(msg.Payload()), 64)
source.lastData.TimeStamp = time.Now()
}
if strings.HasSuffix(msg.Topic(), "temp") {
source.lastData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64)
source.lastData.TimeStamp = time.Now()
}
if strings.HasSuffix(msg.Topic(), "humidity") {
source.lastData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64)
source.lastData.TimeStamp = time.Now()
}
}
}
//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data
func (source *mqttWeatherSource) AddNewWeatherDataCallback(callback NewWeatherDataCallbackFunc) {
source.weatherSource.AddNewWeatherDataCallback(callback)
}
func (source *mqttWeatherSource) newWeatherData(datapoint storage.WeatherData) {
for _, callback := range source.weatherSource.newWeatherDataCallbackFuncs {
callback(datapoint)
}
}

View file

@ -0,0 +1,29 @@
package weathersource
import "weather-data/storage"
//NewWeatherDataCallbackFunc Function-Signature for new weather data callback function
type NewWeatherDataCallbackFunc func(storage.WeatherData)
//WeatherSource is the interface for different weather-source implementations
type WeatherSource interface {
AddNewWeatherDataCallback(NewWeatherDataCallbackFunc)
Close()
}
//WeatherSourceBase is the lowlevel-implementation of the WeatherSource interface, intended to used by highlevel-implementations
type WeatherSourceBase struct {
newWeatherDataCallbackFuncs []NewWeatherDataCallbackFunc
}
//AddNewWeatherDataCallback adds a new callbackMethod for incoming weather data
func (source *WeatherSourceBase) AddNewWeatherDataCallback(callback NewWeatherDataCallbackFunc) {
source.newWeatherDataCallbackFuncs = append(source.newWeatherDataCallbackFuncs, callback)
}
//NewWeatherData executes all newWeatherDataCallbackFuncs for this datapoint
func (source *WeatherSourceBase) NewWeatherData(datapoint storage.WeatherData) {
for _, callback := range source.newWeatherDataCallbackFuncs {
callback(datapoint)
}
}