001/** 002 * Copyright (c) 2014-2015 Digi International Inc., 003 * All rights not expressly granted are reserved. 004 * 005 * This Source Code Form is subject to the terms of the Mozilla Public 006 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 007 * You can obtain one at http://mozilla.org/MPL/2.0/. 008 * 009 * Digi International Inc. 11001 Bren Road East, Minnetonka, MN 55343 010 * ======================================================================= 011*/ 012package com.digi.xbee.api.connection; 013 014import java.io.IOException; 015import java.util.ArrayList; 016import java.util.HashMap; 017import java.util.concurrent.Executors; 018import java.util.concurrent.ScheduledExecutorService; 019 020import org.slf4j.Logger; 021import org.slf4j.LoggerFactory; 022 023import com.digi.xbee.api.RemoteDigiMeshDevice; 024import com.digi.xbee.api.RemoteDigiPointDevice; 025import com.digi.xbee.api.RemoteRaw802Device; 026import com.digi.xbee.api.RemoteXBeeDevice; 027import com.digi.xbee.api.RemoteZigBeeDevice; 028import com.digi.xbee.api.XBeeDevice; 029import com.digi.xbee.api.XBeeNetwork; 030import com.digi.xbee.api.exceptions.InvalidPacketException; 031import com.digi.xbee.api.exceptions.XBeeException; 032import com.digi.xbee.api.io.IOSample; 033import com.digi.xbee.api.listeners.IExplicitDataReceiveListener; 034import com.digi.xbee.api.listeners.IIOSampleReceiveListener; 035import com.digi.xbee.api.listeners.IModemStatusReceiveListener; 036import com.digi.xbee.api.listeners.IPacketReceiveListener; 037import com.digi.xbee.api.listeners.IDataReceiveListener; 038import com.digi.xbee.api.models.ExplicitXBeeMessage; 039import com.digi.xbee.api.models.ModemStatusEvent; 040import com.digi.xbee.api.models.SpecialByte; 041import com.digi.xbee.api.models.OperatingMode; 042import com.digi.xbee.api.models.XBee16BitAddress; 043import com.digi.xbee.api.models.XBee64BitAddress; 044import com.digi.xbee.api.models.XBeeMessage; 045import com.digi.xbee.api.models.XBeePacketsQueue; 046import com.digi.xbee.api.packet.XBeeAPIPacket; 047import com.digi.xbee.api.packet.APIFrameType; 048import com.digi.xbee.api.packet.XBeePacket; 049import com.digi.xbee.api.packet.XBeePacketParser; 050import com.digi.xbee.api.packet.common.ExplicitRxIndicatorPacket; 051import com.digi.xbee.api.packet.common.IODataSampleRxIndicatorPacket; 052import com.digi.xbee.api.packet.common.ModemStatusPacket; 053import com.digi.xbee.api.packet.common.ReceivePacket; 054import com.digi.xbee.api.packet.raw.RX16IOPacket; 055import com.digi.xbee.api.packet.raw.RX16Packet; 056import com.digi.xbee.api.packet.raw.RX64IOPacket; 057import com.digi.xbee.api.packet.raw.RX64Packet; 058import com.digi.xbee.api.utils.HexUtils; 059 060/** 061 * Thread that constantly reads data from an input stream. 062 * 063 * <p>Depending on the XBee operating mode, read data is notified as is to the 064 * subscribed listeners or is parsed to a packet using the packet parser and 065 * then notified to subscribed listeners.</p> 066 */ 067public class DataReader extends Thread { 068 069 // Constants. 070 private final static int ALL_FRAME_IDS = 99999; 071 private final static int MAXIMUM_PARALLEL_LISTENER_THREADS = 20; 072 073 // Variables. 074 private boolean running = false; 075 076 private IConnectionInterface connectionInterface; 077 078 private volatile OperatingMode mode; 079 080 private ArrayList<IDataReceiveListener> dataReceiveListeners = new ArrayList<IDataReceiveListener>(); 081 // The packetReceiveListeners requires to be a HashMap with an associated integer. The integer is used to determine 082 // the frame ID of the packet that should be received. When it is 99999 (ALL_FRAME_IDS), all the packets will be handled. 083 private HashMap<IPacketReceiveListener, Integer> packetReceiveListeners = new HashMap<IPacketReceiveListener, Integer>(); 084 private ArrayList<IIOSampleReceiveListener> ioSampleReceiveListeners = new ArrayList<IIOSampleReceiveListener>(); 085 private ArrayList<IModemStatusReceiveListener> modemStatusListeners = new ArrayList<IModemStatusReceiveListener>(); 086 private ArrayList<IExplicitDataReceiveListener> explicitDataReceiveListeners = new ArrayList<IExplicitDataReceiveListener>(); 087 088 private Logger logger; 089 090 private XBeePacketParser parser; 091 092 private XBeePacketsQueue xbeePacketsQueue; 093 094 private XBeeDevice xbeeDevice; 095 096 /** 097 * Class constructor. Instantiates a new {@code DataReader} object for the 098 * given connection interface using the given XBee operating mode and XBee 099 * device. 100 * 101 * @param connectionInterface Connection interface to read data from. 102 * @param mode XBee operating mode. 103 * @param xbeeDevice Reference to the XBee device containing this 104 * {@code DataReader} object. 105 * 106 * @throws NullPointerException if {@code connectionInterface == null} or 107 * {@code mode == null}. 108 * 109 * @see IConnectionInterface 110 * @see com.digi.xbee.api.XBeeDevice 111 * @see com.digi.xbee.api.models.OperatingMode 112 */ 113 public DataReader(IConnectionInterface connectionInterface, OperatingMode mode, XBeeDevice xbeeDevice) { 114 if (connectionInterface == null) 115 throw new NullPointerException("Connection interface cannot be null."); 116 if (mode == null) 117 throw new NullPointerException("Operating mode cannot be null."); 118 119 this.connectionInterface = connectionInterface; 120 this.mode = mode; 121 this.xbeeDevice = xbeeDevice; 122 this.logger = LoggerFactory.getLogger(DataReader.class); 123 parser = new XBeePacketParser(); 124 xbeePacketsQueue = new XBeePacketsQueue(); 125 } 126 127 /** 128 * Sets the XBee operating mode of this data reader. 129 * 130 * @param mode New XBee operating mode. 131 * 132 * @throws NullPointerException if {@code mode == null}. 133 * 134 * @see com.digi.xbee.api.models.OperatingMode 135 */ 136 public void setXBeeReaderMode(OperatingMode mode) { 137 if (mode == null) 138 throw new NullPointerException("Operating mode cannot be null."); 139 140 this.mode = mode; 141 } 142 143 /** 144 * Adds the given data receive listener to the list of listeners that will 145 * be notified when XBee data packets are received. 146 * 147 * <p>If the listener has been already added, this method does nothing.</p> 148 * 149 * @param listener Listener to be notified when new XBee data packets are 150 * received. 151 * 152 * @see #removeDataReceiveListener(IDataReceiveListener) 153 * @see com.digi.xbee.api.listeners.IDataReceiveListener 154 */ 155 public void addDataReceiveListener(IDataReceiveListener listener) { 156 synchronized (dataReceiveListeners) { 157 if (!dataReceiveListeners.contains(listener)) 158 dataReceiveListeners.add(listener); 159 } 160 } 161 162 /** 163 * Removes the given data receive listener from the list of data receive 164 * listeners. 165 * 166 * <p>If the listener is not included in the list, this method does nothing. 167 * </p> 168 * 169 * @param listener Data receive listener to be remove from the list. 170 * 171 * @see #addDataReceiveListener(IDataReceiveListener) 172 * @see com.digi.xbee.api.listeners.IDataReceiveListener 173 */ 174 public void removeDataReceiveListener(IDataReceiveListener listener) { 175 synchronized (dataReceiveListeners) { 176 if (dataReceiveListeners.contains(listener)) 177 dataReceiveListeners.remove(listener); 178 } 179 } 180 181 /** 182 * Adds the given packet receive listener to the list of listeners that will 183 * be notified when any XBee packet is received. 184 * 185 * <p>If the listener has been already added, this method does nothing.</p> 186 * 187 * @param listener Listener to be notified when any XBee packet is received. 188 * 189 * @see #addPacketReceiveListener(IPacketReceiveListener, int) 190 * @see #removePacketReceiveListener(IPacketReceiveListener) 191 * @see com.digi.xbee.api.listeners.IPacketReceiveListener 192 */ 193 public void addPacketReceiveListener(IPacketReceiveListener listener) { 194 addPacketReceiveListener(listener, ALL_FRAME_IDS); 195 } 196 197 /** 198 * Adds the given packet receive listener to the list of listeners that will 199 * be notified when an XBee packet with the given frame ID is received. 200 * 201 * <p>If the listener has been already added, this method does nothing.</p> 202 * 203 * @param listener Listener to be notified when an XBee packet with the 204 * provided frame ID is received. 205 * @param frameID Frame ID for which this listener should be notified and 206 * removed after. 207 * Using {@link #ALL_FRAME_IDS} this listener will be 208 * notified always and will be removed only by user request. 209 * 210 * @see #addPacketReceiveListener(IPacketReceiveListener) 211 * @see #removePacketReceiveListener(IPacketReceiveListener) 212 * @see com.digi.xbee.api.listeners.IPacketReceiveListener 213 */ 214 public void addPacketReceiveListener(IPacketReceiveListener listener, int frameID) { 215 synchronized (packetReceiveListeners) { 216 if (!packetReceiveListeners.containsKey(listener)) 217 packetReceiveListeners.put(listener, frameID); 218 } 219 } 220 221 /** 222 * Removes the given packet receive listener from the list of XBee packet 223 * receive listeners. 224 * 225 * <p>If the listener is not included in the list, this method does nothing. 226 * </p> 227 * 228 * @param listener Packet receive listener to remove from the list. 229 * 230 * @see #addPacketReceiveListener(IPacketReceiveListener) 231 * @see #addPacketReceiveListener(IPacketReceiveListener, int) 232 * @see com.digi.xbee.api.listeners.IPacketReceiveListener 233 */ 234 public void removePacketReceiveListener(IPacketReceiveListener listener) { 235 synchronized (packetReceiveListeners) { 236 if (packetReceiveListeners.containsKey(listener)) 237 packetReceiveListeners.remove(listener); 238 } 239 } 240 241 /** 242 * Adds the given IO sample receive listener to the list of listeners that 243 * will be notified when an IO sample packet is received. 244 * 245 * <p>If the listener has been already added, this method does nothing.</p> 246 * 247 * @param listener Listener to be notified when new IO sample packets are 248 * received. 249 * 250 * @see #removeIOSampleReceiveListener(IIOSampleReceiveListener) 251 * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener 252 */ 253 public void addIOSampleReceiveListener(IIOSampleReceiveListener listener) { 254 synchronized (ioSampleReceiveListeners) { 255 if (!ioSampleReceiveListeners.contains(listener)) 256 ioSampleReceiveListeners.add(listener); 257 } 258 } 259 260 /** 261 * Removes the given IO sample receive listener from the list of IO sample 262 * receive listeners. 263 * 264 * <p>If the listener is not included in the list, this method does nothing. 265 * </p> 266 * 267 * @param listener IO sample receive listener to remove from the list. 268 * 269 * @see #addIOSampleReceiveListener(IIOSampleReceiveListener) 270 * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener 271 */ 272 public void removeIOSampleReceiveListener(IIOSampleReceiveListener listener) { 273 synchronized (ioSampleReceiveListeners) { 274 if (ioSampleReceiveListeners.contains(listener)) 275 ioSampleReceiveListeners.remove(listener); 276 } 277 } 278 279 /** 280 * Adds the given Modem Status receive listener to the list of listeners 281 * that will be notified when a modem status packet is received. 282 * 283 * <p>If the listener has been already added, this method does nothing.</p> 284 * 285 * @param listener Listener to be notified when new modem status packets are 286 * received. 287 * 288 * @see #removeModemStatusReceiveListener(IModemStatusReceiveListener) 289 * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener 290 */ 291 public void addModemStatusReceiveListener(IModemStatusReceiveListener listener) { 292 synchronized (modemStatusListeners) { 293 if (!modemStatusListeners.contains(listener)) 294 modemStatusListeners.add(listener); 295 } 296 } 297 298 /** 299 * Removes the given Modem Status receive listener from the list of Modem 300 * Status receive listeners. 301 * 302 * <p>If the listener is not included in the list, this method does nothing. 303 * </p> 304 * 305 * @param listener Modem Status receive listener to remove from the list. 306 * 307 * @see #addModemStatusReceiveListener(IModemStatusReceiveListener) 308 * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener 309 */ 310 public void removeModemStatusReceiveListener(IModemStatusReceiveListener listener) { 311 synchronized (modemStatusListeners) { 312 if (modemStatusListeners.contains(listener)) 313 modemStatusListeners.remove(listener); 314 } 315 } 316 317 /** 318 * Adds the given explicit data receive listener to the list of listeners 319 * that will be notified when an explicit data packet is received. 320 * 321 * <p>If the listener has been already added, this method does nothing.</p> 322 * 323 * @param listener Listener to be notified when new explicit data packets 324 * are received. 325 * 326 * @see #removeExplicitDataReceiveListener(IExplicitDataReceiveListener) 327 * @see com.digi.xbee.api.listeners.IExplicitDataReceiveListener 328 */ 329 public void addExplicitDataReceiveListener(IExplicitDataReceiveListener listener) { 330 synchronized (explicitDataReceiveListeners) { 331 if (!explicitDataReceiveListeners.contains(listener)) 332 explicitDataReceiveListeners.add(listener); 333 } 334 } 335 336 /** 337 * Removes the given explicit data receive listener from the list of 338 * explicit data receive listeners. 339 * 340 * <p>If the listener is not included in the list, this method does nothing. 341 * </p> 342 * 343 * @param listener Explicit data receive listener to remove from the list. 344 * 345 * @see #addExplicitDataReceiveListener(IExplicitDataReceiveListener) 346 * @see com.digi.xbee.api.listeners.IExplicitDataReceiveListener 347 */ 348 public void removeExplicitDataReceiveListener(IExplicitDataReceiveListener listener) { 349 synchronized (explicitDataReceiveListeners) { 350 if (explicitDataReceiveListeners.contains(listener)) 351 explicitDataReceiveListeners.remove(listener); 352 } 353 } 354 355 /* 356 * (non-Javadoc) 357 * @see java.lang.Thread#run() 358 */ 359 @Override 360 public void run() { 361 logger.debug(connectionInterface.toString() + "Data reader started."); 362 running = true; 363 // Clear the list of read packets. 364 xbeePacketsQueue.clearQueue(); 365 try { 366 synchronized (connectionInterface) { 367 connectionInterface.wait(); 368 } 369 while (running) { 370 if (!running) 371 break; 372 if (connectionInterface.getInputStream() != null) { 373 switch (mode) { 374 case AT: 375 break; 376 case API: 377 case API_ESCAPE: 378 int headerByte = connectionInterface.getInputStream().read(); 379 // If it is packet header parse the packet, if not discard this byte and continue. 380 if (headerByte == SpecialByte.HEADER_BYTE.getValue()) { 381 try { 382 XBeePacket packet = parser.parsePacket(connectionInterface.getInputStream(), mode); 383 packetReceived(packet); 384 } catch (InvalidPacketException e) { 385 logger.error("Error parsing the API packet.", e); 386 } 387 } 388 break; 389 default: 390 break; 391 } 392 } else if (connectionInterface.getInputStream() == null) 393 break; 394 if (connectionInterface.getInputStream() == null) 395 break; 396 else if (connectionInterface.getInputStream().available() > 0) 397 continue; 398 synchronized (connectionInterface) { 399 connectionInterface.wait(); 400 } 401 } 402 } catch (IOException e) { 403 logger.error("Error reading from input stream.", e); 404 } catch (InterruptedException e) { 405 logger.error(e.getMessage(), e); 406 } catch (IllegalStateException e) { 407 logger.error(e.getMessage(), e); 408 } finally { 409 if (running) { 410 running = false; 411 if (connectionInterface.isOpen()) 412 connectionInterface.close(); 413 } 414 } 415 } 416 417 /** 418 * Dispatches the received XBee packet to the corresponding listener(s). 419 * 420 * @param packet The received XBee packet to be dispatched to the 421 * corresponding listeners. 422 * 423 * @see com.digi.xbee.api.packet.XBeeAPIPacket 424 * @see com.digi.xbee.api.packet.XBeePacket 425 */ 426 private void packetReceived(XBeePacket packet) { 427 // Add the packet to the packets queue. 428 xbeePacketsQueue.addPacket(packet); 429 // Notify that a packet has been received to the corresponding listeners. 430 notifyPacketReceived(packet); 431 432 // Check if the packet is an API packet. 433 if (!(packet instanceof XBeeAPIPacket)) 434 return; 435 436 // Get the API packet type. 437 XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet; 438 APIFrameType apiType = apiPacket.getFrameType(); 439 if (apiType == null) 440 return; 441 442 try { 443 // Obtain the remote device from the packet. 444 RemoteXBeeDevice remoteDevice = getRemoteXBeeDeviceFromPacket(apiPacket); 445 byte[] data = null; 446 447 switch(apiType) { 448 case RECEIVE_PACKET: 449 ReceivePacket receivePacket = (ReceivePacket)apiPacket; 450 data = receivePacket.getRFData(); 451 notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast())); 452 break; 453 case RX_64: 454 RX64Packet rx64Packet = (RX64Packet)apiPacket; 455 data = rx64Packet.getRFData(); 456 notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast())); 457 break; 458 case RX_16: 459 RX16Packet rx16Packet = (RX16Packet)apiPacket; 460 data = rx16Packet.getRFData(); 461 notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast())); 462 break; 463 case IO_DATA_SAMPLE_RX_INDICATOR: 464 IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket; 465 notifyIOSampleReceived(remoteDevice, ioSamplePacket.getIOSample()); 466 break; 467 case RX_IO_64: 468 RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket; 469 notifyIOSampleReceived(remoteDevice, rx64IOPacket.getIOSample()); 470 break; 471 case RX_IO_16: 472 RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket; 473 notifyIOSampleReceived(remoteDevice, rx16IOPacket.getIOSample()); 474 break; 475 case MODEM_STATUS: 476 ModemStatusPacket modemStatusPacket = (ModemStatusPacket)apiPacket; 477 notifyModemStatusReceived(modemStatusPacket.getStatus()); 478 break; 479 case EXPLICIT_RX_INDICATOR: 480 ExplicitRxIndicatorPacket explicitDataPacket = (ExplicitRxIndicatorPacket)apiPacket; 481 int sourceEndpoint = explicitDataPacket.getSourceEndpoint(); 482 int destEndpoint = explicitDataPacket.getDestinationEndpoint(); 483 int clusterID = explicitDataPacket.getClusterID(); 484 int profileID = explicitDataPacket.getProfileID(); 485 data = explicitDataPacket.getRFData(); 486 // If this is an explicit packet for data transmissions in the Digi profile, 487 // notify also the data listener and add a Receive packet to the queue. 488 if (sourceEndpoint == ExplicitRxIndicatorPacket.DATA_ENDPOINT && 489 destEndpoint == ExplicitRxIndicatorPacket.DATA_ENDPOINT && 490 clusterID == ExplicitRxIndicatorPacket.DATA_CLUSTER && 491 profileID == ExplicitRxIndicatorPacket.DIGI_PROFILE) { 492 notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast())); 493 xbeePacketsQueue.addPacket(new ReceivePacket(explicitDataPacket.get64BitSourceAddress(), 494 explicitDataPacket.get16BitSourceAddress(), 495 explicitDataPacket.getReceiveOptions(), 496 explicitDataPacket.getRFData())); 497 } 498 notifyExplicitDataReceived(new ExplicitXBeeMessage(remoteDevice, sourceEndpoint, destEndpoint, clusterID, profileID, data, explicitDataPacket.isBroadcast())); 499 break; 500 default: 501 break; 502 } 503 504 } catch (XBeeException e) { 505 logger.error(e.getMessage(), e); 506 } 507 } 508 509 /** 510 * Returns the remote XBee device from where the given package was sent 511 * from. 512 * 513 * <p><b>This is for internal use only.</b></p> 514 * 515 * <p>If the package does not contain information about the source, this 516 * method returns {@code null} (for example, {@code ModemStatusPacket}).</p> 517 * 518 * <p>First the device that sent the provided package is looked in the 519 * network of the local XBee device. If the remote device is not in the 520 * network, it is automatically added only if the packet contains 521 * information about the origin of the package.</p> 522 * 523 * @param packet The packet sent from the remote device. 524 * 525 * @return The remote XBee device that sends the given packet. It may be 526 * {@code null} if the packet is not a known frame (see 527 * {@link APIFrameType}) or if it does not contain information of 528 * the source device. 529 * 530 * @throws NullPointerException if {@code packet == null} 531 * @throws XBeeException if any error occur while adding the device to the 532 * network. 533 */ 534 public RemoteXBeeDevice getRemoteXBeeDeviceFromPacket(XBeeAPIPacket packet) throws XBeeException { 535 if (packet == null) 536 throw new NullPointerException("XBee API packet cannot be null."); 537 538 XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet; 539 APIFrameType apiType = apiPacket.getFrameType(); 540 if (apiType == null || apiType == APIFrameType.UNKNOWN) 541 return null; 542 543 RemoteXBeeDevice remoteDevice = null; 544 XBee64BitAddress addr64 = null; 545 XBee16BitAddress addr16 = null; 546 547 XBeeNetwork network = xbeeDevice.getNetwork(); 548 549 switch(apiType) { 550 case RECEIVE_PACKET: 551 ReceivePacket receivePacket = (ReceivePacket)apiPacket; 552 addr64 = receivePacket.get64bitSourceAddress(); 553 addr16 = receivePacket.get16bitSourceAddress(); 554 remoteDevice = network.getDevice(addr64); 555 break; 556 case RX_64: 557 RX64Packet rx64Packet = (RX64Packet)apiPacket; 558 addr64 = rx64Packet.get64bitSourceAddress(); 559 remoteDevice = network.getDevice(addr64); 560 break; 561 case RX_16: 562 RX16Packet rx16Packet = (RX16Packet)apiPacket; 563 addr64 = XBee64BitAddress.UNKNOWN_ADDRESS; 564 addr16 = rx16Packet.get16bitSourceAddress(); 565 remoteDevice = network.getDevice(addr16); 566 break; 567 case IO_DATA_SAMPLE_RX_INDICATOR: 568 IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket; 569 addr64 = ioSamplePacket.get64bitSourceAddress(); 570 addr16 = ioSamplePacket.get16bitSourceAddress(); 571 remoteDevice = network.getDevice(addr64); 572 break; 573 case RX_IO_64: 574 RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket; 575 addr64 = rx64IOPacket.get64bitSourceAddress(); 576 remoteDevice = network.getDevice(addr64); 577 break; 578 case RX_IO_16: 579 RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket; 580 addr64 = XBee64BitAddress.UNKNOWN_ADDRESS; 581 addr16 = rx16IOPacket.get16bitSourceAddress(); 582 remoteDevice = network.getDevice(addr16); 583 break; 584 case EXPLICIT_RX_INDICATOR: 585 ExplicitRxIndicatorPacket explicitDataPacket = (ExplicitRxIndicatorPacket)apiPacket; 586 addr64 = explicitDataPacket.get64BitSourceAddress(); 587 addr16 = explicitDataPacket.get16BitSourceAddress(); 588 remoteDevice = network.getDevice(addr64); 589 break; 590 default: 591 // Rest of the types are considered not to contain information 592 // about the origin of the packet. 593 return remoteDevice; 594 } 595 596 // If the origin is not in the network, add it. 597 if (remoteDevice == null) { 598 remoteDevice = createRemoteXBeeDevice(addr64, addr16, null); 599 network.addRemoteDevice(remoteDevice); 600 } 601 602 return remoteDevice; 603 } 604 605 /** 606 * Creates a new remote XBee device with the provided 64-bit address, 607 * 16-bit address, node identifier and the XBee device that is using this 608 * data reader as the connection interface for the remote device. 609 * 610 * The new XBee device will be a {@code RemoteDigiMeshDevice}, 611 * a {@code RemoteDigiPointDevice}, a {@code RemoteRaw802Device} or a 612 * {@code RemoteZigBeeDevice} depending on the protocol of the local XBee 613 * device. If the protocol cannot be determined or is unknown a 614 * {@code RemoteXBeeDevice} will be created instead. 615 * 616 * @param addr64 The 64-bit address of the new remote device. It cannot be 617 * {@code null}. 618 * @param addr16 The 16-bit address of the new remote device. It may be 619 * {@code null}. 620 * @param ni The node identifier of the new remote device. It may be 621 * {@code null}. 622 * 623 * @return a new remote XBee device with the given parameters. 624 */ 625 private RemoteXBeeDevice createRemoteXBeeDevice(XBee64BitAddress addr64, 626 XBee16BitAddress addr16, String ni) { 627 RemoteXBeeDevice device = null; 628 629 switch (xbeeDevice.getXBeeProtocol()) { 630 case ZIGBEE: 631 device = new RemoteZigBeeDevice(xbeeDevice, addr64, addr16, ni); 632 break; 633 case DIGI_MESH: 634 device = new RemoteDigiMeshDevice(xbeeDevice, addr64, ni); 635 break; 636 case DIGI_POINT: 637 device = new RemoteDigiPointDevice(xbeeDevice, addr64, ni); 638 break; 639 case RAW_802_15_4: 640 device = new RemoteRaw802Device(xbeeDevice, addr64, addr16, ni); 641 break; 642 default: 643 device = new RemoteXBeeDevice(xbeeDevice, addr64, addr16, ni); 644 break; 645 } 646 647 return device; 648 } 649 650 /** 651 * Notifies subscribed data receive listeners that a new XBee data packet 652 * has been received in form of an {@code XBeeMessage}. 653 * 654 * @param xbeeMessage The XBee message to be sent to subscribed XBee data 655 * listeners. 656 * 657 * @see com.digi.xbee.api.models.XBeeMessage 658 */ 659 private void notifyDataReceived(final XBeeMessage xbeeMessage) { 660 if (xbeeMessage.isBroadcast()) 661 logger.info(connectionInterface.toString() + 662 "Broadcast data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData())); 663 else 664 logger.info(connectionInterface.toString() + 665 "Data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData())); 666 667 try { 668 synchronized (dataReceiveListeners) { 669 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 670 dataReceiveListeners.size())); 671 for (final IDataReceiveListener listener:dataReceiveListeners) { 672 executor.execute(new Runnable() { 673 /* 674 * (non-Javadoc) 675 * @see java.lang.Runnable#run() 676 */ 677 @Override 678 public void run() { 679 /* Synchronize the listener so it is not called 680 twice. That is, let the listener to finish its job. */ 681 synchronized (listener) { 682 listener.dataReceived(xbeeMessage); 683 } 684 } 685 }); 686 } 687 executor.shutdown(); 688 } 689 } catch (Exception e) { 690 logger.error(e.getMessage(), e); 691 } 692 } 693 694 /** 695 * Notifies subscribed XBee packet listeners that a new XBee packet has 696 * been received. 697 * 698 * @param packet The received XBee packet. 699 * 700 * @see com.digi.xbee.api.packet.XBeeAPIPacket 701 * @see com.digi.xbee.api.packet.XBeePacket 702 */ 703 private void notifyPacketReceived(final XBeePacket packet) { 704 logger.debug(connectionInterface.toString() + "Packet received: \n{}", packet.toPrettyString()); 705 706 try { 707 synchronized (packetReceiveListeners) { 708 final ArrayList<IPacketReceiveListener> removeListeners = new ArrayList<IPacketReceiveListener>(); 709 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 710 packetReceiveListeners.size())); 711 for (final IPacketReceiveListener listener:packetReceiveListeners.keySet()) { 712 executor.execute(new Runnable() { 713 /* 714 * (non-Javadoc) 715 * @see java.lang.Runnable#run() 716 */ 717 @Override 718 public void run() { 719 // Synchronize the listener so it is not called 720 // twice. That is, let the listener to finish its job. 721 synchronized (packetReceiveListeners) { 722 synchronized (listener) { 723 if (packetReceiveListeners.get(listener) == ALL_FRAME_IDS) 724 listener.packetReceived(packet); 725 else if (((XBeeAPIPacket)packet).needsAPIFrameID() && 726 ((XBeeAPIPacket)packet).getFrameID() == packetReceiveListeners.get(listener)) { 727 listener.packetReceived(packet); 728 removeListeners.add(listener); 729 } 730 } 731 } 732 } 733 }); 734 } 735 executor.shutdown(); 736 // Remove required listeners. 737 for (IPacketReceiveListener listener:removeListeners) 738 packetReceiveListeners.remove(listener); 739 } 740 } catch (Exception e) { 741 logger.error(e.getMessage(), e); 742 } 743 } 744 745 /** 746 * Notifies subscribed IO sample listeners that a new IO sample packet has 747 * been received. 748 * 749 * @param ioSample The received IO sample. 750 * @param remoteDevice The remote XBee device that sent the sample. 751 * 752 * @see com.digi.xbee.api.RemoteXBeeDevice 753 * @see com.digi.xbee.api.io.IOSample 754 */ 755 private void notifyIOSampleReceived(final RemoteXBeeDevice remoteDevice, final IOSample ioSample) { 756 logger.debug(connectionInterface.toString() + "IO sample received."); 757 758 try { 759 synchronized (ioSampleReceiveListeners) { 760 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 761 ioSampleReceiveListeners.size())); 762 for (final IIOSampleReceiveListener listener:ioSampleReceiveListeners) { 763 executor.execute(new Runnable() { 764 /* 765 * (non-Javadoc) 766 * @see java.lang.Runnable#run() 767 */ 768 @Override 769 public void run() { 770 // Synchronize the listener so it is not called 771 // twice. That is, let the listener to finish its job. 772 synchronized (listener) { 773 listener.ioSampleReceived(remoteDevice, ioSample); 774 } 775 } 776 }); 777 } 778 executor.shutdown(); 779 } 780 } catch (Exception e) { 781 logger.error(e.getMessage(), e); 782 } 783 } 784 785 /** 786 * Notifies subscribed Modem Status listeners that a Modem Status event 787 * packet has been received. 788 * 789 * @param modemStatusEvent The Modem Status event. 790 * 791 * @see com.digi.xbee.api.models.ModemStatusEvent 792 */ 793 private void notifyModemStatusReceived(final ModemStatusEvent modemStatusEvent) { 794 logger.debug(connectionInterface.toString() + "Modem Status event received."); 795 796 try { 797 synchronized (modemStatusListeners) { 798 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 799 modemStatusListeners.size())); 800 for (final IModemStatusReceiveListener listener:modemStatusListeners) { 801 executor.execute(new Runnable() { 802 /* 803 * (non-Javadoc) 804 * @see java.lang.Runnable#run() 805 */ 806 @Override 807 public void run() { 808 // Synchronize the listener so it is not called 809 // twice. That is, let the listener to finish its job. 810 synchronized (listener) { 811 listener.modemStatusEventReceived(modemStatusEvent); 812 } 813 } 814 }); 815 } 816 executor.shutdown(); 817 } 818 } catch (Exception e) { 819 logger.error(e.getMessage(), e); 820 } 821 } 822 823 /** 824 * Notifies subscribed explicit data receive listeners that a new XBee 825 * explicit data packet has been received in form of an 826 * {@code ExplicitXBeeMessage}. 827 * 828 * @param explicitXBeeMessage The XBee message to be sent to subscribed 829 * XBee data listeners. 830 * 831 * @see com.digi.xbee.api.models.ExplicitXBeeMessage 832 */ 833 private void notifyExplicitDataReceived(final ExplicitXBeeMessage explicitXBeeMessage) { 834 if (explicitXBeeMessage.isBroadcast()) 835 logger.info(connectionInterface.toString() + 836 "Broadcast explicit data received from {} >> {}.", explicitXBeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(explicitXBeeMessage.getData())); 837 else 838 logger.info(connectionInterface.toString() + 839 "Explicit data received from {} >> {}.", explicitXBeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(explicitXBeeMessage.getData())); 840 841 try { 842 synchronized (explicitDataReceiveListeners) { 843 ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 844 explicitDataReceiveListeners.size())); 845 for (final IExplicitDataReceiveListener listener:explicitDataReceiveListeners) { 846 executor.execute(new Runnable() { 847 /* 848 * (non-Javadoc) 849 * @see java.lang.Runnable#run() 850 */ 851 @Override 852 public void run() { 853 /* Synchronize the listener so it is not called 854 twice. That is, let the listener to finish its job. */ 855 synchronized (listener) { 856 listener.explicitDataReceived(explicitXBeeMessage); 857 } 858 } 859 }); 860 } 861 executor.shutdown(); 862 } 863 } catch (Exception e) { 864 logger.error(e.getMessage(), e); 865 } 866 } 867 868 /** 869 * Returns whether this Data reader is running or not. 870 * 871 * @return {@code true} if the Data reader is running, {@code false} 872 * otherwise. 873 * 874 * @see #stopReader() 875 */ 876 public boolean isRunning() { 877 return running; 878 } 879 880 /** 881 * Stops the Data reader thread. 882 * 883 * @see #isRunning() 884 */ 885 public void stopReader() { 886 running = false; 887 synchronized (connectionInterface) { 888 connectionInterface.notify(); 889 } 890 logger.debug(connectionInterface.toString() + "Data reader stopped."); 891 } 892 893 /** 894 * Returns the queue of read XBee packets. 895 * 896 * @return The queue of read XBee packets. 897 * 898 * @see com.digi.xbee.api.models.XBeePacketsQueue 899 */ 900 public XBeePacketsQueue getXBeePacketsQueue() { 901 return xbeePacketsQueue; 902 } 903}