Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt reconnect #751

Merged
merged 2 commits into from
Feb 19, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 44 additions & 126 deletions lib/mqtt.pm
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ Notes:
reconnect we need to resubscribe. There is no way to do that now
(we'll need to resubscribe all the same socket related subscriptions)
@FIXME: We're really not checking for ConnAck or SubAck.
@FIXME: there is no reconnect logic
@FIXME: No SSL
@FIXME: Lots of error checking needs to be done
@FIXME: Use of uninitialized value
Expand Down Expand Up @@ -197,7 +196,7 @@ my $msg_id = 1;

my %MQTT_Data;

$main::Debug{mqtt} = 1;
#$main::Debug{mqtt} = 1;

# ------------------------------------------------------------------------------
sub dump() {
Expand All @@ -206,105 +205,13 @@ sub dump() {

# ------------------------------------------------------------------------------

=item <mqtt_reconnect()>

Okay, I can see this is going to get complicated and require I do a rewrite of
the subscription handling. When we reconnect we want to also resubscribe.
Currently we can't do that.

=cut

sub mqtt_reconnect() {
my ($self) = @_;

###
### Do we need to do a clean up on the existing socket before we reconnect?
### Will a close do that for us ?
###
$$self{socket}->close();

&main::print_log("*** mqtt $$self{instance} mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ") if ( $main::Debug{mqtt} || 1 );

### 1) open a socket (host, port and keepalive
my $socket = IO::Socket::INET->new(
PeerAddr => $self->{host} . ':' . $self->{port},
Timeout => $self->{keep_alive_timer},
);

# Can't use this at this time
# $socket = new main::Socket_Item(undef, undef, "$host:$port", $instance);

&main::print_log( "*** mqtt $$self{instance} Socket check #1 ($$self{keep_alive_timer}) [ $! ]: " . ( $self->isConnected() ? "Connected" : "Failed" ) )
if ( $main::Debug{mqtt} );
return if ( !defined($socket) );

$self->{socket} = $socket;
$self->{got_ping_response} = 1;
$self->{next_ping} = $self->{keep_alive_timer};

# --------------------------------------------------------------------------
### 2) Send MQTT_CONNECT
$self->send_mqtt_msg(
message_type => MQTT_CONNECT,
keep_alive_timer => $self->{keep_alive_timer},
user_name => $self->{user_name},
password => $self->{password}
);

### 3) Check for ACK or fail
&main::print_log( "*** mqtt $$self{instance} Socket check #2 ($$self{keep_alive_timer}) [ $! ]: " . ( $self->isConnected() ? "Connected" : "Failed" ) )
if ( $main::Debug{mqtt} );

my $msg = read_mqtt_msg_timeout( $self, $buf );
if ( !$msg ) {
&main::print_log("XXX mqtt $$self{instance} No ConnAck ");

#exit 1;
return;
}

# We should actually get a SubAck but who is checking (yes, I know I should)
&main::print_log( "*** mqtt $$self{instance} Received: " . $msg->string )
if ( $main::Debug{mqtt} );

### ------------------------------------------------------------------------

###
### Here is where we need to make the changes to support multiple
### subscriptions.
###

### 4) Send a subscribe '#' (we'll have many of these, one for each device)
### I don't know if this is a good idea or not but that's what I intend to do for now
$self->send_mqtt_msg(
message_type => MQTT_SUBSCRIBE,
message_id => $msg_id++,
topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } $self->{topic} ]
);

### 5) Check for ACK or fail
### we really should check for a SubAck and that it's the correct SubAck
$msg = $self->read_mqtt_msg_timeout($buf)
or &main::print_log( "*** mqtt $$self{instance} Received: " . "No SubAck" );
&main::print_log( "*** mqtt $$self{instance} Sub 1 Received: " . "$$msg{string}" )
if ( $main::Debug{mqtt} );

### ------------------------------------------------------------------------

### 6) check for data
&main::print_log("*** mqtt $$self{instance} Initializing MQTT re_connection ...")
if ( $main::Debug{mqtt} );
}

# ------------------------------------------------------------------------------

=item <mqtt_connect()>
=cut

sub mqtt_connect() {
my ($self) = @_;

&main::print_log("*** mqtt mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ") if ( $main::Debug{mqtt} || 1 );
&main::print_log("*** mqtt mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) ") if ( $main::Debug{mqtt} );

### 1) open a socket (host, port and keepalive
my $socket = IO::Socket::INET->new(
Expand All @@ -315,7 +222,16 @@ sub mqtt_connect() {
# Can't use this at this time
# $socket = new main::Socket_Item(undef, undef, "$host:$port", $instance);

return if ( !defined($socket) );
if ( !defined($socket) ) {
if ($$self{recon_timer}->inactive) {
::print_log("*** mqtt connection for $$self{instance} failed, I will try to reconnect in 20 seconds");
my $inst = $$self{instance};
$$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() });
return;
}
}



$self->{socket} = $socket;
$self->{got_ping_response} = 1;
Expand Down Expand Up @@ -381,7 +297,7 @@ sub mqtt_connect() {

sub isConnected {
my ($self) = @_;

unless( defined($$self{socket}) ) { return 0 }
return $$self{socket}->connected;
}

Expand All @@ -392,7 +308,7 @@ sub isConnected {

sub isNotConnected {
my ($self) = @_;

unless( defined($$self{socket}) ) { return 1 }
return !$$self{socket}->connected;
}

Expand Down Expand Up @@ -464,7 +380,7 @@ sub new {
@{ $$self{command_stack} } = ();

$$self{instance} = $instance;

$$self{recon_timer} = ::Timer::new();
$$self{host} = "$host" || "127.0.0.1";
$$self{port} = $port || 1883;

Expand Down Expand Up @@ -500,13 +416,13 @@ sub new {
### ------------------------------------------------------------------------
$self->mqtt_connect();

&main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n")
unless $self;
return unless $self;
unless ($self) {
&main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n");
return;
}

# Hey what happens when we fail ?
#$MQTT_Data{$instance}{self} = $self;

if ( 1 == scalar( keys %MQTT_Data ) ) { # Add hooks on first call only
&main::print_log("*** mqtt added MQTT check_for_data ...");
&::MainLoop_pre_add_hook( \&mqtt::check_for_data, 1 );
Expand Down Expand Up @@ -546,10 +462,10 @@ sub check_for_data {
### @FIXME: failed connection
if ( 'off' ne $self->{state} ) {

# First say something
&main::print_log("*** mqtt $inst failed ($$self{host}/$$self{port}/$$self{topic})");

# Then do something (reconnect)
if ($$self{recon_timer}->inactive) {
::print_log("*** mqtt $inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds");
$$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() });
}

# check the state to see if it's off already

Expand Down Expand Up @@ -709,12 +625,17 @@ sub read_mqtt_msg {

# We get no bytes if there is an error or the socket has closed
unless ($bytes) {
&main::print_log( "*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) );
my $inst = $$self{instance};
if ($$self{recon_timer}->inactive) {
::print_log( "*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) );
::print_log( "*** mqtt instance $$self{instance} will try to reconnect in 20 seconds");
$$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() });
}

# Not a permanent solution just a way to keep debugging
&main::print_log( "*** mqtt deleting $$self{instance}\n" . Dumper( \$self ) )
if ( $main::Debug{mqtt} );
delete( $MQTT_Data{ $$self{instance} } );
#&main::print_log( "*** mqtt deleting $$self{instance}\n" . Dumper( \$self ) )
# if ( $main::Debug{mqtt} );
#delete( $MQTT_Data{ $$self{instance} } );

return;
}
Expand Down Expand Up @@ -770,13 +691,10 @@ sub read_mqtt_msg_timeout {
sub set {
my ( $self, $msg, $set_by ) = @_;

if ( $main::Debug{mqtt} || 1 ) {
if ( $main::Debug{mqtt} ) {
my $xStr = defined($msg) ? "($msg)" : "undefined message";
$xStr .= defined($set_by) ? ", ($set_by)" : ", undefined set_by";
$xStr .=
", Obj: " . defined( $$self{object_name} )
? ", $$self{object_name}"
: ", undefined object_name"; # @FIXME: Use of uninitialized value
$xStr .= defined($set_by) ? ", ($set_by)" : ", undefined set_by, Obj: ";
$xStr .= defined($$self{object_name}) ? ", $$self{object_name}" : ", undefined object_name"; # @FIXME: Use of uninitialized value

&main::print_log("*** mqtt mqtt set $$self{instance}: [$xStr]");
&main::print_log(
Expand Down Expand Up @@ -960,11 +878,11 @@ sub parse_data_to_obj {

#
for my $obj ( @{ $$self{objects} } ) {
if ( "$$obj{topic}" eq "$$msg{topic}" ) {
if ( "$$obj{topic}" eq "$$msg{topic}" || "$$obj{topic}" eq "$$msg{topic}/set" ) {
$obj->set( $$msg{message}, $self, );
}
}
else {
#&main::print_log ("***mqtt mqtt obj ($$obj{topic}) vs ($$msg{topic})");
&main::print_log ("***mqtt mqtt obj ($$obj{topic}) vs ($$msg{topic})") if ( $main::Debug{mqtt} );
}
}

Expand All @@ -991,21 +909,21 @@ use Data::Dumper;

=over

=item name: the 'friendly' name of the squeezebox in squeezecenter. This parameter is used to link this object to the correct status messages in the CLI interface of squeezecenter
=item name: the name of the object seen in Misterhouse

=item interface: the object that is the CLI interface to assign this player to.
=item interface: the parent (mqtt) object that holds the connection info.

=item interface: the topic that is used to update the object state and/or control a mqtt device

=back

The following parameters are optional

=over

=item amplifier: the object that needs to be enabled and disabled together with the squeezebox

=item auto_off_time: the time (in minutes) the squeezebox and the optional attached amplifier should be turned off after a playlist has ended
=item retain

=item preheat_time: the time (in seconds) the amplifier should be turned on before a notification is played if the amplifier is off. This enables the amplifier to turn on and enable the speakers before the notification is played.
=item qos

=back

Expand Down