107 lines
2.6 KiB
Ruby
107 lines
2.6 KiB
Ruby
require 'mqtt'
|
|
require 'timeout'
|
|
require 'json'
|
|
|
|
class MqttClient
|
|
BreakListenLoopError = Class.new(StandardError)
|
|
|
|
def initialize(server, username, password, topic_prefix)
|
|
@client = MQTT::Client.connect("mqtt://#{username}:#{password}@#{server}")
|
|
@topic_prefix = topic_prefix
|
|
@listen_threads = []
|
|
end
|
|
|
|
def disconnect
|
|
@client.disconnect
|
|
end
|
|
|
|
def reconnect
|
|
@client.disconnect
|
|
@client.connect
|
|
end
|
|
|
|
def wait_for_message(topic, timeout = 10)
|
|
on_message(topic, timeout) { |topic, message| }
|
|
wait_for_listeners
|
|
end
|
|
|
|
def id_topic_suffix(params)
|
|
if params
|
|
str_id = if params[:id_format] == 'decimal'
|
|
params[:id].to_s
|
|
else
|
|
sprintf '0x%04X', params[:id]
|
|
end
|
|
|
|
"#{str_id}/#{params[:type]}/#{params[:group_id]}"
|
|
else
|
|
"+/+/+"
|
|
end
|
|
end
|
|
|
|
def on_update(id_params = nil, timeout = 10, &block)
|
|
on_id_message('updates', id_params, timeout, &block)
|
|
end
|
|
|
|
def on_state(id_params = nil, timeout = 10, &block)
|
|
on_id_message('state', id_params, timeout, &block)
|
|
end
|
|
|
|
def on_id_message(path, id_params, timeout, &block)
|
|
sub_topic = "#{@topic_prefix}#{path}/#{id_topic_suffix(nil)}"
|
|
|
|
on_message(sub_topic, timeout) do |topic, message|
|
|
topic_parts = topic.split('/')
|
|
topic_id_params = {
|
|
id: topic_parts[2].to_i(16),
|
|
type: topic_parts[3],
|
|
group_id: topic_parts[4].to_i,
|
|
unparsed_id: topic_parts[2]
|
|
}
|
|
|
|
if !id_params || %w(id type group_id).all? { |k| k=k.to_sym; topic_id_params[k] == id_params[k] }
|
|
begin
|
|
message = JSON.parse(message)
|
|
rescue JSON::ParserError => e
|
|
end
|
|
|
|
yield( topic_id_params, message )
|
|
end
|
|
end
|
|
end
|
|
|
|
def on_message(topic, timeout = 10, raise_error = true, &block)
|
|
@listen_threads << Thread.new do
|
|
begin
|
|
Timeout.timeout(timeout) do
|
|
@client.get(topic) do |topic, message|
|
|
ret_val = yield(topic, message)
|
|
raise BreakListenLoopError if ret_val
|
|
end
|
|
end
|
|
rescue Timeout::Error => e
|
|
puts "Timed out listening for message on: #{topic}"
|
|
raise e if raise_error
|
|
rescue BreakListenLoopError
|
|
end
|
|
end
|
|
end
|
|
|
|
def publish(topic, state = {}, retain = false)
|
|
state = state.to_json unless state.is_a?(String)
|
|
|
|
@client.publish(topic, state, retain)
|
|
end
|
|
|
|
def patch_state(id_params, state = {})
|
|
@client.publish(
|
|
"#{@topic_prefix}commands/#{id_topic_suffix(id_params)}",
|
|
state.to_json
|
|
)
|
|
end
|
|
|
|
def wait_for_listeners
|
|
@listen_threads.each(&:join)
|
|
@listen_threads.clear
|
|
end
|
|
end |