MQTT_WeatherGateway/src/server.rb

126 lines
3.3 KiB
Ruby
Raw Permalink Normal View History

2022-07-24 16:50:42 +00:00
# frozen_string_literal: true
2022-10-05 02:39:31 +00:00
require 'logger'
2022-07-24 16:50:42 +00:00
require 'bundler'
Bundler.require
Dotenv.load
2022-07-25 15:46:21 +00:00
Dotenv.require_keys('MOJI_APPCODE', 'CACHE_TTL', 'IOT_MQTT_HOST', 'IOT_MQTT_PORT', 'IOT_MQTT_SSL')
2022-07-24 16:50:42 +00:00
2022-10-05 02:39:31 +00:00
@logger = Logger.new($stdout)
@wxapi = MojiWeather::Api::RestClient.new(app_code: ENV['MOJI_APPCODE'])
@wxapi_simple = nil
unless ENV['MOJI_HAS_BASIC'].nil?
@wxapi_simple = MojiWeather::Api::RestClient.new(app_code: ENV['MOJI_APPCODE'], cityid_base: 'http://aliv13.data.moji.com/whapi/json/alicityweather')
end
2022-07-25 15:46:21 +00:00
cache = LruRedux::TTL::Cache.new(100, ENV['CACHE_TTL'].to_i)
2022-07-24 16:50:42 +00:00
def extract_device_id(topic)
devid = %r{iot/weather/(.*)/request}.match(topic)
if devid.nil?
nil
else
devid[1]
end
end
2022-07-25 15:46:21 +00:00
def extract_req_type(type)
wxapi = @wxapi_simple.nil? ? @wxapi : @wxapi_simple
2022-07-25 15:46:21 +00:00
case type
when 'condition'
[MojiWeather::Api::ApiType::CONDITION, wxapi]
2022-07-25 15:46:21 +00:00
when 'aqi'
[MojiWeather::Api::ApiType::AQI, wxapi]
when 'forecast24h'
[MojiWeather::Api::ApiType::FORECAST_24HRS, @wxapi]
when 'forecast6d'
[MojiWeather::Api::ApiType::FORECAST_6DAYS, wxapi]
when 'forecast15d'
[MojiWeather::Api::ApiType::FORECAST_15DAYS, @wxapi]
else
[nil, nil]
2022-07-25 15:46:21 +00:00
end
end
client = MQTT::Client.new
2022-07-24 16:50:42 +00:00
begin
client.host = ENV['IOT_MQTT_HOST']
client.port = ENV['IOT_MQTT_PORT'].to_i
unless ENV['IOT_MQTT_SSL'].nil? || ENV['IOT_MQTT_SSL'] == 'false' || ENV['IOT_MQTT_SSL'] == 'no'
client.ssl = true
client.cert_file = ENV['IOT_MQTT_SSL_CERT_FILE']
client.key_file = ENV['IOT_MQTT_SSL_KEY_FILE']
client.ca_file = ENV['IOT_MQTT_SSL_CA_FILE']
end
client.connect
2022-10-05 02:39:31 +00:00
@logger.info "client [#{client}] connected..."
2022-07-25 15:46:21 +00:00
2022-07-24 16:50:42 +00:00
client.subscribe('iot/weather/#')
client.get do |topic, payload|
# this method also checks if this is from a request topic.
dev_id = extract_device_id(topic)
unless dev_id.nil?
2022-10-05 02:39:31 +00:00
@logger.info "[#{dev_id}] <- #{payload.length}B"
2022-07-24 16:50:42 +00:00
# decode CBOR object, retrieve request.
dev_req = CBOR.decode(payload)
wx_cond, wxapi = extract_req_type(dev_req['type'])
2022-07-25 15:46:21 +00:00
# Not a valid type
next if wx_cond.nil?
2022-07-24 16:50:42 +00:00
req_params = if !dev_req['city_id'].nil?
{ city_id: dev_req['city_id'] }
elsif !dev_req['location'].nil?
{ location: { lat: dev_req['location']['lat'], lon: dev_req['location']['lon'] } }
else
2022-10-05 02:39:31 +00:00
@logger.warn "[#{dev_id}] not a valid request"
2022-07-24 16:50:42 +00:00
next
end
2022-07-31 08:35:02 +00:00
req_params[:type] = wx_cond
2022-07-25 15:46:21 +00:00
# Check cache
api_resp = cache["cache_#{req_params}"]
# Cache missed..
if api_resp.nil?
2022-10-05 02:39:31 +00:00
@logger.info "[#{dev_id}] cache missed"
2022-07-25 15:46:21 +00:00
# Request external service for weather information
api_resp = wxapi.query(wx_cond, req_params)
# Update cache entry
cache["cache_#{req_params}"] = api_resp
else
2022-10-05 02:39:31 +00:00
@logger.info "[#{dev_id}] cache hit"
2022-07-25 15:46:21 +00:00
end
2022-07-24 16:50:42 +00:00
# Encode CBOR object
2022-07-25 15:46:21 +00:00
resp = api_resp.to_cbor
2022-07-24 16:50:42 +00:00
# Publish to response topic.
2022-10-05 02:39:31 +00:00
@logger.info "[#{dev_id}] -> #{resp.length}B"
2022-07-24 16:50:42 +00:00
2022-07-25 15:46:21 +00:00
# Send response
2022-07-24 16:50:42 +00:00
client.publish("iot/weather/#{dev_id}/response", resp)
end
end
rescue SystemExit, Interrupt
2022-10-05 02:39:31 +00:00
@logger.warn "Interrupt caught, client [#{client}] disconnect."
2022-07-24 16:50:42 +00:00
client.disconnect
rescue StandardError => e
p e
end