001/** 002 * Copyright (c) 2014 Digi International Inc., 003 * All rights not expressly granted are reserved. 004 * 005 * This Source Code Form is subject to the terms of the Mozilla Public 006 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 007 * You can obtain one at http://mozilla.org/MPL/2.0/. 008 * 009 * Digi International Inc. 11001 Bren Road East, Minnetonka, MN 55343 010 * ======================================================================= 011*/ 012package com.digi.xbee.api.connection; 013 014import java.io.IOException; 015import java.util.ArrayList; 016import java.util.HashMap; 017import java.util.concurrent.Executors; 018import java.util.concurrent.ScheduledExecutorService; 019 020import org.slf4j.Logger; 021import org.slf4j.LoggerFactory; 022 023import com.digi.xbee.api.RemoteRaw802Device; 024import com.digi.xbee.api.RemoteXBeeDevice; 025import com.digi.xbee.api.XBeeDevice; 026import com.digi.xbee.api.XBeeNetwork; 027import com.digi.xbee.api.exceptions.InvalidPacketException; 028import com.digi.xbee.api.exceptions.XBeeException; 029import com.digi.xbee.api.io.IOSample; 030import com.digi.xbee.api.listeners.IIOSampleReceiveListener; 031import com.digi.xbee.api.listeners.IModemStatusReceiveListener; 032import com.digi.xbee.api.listeners.IPacketReceiveListener; 033import com.digi.xbee.api.listeners.IDataReceiveListener; 034import com.digi.xbee.api.models.ModemStatusEvent; 035import com.digi.xbee.api.models.SpecialByte; 036import com.digi.xbee.api.models.OperatingMode; 037import com.digi.xbee.api.models.XBeeMessage; 038import com.digi.xbee.api.models.XBeePacketsQueue; 039import com.digi.xbee.api.packet.XBeeAPIPacket; 040import com.digi.xbee.api.packet.APIFrameType; 041import com.digi.xbee.api.packet.XBeePacket; 042import com.digi.xbee.api.packet.XBeePacketParser; 043import com.digi.xbee.api.packet.common.IODataSampleRxIndicatorPacket; 044import com.digi.xbee.api.packet.common.ModemStatusPacket; 045import com.digi.xbee.api.packet.common.ReceivePacket; 046import com.digi.xbee.api.packet.raw.RX16IOPacket; 047import com.digi.xbee.api.packet.raw.RX16Packet; 048import com.digi.xbee.api.packet.raw.RX64IOPacket; 049import com.digi.xbee.api.packet.raw.RX64Packet; 050import com.digi.xbee.api.utils.HexUtils; 051 052/** 053 * Thread that constantly reads data from an input stream. 054 * 055 * <p>Depending on the XBee operating mode, read data is notified as is to the 056 * subscribed listeners or is parsed to a packet using the packet parser and 057 * then notified to subscribed listeners.</p> 058 */ 059public class DataReader extends Thread { 060 061 // Constants. 062 private final static int ALL_FRAME_IDS = 99999; 063 private final static int MAXIMUM_PARALLEL_LISTENER_THREADS = 20; 064 065 // Variables. 066 private boolean running = false; 067 068 private IConnectionInterface connectionInterface; 069 070 private volatile OperatingMode mode; 071 072 private ArrayList<IDataReceiveListener> dataReceiveListeners = new ArrayList<IDataReceiveListener>(); 073 // The packetReceiveListeners requires to be a HashMap with an associated integer. The integer is used to determine 074 // the frame ID of the packet that should be received. When it is 99999 (ALL_FRAME_IDS), all the packets will be handled. 075 private HashMap<IPacketReceiveListener, Integer> packetReceiveListeners = new HashMap<IPacketReceiveListener, Integer>(); 076 private ArrayList<IIOSampleReceiveListener> ioSampleReceiveListeners = new ArrayList<IIOSampleReceiveListener>(); 077 private ArrayList<IModemStatusReceiveListener> modemStatusListeners = new ArrayList<IModemStatusReceiveListener>(); 078 079 private Logger logger; 080 081 private XBeePacketParser parser; 082 083 private XBeePacketsQueue xbeePacketsQueue; 084 085 private XBeeDevice xbeeDevice; 086 087 /** 088 * Class constructor. Instantiates a new {@code DataReader} object for the 089 * given connection interface using the given XBee operating mode and XBee 090 * device. 091 * 092 * @param connectionInterface Connection interface to read data from. 093 * @param mode XBee operating mode. 094 * @param xbeeDevice Reference to the XBee device containing this 095 * {@code DataReader} object. 096 * 097 * @throws NullPointerException if {@code connectionInterface == null} or 098 * {@code mode == null}. 099 * 100 * @see IConnectionInterface 101 * @see com.digi.xbee.api.XBeeDevice 102 * @see com.digi.xbee.api.models.OperatingMode 103 */ 104 public DataReader(IConnectionInterface connectionInterface, OperatingMode mode, XBeeDevice xbeeDevice) { 105 if (connectionInterface == null) 106 throw new NullPointerException("Connection interface cannot be null."); 107 if (mode == null) 108 throw new NullPointerException("Operating mode cannot be null."); 109 110 this.connectionInterface = connectionInterface; 111 this.mode = mode; 112 this.xbeeDevice = xbeeDevice; 113 this.logger = LoggerFactory.getLogger(DataReader.class); 114 parser = new XBeePacketParser(); 115 xbeePacketsQueue = new XBeePacketsQueue(); 116 } 117 118 /** 119 * Sets the XBee operating mode of this data reader. 120 * 121 * @param mode New XBee operating mode. 122 * 123 * @throws NullPointerException if {@code mode == null}. 124 * 125 * @see com.digi.xbee.api.models.OperatingMode 126 */ 127 public void setXBeeReaderMode(OperatingMode mode) { 128 if (mode == null) 129 throw new NullPointerException("Operating mode cannot be null."); 130 131 this.mode = mode; 132 } 133 134 /** 135 * Adds the given data receive listener to the list of listeners that will 136 * be notified when XBee data packets are received. 137 * 138 * <p>If the listener has been already added, this method does nothing.</p> 139 * 140 * @param listener Listener to be notified when new XBee data packets are 141 * received. 142 * 143 * @see #removeDataReceiveListener(IDataReceiveListener) 144 * @see com.digi.xbee.api.listeners.IDataReceiveListener 145 */ 146 public void addDataReceiveListener(IDataReceiveListener listener) { 147 synchronized (dataReceiveListeners) { 148 if (!dataReceiveListeners.contains(listener)) 149 dataReceiveListeners.add(listener); 150 } 151 } 152 153 /** 154 * Removes the given data receive listener from the list of data receive 155 * listeners. 156 * 157 * <p>If the listener is not included in the list, this method does nothing. 158 * </p> 159 * 160 * @param listener Data receive listener to be remove from the list. 161 * 162 * @see #addDataReceiveListener(IDataReceiveListener) 163 * @see com.digi.xbee.api.listeners.IDataReceiveListener 164 */ 165 public void removeDataReceiveListener(IDataReceiveListener listener) { 166 synchronized (dataReceiveListeners) { 167 if (dataReceiveListeners.contains(listener)) 168 dataReceiveListeners.remove(listener); 169 } 170 } 171 172 /** 173 * Adds the given packet receive listener to the list of listeners that will 174 * be notified when any XBee packet is received. 175 * 176 * <p>If the listener has been already added, this method does nothing.</p> 177 * 178 * @param listener Listener to be notified when any XBee packet is received. 179 * 180 * @see #addPacketReceiveListener(IPacketReceiveListener, int) 181 * @see #removePacketReceiveListener(IPacketReceiveListener) 182 * @see com.digi.xbee.api.listeners.IPacketReceiveListener 183 */ 184 public void addPacketReceiveListener(IPacketReceiveListener listener) { 185 addPacketReceiveListener(listener, ALL_FRAME_IDS); 186 } 187 188 /** 189 * Adds the given packet receive listener to the list of listeners that will 190 * be notified when an XBee packet with the given frame ID is received. 191 * 192 * <p>If the listener has been already added, this method does nothing.</p> 193 * 194 * @param listener Listener to be notified when an XBee packet with the 195 * provided frame ID is received. 196 * @param frameID Frame ID for which this listener should be notified and 197 * removed after. 198 * Using {@link #ALL_FRAME_IDS} this listener will be 199 * notified always and will be removed only by user request. 200 * 201 * @see #addPacketReceiveListener(IPacketReceiveListener) 202 * @see #removePacketReceiveListener(IPacketReceiveListener) 203 * @see com.digi.xbee.api.listeners.IPacketReceiveListener 204 */ 205 public void addPacketReceiveListener(IPacketReceiveListener listener, int frameID) { 206 synchronized (packetReceiveListeners) { 207 if (!packetReceiveListeners.containsKey(listener)) 208 packetReceiveListeners.put(listener, frameID); 209 } 210 } 211 212 /** 213 * Removes the given packet receive listener from the list of XBee packet 214 * receive listeners. 215 * 216 * <p>If the listener is not included in the list, this method does nothing. 217 * </p> 218 * 219 * @param listener Packet receive listener to remove from the list. 220 * 221 * @see #addPacketReceiveListener(IPacketReceiveListener) 222 * @see #addPacketReceiveListener(IPacketReceiveListener, int) 223 * @see com.digi.xbee.api.listeners.IPacketReceiveListener 224 */ 225 public void removePacketReceiveListener(IPacketReceiveListener listener) { 226 synchronized (packetReceiveListeners) { 227 if (packetReceiveListeners.containsKey(listener)) 228 packetReceiveListeners.remove(listener); 229 } 230 } 231 232 /** 233 * Adds the given IO sample receive listener to the list of listeners that 234 * will be notified when an IO sample packet is received. 235 * 236 * <p>If the listener has been already added, this method does nothing.</p> 237 * 238 * @param listener Listener to be notified when new IO sample packets are 239 * received. 240 * 241 * @see #removeIOSampleReceiveListener(IIOSampleReceiveListener) 242 * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener 243 */ 244 public void addIOSampleReceiveListener(IIOSampleReceiveListener listener) { 245 synchronized (ioSampleReceiveListeners) { 246 if (!ioSampleReceiveListeners.contains(listener)) 247 ioSampleReceiveListeners.add(listener); 248 } 249 } 250 251 /** 252 * Removes the given IO sample receive listener from the list of IO sample 253 * receive listeners. 254 * 255 * <p>If the listener is not included in the list, this method does nothing. 256 * </p> 257 * 258 * @param listener IO sample receive listener to remove from the list. 259 * 260 * @see #addIOSampleReceiveListener(IIOSampleReceiveListener) 261 * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener 262 */ 263 public void removeIOSampleReceiveListener(IIOSampleReceiveListener listener) { 264 synchronized (ioSampleReceiveListeners) { 265 if (ioSampleReceiveListeners.contains(listener)) 266 ioSampleReceiveListeners.remove(listener); 267 } 268 } 269 270 /** 271 * Adds the given Modem Status receive listener to the list of listeners 272 * that will be notified when a modem status packet is received. 273 * 274 * <p>If the listener has been already added, this method does nothing.</p> 275 * 276 * @param listener Listener to be notified when new modem status packets are 277 * received. 278 * 279 * @see #removeModemStatusReceiveListener(IModemStatusReceiveListener) 280 * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener 281 */ 282 public void addModemStatusReceiveListener(IModemStatusReceiveListener listener) { 283 synchronized (modemStatusListeners) { 284 if (!modemStatusListeners.contains(listener)) 285 modemStatusListeners.add(listener); 286 } 287 } 288 289 /** 290 * Removes the given Modem Status receive listener from the list of Modem 291 * Status receive listeners. 292 * 293 * <p>If the listener is not included in the list, this method does nothing. 294 * </p> 295 * 296 * @param listener Modem Status receive listener to remove from the list. 297 * 298 * @see #addModemStatusReceiveListener(IModemStatusReceiveListener) 299 * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener 300 */ 301 public void removeModemStatusReceiveListener(IModemStatusReceiveListener listener) { 302 synchronized (modemStatusListeners) { 303 if (modemStatusListeners.contains(listener)) 304 modemStatusListeners.remove(listener); 305 } 306 } 307 308 /* 309 * (non-Javadoc) 310 * @see java.lang.Thread#run() 311 */ 312 @Override 313 public void run() { 314 logger.debug(connectionInterface.toString() + "Data reader started."); 315 running = true; 316 // Clear the list of read packets. 317 xbeePacketsQueue.clearQueue(); 318 try { 319 synchronized (connectionInterface) { 320 connectionInterface.wait(); 321 } 322 while (running) { 323 if (!running) 324 break; 325 if (connectionInterface.getInputStream() != null) { 326 switch (mode) { 327 case AT: 328 break; 329 case API: 330 case API_ESCAPE: 331 int headerByte = connectionInterface.getInputStream().read(); 332 // If it is packet header parse the packet, if not discard this byte and continue. 333 if (headerByte == SpecialByte.HEADER_BYTE.getValue()) { 334 try { 335 XBeePacket packet = parser.parsePacket(connectionInterface.getInputStream(), mode); 336 packetReceived(packet); 337 } catch (InvalidPacketException e) { 338 logger.error("Error parsing the API packet.", e); 339 } 340 } 341 break; 342 default: 343 break; 344 } 345 } else if (connectionInterface.getInputStream() == null) 346 break; 347 if (connectionInterface.getInputStream() == null) 348 break; 349 else if (connectionInterface.getInputStream().available() > 0) 350 continue; 351 synchronized (connectionInterface) { 352 connectionInterface.wait(); 353 } 354 } 355 } catch (IOException e) { 356 logger.error("Error reading from input stream.", e); 357 } catch (InterruptedException e) { 358 logger.error(e.getMessage(), e); 359 } catch (IllegalStateException e) { 360 logger.error(e.getMessage(), e); 361 } finally { 362 if (running) { 363 running = false; 364 if (connectionInterface.isOpen()) 365 connectionInterface.close(); 366 } 367 } 368 } 369 370 /** 371 * Dispatches the received XBee packet to the corresponding listener(s). 372 * 373 * @param packet The received XBee packet to be dispatched to the 374 * corresponding listeners. 375 * 376 * @see com.digi.xbee.api.packet.XBeeAPIPacket 377 * @see com.digi.xbee.api.packet.XBeePacket 378 */ 379 private void packetReceived(XBeePacket packet) { 380 // Add the packet to the packets queue. 381 xbeePacketsQueue.addPacket(packet); 382 // Notify that a packet has been received to the corresponding listeners. 383 notifyPacketReceived(packet); 384 385 // Check if the packet is an API packet. 386 if (!(packet instanceof XBeeAPIPacket)) 387 return; 388 389 // Get the API packet type. 390 XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet; 391 APIFrameType apiType = apiPacket.getFrameType(); 392 if (apiType == null) 393 return; 394 395 XBeeNetwork network = xbeeDevice.getNetwork(); 396 RemoteXBeeDevice remoteDevice = null; 397 byte[] data = null; 398 399 try { 400 switch(apiType) { 401 case RECEIVE_PACKET: 402 ReceivePacket receivePacket = (ReceivePacket)apiPacket; 403 remoteDevice = network.getDevice(receivePacket.get64bitSourceAddress()); 404 if (remoteDevice == null) { 405 remoteDevice = new RemoteXBeeDevice(xbeeDevice, receivePacket.get64bitSourceAddress()); 406 network.addRemoteDevice(remoteDevice); 407 } 408 data = receivePacket.getRFData(); 409 notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast())); 410 break; 411 case RX_64: 412 RX64Packet rx64Packet = (RX64Packet)apiPacket; 413 remoteDevice = network.getDevice(rx64Packet.get64bitSourceAddress()); 414 if (remoteDevice == null) { 415 remoteDevice = new RemoteXBeeDevice(xbeeDevice, rx64Packet.get64bitSourceAddress()); 416 network.addRemoteDevice(remoteDevice); 417 } 418 data = rx64Packet.getRFData(); 419 notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast())); 420 break; 421 case RX_16: 422 RX16Packet rx16Packet = (RX16Packet)apiPacket; 423 remoteDevice = network.getDevice(rx16Packet.get16bitSourceAddress()); 424 if (remoteDevice == null) { 425 remoteDevice = new RemoteRaw802Device(xbeeDevice, rx16Packet.get16bitSourceAddress()); 426 network.addRemoteDevice(remoteDevice); 427 } 428 data = rx16Packet.getRFData(); 429 notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast())); 430 break; 431 case IO_DATA_SAMPLE_RX_INDICATOR: 432 IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket; 433 remoteDevice = network.getDevice(ioSamplePacket.get64bitSourceAddress()); 434 if (remoteDevice == null) { 435 remoteDevice = new RemoteXBeeDevice(xbeeDevice, ioSamplePacket.get64bitSourceAddress()); 436 network.addRemoteDevice(remoteDevice); 437 } 438 notifyIOSampleReceived(remoteDevice, ioSamplePacket.getIOSample()); 439 break; 440 case RX_IO_64: 441 RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket; 442 remoteDevice = network.getDevice(rx64IOPacket.get64bitSourceAddress()); 443 if (remoteDevice == null) { 444 remoteDevice = new RemoteXBeeDevice(xbeeDevice, rx64IOPacket.get64bitSourceAddress()); 445 network.addRemoteDevice(remoteDevice); 446 } 447 notifyIOSampleReceived(remoteDevice, rx64IOPacket.getIOSample()); 448 break; 449 case RX_IO_16: 450 RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket; 451 remoteDevice = network.getDevice(rx16IOPacket.get16bitSourceAddress()); 452 if (remoteDevice == null) { 453 remoteDevice = new RemoteRaw802Device(xbeeDevice, rx16IOPacket.get16bitSourceAddress()); 454 network.addRemoteDevice(remoteDevice); 455 } 456 notifyIOSampleReceived(remoteDevice, rx16IOPacket.getIOSample()); 457 break; 458 case MODEM_STATUS: 459 ModemStatusPacket modemStatusPacket = (ModemStatusPacket)apiPacket; 460 notifyModemStatusReceived(modemStatusPacket.getStatus()); 461 default: 462 break; 463 } 464 } catch (XBeeException e) { 465 logger.error(e.getMessage(), e); 466 } 467 } 468 469 /** 470 * Notifies subscribed data receive listeners that a new XBee data packet 471 * has been received in form of an {@code XBeeMessage}. 472 * 473 * @param xbeeMessage The XBee message to be sent to subscribed XBee data 474 * listeners. 475 * 476 * @see com.digi.xbee.api.models.XBeeMessage 477 */ 478 private void notifyDataReceived(final XBeeMessage xbeeMessage) { 479 if (xbeeMessage.isBroadcast()) 480 logger.info(connectionInterface.toString() + 481 "Broadcast data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData())); 482 else 483 logger.info(connectionInterface.toString() + 484 "Data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData())); 485 486 try { 487 synchronized (dataReceiveListeners) { 488 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 489 dataReceiveListeners.size())); 490 for (final IDataReceiveListener listener:dataReceiveListeners) { 491 executor.execute(new Runnable() { 492 /* 493 * (non-Javadoc) 494 * @see java.lang.Runnable#run() 495 */ 496 @Override 497 public void run() { 498 /* Synchronize the listener so it is not called 499 twice. That is, let the listener to finish its job. */ 500 synchronized (listener) { 501 listener.dataReceived(xbeeMessage); 502 } 503 } 504 }); 505 } 506 executor.shutdown(); 507 } 508 } catch (Exception e) { 509 logger.error(e.getMessage(), e); 510 } 511 } 512 513 /** 514 * Notifies subscribed XBee packet listeners that a new XBee packet has 515 * been received. 516 * 517 * @param packet The received XBee packet. 518 * 519 * @see com.digi.xbee.api.packet.XBeeAPIPacket 520 * @see com.digi.xbee.api.packet.XBeePacket 521 */ 522 private void notifyPacketReceived(final XBeePacket packet) { 523 logger.debug(connectionInterface.toString() + "Packet received: \n{}", packet.toPrettyString()); 524 525 try { 526 synchronized (packetReceiveListeners) { 527 final ArrayList<IPacketReceiveListener> removeListeners = new ArrayList<IPacketReceiveListener>(); 528 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 529 packetReceiveListeners.size())); 530 for (final IPacketReceiveListener listener:packetReceiveListeners.keySet()) { 531 executor.execute(new Runnable() { 532 /* 533 * (non-Javadoc) 534 * @see java.lang.Runnable#run() 535 */ 536 @Override 537 public void run() { 538 // Synchronize the listener so it is not called 539 // twice. That is, let the listener to finish its job. 540 synchronized (packetReceiveListeners) { 541 synchronized (listener) { 542 if (packetReceiveListeners.get(listener) == ALL_FRAME_IDS) 543 listener.packetReceived(packet); 544 else if (((XBeeAPIPacket)packet).needsAPIFrameID() && 545 ((XBeeAPIPacket)packet).getFrameID() == packetReceiveListeners.get(listener)) { 546 listener.packetReceived(packet); 547 removeListeners.add(listener); 548 } 549 } 550 } 551 } 552 }); 553 } 554 executor.shutdown(); 555 // Remove required listeners. 556 for (IPacketReceiveListener listener:removeListeners) 557 packetReceiveListeners.remove(listener); 558 } 559 } catch (Exception e) { 560 logger.error(e.getMessage(), e); 561 } 562 } 563 564 /** 565 * Notifies subscribed IO sample listeners that a new IO sample packet has 566 * been received. 567 * 568 * @param ioSample The received IO sample. 569 * @param remoteDevice The remote XBee device that sent the sample. 570 * 571 * @see com.digi.xbee.api.RemoteXBeeDevice 572 * @see com.digi.xbee.api.io.IOSample 573 */ 574 private void notifyIOSampleReceived(final RemoteXBeeDevice remoteDevice, final IOSample ioSample) { 575 logger.debug(connectionInterface.toString() + "IO sample received."); 576 577 try { 578 synchronized (ioSampleReceiveListeners) { 579 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 580 ioSampleReceiveListeners.size())); 581 for (final IIOSampleReceiveListener listener:ioSampleReceiveListeners) { 582 executor.execute(new Runnable() { 583 /* 584 * (non-Javadoc) 585 * @see java.lang.Runnable#run() 586 */ 587 @Override 588 public void run() { 589 // Synchronize the listener so it is not called 590 // twice. That is, let the listener to finish its job. 591 synchronized (listener) { 592 listener.ioSampleReceived(remoteDevice, ioSample); 593 } 594 } 595 }); 596 } 597 executor.shutdown(); 598 } 599 } catch (Exception e) { 600 logger.error(e.getMessage(), e); 601 } 602 } 603 604 /** 605 * Notifies subscribed Modem Status listeners that a Modem Status event 606 * packet has been received. 607 * 608 * @param modemStatusEvent The Modem Status event. 609 * 610 * @see com.digi.xbee.api.models.ModemStatusEvent 611 */ 612 private void notifyModemStatusReceived(final ModemStatusEvent modemStatusEvent) { 613 logger.debug(connectionInterface.toString() + "Modem Status event received."); 614 615 try { 616 synchronized (modemStatusListeners) { 617 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 618 modemStatusListeners.size())); 619 for (final IModemStatusReceiveListener listener:modemStatusListeners) { 620 executor.execute(new Runnable() { 621 /* 622 * (non-Javadoc) 623 * @see java.lang.Runnable#run() 624 */ 625 @Override 626 public void run() { 627 // Synchronize the listener so it is not called 628 // twice. That is, let the listener to finish its job. 629 synchronized (listener) { 630 listener.modemStatusEventReceived(modemStatusEvent); 631 } 632 } 633 }); 634 } 635 executor.shutdown(); 636 } 637 } catch (Exception e) { 638 logger.error(e.getMessage(), e); 639 } 640 } 641 642 /** 643 * Returns whether this Data reader is running or not. 644 * 645 * @return {@code true} if the Data reader is running, {@code false} 646 * otherwise. 647 * 648 * @see #stopReader() 649 */ 650 public boolean isRunning() { 651 return running; 652 } 653 654 /** 655 * Stops the Data reader thread. 656 * 657 * @see #isRunning() 658 */ 659 public void stopReader() { 660 running = false; 661 synchronized (connectionInterface) { 662 connectionInterface.notify(); 663 } 664 logger.debug(connectionInterface.toString() + "Data reader stopped."); 665 } 666 667 /** 668 * Returns the queue of read XBee packets. 669 * 670 * @return The queue of read XBee packets. 671 * 672 * @see com.digi.xbee.api.models.XBeePacketsQueue 673 */ 674 public XBeePacketsQueue getXBeePacketsQueue() { 675 return xbeePacketsQueue; 676 } 677}