refactoring of config
This commit is contained in:
parent
5bb48d6ddc
commit
69f8c4e9dc
6 changed files with 118 additions and 189 deletions
175
config/config.go
175
config/config.go
|
@ -6,125 +6,94 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// const influx stuff
|
type MongoConfig struct {
|
||||||
const influxToken = "default-token"
|
Host string
|
||||||
const influxWeatherBucket = "default-bucket"
|
Database string
|
||||||
const influxOrganization = "default-org"
|
Username string
|
||||||
const influxURL = "https://influx.default-address.com"
|
Password string
|
||||||
|
Collection string
|
||||||
//const mqtt stuff
|
|
||||||
const mqttURL = "tcp://default-address.com:1883"
|
|
||||||
const mqttTopic = "sensor/#"
|
|
||||||
const defaultLocation = "default-location"
|
|
||||||
const mqttUser = "weather-api"
|
|
||||||
const mqttPassword = "weather-api"
|
|
||||||
const useAnonymousMqttAuthentication = false
|
|
||||||
const mqttPublishInterval = time.Second
|
|
||||||
const mqttMinDistToLastValue = 250 * time.Millisecond
|
|
||||||
|
|
||||||
//const mongodb stuff
|
|
||||||
const mongodbURL = "mongodb://default-address.com:27017"
|
|
||||||
const mongodbName = "weathersensors"
|
|
||||||
const mongodbCollection = "sensordata"
|
|
||||||
const mongodbUser = "mongoUser"
|
|
||||||
const mongdbPassword = "mongoPassword"
|
|
||||||
|
|
||||||
//other config stuff
|
|
||||||
const allowUnregisteredSensors = false
|
|
||||||
|
|
||||||
//influx config
|
|
||||||
func GetInfluxUrl() string {
|
|
||||||
return getVariableWithDefault("WEATHER-API-INFLUX_URL", influxURL)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetInfluxToken() string {
|
type InfluxConfig struct {
|
||||||
return getVariableWithDefault("WEATHER-API-INFLUX_TOKEN", influxToken)
|
Host string
|
||||||
|
Token string
|
||||||
|
Organization string
|
||||||
|
Bucket string
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetInfluxOrganization() string {
|
type MqttConfig struct {
|
||||||
return getVariableWithDefault("WEATHER-API-INFLUX_ORG", influxOrganization)
|
Host string
|
||||||
|
Topic string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
PublishInterval time.Duration
|
||||||
|
MinDistToLastValue time.Duration
|
||||||
|
AllowAnonymousAuthentication bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetInfluxBucket() string {
|
var MongoConfiguration = MongoConfig{
|
||||||
return getVariableWithDefault("WEATHER-API-INFLUX_BUCKET", influxWeatherBucket)
|
Host: getEnv("MONGO_HOST", "localhost:27017"),
|
||||||
|
Database: getEnv("MONGO_DB", "weathersensors"),
|
||||||
|
Username: getEnv("MONGO_USER", "admin"),
|
||||||
|
Password: getEnv("MONGO_PASS", "admin"),
|
||||||
|
Collection: getEnv("MONGO_COLLECTION", "sensors"),
|
||||||
}
|
}
|
||||||
|
|
||||||
//mqtt config
|
var InfluxConfiguration = InfluxConfig{
|
||||||
func GetMqttUrl() string {
|
Host: getEnv("INFLUX_HOST", "localhost:8086"),
|
||||||
return getVariableWithDefault("WEATHER-API-MQTT_URL", mqttURL)
|
Token: getEnv("INFLUX_TOKEN", "token"),
|
||||||
|
Organization: getEnv("INFLUX_ORG", "org_name"),
|
||||||
|
Bucket: getEnv("INFLUX_BUCKET", "bucket_name"),
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetMqttTopic() string {
|
var MqttConfiguration = MqttConfig{
|
||||||
return getVariableWithDefault("WEATHER-API-MQTT_TOPIC", mqttTopic)
|
Host: getEnv("MQTT_HOST", "localhost:1883"),
|
||||||
|
Topic: getEnv("MQTT_TOPIC", "sensor/#"),
|
||||||
|
Username: getEnv("MQTT_USER", "mqtt"),
|
||||||
|
Password: getEnv("MQTT_PASS", "mqtt"),
|
||||||
|
PublishInterval: getEnvDuration("MQTT_PUBLISH_INTERVALL", time.Millisecond*2500),
|
||||||
|
MinDistToLastValue: getEnvDuration("MQTT_MIN_DIST_LAST_VALUE", time.Millisecond*250),
|
||||||
|
AllowAnonymousAuthentication: getEnvBool("MQTT_ANONYMOUS", false),
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetMqttUser() string {
|
var AllowUnregisteredSensors = getEnvBool("ALLOW_UNREGISTERED_SENSORS", false)
|
||||||
return getVariableWithDefault("WEATHER-API-MQTT_USER", mqttTopic)
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetMqttPassword() string {
|
|
||||||
return getVariableWithDefault("WEATHER-API-MQTT_PASSWORD", mqttTopic)
|
|
||||||
}
|
|
||||||
|
|
||||||
func UseAnonymousMqttAuthentication() bool {
|
|
||||||
return getVariableWithDefaultBool("WEATHER-API-ANONYMOUS_MQTT_AUTHENTICATION", useAnonymousMqttAuthentication)
|
|
||||||
}
|
|
||||||
|
|
||||||
func MqttPublishInterval() time.Duration {
|
|
||||||
interval, err := strconv.ParseInt(os.Getenv("WEATHER-API-MQTT_PUBLISH_INTERVAL"), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return mqttPublishInterval
|
|
||||||
}
|
|
||||||
return time.Millisecond * time.Duration(interval)
|
|
||||||
}
|
|
||||||
|
|
||||||
func MqttMinDistToLastValue() time.Duration {
|
|
||||||
interval, err := strconv.ParseInt(os.Getenv("WEATHER-API-MQTT_MIN_DIST_TO_LAST_VALUE"), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return mqttMinDistToLastValue
|
|
||||||
}
|
|
||||||
return time.Millisecond * time.Duration(interval)
|
|
||||||
}
|
|
||||||
|
|
||||||
//mongodb config
|
|
||||||
func GetMongodbURL() string {
|
|
||||||
return getVariableWithDefault("WEATHER-API-MONGODB_URL", mongodbURL)
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetMongodbName() string {
|
|
||||||
return getVariableWithDefault("WEATHER-API-MONGODB_NAME", mongodbName)
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetMongodbCollection() string {
|
|
||||||
return getVariableWithDefault("WEATHER-API-MONGODB_COLLECTION", mongodbCollection)
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetMongodbUserName() string {
|
|
||||||
return getVariableWithDefault("WEATHER-API-MONGODB_USER", mongodbUser)
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetMongodbPassword() string {
|
|
||||||
return getVariableWithDefault("WEATHER-API-MONGODB_PASSWORD", mongdbPassword)
|
|
||||||
}
|
|
||||||
|
|
||||||
//common config
|
|
||||||
func AllowUnregisteredSensors() bool {
|
|
||||||
return getVariableWithDefaultBool("WEATHER-API-ALLOW_UNREGISTERED_SENSORS", allowUnregisteredSensors)
|
|
||||||
}
|
|
||||||
|
|
||||||
//helper
|
//helper
|
||||||
func getVariableWithDefault(variableKey, defaultValue string) string {
|
func getEnv(key, fallback string) string {
|
||||||
variable := os.Getenv(variableKey)
|
if value, ok := os.LookupEnv(key); ok {
|
||||||
if len(variable) == 0 {
|
return value
|
||||||
return defaultValue
|
|
||||||
}
|
}
|
||||||
return variable
|
|
||||||
|
return fallback
|
||||||
}
|
}
|
||||||
|
|
||||||
func getVariableWithDefaultBool(variableKey string, defaultValue bool) bool {
|
func getEnvBool(key string, fallback bool) bool {
|
||||||
ok, err := strconv.ParseBool(os.Getenv(variableKey))
|
|
||||||
if err != nil {
|
if value, ok := os.LookupEnv(key); ok {
|
||||||
return defaultValue
|
if bValue, err := strconv.ParseBool(value); err == nil {
|
||||||
|
return bValue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ok
|
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEnvInt(key string, fallback int64) int64 {
|
||||||
|
if value, ok := os.LookupEnv(key); ok {
|
||||||
|
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
||||||
|
return iValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEnvDuration(key string, fallback time.Duration) time.Duration {
|
||||||
|
if value, ok := os.LookupEnv(key); ok {
|
||||||
|
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
||||||
|
return time.Millisecond * time.Duration(iValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fallback
|
||||||
}
|
}
|
||||||
|
|
33
main.go
33
main.go
|
@ -18,47 +18,22 @@ var weatherAPI api.WeatherAPI
|
||||||
func main() {
|
func main() {
|
||||||
//setup new sensorRegistry -> MongodbSensorRegistry
|
//setup new sensorRegistry -> MongodbSensorRegistry
|
||||||
var err error
|
var err error
|
||||||
sensorRegistry, err = storage.NewMongodbSensorRegistry(
|
if sensorRegistry, err = storage.NewMongodbSensorRegistry(config.MongoConfiguration); err != nil {
|
||||||
config.GetMongodbURL(),
|
|
||||||
config.GetMongodbName(),
|
|
||||||
config.GetMongodbCollection(),
|
|
||||||
config.GetMongodbUserName(),
|
|
||||||
config.GetMongodbPassword())
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer sensorRegistry.Close()
|
defer sensorRegistry.Close()
|
||||||
|
|
||||||
//setup a new weatherstorage -> InfluxDB
|
//setup a new weatherstorage -> InfluxDB
|
||||||
weatherStorage, err = storage.NewInfluxStorage(
|
if weatherStorage, err = storage.NewInfluxStorage(config.InfluxConfiguration); err != nil {
|
||||||
config.GetInfluxToken(),
|
|
||||||
config.GetInfluxBucket(),
|
|
||||||
config.GetInfluxOrganization(),
|
|
||||||
config.GetInfluxUrl())
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer weatherStorage.Close()
|
defer weatherStorage.Close()
|
||||||
|
|
||||||
//setup new weatherData source -> mqtt
|
//setup new weatherData source -> mqtt
|
||||||
if config.UseAnonymousMqttAuthentication() {
|
if weatherSource, err = weathersource.NewMqttSource(config.MqttConfiguration); err != nil {
|
||||||
weatherSource, err = weathersource.NewAnonymousMqttSource(
|
|
||||||
config.GetMqttUrl(),
|
|
||||||
config.GetMqttTopic())
|
|
||||||
} else {
|
|
||||||
weatherSource, err = weathersource.NewMqttSource(
|
|
||||||
config.GetMqttUrl(),
|
|
||||||
config.GetMqttTopic(),
|
|
||||||
config.GetMqttUser(),
|
|
||||||
config.GetMqttPassword())
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Could not connect to mqtt:", err.Error())
|
fmt.Println("Could not connect to mqtt:", err.Error())
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer weatherSource.Close()
|
defer weatherSource.Close()
|
||||||
weatherSource.AddNewWeatherDataCallback(handleNewWeatherData)
|
weatherSource.AddNewWeatherDataCallback(handleNewWeatherData)
|
||||||
|
|
||||||
|
@ -75,7 +50,7 @@ func main() {
|
||||||
|
|
||||||
func handleNewWeatherData(wd storage.WeatherData) error {
|
func handleNewWeatherData(wd storage.WeatherData) error {
|
||||||
_, couldResolve := sensorRegistry.ResolveSensorById(wd.SensorId)
|
_, couldResolve := sensorRegistry.ResolveSensorById(wd.SensorId)
|
||||||
if !config.AllowUnregisteredSensors() && !couldResolve {
|
if !config.AllowUnregisteredSensors && !couldResolve {
|
||||||
return errors.New("sensor have to be registered")
|
return errors.New("sensor have to be registered")
|
||||||
}
|
}
|
||||||
weatherStorage.Save(wd)
|
weatherStorage.Save(wd)
|
||||||
|
|
|
@ -2,26 +2,26 @@
|
||||||
go build main.go
|
go build main.go
|
||||||
|
|
||||||
#set environment variables for weather-api configuration
|
#set environment variables for weather-api configuration
|
||||||
Set-Item -Path "Env:WEATHER-API-INFLUX_URL" -Value "https://influx.default-address.com"
|
Set-Item -Path "Env:INFLUX_HOST" -Value "localhost:8086"
|
||||||
Set-Item -Path "Env:WEATHER-API-INFLUX_TOKEN" -Value "default-token"
|
Set-Item -Path "Env:INFLUX_TOKEN" -Value "token"
|
||||||
Set-Item -Path "Env:WEATHER-API-INFLUX_ORG" -Value "default-org"
|
Set-Item -Path "Env:INFLUX_ORG" -Value "org-name"
|
||||||
Set-Item -Path "Env:WEATHER-API-INFLUX_BUCKET" -Value "default-bucket"
|
Set-Item -Path "Env:INFLUX_BUCKET" -Value "bucket-name"
|
||||||
|
|
||||||
Set-Item -Path "Env:WEATHER-API-MQTT_URL" -Value "tcp://default-address.com:1883"
|
Set-Item -Path "Env:MQTT_HOST" -Value "localhost:1883"
|
||||||
Set-Item -Path "Env:WEATHER-API-MQTT_TOPIC" -Value "sensor/#"
|
Set-Item -Path "Env:MQTT_TOPIC" -Value "sensor/#"
|
||||||
Set-Item -Path "Env:WEATHER-API-MQTT_USER" -Value "weather-api"
|
Set-Item -Path "Env:MQTT_USER" -Value "mqtt"
|
||||||
Set-Item -Path "Env:WEATHER-API-MQTT_PASSWORD" -Value "weather-api"
|
Set-Item -Path "Env:MQTT_PASS" -Value "mqtt"
|
||||||
Set-Item -Path "Env:WEATHER-API-MQTT_PUBLISH_INTERVAL" -Value "2500"
|
Set-Item -Path "Env:MQTT_PUBLISH_INTERVALL" -Value "2500"
|
||||||
Set-Item -Path "Env:WEATHER-API-MQTT_MIN_DIST_TO_LAST_VALUE" -Value "250"
|
Set-Item -Path "Env:MQTT_MIN_DIST_LAST_VALUE" -Value "250"
|
||||||
|
Set-Item -Path "Env:MQTT_ANONYMOUS" -Value "false"
|
||||||
|
|
||||||
Set-Item -Path "Env:WEATHER-API-MONGODB_URL" -Value "mongodb://default-address.com:27017"
|
Set-Item -Path "Env:MONGO_HOST" -Value "localhost:27017"
|
||||||
Set-Item -Path "Env:WEATHER-API-MONGODB_NAME" -Value "weathersensors"
|
Set-Item -Path "Env:MONGO_DB" -Value "weathersensors"
|
||||||
Set-Item -Path "Env:WEATHER-API-MONGODB_COLLECTION" -Value "sensordata"
|
Set-Item -Path "Env:MONGO_COLLECTION" -Value "sensors"
|
||||||
Set-Item -Path "Env:WEATHER-API-MONGODB_USER" -Value "mongoUser"
|
Set-Item -Path "Env:MONGO_USER" -Value "admin"
|
||||||
Set-Item -Path "Env:WEATHER-API-MONGODB_PASSWORD" -Value "mongoPassword"
|
Set-Item -Path "Env:MONGO_PASS" -Value "admin"
|
||||||
|
|
||||||
Set-Item -Path "Env:WEATHER-API-ANONYMOUS_MQTT_AUTHENTICATION" -Value "false"
|
Set-Item -Path "Env:ALLOW_UNREGISTERED_SENSORS" -Value "false"
|
||||||
Set-Item -Path "Env:WEATHER-API-ALLOW_UNREGISTERED_SENSORS" -Value "true"
|
|
||||||
|
|
||||||
#start application
|
#start application
|
||||||
Start-Process "main.exe" -Wait -NoNewWindow
|
Start-Process "main.exe" -Wait -NoNewWindow
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
"weather-data/config"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
|
@ -11,22 +12,16 @@ import (
|
||||||
|
|
||||||
//influxStorage is the Storage implementation for InfluxDB
|
//influxStorage is the Storage implementation for InfluxDB
|
||||||
type influxStorage struct {
|
type influxStorage struct {
|
||||||
token string
|
config config.InfluxConfig
|
||||||
bucket string
|
measurement string
|
||||||
organization string
|
client influxdb2.Client
|
||||||
url string
|
|
||||||
measurement string
|
|
||||||
client influxdb2.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//NewInfluxStorage Factory
|
//NewInfluxStorage Factory
|
||||||
func NewInfluxStorage(token, bucket, organization, url string) (*influxStorage, error) {
|
func NewInfluxStorage(cfg config.InfluxConfig) (*influxStorage, error) {
|
||||||
influx := new(influxStorage)
|
influx := new(influxStorage)
|
||||||
influx.bucket = bucket
|
influx.config = cfg
|
||||||
influx.token = token
|
influx.client = influxdb2.NewClient(cfg.Host, cfg.Token)
|
||||||
influx.organization = organization
|
|
||||||
influx.url = url
|
|
||||||
influx.client = influxdb2.NewClient(url, token)
|
|
||||||
influx.measurement = "data"
|
influx.measurement = "data"
|
||||||
return influx, nil
|
return influx, nil
|
||||||
}
|
}
|
||||||
|
@ -47,7 +42,7 @@ func (storage *influxStorage) Save(data WeatherData) error {
|
||||||
fields,
|
fields,
|
||||||
data.TimeStamp)
|
data.TimeStamp)
|
||||||
|
|
||||||
writeAPI := storage.client.WriteAPI(storage.organization, storage.bucket)
|
writeAPI := storage.client.WriteAPI(storage.config.Organization, storage.config.Bucket)
|
||||||
writeAPI.WritePoint(datapoint)
|
writeAPI.WritePoint(datapoint)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -85,13 +80,13 @@ func (storage *influxStorage) createFluxQuery(query *WeatherQuery) string {
|
||||||
|
|
||||||
fields = fmt.Sprintf(" and ( %v )", fields)
|
fields = fmt.Sprintf(" and ( %v )", fields)
|
||||||
|
|
||||||
fluxQuery := fmt.Sprintf("from(bucket:\"%v\")|> range(start: %v, stop: %v) |> filter(fn: (r) => r._measurement == \"%v\" and r.sensorId == \"%v\" %v)", storage.bucket, query.Start.Format(time.RFC3339), query.End.Format(time.RFC3339), storage.measurement, query.SensorId, fields)
|
fluxQuery := fmt.Sprintf("from(bucket:\"%v\")|> range(start: %v, stop: %v) |> filter(fn: (r) => r._measurement == \"%v\" and r.sensorId == \"%v\" %v)", storage.config.Bucket, query.Start.Format(time.RFC3339), query.End.Format(time.RFC3339), storage.measurement, query.SensorId, fields)
|
||||||
return fluxQuery
|
return fluxQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, error) {
|
func (storage *influxStorage) executeFluxQuery(query string) ([]*WeatherData, error) {
|
||||||
|
|
||||||
queryAPI := storage.client.QueryAPI(storage.organization)
|
queryAPI := storage.client.QueryAPI(storage.config.Organization)
|
||||||
result, err := queryAPI.Query(context.Background(), query)
|
result, err := queryAPI.Query(context.Background(), query)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
"weather-data/config"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
@ -19,10 +20,10 @@ type mongodbSensorRegistry struct {
|
||||||
client *mongo.Client
|
client *mongo.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMongodbSensorRegistry(connection, database, collection, user, password string) (*mongodbSensorRegistry, error) {
|
func NewMongodbSensorRegistry(mongoCfg config.MongoConfig) (*mongodbSensorRegistry, error) {
|
||||||
sensorRegistry := new(mongodbSensorRegistry)
|
sensorRegistry := new(mongodbSensorRegistry)
|
||||||
|
|
||||||
options := options.Client().ApplyURI(connection).SetAuth(options.Credential{Username: user, Password: password})
|
options := options.Client().ApplyURI(mongoCfg.Host).SetAuth(options.Credential{Username: mongoCfg.Username, Password: mongoCfg.Password})
|
||||||
|
|
||||||
client, err := mongo.NewClient(options)
|
client, err := mongo.NewClient(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -44,8 +45,8 @@ func NewMongodbSensorRegistry(connection, database, collection, user, password s
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
weathersensorsDB := client.Database(database)
|
weathersensorsDB := client.Database(mongoCfg.Database)
|
||||||
sensorRegistry.sensorCollection = weathersensorsDB.Collection(collection)
|
sensorRegistry.sensorCollection = weathersensorsDB.Collection(mongoCfg.Collection)
|
||||||
|
|
||||||
return sensorRegistry, nil
|
return sensorRegistry, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,7 @@ var mqttTopicRegexPattern = "(^sensor/)([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F
|
||||||
var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern)
|
var regexTopic *regexp.Regexp = regexp.MustCompile(mqttTopicRegexPattern)
|
||||||
|
|
||||||
type mqttWeatherSource struct {
|
type mqttWeatherSource struct {
|
||||||
url string
|
config config.MqttConfig
|
||||||
topic string
|
|
||||||
mqttClient mqtt.Client
|
mqttClient mqtt.Client
|
||||||
lastWeatherDataPoints []*storage.WeatherData
|
lastWeatherDataPoints []*storage.WeatherData
|
||||||
weatherSource WeatherSourceBase
|
weatherSource WeatherSourceBase
|
||||||
|
@ -29,31 +28,21 @@ func (source *mqttWeatherSource) Close() {
|
||||||
source.mqttClient.Disconnect(2)
|
source.mqttClient.Disconnect(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
//NewAnonymousMqttSource Factory function for mqttWeatherSource with anonymous authentication
|
|
||||||
func NewAnonymousMqttSource(url, topic string) (*mqttWeatherSource, error) {
|
|
||||||
return newMqttSource(url, topic, "", "", true)
|
|
||||||
}
|
|
||||||
|
|
||||||
//NewMqttSource Factory function for mqttWeatherSource with authentication
|
//NewMqttSource Factory function for mqttWeatherSource with authentication
|
||||||
func NewMqttSource(url, topic, user, password string) (*mqttWeatherSource, error) {
|
func NewMqttSource(cfg config.MqttConfig) (*mqttWeatherSource, error) {
|
||||||
return newMqttSource(url, topic, user, password, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
//NewMqttSource Factory function for mqttWeatherSource
|
|
||||||
func newMqttSource(url, topic, user, password string, anonymous bool) (*mqttWeatherSource, error) {
|
|
||||||
source := new(mqttWeatherSource)
|
source := new(mqttWeatherSource)
|
||||||
source.url = url
|
source.config = cfg
|
||||||
|
|
||||||
opts := mqtt.NewClientOptions().AddBroker(url)
|
opts := mqtt.NewClientOptions().AddBroker(cfg.Host)
|
||||||
|
|
||||||
//mqtt
|
//mqtt
|
||||||
opts.SetKeepAlive(60 * time.Second)
|
opts.SetKeepAlive(60 * time.Second)
|
||||||
opts.SetDefaultPublishHandler(source.mqttMessageHandler())
|
opts.SetDefaultPublishHandler(source.mqttMessageHandler())
|
||||||
opts.SetPingTimeout(1 * time.Second)
|
opts.SetPingTimeout(1 * time.Second)
|
||||||
|
|
||||||
if !anonymous {
|
if !cfg.AllowAnonymousAuthentication {
|
||||||
opts.Username = user
|
opts.Username = cfg.Username
|
||||||
opts.Password = password
|
opts.Password = cfg.Password
|
||||||
}
|
}
|
||||||
|
|
||||||
source.mqttClient = mqtt.NewClient(opts)
|
source.mqttClient = mqtt.NewClient(opts)
|
||||||
|
@ -62,7 +51,7 @@ func newMqttSource(url, topic, user, password string, anonymous bool) (*mqttWeat
|
||||||
return nil, token.Error()
|
return nil, token.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
if token := source.mqttClient.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil {
|
if token := source.mqttClient.Subscribe(cfg.Topic, 2, nil); token.Wait() && token.Error() != nil {
|
||||||
return nil, token.Error()
|
return nil, token.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,13 +105,13 @@ func (source *mqttWeatherSource) publishDataValues() {
|
||||||
for len(source.lastWeatherDataPoints) != 0 {
|
for len(source.lastWeatherDataPoints) != 0 {
|
||||||
current := *source.lastWeatherDataPoints[0]
|
current := *source.lastWeatherDataPoints[0]
|
||||||
diff := time.Now().Sub(current.TimeStamp)
|
diff := time.Now().Sub(current.TimeStamp)
|
||||||
if diff >= config.MqttMinDistToLastValue() {
|
if diff >= source.config.MinDistToLastValue {
|
||||||
source.newWeatherData(current)
|
source.newWeatherData(current)
|
||||||
source.lastWeatherDataPoints = source.lastWeatherDataPoints[1:]
|
source.lastWeatherDataPoints = source.lastWeatherDataPoints[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
time.Sleep(config.MqttPublishInterval())
|
time.Sleep(source.config.PublishInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue