diff --git a/config/config.go b/config/config.go index 355932c..71c9f8d 100644 --- a/config/config.go +++ b/config/config.go @@ -6,125 +6,94 @@ import ( "time" ) -// const influx stuff -const influxToken = "default-token" -const influxWeatherBucket = "default-bucket" -const influxOrganization = "default-org" -const influxURL = "https://influx.default-address.com" - -//const mqtt stuff -const mqttURL = "tcp://default-address.com:1883" -const mqttTopic = "sensor/#" -const defaultLocation = "default-location" -const mqttUser = "weather-api" -const mqttPassword = "weather-api" -const useAnonymousMqttAuthentication = false -const mqttPublishInterval = time.Second -const mqttMinDistToLastValue = 250 * time.Millisecond - -//const mongodb stuff -const mongodbURL = "mongodb://default-address.com:27017" -const mongodbName = "weathersensors" -const mongodbCollection = "sensordata" -const mongodbUser = "mongoUser" -const mongdbPassword = "mongoPassword" - -//other config stuff -const allowUnregisteredSensors = false - -//influx config -func GetInfluxUrl() string { - return getVariableWithDefault("WEATHER-API-INFLUX_URL", influxURL) +type MongoConfig struct { + Host string + Database string + Username string + Password string + Collection string } -func GetInfluxToken() string { - return getVariableWithDefault("WEATHER-API-INFLUX_TOKEN", influxToken) +type InfluxConfig struct { + Host string + Token string + Organization string + Bucket string } -func GetInfluxOrganization() string { - return getVariableWithDefault("WEATHER-API-INFLUX_ORG", influxOrganization) +type MqttConfig struct { + Host string + Topic string + Username string + Password string + PublishInterval time.Duration + MinDistToLastValue time.Duration + AllowAnonymousAuthentication bool } -func GetInfluxBucket() string { - return getVariableWithDefault("WEATHER-API-INFLUX_BUCKET", influxWeatherBucket) +var MongoConfiguration = MongoConfig{ + Host: getEnv("MONGO_HOST", "localhost:27017"), + Database: getEnv("MONGO_DB", "weathersensors"), + Username: getEnv("MONGO_USER", "admin"), + Password: getEnv("MONGO_PASS", "admin"), + Collection: getEnv("MONGO_COLLECTION", "sensors"), } -//mqtt config -func GetMqttUrl() string { - return getVariableWithDefault("WEATHER-API-MQTT_URL", mqttURL) +var InfluxConfiguration = InfluxConfig{ + Host: getEnv("INFLUX_HOST", "localhost:8086"), + Token: getEnv("INFLUX_TOKEN", "token"), + Organization: getEnv("INFLUX_ORG", "org_name"), + Bucket: getEnv("INFLUX_BUCKET", "bucket_name"), } -func GetMqttTopic() string { - return getVariableWithDefault("WEATHER-API-MQTT_TOPIC", mqttTopic) +var MqttConfiguration = MqttConfig{ + Host: getEnv("MQTT_HOST", "localhost:1883"), + Topic: getEnv("MQTT_TOPIC", "sensor/#"), + Username: getEnv("MQTT_USER", "mqtt"), + Password: getEnv("MQTT_PASS", "mqtt"), + PublishInterval: getEnvDuration("MQTT_PUBLISH_INTERVALL", time.Millisecond*2500), + MinDistToLastValue: getEnvDuration("MQTT_MIN_DIST_LAST_VALUE", time.Millisecond*250), + AllowAnonymousAuthentication: getEnvBool("MQTT_ANONYMOUS", false), } -func GetMqttUser() string { - return getVariableWithDefault("WEATHER-API-MQTT_USER", mqttTopic) -} - -func GetMqttPassword() string { - return getVariableWithDefault("WEATHER-API-MQTT_PASSWORD", mqttTopic) -} - -func UseAnonymousMqttAuthentication() bool { - return getVariableWithDefaultBool("WEATHER-API-ANONYMOUS_MQTT_AUTHENTICATION", useAnonymousMqttAuthentication) -} - -func MqttPublishInterval() time.Duration { - interval, err := strconv.ParseInt(os.Getenv("WEATHER-API-MQTT_PUBLISH_INTERVAL"), 10, 64) - if err != nil { - return mqttPublishInterval - } - return time.Millisecond * time.Duration(interval) -} - -func MqttMinDistToLastValue() time.Duration { - interval, err := strconv.ParseInt(os.Getenv("WEATHER-API-MQTT_MIN_DIST_TO_LAST_VALUE"), 10, 64) - if err != nil { - return mqttMinDistToLastValue - } - return time.Millisecond * time.Duration(interval) -} - -//mongodb config -func GetMongodbURL() string { - return getVariableWithDefault("WEATHER-API-MONGODB_URL", mongodbURL) -} - -func GetMongodbName() string { - return getVariableWithDefault("WEATHER-API-MONGODB_NAME", mongodbName) -} - -func GetMongodbCollection() string { - return getVariableWithDefault("WEATHER-API-MONGODB_COLLECTION", mongodbCollection) -} - -func GetMongodbUserName() string { - return getVariableWithDefault("WEATHER-API-MONGODB_USER", mongodbUser) -} - -func GetMongodbPassword() string { - return getVariableWithDefault("WEATHER-API-MONGODB_PASSWORD", mongdbPassword) -} - -//common config -func AllowUnregisteredSensors() bool { - return getVariableWithDefaultBool("WEATHER-API-ALLOW_UNREGISTERED_SENSORS", allowUnregisteredSensors) -} +var AllowUnregisteredSensors = getEnvBool("ALLOW_UNREGISTERED_SENSORS", false) //helper -func getVariableWithDefault(variableKey, defaultValue string) string { - variable := os.Getenv(variableKey) - if len(variable) == 0 { - return defaultValue +func getEnv(key, fallback string) string { + if value, ok := os.LookupEnv(key); ok { + return value } - return variable + + return fallback } -func getVariableWithDefaultBool(variableKey string, defaultValue bool) bool { - ok, err := strconv.ParseBool(os.Getenv(variableKey)) - if err != nil { - return defaultValue +func getEnvBool(key string, fallback bool) bool { + + if value, ok := os.LookupEnv(key); ok { + if bValue, err := strconv.ParseBool(value); err == nil { + return bValue + } } - return ok + + return fallback +} + +func getEnvInt(key string, fallback int64) int64 { + if value, ok := os.LookupEnv(key); ok { + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + return iValue + } + } + + return fallback +} + +func getEnvDuration(key string, fallback time.Duration) time.Duration { + if value, ok := os.LookupEnv(key); ok { + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + return time.Millisecond * time.Duration(iValue) + } + } + + return fallback } diff --git a/main.go b/main.go index 861637f..c2323d0 100644 --- a/main.go +++ b/main.go @@ -18,47 +18,22 @@ var weatherAPI api.WeatherAPI func main() { //setup new sensorRegistry -> MongodbSensorRegistry var err error - sensorRegistry, err = storage.NewMongodbSensorRegistry( - config.GetMongodbURL(), - config.GetMongodbName(), - config.GetMongodbCollection(), - config.GetMongodbUserName(), - config.GetMongodbPassword()) - - if err != nil { + if sensorRegistry, err = storage.NewMongodbSensorRegistry(config.MongoConfiguration); err != nil { os.Exit(1) } defer sensorRegistry.Close() //setup a new weatherstorage -> InfluxDB - weatherStorage, err = storage.NewInfluxStorage( - config.GetInfluxToken(), - config.GetInfluxBucket(), - config.GetInfluxOrganization(), - config.GetInfluxUrl()) - - if err != nil { + if weatherStorage, err = storage.NewInfluxStorage(config.InfluxConfiguration); err != nil { os.Exit(1) } defer weatherStorage.Close() //setup new weatherData source -> mqtt - if config.UseAnonymousMqttAuthentication() { - weatherSource, err = weathersource.NewAnonymousMqttSource( - config.GetMqttUrl(), - config.GetMqttTopic()) - } else { - weatherSource, err = weathersource.NewMqttSource( - config.GetMqttUrl(), - config.GetMqttTopic(), - config.GetMqttUser(), - config.GetMqttPassword()) - } - if err != nil { + if weatherSource, err = weathersource.NewMqttSource(config.MqttConfiguration); err != nil { fmt.Println("Could not connect to mqtt:", err.Error()) os.Exit(1) } - defer weatherSource.Close() weatherSource.AddNewWeatherDataCallback(handleNewWeatherData) @@ -75,7 +50,7 @@ func main() { func handleNewWeatherData(wd storage.WeatherData) error { _, couldResolve := sensorRegistry.ResolveSensorById(wd.SensorId) - if !config.AllowUnregisteredSensors() && !couldResolve { + if !config.AllowUnregisteredSensors && !couldResolve { return errors.New("sensor have to be registered") } weatherStorage.Save(wd) diff --git a/run_default.ps1 b/run_default.ps1 index 18f0f55..430f793 100644 --- a/run_default.ps1 +++ b/run_default.ps1 @@ -2,26 +2,26 @@ go build main.go #set environment variables for weather-api configuration -Set-Item -Path "Env:WEATHER-API-INFLUX_URL" -Value "https://influx.default-address.com" -Set-Item -Path "Env:WEATHER-API-INFLUX_TOKEN" -Value "default-token" -Set-Item -Path "Env:WEATHER-API-INFLUX_ORG" -Value "default-org" -Set-Item -Path "Env:WEATHER-API-INFLUX_BUCKET" -Value "default-bucket" +Set-Item -Path "Env:INFLUX_HOST" -Value "localhost:8086" +Set-Item -Path "Env:INFLUX_TOKEN" -Value "token" +Set-Item -Path "Env:INFLUX_ORG" -Value "org-name" +Set-Item -Path "Env:INFLUX_BUCKET" -Value "bucket-name" -Set-Item -Path "Env:WEATHER-API-MQTT_URL" -Value "tcp://default-address.com:1883" -Set-Item -Path "Env:WEATHER-API-MQTT_TOPIC" -Value "sensor/#" -Set-Item -Path "Env:WEATHER-API-MQTT_USER" -Value "weather-api" -Set-Item -Path "Env:WEATHER-API-MQTT_PASSWORD" -Value "weather-api" -Set-Item -Path "Env:WEATHER-API-MQTT_PUBLISH_INTERVAL" -Value "2500" -Set-Item -Path "Env:WEATHER-API-MQTT_MIN_DIST_TO_LAST_VALUE" -Value "250" +Set-Item -Path "Env:MQTT_HOST" -Value "localhost:1883" +Set-Item -Path "Env:MQTT_TOPIC" -Value "sensor/#" +Set-Item -Path "Env:MQTT_USER" -Value "mqtt" +Set-Item -Path "Env:MQTT_PASS" -Value "mqtt" +Set-Item -Path "Env:MQTT_PUBLISH_INTERVALL" -Value "2500" +Set-Item -Path "Env:MQTT_MIN_DIST_LAST_VALUE" -Value "250" +Set-Item -Path "Env:MQTT_ANONYMOUS" -Value "false" -Set-Item -Path "Env:WEATHER-API-MONGODB_URL" -Value "mongodb://default-address.com:27017" -Set-Item -Path "Env:WEATHER-API-MONGODB_NAME" -Value "weathersensors" -Set-Item -Path "Env:WEATHER-API-MONGODB_COLLECTION" -Value "sensordata" -Set-Item -Path "Env:WEATHER-API-MONGODB_USER" -Value "mongoUser" -Set-Item -Path "Env:WEATHER-API-MONGODB_PASSWORD" -Value "mongoPassword" +Set-Item -Path "Env:MONGO_HOST" -Value "localhost:27017" +Set-Item -Path "Env:MONGO_DB" -Value "weathersensors" +Set-Item -Path "Env:MONGO_COLLECTION" -Value "sensors" +Set-Item -Path "Env:MONGO_USER" -Value "admin" +Set-Item -Path "Env:MONGO_PASS" -Value "admin" -Set-Item -Path "Env:WEATHER-API-ANONYMOUS_MQTT_AUTHENTICATION" -Value "false" -Set-Item -Path "Env:WEATHER-API-ALLOW_UNREGISTERED_SENSORS" -Value "true" +Set-Item -Path "Env:ALLOW_UNREGISTERED_SENSORS" -Value "false" #start application Start-Process "main.exe" -Wait -NoNewWindow diff --git a/storage/influxdb-storage.go b/storage/influxdb-storage.go index 99eb9a5..3ff53c6 100644 --- a/storage/influxdb-storage.go +++ b/storage/influxdb-storage.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "time" + "weather-data/config" "github.com/google/uuid" influxdb2 "github.com/influxdata/influxdb-client-go/v2" @@ -11,22 +12,16 @@ import ( //influxStorage is the Storage implementation for InfluxDB type influxStorage struct { - token string - bucket string - organization string - url string - measurement string - client influxdb2.Client + config config.InfluxConfig + measurement string + client influxdb2.Client } //NewInfluxStorage Factory -func NewInfluxStorage(token, bucket, organization, url string) (*influxStorage, error) { +func NewInfluxStorage(cfg config.InfluxConfig) (*influxStorage, error) { influx := new(influxStorage) - influx.bucket = bucket - influx.token = token - influx.organization = organization - influx.url = url - influx.client = influxdb2.NewClient(url, token) + influx.config = cfg + influx.client = influxdb2.NewClient(cfg.Host, cfg.Token) influx.measurement = "data" return influx, nil } @@ -47,7 +42,7 @@ func (storage *influxStorage) Save(data WeatherData) error { fields, data.TimeStamp) - writeAPI := storage.client.WriteAPI(storage.organization, storage.bucket) + writeAPI := storage.client.WriteAPI(storage.config.Organization, storage.config.Bucket) writeAPI.WritePoint(datapoint) return nil } @@ -85,13 +80,13 @@ func (storage *influxStorage) createFluxQuery(query *WeatherQuery) string { fields = fmt.Sprintf(" and ( %v )", fields) - fluxQuery := fmt.Sprintf("from(bucket:\"%v\")|> range(start: %v, stop: %v) |> filter(fn: (r) => r._measurement == \"%v\" and r.sensorId == \"%v\" %v)", storage.bucket, query.Start.Format(time.RFC3339), query.End.Format(time.RFC3339), storage.measurement, query.SensorId, fields) + fluxQuery := fmt.Sprintf("from(bucket:\"%v\")|> range(start: %v, stop: %v) |> filter(fn: (r) => r._measurement == \"%v\" and r.sensorId == \"%v\" %v)", storage.config.Bucket, query.Start.Format(time.RFC3339), query.End.Format(time.RFC3339), storage.measurement, query.SensorId, fields) return fluxQuery } func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, error) { - queryAPI := storage.client.QueryAPI(storage.organization) + queryAPI := storage.client.QueryAPI(storage.config.Organization) result, err := queryAPI.Query(context.Background(), query) if err != nil { diff --git a/storage/mongodb-storage.go b/storage/mongodb-storage.go index 3851531..4857235 100644 --- a/storage/mongodb-storage.go +++ b/storage/mongodb-storage.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "time" + "weather-data/config" "github.com/google/uuid" "go.mongodb.org/mongo-driver/bson" @@ -19,10 +20,10 @@ type mongodbSensorRegistry struct { client *mongo.Client } -func NewMongodbSensorRegistry(connection, database, collection, user, password string) (*mongodbSensorRegistry, error) { +func NewMongodbSensorRegistry(mongoCfg config.MongoConfig) (*mongodbSensorRegistry, error) { sensorRegistry := new(mongodbSensorRegistry) - options := options.Client().ApplyURI(connection).SetAuth(options.Credential{Username: user, Password: password}) + options := options.Client().ApplyURI(mongoCfg.Host).SetAuth(options.Credential{Username: mongoCfg.Username, Password: mongoCfg.Password}) client, err := mongo.NewClient(options) if err != nil { @@ -44,8 +45,8 @@ func NewMongodbSensorRegistry(connection, database, collection, user, password s return nil, err } - weathersensorsDB := client.Database(database) - sensorRegistry.sensorCollection = weathersensorsDB.Collection(collection) + weathersensorsDB := client.Database(mongoCfg.Database) + sensorRegistry.sensorCollection = weathersensorsDB.Collection(mongoCfg.Collection) return sensorRegistry, nil } diff --git a/weathersource/mqtt-source.go b/weathersource/mqtt-source.go index 477a7df..714b092 100644 --- a/weathersource/mqtt-source.go +++ b/weathersource/mqtt-source.go @@ -17,8 +17,7 @@ var mqttTopicRegexPattern = "(^sensor/)([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern) type mqttWeatherSource struct { - url string - topic string + config config.MqttConfig mqttClient mqtt.Client lastWeatherDataPoints []*storage.WeatherData weatherSource WeatherSourceBase @@ -29,31 +28,21 @@ func (source *mqttWeatherSource) Close() { source.mqttClient.Disconnect(2) } -//NewAnonymousMqttSource Factory function for mqttWeatherSource with anonymous authentication -func NewAnonymousMqttSource(url, topic string) (*mqttWeatherSource, error) { - return newMqttSource(url, topic, "", "", true) -} - //NewMqttSource Factory function for mqttWeatherSource with authentication -func NewMqttSource(url, topic, user, password string) (*mqttWeatherSource, error) { - return newMqttSource(url, topic, user, password, false) -} - -//NewMqttSource Factory function for mqttWeatherSource -func newMqttSource(url, topic, user, password string, anonymous bool) (*mqttWeatherSource, error) { +func NewMqttSource(cfg config.MqttConfig) (*mqttWeatherSource, error) { source := new(mqttWeatherSource) - source.url = url + source.config = cfg - opts := mqtt.NewClientOptions().AddBroker(url) + opts := mqtt.NewClientOptions().AddBroker(cfg.Host) //mqtt opts.SetKeepAlive(60 * time.Second) opts.SetDefaultPublishHandler(source.mqttMessageHandler()) opts.SetPingTimeout(1 * time.Second) - if !anonymous { - opts.Username = user - opts.Password = password + if !cfg.AllowAnonymousAuthentication { + opts.Username = cfg.Username + opts.Password = cfg.Password } source.mqttClient = mqtt.NewClient(opts) @@ -62,7 +51,7 @@ func newMqttSource(url, topic, user, password string, anonymous bool) (*mqttWeat return nil, token.Error() } - if token := source.mqttClient.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil { + if token := source.mqttClient.Subscribe(cfg.Topic, 2, nil); token.Wait() && token.Error() != nil { return nil, token.Error() } @@ -116,13 +105,13 @@ func (source *mqttWeatherSource) publishDataValues() { for len(source.lastWeatherDataPoints) != 0 { current := *source.lastWeatherDataPoints[0] diff := time.Now().Sub(current.TimeStamp) - if diff >= config.MqttMinDistToLastValue() { + if diff >= source.config.MinDistToLastValue { source.newWeatherData(current) source.lastWeatherDataPoints = source.lastWeatherDataPoints[1:] } } - time.Sleep(config.MqttPublishInterval()) + time.Sleep(source.config.PublishInterval) } }