001/**
002 * Copyright (c) 2014-2015 Digi International Inc.,
003 * All rights not expressly granted are reserved.
004 *
005 * This Source Code Form is subject to the terms of the Mozilla Public
006 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
007 * You can obtain one at http://mozilla.org/MPL/2.0/.
008 *
009 * Digi International Inc. 11001 Bren Road East, Minnetonka, MN 55343
010 * =======================================================================
011*/
012package com.digi.xbee.api.connection;
013
014import java.io.IOException;
015import java.util.ArrayList;
016import java.util.HashMap;
017import java.util.concurrent.Executors;
018import java.util.concurrent.ScheduledExecutorService;
019
020import org.slf4j.Logger;
021import org.slf4j.LoggerFactory;
022
023import com.digi.xbee.api.RemoteDigiMeshDevice;
024import com.digi.xbee.api.RemoteDigiPointDevice;
025import com.digi.xbee.api.RemoteRaw802Device;
026import com.digi.xbee.api.RemoteXBeeDevice;
027import com.digi.xbee.api.RemoteZigBeeDevice;
028import com.digi.xbee.api.XBeeDevice;
029import com.digi.xbee.api.XBeeNetwork;
030import com.digi.xbee.api.exceptions.InvalidPacketException;
031import com.digi.xbee.api.exceptions.XBeeException;
032import com.digi.xbee.api.io.IOSample;
033import com.digi.xbee.api.listeners.IExplicitDataReceiveListener;
034import com.digi.xbee.api.listeners.IIOSampleReceiveListener;
035import com.digi.xbee.api.listeners.IModemStatusReceiveListener;
036import com.digi.xbee.api.listeners.IPacketReceiveListener;
037import com.digi.xbee.api.listeners.IDataReceiveListener;
038import com.digi.xbee.api.models.ExplicitXBeeMessage;
039import com.digi.xbee.api.models.ModemStatusEvent;
040import com.digi.xbee.api.models.SpecialByte;
041import com.digi.xbee.api.models.OperatingMode;
042import com.digi.xbee.api.models.XBee16BitAddress;
043import com.digi.xbee.api.models.XBee64BitAddress;
044import com.digi.xbee.api.models.XBeeMessage;
045import com.digi.xbee.api.models.XBeePacketsQueue;
046import com.digi.xbee.api.packet.XBeeAPIPacket;
047import com.digi.xbee.api.packet.APIFrameType;
048import com.digi.xbee.api.packet.XBeePacket;
049import com.digi.xbee.api.packet.XBeePacketParser;
050import com.digi.xbee.api.packet.common.ExplicitRxIndicatorPacket;
051import com.digi.xbee.api.packet.common.IODataSampleRxIndicatorPacket;
052import com.digi.xbee.api.packet.common.ModemStatusPacket;
053import com.digi.xbee.api.packet.common.ReceivePacket;
054import com.digi.xbee.api.packet.raw.RX16IOPacket;
055import com.digi.xbee.api.packet.raw.RX16Packet;
056import com.digi.xbee.api.packet.raw.RX64IOPacket;
057import com.digi.xbee.api.packet.raw.RX64Packet;
058import com.digi.xbee.api.utils.HexUtils;
059
060/**
061 * Thread that constantly reads data from an input stream.
062 * 
063 * <p>Depending on the XBee operating mode, read data is notified as is to the 
064 * subscribed listeners or is parsed to a packet using the packet parser and 
065 * then notified to subscribed listeners.</p> 
066 */
067public class DataReader extends Thread {
068        
069        // Constants.
070        private final static int ALL_FRAME_IDS = 99999;
071        private final static int MAXIMUM_PARALLEL_LISTENER_THREADS = 20;
072        
073        // Variables.
074        private boolean running = false;
075        
076        private IConnectionInterface connectionInterface;
077        
078        private volatile OperatingMode mode;
079        
080        private ArrayList<IDataReceiveListener> dataReceiveListeners = new ArrayList<IDataReceiveListener>();
081        // The packetReceiveListeners requires to be a HashMap with an associated integer. The integer is used to determine 
082        // the frame ID of the packet that should be received. When it is 99999 (ALL_FRAME_IDS), all the packets will be handled.
083        private HashMap<IPacketReceiveListener, Integer> packetReceiveListeners = new HashMap<IPacketReceiveListener, Integer>();
084        private ArrayList<IIOSampleReceiveListener> ioSampleReceiveListeners = new ArrayList<IIOSampleReceiveListener>();
085        private ArrayList<IModemStatusReceiveListener> modemStatusListeners = new ArrayList<IModemStatusReceiveListener>();
086        private ArrayList<IExplicitDataReceiveListener> explicitDataReceiveListeners = new ArrayList<IExplicitDataReceiveListener>();
087        
088        private Logger logger;
089        
090        private XBeePacketParser parser;
091        
092        private XBeePacketsQueue xbeePacketsQueue;
093        
094        private XBeeDevice xbeeDevice;
095        
096        /**
097         * Class constructor. Instantiates a new {@code DataReader} object for the 
098         * given connection interface using the given XBee operating mode and XBee
099         * device.
100         * 
101         * @param connectionInterface Connection interface to read data from.
102         * @param mode XBee operating mode.
103         * @param xbeeDevice Reference to the XBee device containing this 
104         *                   {@code DataReader} object.
105         * 
106         * @throws NullPointerException if {@code connectionInterface == null} or
107         *                                 {@code mode == null}.
108         * 
109         * @see IConnectionInterface
110         * @see com.digi.xbee.api.XBeeDevice
111         * @see com.digi.xbee.api.models.OperatingMode
112         */
113        public DataReader(IConnectionInterface connectionInterface, OperatingMode mode, XBeeDevice xbeeDevice) {
114                if (connectionInterface == null)
115                        throw new NullPointerException("Connection interface cannot be null.");
116                if (mode == null)
117                        throw new NullPointerException("Operating mode cannot be null.");
118                
119                this.connectionInterface = connectionInterface;
120                this.mode = mode;
121                this.xbeeDevice = xbeeDevice;
122                this.logger = LoggerFactory.getLogger(DataReader.class);
123                parser = new XBeePacketParser();
124                xbeePacketsQueue = new XBeePacketsQueue();
125        }
126        
127        /**
128         * Sets the XBee operating mode of this data reader.
129         * 
130         * @param mode New XBee operating mode.
131         * 
132         * @throws NullPointerException if {@code mode == null}.
133         * 
134         * @see com.digi.xbee.api.models.OperatingMode
135         */
136        public void setXBeeReaderMode(OperatingMode mode) {
137                if (mode == null)
138                        throw new NullPointerException("Operating mode cannot be null.");
139                
140                this.mode = mode;
141        }
142        
143        /**
144         * Adds the given data receive listener to the list of listeners that will 
145         * be notified when XBee data packets are received.
146         * 
147         * <p>If the listener has been already added, this method does nothing.</p>
148         * 
149         * @param listener Listener to be notified when new XBee data packets are 
150         *                 received.
151         * 
152         * @see #removeDataReceiveListener(IDataReceiveListener)
153         * @see com.digi.xbee.api.listeners.IDataReceiveListener
154         */
155        public void addDataReceiveListener(IDataReceiveListener listener) {
156                synchronized (dataReceiveListeners) {
157                        if (!dataReceiveListeners.contains(listener))
158                                dataReceiveListeners.add(listener);
159                }
160        }
161        
162        /**
163         * Removes the given data receive listener from the list of data receive 
164         * listeners.
165         * 
166         * <p>If the listener is not included in the list, this method does nothing.
167         * </p>
168         * 
169         * @param listener Data receive listener to be remove from the list.
170         * 
171         * @see #addDataReceiveListener(IDataReceiveListener)
172         * @see com.digi.xbee.api.listeners.IDataReceiveListener
173         */
174        public void removeDataReceiveListener(IDataReceiveListener listener) {
175                synchronized (dataReceiveListeners) {
176                        if (dataReceiveListeners.contains(listener))
177                                dataReceiveListeners.remove(listener);
178                }
179        }
180        
181        /**
182         * Adds the given packet receive listener to the list of listeners that will
183         * be notified when any XBee packet is received.
184         * 
185         * <p>If the listener has been already added, this method does nothing.</p>
186         * 
187         * @param listener Listener to be notified when any XBee packet is received.
188         * 
189         * @see #addPacketReceiveListener(IPacketReceiveListener, int)
190         * @see #removePacketReceiveListener(IPacketReceiveListener)
191         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
192         */
193        public void addPacketReceiveListener(IPacketReceiveListener listener) {
194                addPacketReceiveListener(listener, ALL_FRAME_IDS);
195        }
196        
197        /**
198         * Adds the given packet receive listener to the list of listeners that will
199         * be notified when an XBee packet with the given frame ID is received.
200         * 
201         * <p>If the listener has been already added, this method does nothing.</p>
202         * 
203         * @param listener Listener to be notified when an XBee packet with the
204         *                 provided frame ID is received.
205         * @param frameID Frame ID for which this listener should be notified and 
206         *                removed after.
207         *                Using {@link #ALL_FRAME_IDS} this listener will be 
208         *                notified always and will be removed only by user request.
209         * 
210         * @see #addPacketReceiveListener(IPacketReceiveListener)
211         * @see #removePacketReceiveListener(IPacketReceiveListener)
212         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
213         */
214        public void addPacketReceiveListener(IPacketReceiveListener listener, int frameID) {
215                synchronized (packetReceiveListeners) {
216                        if (!packetReceiveListeners.containsKey(listener))
217                                packetReceiveListeners.put(listener, frameID);
218                }
219        }
220        
221        /**
222         * Removes the given packet receive listener from the list of XBee packet 
223         * receive listeners.
224         * 
225         * <p>If the listener is not included in the list, this method does nothing.
226         * </p>
227         * 
228         * @param listener Packet receive listener to remove from the list.
229         * 
230         * @see #addPacketReceiveListener(IPacketReceiveListener)
231         * @see #addPacketReceiveListener(IPacketReceiveListener, int)
232         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
233         */
234        public void removePacketReceiveListener(IPacketReceiveListener listener) {
235                synchronized (packetReceiveListeners) {
236                        if (packetReceiveListeners.containsKey(listener))
237                                packetReceiveListeners.remove(listener);
238                }
239        }
240        
241        /**
242         * Adds the given IO sample receive listener to the list of listeners that 
243         * will be notified when an IO sample packet is received.
244         * 
245         * <p>If the listener has been already added, this method does nothing.</p>
246         * 
247         * @param listener Listener to be notified when new IO sample packets are 
248         *                 received.
249         * 
250         * @see #removeIOSampleReceiveListener(IIOSampleReceiveListener)
251         * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener
252         */
253        public void addIOSampleReceiveListener(IIOSampleReceiveListener listener) {
254                synchronized (ioSampleReceiveListeners) {
255                        if (!ioSampleReceiveListeners.contains(listener))
256                                ioSampleReceiveListeners.add(listener);
257                }
258        }
259        
260        /**
261         * Removes the given IO sample receive listener from the list of IO sample 
262         * receive listeners.
263         * 
264         * <p>If the listener is not included in the list, this method does nothing.
265         * </p>
266         * 
267         * @param listener IO sample receive listener to remove from the list.
268         * 
269         * @see #addIOSampleReceiveListener(IIOSampleReceiveListener)
270         * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener
271         */
272        public void removeIOSampleReceiveListener(IIOSampleReceiveListener listener) {
273                synchronized (ioSampleReceiveListeners) {
274                        if (ioSampleReceiveListeners.contains(listener))
275                                ioSampleReceiveListeners.remove(listener);
276                }
277        }
278        
279        /**
280         * Adds the given Modem Status receive listener to the list of listeners 
281         * that will be notified when a modem status packet is received.
282         * 
283         * <p>If the listener has been already added, this method does nothing.</p>
284         * 
285         * @param listener Listener to be notified when new modem status packets are
286         *                 received.
287         * 
288         * @see #removeModemStatusReceiveListener(IModemStatusReceiveListener)
289         * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener
290         */
291        public void addModemStatusReceiveListener(IModemStatusReceiveListener listener) {
292                synchronized (modemStatusListeners) {
293                        if (!modemStatusListeners.contains(listener))
294                                modemStatusListeners.add(listener);
295                }
296        }
297        
298        /**
299         * Removes the given Modem Status receive listener from the list of Modem 
300         * Status receive listeners.
301         * 
302         * <p>If the listener is not included in the list, this method does nothing.
303         * </p>
304         * 
305         * @param listener Modem Status receive listener to remove from the list.
306         * 
307         * @see #addModemStatusReceiveListener(IModemStatusReceiveListener)
308         * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener
309         */
310        public void removeModemStatusReceiveListener(IModemStatusReceiveListener listener) {
311                synchronized (modemStatusListeners) {
312                        if (modemStatusListeners.contains(listener))
313                                modemStatusListeners.remove(listener);
314                }
315        }
316        
317        /**
318         * Adds the given explicit data receive listener to the list of listeners 
319         * that will be notified when an explicit data packet is received.
320         * 
321         * <p>If the listener has been already added, this method does nothing.</p>
322         * 
323         * @param listener Listener to be notified when new explicit data packets 
324         *                 are received.
325         * 
326         * @see #removeExplicitDataReceiveListener(IExplicitDataReceiveListener)
327         * @see com.digi.xbee.api.listeners.IExplicitDataReceiveListener
328         */
329        public void addExplicitDataReceiveListener(IExplicitDataReceiveListener listener) {
330                synchronized (explicitDataReceiveListeners) {
331                        if (!explicitDataReceiveListeners.contains(listener))
332                                explicitDataReceiveListeners.add(listener);
333                }
334        }
335        
336        /**
337         * Removes the given explicit data receive listener from the list of 
338         * explicit data receive listeners.
339         * 
340         * <p>If the listener is not included in the list, this method does nothing.
341         * </p>
342         * 
343         * @param listener Explicit data receive listener to remove from the list.
344         * 
345         * @see #addExplicitDataReceiveListener(IExplicitDataReceiveListener)
346         * @see com.digi.xbee.api.listeners.IExplicitDataReceiveListener
347         */
348        public void removeExplicitDataReceiveListener(IExplicitDataReceiveListener listener) {
349                synchronized (explicitDataReceiveListeners) {
350                        if (explicitDataReceiveListeners.contains(listener))
351                                explicitDataReceiveListeners.remove(listener);
352                }
353        }
354        
355        /*
356         * (non-Javadoc)
357         * @see java.lang.Thread#run()
358         */
359        @Override
360        public void run() {
361                logger.debug(connectionInterface.toString() + "Data reader started.");
362                running = true;
363                // Clear the list of read packets.
364                xbeePacketsQueue.clearQueue();
365                try {
366                        synchronized (connectionInterface) {
367                                connectionInterface.wait();
368                        }
369                        while (running) {
370                                if (!running)
371                                        break;
372                                if (connectionInterface.getInputStream() != null) {
373                                        switch (mode) {
374                                        case AT:
375                                                break;
376                                        case API:
377                                        case API_ESCAPE:
378                                                int headerByte = connectionInterface.getInputStream().read();
379                                                // If it is packet header parse the packet, if not discard this byte and continue.
380                                                if (headerByte == SpecialByte.HEADER_BYTE.getValue()) {
381                                                        try {
382                                                                XBeePacket packet = parser.parsePacket(connectionInterface.getInputStream(), mode);
383                                                                packetReceived(packet);
384                                                        } catch (InvalidPacketException e) {
385                                                                logger.error("Error parsing the API packet.", e);
386                                                        }
387                                                }
388                                                break;
389                                        default:
390                                                break;
391                                        }
392                                } else if (connectionInterface.getInputStream() == null)
393                                        break;
394                                if (connectionInterface.getInputStream() == null)
395                                        break;
396                                else if (connectionInterface.getInputStream().available() > 0)
397                                        continue;
398                                synchronized (connectionInterface) {
399                                        connectionInterface.wait();
400                                }
401                        }
402                } catch (IOException e) {
403                        logger.error("Error reading from input stream.", e);
404                } catch (InterruptedException e) {
405                        logger.error(e.getMessage(), e);
406                } catch (IllegalStateException e) {
407                        logger.error(e.getMessage(), e);
408                } finally {
409                        if (running) {
410                                running = false;
411                                if (connectionInterface.isOpen())
412                                        connectionInterface.close();
413                        }
414                }
415        }
416        
417        /**
418         * Dispatches the received XBee packet to the corresponding listener(s).
419         * 
420         * @param packet The received XBee packet to be dispatched to the 
421         *               corresponding listeners.
422         * 
423         * @see com.digi.xbee.api.packet.XBeeAPIPacket
424         * @see com.digi.xbee.api.packet.XBeePacket
425         */
426        private void packetReceived(XBeePacket packet) {
427                // Add the packet to the packets queue.
428                xbeePacketsQueue.addPacket(packet);
429                // Notify that a packet has been received to the corresponding listeners.
430                notifyPacketReceived(packet);
431                
432                // Check if the packet is an API packet.
433                if (!(packet instanceof XBeeAPIPacket))
434                        return;
435                
436                // Get the API packet type.
437                XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet;
438                APIFrameType apiType = apiPacket.getFrameType();
439                if (apiType == null)
440                        return;
441                
442                try {
443                        // Obtain the remote device from the packet.
444                        RemoteXBeeDevice remoteDevice = getRemoteXBeeDeviceFromPacket(apiPacket);
445                        byte[] data = null;
446                        
447                        switch(apiType) {
448                        case RECEIVE_PACKET:
449                                ReceivePacket receivePacket = (ReceivePacket)apiPacket;
450                                data = receivePacket.getRFData();
451                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
452                                break;
453                        case RX_64:
454                                RX64Packet rx64Packet = (RX64Packet)apiPacket;
455                                data = rx64Packet.getRFData();
456                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
457                                break;
458                        case RX_16:
459                                RX16Packet rx16Packet = (RX16Packet)apiPacket;
460                                data = rx16Packet.getRFData();
461                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
462                                break;
463                        case IO_DATA_SAMPLE_RX_INDICATOR:
464                                IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket;
465                                notifyIOSampleReceived(remoteDevice, ioSamplePacket.getIOSample());
466                                break;
467                        case RX_IO_64:
468                                RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket;
469                                notifyIOSampleReceived(remoteDevice, rx64IOPacket.getIOSample());
470                                break;
471                        case RX_IO_16:
472                                RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket;
473                                notifyIOSampleReceived(remoteDevice, rx16IOPacket.getIOSample());
474                                break;
475                        case MODEM_STATUS:
476                                ModemStatusPacket modemStatusPacket = (ModemStatusPacket)apiPacket;
477                                notifyModemStatusReceived(modemStatusPacket.getStatus());
478                                break;
479                        case EXPLICIT_RX_INDICATOR:
480                                ExplicitRxIndicatorPacket explicitDataPacket = (ExplicitRxIndicatorPacket)apiPacket;
481                                int sourceEndpoint = explicitDataPacket.getSourceEndpoint();
482                                int destEndpoint = explicitDataPacket.getDestinationEndpoint();
483                                int clusterID = explicitDataPacket.getClusterID();
484                                int profileID = explicitDataPacket.getProfileID();
485                                data = explicitDataPacket.getRFData();
486                                // If this is an explicit packet for data transmissions in the Digi profile, 
487                                // notify also the data listener and add a Receive packet to the queue.
488                                if (sourceEndpoint == ExplicitRxIndicatorPacket.DATA_ENDPOINT && 
489                                                destEndpoint == ExplicitRxIndicatorPacket.DATA_ENDPOINT &&
490                                                clusterID == ExplicitRxIndicatorPacket.DATA_CLUSTER && 
491                                                profileID == ExplicitRxIndicatorPacket.DIGI_PROFILE) {
492                                        notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
493                                        xbeePacketsQueue.addPacket(new ReceivePacket(explicitDataPacket.get64BitSourceAddress(), 
494                                                        explicitDataPacket.get16BitSourceAddress(), 
495                                                        explicitDataPacket.getReceiveOptions(), 
496                                                        explicitDataPacket.getRFData()));
497                                }
498                                notifyExplicitDataReceived(new ExplicitXBeeMessage(remoteDevice, sourceEndpoint, destEndpoint, clusterID, profileID, data, explicitDataPacket.isBroadcast()));
499                                break;
500                        default:
501                                break;
502                        }
503                        
504                } catch (XBeeException e) {
505                        logger.error(e.getMessage(), e);
506                }
507        }
508        
509        /**
510         * Returns the remote XBee device from where the given package was sent 
511         * from.
512         * 
513         * <p><b>This is for internal use only.</b></p>
514         * 
515         * <p>If the package does not contain information about the source, this 
516         * method returns {@code null} (for example, {@code ModemStatusPacket}).</p>
517         * 
518         * <p>First the device that sent the provided package is looked in the 
519         * network of the local XBee device. If the remote device is not in the 
520         * network, it is automatically added only if the packet contains 
521         * information about the origin of the package.</p>
522         * 
523         * @param packet The packet sent from the remote device.
524         * 
525         * @return The remote XBee device that sends the given packet. It may be 
526         *         {@code null} if the packet is not a known frame (see 
527         *         {@link APIFrameType}) or if it does not contain information of 
528         *         the source device.
529         * 
530         * @throws NullPointerException if {@code packet == null}
531         * @throws XBeeException if any error occur while adding the device to the 
532         *                       network.
533         */
534        public RemoteXBeeDevice getRemoteXBeeDeviceFromPacket(XBeeAPIPacket packet) throws XBeeException {
535                if (packet == null)
536                        throw new NullPointerException("XBee API packet cannot be null.");
537                        
538                XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet;
539                APIFrameType apiType = apiPacket.getFrameType();
540                if (apiType == null || apiType == APIFrameType.UNKNOWN)
541                        return null;
542                
543                RemoteXBeeDevice remoteDevice = null;
544                XBee64BitAddress addr64 = null;
545                XBee16BitAddress addr16 = null;
546                
547                XBeeNetwork network = xbeeDevice.getNetwork();
548                
549                switch(apiType) {
550                case RECEIVE_PACKET:
551                        ReceivePacket receivePacket = (ReceivePacket)apiPacket;
552                        addr64 = receivePacket.get64bitSourceAddress();
553                        addr16 = receivePacket.get16bitSourceAddress();
554                        remoteDevice = network.getDevice(addr64);
555                        break;
556                case RX_64:
557                        RX64Packet rx64Packet = (RX64Packet)apiPacket;
558                        addr64 = rx64Packet.get64bitSourceAddress();
559                        remoteDevice = network.getDevice(addr64);
560                        break;
561                case RX_16:
562                        RX16Packet rx16Packet = (RX16Packet)apiPacket;
563                        addr64 = XBee64BitAddress.UNKNOWN_ADDRESS;
564                        addr16 = rx16Packet.get16bitSourceAddress();
565                        remoteDevice = network.getDevice(addr16);
566                        break;
567                case IO_DATA_SAMPLE_RX_INDICATOR:
568                        IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket;
569                        addr64 = ioSamplePacket.get64bitSourceAddress();
570                        addr16 = ioSamplePacket.get16bitSourceAddress();
571                        remoteDevice = network.getDevice(addr64);
572                        break;
573                case RX_IO_64:
574                        RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket;
575                        addr64 = rx64IOPacket.get64bitSourceAddress();
576                        remoteDevice = network.getDevice(addr64);
577                        break;
578                case RX_IO_16:
579                        RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket;
580                        addr64 = XBee64BitAddress.UNKNOWN_ADDRESS;
581                        addr16 = rx16IOPacket.get16bitSourceAddress();
582                        remoteDevice = network.getDevice(addr16);
583                        break;
584                case EXPLICIT_RX_INDICATOR:
585                        ExplicitRxIndicatorPacket explicitDataPacket = (ExplicitRxIndicatorPacket)apiPacket;
586                        addr64 = explicitDataPacket.get64BitSourceAddress();
587                        addr16 = explicitDataPacket.get16BitSourceAddress();
588                        remoteDevice = network.getDevice(addr64);
589                        break;
590                default:
591                        // Rest of the types are considered not to contain information 
592                        // about the origin of the packet.
593                        return remoteDevice;
594                }
595                
596                // If the origin is not in the network, add it.
597                if (remoteDevice == null) {
598                        remoteDevice = createRemoteXBeeDevice(addr64, addr16, null);
599                        network.addRemoteDevice(remoteDevice);
600                }
601                
602                return remoteDevice;
603        }
604        
605        /**
606         * Creates a new remote XBee device with the provided 64-bit address, 
607         * 16-bit address, node identifier and the XBee device that is using this 
608         * data reader as the connection interface for the remote device.
609         * 
610         * The new XBee device will be a {@code RemoteDigiMeshDevice}, 
611         * a {@code RemoteDigiPointDevice}, a {@code RemoteRaw802Device} or a 
612         * {@code RemoteZigBeeDevice} depending on the protocol of the local XBee 
613         * device. If the protocol cannot be determined or is unknown a 
614         * {@code RemoteXBeeDevice} will be created instead.
615         * 
616         * @param addr64 The 64-bit address of the new remote device. It cannot be 
617         *               {@code null}.
618         * @param addr16 The 16-bit address of the new remote device. It may be 
619         *               {@code null}.
620         * @param ni The node identifier of the new remote device. It may be 
621         *           {@code null}.
622         * 
623         * @return a new remote XBee device with the given parameters.
624         */
625        private RemoteXBeeDevice createRemoteXBeeDevice(XBee64BitAddress addr64, 
626                        XBee16BitAddress addr16, String ni) {
627                RemoteXBeeDevice device = null;
628                
629                switch (xbeeDevice.getXBeeProtocol()) {
630                case ZIGBEE:
631                        device = new RemoteZigBeeDevice(xbeeDevice, addr64, addr16, ni);
632                        break;
633                case DIGI_MESH:
634                        device = new RemoteDigiMeshDevice(xbeeDevice, addr64, ni);
635                        break;
636                case DIGI_POINT:
637                        device = new RemoteDigiPointDevice(xbeeDevice, addr64, ni);
638                        break;
639                case RAW_802_15_4:
640                        device = new RemoteRaw802Device(xbeeDevice, addr64, addr16, ni);
641                        break;
642                default:
643                        device = new RemoteXBeeDevice(xbeeDevice, addr64, addr16, ni);
644                        break;
645                }
646                
647                return device;
648        }
649        
650        /**
651         * Notifies subscribed data receive listeners that a new XBee data packet 
652         * has been received in form of an {@code XBeeMessage}.
653         *
654         * @param xbeeMessage The XBee message to be sent to subscribed XBee data
655         *                    listeners.
656         * 
657         * @see com.digi.xbee.api.models.XBeeMessage
658         */
659        private void notifyDataReceived(final XBeeMessage xbeeMessage) {
660                if (xbeeMessage.isBroadcast())
661                        logger.info(connectionInterface.toString() + 
662                                        "Broadcast data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData()));
663                else
664                        logger.info(connectionInterface.toString() + 
665                                        "Data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData()));
666                
667                try {
668                        synchronized (dataReceiveListeners) {
669                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
670                                                dataReceiveListeners.size()));
671                                for (final IDataReceiveListener listener:dataReceiveListeners) {
672                                        executor.execute(new Runnable() {
673                                                /*
674                                                 * (non-Javadoc)
675                                                 * @see java.lang.Runnable#run()
676                                                 */
677                                                @Override
678                                                public void run() {
679                                                        /* Synchronize the listener so it is not called 
680                                                         twice. That is, let the listener to finish its job. */
681                                                        synchronized (listener) {
682                                                                listener.dataReceived(xbeeMessage);
683                                                        }
684                                                }
685                                        });
686                                }
687                                executor.shutdown();
688                        }
689                } catch (Exception e) {
690                        logger.error(e.getMessage(), e);
691                }
692        }
693        
694        /**
695         * Notifies subscribed XBee packet listeners that a new XBee packet has 
696         * been received.
697         *
698         * @param packet The received XBee packet.
699         * 
700         * @see com.digi.xbee.api.packet.XBeeAPIPacket
701         * @see com.digi.xbee.api.packet.XBeePacket
702         */
703        private void notifyPacketReceived(final XBeePacket packet) {
704                logger.debug(connectionInterface.toString() + "Packet received: \n{}", packet.toPrettyString());
705                
706                try {
707                        synchronized (packetReceiveListeners) {
708                                final ArrayList<IPacketReceiveListener> removeListeners = new ArrayList<IPacketReceiveListener>();
709                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
710                                                packetReceiveListeners.size()));
711                                for (final IPacketReceiveListener listener:packetReceiveListeners.keySet()) {
712                                        executor.execute(new Runnable() {
713                                                /*
714                                                 * (non-Javadoc)
715                                                 * @see java.lang.Runnable#run()
716                                                 */
717                                                @Override
718                                                public void run() {
719                                                        // Synchronize the listener so it is not called 
720                                                        // twice. That is, let the listener to finish its job.
721                                                        synchronized (packetReceiveListeners) {
722                                                                synchronized (listener) {
723                                                                        if (packetReceiveListeners.get(listener) == ALL_FRAME_IDS)
724                                                                                listener.packetReceived(packet);
725                                                                        else if (((XBeeAPIPacket)packet).needsAPIFrameID() && 
726                                                                                        ((XBeeAPIPacket)packet).getFrameID() == packetReceiveListeners.get(listener)) {
727                                                                                listener.packetReceived(packet);
728                                                                                removeListeners.add(listener);
729                                                                        }
730                                                                }
731                                                        }
732                                                }
733                                        });
734                                }
735                                executor.shutdown();
736                                // Remove required listeners.
737                                for (IPacketReceiveListener listener:removeListeners)
738                                        packetReceiveListeners.remove(listener);
739                        }
740                } catch (Exception e) {
741                        logger.error(e.getMessage(), e);
742                }
743        }
744        
745        /**
746         * Notifies subscribed IO sample listeners that a new IO sample packet has
747         * been received.
748         *
749         * @param ioSample The received IO sample.
750         * @param remoteDevice The remote XBee device that sent the sample.
751         * 
752         * @see com.digi.xbee.api.RemoteXBeeDevice
753         * @see com.digi.xbee.api.io.IOSample
754         */
755        private void notifyIOSampleReceived(final RemoteXBeeDevice remoteDevice, final IOSample ioSample) {
756                logger.debug(connectionInterface.toString() + "IO sample received.");
757                
758                try {
759                        synchronized (ioSampleReceiveListeners) {
760                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
761                                                ioSampleReceiveListeners.size()));
762                                for (final IIOSampleReceiveListener listener:ioSampleReceiveListeners) {
763                                        executor.execute(new Runnable() {
764                                                /*
765                                                 * (non-Javadoc)
766                                                 * @see java.lang.Runnable#run()
767                                                 */
768                                                @Override
769                                                public void run() {
770                                                        // Synchronize the listener so it is not called 
771                                                        // twice. That is, let the listener to finish its job.
772                                                        synchronized (listener) {
773                                                                listener.ioSampleReceived(remoteDevice, ioSample);
774                                                        }
775                                                }
776                                        });
777                                }
778                                executor.shutdown();
779                        }
780                } catch (Exception e) {
781                        logger.error(e.getMessage(), e);
782                }
783        }
784        
785        /**
786         * Notifies subscribed Modem Status listeners that a Modem Status event 
787         * packet has been received.
788         *
789         * @param modemStatusEvent The Modem Status event.
790         * 
791         * @see com.digi.xbee.api.models.ModemStatusEvent
792         */
793        private void notifyModemStatusReceived(final ModemStatusEvent modemStatusEvent) {
794                logger.debug(connectionInterface.toString() + "Modem Status event received.");
795                
796                try {
797                        synchronized (modemStatusListeners) {
798                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
799                                                modemStatusListeners.size()));
800                                for (final IModemStatusReceiveListener listener:modemStatusListeners) {
801                                        executor.execute(new Runnable() {
802                                                /*
803                                                 * (non-Javadoc)
804                                                 * @see java.lang.Runnable#run()
805                                                 */
806                                                @Override
807                                                public void run() {
808                                                        // Synchronize the listener so it is not called 
809                                                        // twice. That is, let the listener to finish its job.
810                                                        synchronized (listener) {
811                                                                listener.modemStatusEventReceived(modemStatusEvent);
812                                                        }
813                                                }
814                                        });
815                                }
816                                executor.shutdown();
817                        }
818                } catch (Exception e) {
819                        logger.error(e.getMessage(), e);
820                }
821        }
822        
823        /**
824         * Notifies subscribed explicit data receive listeners that a new XBee 
825         * explicit data packet has been received in form of an 
826         * {@code ExplicitXBeeMessage}.
827         *
828         * @param explicitXBeeMessage The XBee message to be sent to subscribed 
829         *                            XBee data listeners.
830         * 
831         * @see com.digi.xbee.api.models.ExplicitXBeeMessage
832         */
833        private void notifyExplicitDataReceived(final ExplicitXBeeMessage explicitXBeeMessage) {
834                if (explicitXBeeMessage.isBroadcast())
835                        logger.info(connectionInterface.toString() + 
836                                        "Broadcast explicit data received from {} >> {}.", explicitXBeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(explicitXBeeMessage.getData()));
837                else
838                        logger.info(connectionInterface.toString() + 
839                                        "Explicit data received from {} >> {}.", explicitXBeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(explicitXBeeMessage.getData()));
840                
841                try {
842                        synchronized (explicitDataReceiveListeners) {
843                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
844                                                explicitDataReceiveListeners.size()));
845                                for (final IExplicitDataReceiveListener listener:explicitDataReceiveListeners) {
846                                        executor.execute(new Runnable() {
847                                                /*
848                                                 * (non-Javadoc)
849                                                 * @see java.lang.Runnable#run()
850                                                 */
851                                                @Override
852                                                public void run() {
853                                                        /* Synchronize the listener so it is not called 
854                                                         twice. That is, let the listener to finish its job. */
855                                                        synchronized (listener) {
856                                                                listener.explicitDataReceived(explicitXBeeMessage);
857                                                        }
858                                                }
859                                        });
860                                }
861                                executor.shutdown();
862                        }
863                } catch (Exception e) {
864                        logger.error(e.getMessage(), e);
865                }
866        }
867        
868        /**
869         * Returns whether this Data reader is running or not.
870         * 
871         * @return {@code true} if the Data reader is running, {@code false} 
872         *         otherwise.
873         * 
874         * @see #stopReader()
875         */
876        public boolean isRunning() {
877                return running;
878        }
879        
880        /**
881         * Stops the Data reader thread.
882         * 
883         * @see #isRunning()
884         */
885        public void stopReader() {
886                running = false;
887                synchronized (connectionInterface) {
888                        connectionInterface.notify();
889                }
890                logger.debug(connectionInterface.toString() + "Data reader stopped.");
891        }
892        
893        /**
894         * Returns the queue of read XBee packets.
895         * 
896         * @return The queue of read XBee packets.
897         * 
898         * @see com.digi.xbee.api.models.XBeePacketsQueue
899         */
900        public XBeePacketsQueue getXBeePacketsQueue() {
901                return xbeePacketsQueue;
902        }
903}