refactoring & generic weatherdata
This commit is contained in:
parent
7fb0ebff20
commit
641c05afc6
4 changed files with 91 additions and 110 deletions
|
@ -107,6 +107,8 @@ func (api *weatherRestApi) addDataHandler(w http.ResponseWriter, r *http.Request
|
|||
return
|
||||
}
|
||||
|
||||
fmt.Println(r.Body)
|
||||
|
||||
var data storage.WeatherData
|
||||
err := json.NewDecoder(r.Body).Decode(&data)
|
||||
if err != nil {
|
||||
|
@ -114,6 +116,8 @@ func (api *weatherRestApi) addDataHandler(w http.ResponseWriter, r *http.Request
|
|||
return
|
||||
}
|
||||
|
||||
fmt.Println(data)
|
||||
|
||||
err = api.addNewWeatherData(data)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
|
|
|
@ -33,11 +33,11 @@ func (storage *influxStorage) Save(data WeatherData) error {
|
|||
tags := map[string]string{
|
||||
"sensorId": data.SensorId.String()}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"temperature": data.Temperature,
|
||||
"humidity": data.Humidity,
|
||||
"pressure": data.Pressure,
|
||||
"co2level": data.CO2Level}
|
||||
fields := make(map[string]interface{})
|
||||
|
||||
for k, v := range data.Values {
|
||||
fields[string(k)] = v
|
||||
}
|
||||
|
||||
datapoint := influxdb2.NewPoint(storage.measurement,
|
||||
tags,
|
||||
|
@ -46,7 +46,6 @@ func (storage *influxStorage) Save(data WeatherData) error {
|
|||
|
||||
writeAPI := storage.client.WriteAPI(storage.config.Organization, storage.config.Bucket)
|
||||
writeAPI.WritePoint(datapoint)
|
||||
log.Print("Written weather data point to influx-db")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -61,24 +60,11 @@ func (storage *influxStorage) createFluxQuery(query *WeatherQuery) string {
|
|||
fields := ""
|
||||
concat := ""
|
||||
|
||||
if query.Temperature {
|
||||
fields = fmt.Sprintf("%v %v r._field == \"temperature\"", fields, concat)
|
||||
concat = "or"
|
||||
}
|
||||
|
||||
if query.Humidity {
|
||||
fields = fmt.Sprintf("%v %v r._field == \"humidity\"", fields, concat)
|
||||
concat = "or"
|
||||
}
|
||||
|
||||
if query.Pressure {
|
||||
fields = fmt.Sprintf("%v %v r._field == \"pressure\"", fields, concat)
|
||||
concat = "or"
|
||||
}
|
||||
|
||||
if query.Co2Level {
|
||||
fields = fmt.Sprintf("%v %v r._field == \"co2level\"", fields, concat)
|
||||
concat = "or"
|
||||
for _, sensorValueType := range GetSensorValueTypes() {
|
||||
if query.Values[sensorValueType] {
|
||||
fields = fmt.Sprintf("%v %v r._field == \"%v\"", fields, concat, string(sensorValueType))
|
||||
concat = "or"
|
||||
}
|
||||
}
|
||||
|
||||
fields = fmt.Sprintf(" and ( %v )", fields)
|
||||
|
@ -112,17 +98,10 @@ func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, er
|
|||
|
||||
data, contained := containsWeatherData(queryResults, sensorId, timestamp)
|
||||
|
||||
if result.Record().Field() == "temperature" {
|
||||
data.Temperature = result.Record().Value().(float64)
|
||||
}
|
||||
if result.Record().Field() == "pressure" {
|
||||
data.Pressure = result.Record().Value().(float64)
|
||||
}
|
||||
if result.Record().Field() == "humidity" {
|
||||
data.Humidity = result.Record().Value().(float64)
|
||||
}
|
||||
if result.Record().Field() == "co2level" {
|
||||
data.CO2Level = result.Record().Value().(float64)
|
||||
for _, sensorValueType := range GetSensorValueTypes() {
|
||||
if result.Record().Field() == string(sensorValueType) {
|
||||
data.Values[sensorValueType] = result.Record().Value().(float64)
|
||||
}
|
||||
}
|
||||
|
||||
if !contained {
|
||||
|
@ -142,6 +121,7 @@ func containsWeatherData(weatherData []*WeatherData, sensorId uuid.UUID, timesta
|
|||
}
|
||||
}
|
||||
var newData WeatherData
|
||||
newData.Values = make(map[SensorValueType]float64)
|
||||
return &newData, false
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,19 @@ import (
|
|||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type SensorValueType string
|
||||
|
||||
const (
|
||||
Temperature SensorValueType = "temperature"
|
||||
Pressure SensorValueType = "pressure"
|
||||
Humidity SensorValueType = "humidity"
|
||||
Co2Level SensorValueType = "co2level"
|
||||
)
|
||||
|
||||
func GetSensorValueTypes() []SensorValueType {
|
||||
return []SensorValueType{Temperature, Pressure, Humidity, Co2Level}
|
||||
}
|
||||
|
||||
//WeatherStorage interface for different storage-implementations of weather data
|
||||
type WeatherStorage interface {
|
||||
Save(WeatherData) error
|
||||
|
@ -27,38 +40,37 @@ type SensorRegistry interface {
|
|||
|
||||
//WeatherData type
|
||||
type WeatherData struct {
|
||||
Humidity float64 `json:"humidity"`
|
||||
Pressure float64 `json:"pressure"`
|
||||
Temperature float64 `json:"temperature"`
|
||||
CO2Level float64 `json:"co2level"`
|
||||
SensorId uuid.UUID `json:"sensorId"`
|
||||
TimeStamp time.Time `json:"timestamp"`
|
||||
Values map[SensorValueType]float64
|
||||
SensorId uuid.UUID `json:"sensorId"`
|
||||
TimeStamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func (data *WeatherData) GetQueriedValues(query *WeatherQuery) map[string]string {
|
||||
result := map[string]string{
|
||||
func (data *WeatherData) OnlyQueriedValues(query *WeatherQuery) *WeatherData {
|
||||
for _, sensorValueType := range GetSensorValueTypes() {
|
||||
if !query.Values[sensorValueType] {
|
||||
delete(data.Values, sensorValueType)
|
||||
}
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func (data *WeatherData) ToStringMap() map[string]string {
|
||||
mappedData := map[string]string{
|
||||
"sensorId": data.SensorId.String(),
|
||||
"timestamp": data.TimeStamp.String(),
|
||||
"timeStamp": data.TimeStamp.String(),
|
||||
}
|
||||
if query.Temperature {
|
||||
result["temperature"] = strconv.FormatFloat(data.Temperature, 'f', -1, 32)
|
||||
|
||||
for sensorValueType, value := range data.Values {
|
||||
mappedData[string(sensorValueType)] = strconv.FormatFloat(value, 'f', -1, 64)
|
||||
}
|
||||
if query.Pressure {
|
||||
result["pressure"] = strconv.FormatFloat(data.Pressure, 'f', -1, 32)
|
||||
}
|
||||
if query.Co2Level {
|
||||
result["co2level"] = strconv.FormatFloat(data.CO2Level, 'f', -1, 32)
|
||||
}
|
||||
if query.Humidity {
|
||||
result["humidity"] = strconv.FormatFloat(data.Humidity, 'f', -1, 32)
|
||||
}
|
||||
return result
|
||||
|
||||
return mappedData
|
||||
}
|
||||
|
||||
func GetOnlyQueriedFields(dataPoints []*WeatherData, query *WeatherQuery) []map[string]string {
|
||||
var result []map[string]string
|
||||
for _, data := range dataPoints {
|
||||
result = append(result, data.GetQueriedValues(query))
|
||||
result = append(result, data.OnlyQueriedValues(query).ToStringMap())
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
@ -73,23 +85,20 @@ type WeatherSensor struct {
|
|||
}
|
||||
|
||||
type WeatherQuery struct {
|
||||
Start time.Time
|
||||
End time.Time
|
||||
SensorId uuid.UUID
|
||||
Temperature bool
|
||||
Humidity bool
|
||||
Pressure bool
|
||||
Co2Level bool
|
||||
Start time.Time
|
||||
End time.Time
|
||||
SensorId uuid.UUID
|
||||
Values map[SensorValueType]bool
|
||||
}
|
||||
|
||||
func (data *WeatherQuery) Init() {
|
||||
data.Start = time.Now().Add(-1 * time.Hour * 24 * 14)
|
||||
data.End = time.Now()
|
||||
data.SensorId = uuid.Nil
|
||||
data.Temperature = true
|
||||
data.Humidity = true
|
||||
data.Pressure = true
|
||||
data.Co2Level = true
|
||||
func (query *WeatherQuery) Init() {
|
||||
query.Start = time.Now().Add(-1 * time.Hour * 24 * 14)
|
||||
query.End = time.Now()
|
||||
query.SensorId = uuid.Nil
|
||||
query.Values = make(map[SensorValueType]bool)
|
||||
for _, sensorValueType := range GetSensorValueTypes() {
|
||||
query.Values[sensorValueType] = true
|
||||
}
|
||||
}
|
||||
|
||||
func ParseFromUrlQuery(query url.Values) (*WeatherQuery, error) {
|
||||
|
@ -98,10 +107,6 @@ func ParseFromUrlQuery(query url.Values) (*WeatherQuery, error) {
|
|||
|
||||
start := query.Get("start")
|
||||
end := query.Get("end")
|
||||
temperature := query.Get("temperature")
|
||||
humidity := query.Get("humidity")
|
||||
pressure := query.Get("pressure")
|
||||
co2level := query.Get("co2level")
|
||||
|
||||
if len(start) != 0 {
|
||||
if tval, err := time.Parse(time.RFC3339, start); err == nil {
|
||||
|
@ -121,20 +126,11 @@ func ParseFromUrlQuery(query url.Values) (*WeatherQuery, error) {
|
|||
}
|
||||
}
|
||||
|
||||
if bval, err := strconv.ParseBool(temperature); err == nil {
|
||||
result.Temperature = bval
|
||||
}
|
||||
|
||||
if bval, err := strconv.ParseBool(humidity); err == nil {
|
||||
result.Humidity = bval
|
||||
}
|
||||
|
||||
if bval, err := strconv.ParseBool(pressure); err == nil {
|
||||
result.Pressure = bval
|
||||
}
|
||||
|
||||
if bval, err := strconv.ParseBool(co2level); err == nil {
|
||||
result.Co2Level = bval
|
||||
for _, sensorValueType := range GetSensorValueTypes() {
|
||||
queryParam := query.Get(string(sensorValueType))
|
||||
if bval, err := strconv.ParseBool(queryParam); err == nil {
|
||||
result.Values[sensorValueType] = bval
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
@ -144,10 +140,10 @@ func ParseFromUrlQuery(query url.Values) (*WeatherQuery, error) {
|
|||
func NewRandomWeatherData(sensorId uuid.UUID) WeatherData {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
var data WeatherData
|
||||
data.Humidity = rand.Float64() * 100
|
||||
data.Pressure = rand.Float64()*80 + 960
|
||||
data.Temperature = rand.Float64()*40 - 5
|
||||
data.CO2Level = rand.Float64()*50 + 375
|
||||
data.Values[Humidity] = rand.Float64() * 100
|
||||
data.Values[Pressure] = rand.Float64()*80 + 960
|
||||
data.Values[Temperature] = rand.Float64()*40 - 5
|
||||
data.Values[Co2Level] = rand.Float64()*50 + 375
|
||||
data.SensorId = sensorId
|
||||
data.TimeStamp = time.Now()
|
||||
return data
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"log"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"weather-data/config"
|
||||
"weather-data/storage"
|
||||
|
@ -13,7 +12,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
var mqttTopicRegexPattern = "(^sensor/)([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})(/(temp|pressure|humidity|co2level)$)"
|
||||
var mqttTopicRegexPattern = "(^sensor)/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})/(.*)"
|
||||
|
||||
var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern)
|
||||
|
||||
|
@ -79,26 +78,28 @@ func (source *mqttWeatherSource) mqttMessageHandler() mqtt.MessageHandler {
|
|||
|
||||
if !found {
|
||||
lastWeatherData = new(storage.WeatherData)
|
||||
lastWeatherData.Values = make(map[storage.SensorValueType]float64)
|
||||
lastWeatherData.SensorId = sensorId
|
||||
source.lastWeatherDataPoints = append(source.lastWeatherDataPoints, lastWeatherData)
|
||||
}
|
||||
|
||||
if strings.HasSuffix(msg.Topic(), "pressure") {
|
||||
lastWeatherData.Pressure, _ = strconv.ParseFloat(string(msg.Payload()), 64)
|
||||
lastWeatherData.TimeStamp = time.Now()
|
||||
value, err := strconv.ParseFloat(string(msg.Payload()), 64)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(msg.Topic(), "temp") {
|
||||
lastWeatherData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64)
|
||||
lastWeatherData.TimeStamp = time.Now()
|
||||
}
|
||||
if strings.HasSuffix(msg.Topic(), "humidity") {
|
||||
lastWeatherData.Temperature, _ = strconv.ParseFloat(string(msg.Payload()), 64)
|
||||
lastWeatherData.TimeStamp = time.Now()
|
||||
}
|
||||
if strings.HasSuffix(msg.Topic(), "co2level") {
|
||||
lastWeatherData.CO2Level, _ = strconv.ParseFloat(string(msg.Payload()), 64)
|
||||
lastWeatherData.TimeStamp = time.Now()
|
||||
|
||||
sensorValueType := storage.SensorValueType(regexTopic.FindStringSubmatch(msg.Topic())[3])
|
||||
lastWeatherData.Values[sensorValueType] = value
|
||||
lastWeatherData.TimeStamp = time.Now()
|
||||
|
||||
/* only use predefined sensorValueTypes
|
||||
for _, sensorValueType := range storage.GetSensorValueTypes() {
|
||||
if strings.HasSuffix(msg.Topic(), string(sensorValueType)) {
|
||||
lastWeatherData.Values[sensorValueType], _ = strconv.ParseFloat(string(msg.Payload()), 64)
|
||||
lastWeatherData.TimeStamp = time.Now()
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue