001/*
002 * Copyright 2017-2019, Digi International Inc.
003 *
004 * This Source Code Form is subject to the terms of the Mozilla Public
005 * License, v. 2.0. If a copy of the MPL was not distributed with this
006 * file, you can obtain one at http://mozilla.org/MPL/2.0/.
007 *
008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 
009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 
011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 
014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
015 */
016package com.digi.xbee.api;
017
018import java.io.IOException;
019import java.net.Inet6Address;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.concurrent.Executors;
023import java.util.concurrent.ScheduledExecutorService;
024
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import com.digi.xbee.api.connection.IConnectionInterface;
029import com.digi.xbee.api.exceptions.XBeeException;
030import com.digi.xbee.api.io.IOSample;
031import com.digi.xbee.api.listeners.IExplicitDataReceiveListener;
032import com.digi.xbee.api.listeners.IIOSampleReceiveListener;
033import com.digi.xbee.api.listeners.IModemStatusReceiveListener;
034import com.digi.xbee.api.listeners.IIPDataReceiveListener;
035import com.digi.xbee.api.listeners.IPacketReceiveListener;
036import com.digi.xbee.api.listeners.IDataReceiveListener;
037import com.digi.xbee.api.listeners.ISMSReceiveListener;
038import com.digi.xbee.api.listeners.IUserDataRelayReceiveListener;
039import com.digi.xbee.api.listeners.relay.IBluetoothDataReceiveListener;
040import com.digi.xbee.api.listeners.relay.IMicroPythonDataReceiveListener;
041import com.digi.xbee.api.listeners.relay.ISerialDataReceiveListener;
042import com.digi.xbee.api.models.ExplicitXBeeMessage;
043import com.digi.xbee.api.models.ModemStatusEvent;
044import com.digi.xbee.api.models.IPMessage;
045import com.digi.xbee.api.models.SMSMessage;
046import com.digi.xbee.api.models.SpecialByte;
047import com.digi.xbee.api.models.OperatingMode;
048import com.digi.xbee.api.models.UserDataRelayMessage;
049import com.digi.xbee.api.models.XBee16BitAddress;
050import com.digi.xbee.api.models.XBee64BitAddress;
051import com.digi.xbee.api.models.XBeeMessage;
052import com.digi.xbee.api.models.XBeePacketsQueue;
053import com.digi.xbee.api.models.XBeeProtocol;
054import com.digi.xbee.api.packet.XBeeAPIPacket;
055import com.digi.xbee.api.packet.APIFrameType;
056import com.digi.xbee.api.packet.XBeePacket;
057import com.digi.xbee.api.packet.XBeePacketParser;
058import com.digi.xbee.api.packet.cellular.RXSMSPacket;
059import com.digi.xbee.api.packet.common.ExplicitRxIndicatorPacket;
060import com.digi.xbee.api.packet.common.IODataSampleRxIndicatorPacket;
061import com.digi.xbee.api.packet.common.ModemStatusPacket;
062import com.digi.xbee.api.packet.common.ReceivePacket;
063import com.digi.xbee.api.packet.ip.RXIPv4Packet;
064import com.digi.xbee.api.packet.raw.RX16IOPacket;
065import com.digi.xbee.api.packet.raw.RX16Packet;
066import com.digi.xbee.api.packet.raw.RX64IOPacket;
067import com.digi.xbee.api.packet.raw.RX64Packet;
068import com.digi.xbee.api.packet.relay.UserDataRelayOutputPacket;
069import com.digi.xbee.api.packet.thread.IPv6IODataSampleRxIndicator;
070import com.digi.xbee.api.packet.thread.RXIPv6Packet;
071import com.digi.xbee.api.utils.HexUtils;
072
073/**
074 * Thread that constantly reads data from an input stream.
075 * 
076 * <p>Depending on the XBee operating mode, read data is notified as is to the 
077 * subscribed listeners or is parsed to a packet using the packet parser and 
078 * then notified to subscribed listeners.</p> 
079 */
080public class DataReader extends Thread {
081        
082        // Constants.
083        private final static int ALL_FRAME_IDS = 99999;
084        private final static int MAXIMUM_PARALLEL_LISTENER_THREADS = 20;
085        
086        // Variables.
087        private boolean running = false;
088        
089        private IConnectionInterface connectionInterface;
090        
091        private volatile OperatingMode mode;
092        
093        private ArrayList<IDataReceiveListener> dataReceiveListeners = new ArrayList<>();
094        // The packetReceiveListeners requires to be a HashMap with an associated integer. The integer is used to determine 
095        // the frame ID of the packet that should be received. When it is 99999 (ALL_FRAME_IDS), all the packets will be handled.
096        private HashMap<IPacketReceiveListener, Integer> packetReceiveListeners = new HashMap<>();
097        private ArrayList<IIOSampleReceiveListener> ioSampleReceiveListeners = new ArrayList<>();
098        private ArrayList<IModemStatusReceiveListener> modemStatusListeners = new ArrayList<>();
099        private ArrayList<IExplicitDataReceiveListener> explicitDataReceiveListeners = new ArrayList<>();
100        private ArrayList<IIPDataReceiveListener> ipDataReceiveListeners = new ArrayList<>();
101        private ArrayList<ISMSReceiveListener> smsReceiveListeners = new ArrayList<>();
102        private ArrayList<IUserDataRelayReceiveListener> dataRelayReceiveListeners = new ArrayList<>();
103        private ArrayList<IBluetoothDataReceiveListener> bluetoothDataReceiveListeners = new ArrayList<>();
104        private ArrayList<IMicroPythonDataReceiveListener> microPythonDataReceiveListeners = new ArrayList<>();
105        private ArrayList<ISerialDataReceiveListener> serialDataReceiveListeners = new ArrayList<>();
106
107        private Logger logger;
108        
109        private XBeePacketParser parser;
110        
111        private XBeePacketsQueue xbeePacketsQueue;
112        
113        private AbstractXBeeDevice xbeeDevice;
114        
115        /**
116         * Class constructor. Instantiates a new {@code DataReader} object for the 
117         * given connection interface using the given XBee operating mode and XBee
118         * device.
119         * 
120         * @param connectionInterface Connection interface to read data from.
121         * @param mode XBee operating mode.
122         * @param xbeeDevice Reference to the XBee device containing this 
123         *                   {@code DataReader} object.
124         * 
125         * @throws NullPointerException if {@code connectionInterface == null} or
126         *                                 {@code mode == null}.
127         * @throws IllegalArgumentException If {@code xbeeDevice.isRemote() == true}.
128         * 
129         * @see IConnectionInterface
130         * @see com.digi.xbee.api.XBeeDevice
131         * @see com.digi.xbee.api.models.OperatingMode
132         */
133        public DataReader(IConnectionInterface connectionInterface, OperatingMode mode, AbstractXBeeDevice xbeeDevice) {
134                if (connectionInterface == null)
135                        throw new NullPointerException("Connection interface cannot be null.");
136                if (mode == null)
137                        throw new NullPointerException("Operating mode cannot be null.");
138                if (xbeeDevice.isRemote())
139                        throw new IllegalArgumentException("The given local XBee device is remote.");
140                
141                this.connectionInterface = connectionInterface;
142                this.mode = mode;
143                this.xbeeDevice = xbeeDevice;
144                this.logger = LoggerFactory.getLogger(DataReader.class);
145                parser = new XBeePacketParser();
146                xbeePacketsQueue = new XBeePacketsQueue();
147        }
148        
149        /**
150         * Sets the XBee operating mode of this data reader.
151         * 
152         * @param mode New XBee operating mode.
153         * 
154         * @throws NullPointerException if {@code mode == null}.
155         * 
156         * @see com.digi.xbee.api.models.OperatingMode
157         */
158        public void setXBeeReaderMode(OperatingMode mode) {
159                if (mode == null)
160                        throw new NullPointerException("Operating mode cannot be null.");
161                
162                this.mode = mode;
163        }
164        
165        /**
166         * Adds the given data receive listener to the list of listeners that will 
167         * be notified when XBee data packets are received.
168         * 
169         * <p>If the listener has been already added, this method does nothing.</p>
170         * 
171         * @param listener Listener to be notified when new XBee data packets are 
172         *                 received.
173         * 
174         * @throws NullPointerException if {@code listener == null}.
175         * 
176         * @see #removeDataReceiveListener(IDataReceiveListener)
177         * @see com.digi.xbee.api.listeners.IDataReceiveListener
178         */
179        public void addDataReceiveListener(IDataReceiveListener listener) {
180                if (listener == null)
181                        throw new NullPointerException("Listener cannot be null.");
182                
183                synchronized (dataReceiveListeners) {
184                        if (!dataReceiveListeners.contains(listener))
185                                dataReceiveListeners.add(listener);
186                }
187        }
188        
189        /**
190         * Removes the given data receive listener from the list of data receive 
191         * listeners.
192         * 
193         * <p>If the listener is not included in the list, this method does nothing.
194         * </p>
195         * 
196         * @param listener Data receive listener to be remove from the list.
197         * 
198         * @see #addDataReceiveListener(IDataReceiveListener)
199         * @see com.digi.xbee.api.listeners.IDataReceiveListener
200         */
201        public void removeDataReceiveListener(IDataReceiveListener listener) {
202                synchronized (dataReceiveListeners) {
203                        if (dataReceiveListeners.contains(listener))
204                                dataReceiveListeners.remove(listener);
205                }
206        }
207        
208        /**
209         * Adds the given packet receive listener to the list of listeners that will
210         * be notified when any XBee packet is received.
211         * 
212         * <p>If the listener has been already added, this method does nothing.</p>
213         * 
214         * @param listener Listener to be notified when any XBee packet is received.
215         * 
216         * @throws NullPointerException if {@code listener == null}.
217         * 
218         * @see #addPacketReceiveListener(IPacketReceiveListener, int)
219         * @see #removePacketReceiveListener(IPacketReceiveListener)
220         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
221         */
222        public void addPacketReceiveListener(IPacketReceiveListener listener) {
223                addPacketReceiveListener(listener, ALL_FRAME_IDS);
224        }
225        
226        /**
227         * Adds the given packet receive listener to the list of listeners that will
228         * be notified when an XBee packet with the given frame ID is received.
229         * 
230         * <p>If the listener has been already added, this method does nothing.</p>
231         * 
232         * @param listener Listener to be notified when an XBee packet with the
233         *                 provided frame ID is received.
234         * @param frameID Frame ID for which this listener should be notified and 
235         *                removed after.
236         *                Using {@link #ALL_FRAME_IDS} this listener will be 
237         *                notified always and will be removed only by user request.
238         * 
239         * @throws NullPointerException if {@code listener == null}.
240         * 
241         * @see #addPacketReceiveListener(IPacketReceiveListener)
242         * @see #removePacketReceiveListener(IPacketReceiveListener)
243         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
244         */
245        public void addPacketReceiveListener(IPacketReceiveListener listener, int frameID) {
246                if (listener == null)
247                        throw new NullPointerException("Listener cannot be null.");
248                
249                synchronized (packetReceiveListeners) {
250                        if (!packetReceiveListeners.containsKey(listener))
251                                packetReceiveListeners.put(listener, frameID);
252                }
253        }
254        
255        /**
256         * Removes the given packet receive listener from the list of XBee packet 
257         * receive listeners.
258         * 
259         * <p>If the listener is not included in the list, this method does nothing.
260         * </p>
261         * 
262         * @param listener Packet receive listener to remove from the list.
263         * 
264         * @see #addPacketReceiveListener(IPacketReceiveListener)
265         * @see #addPacketReceiveListener(IPacketReceiveListener, int)
266         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
267         */
268        public void removePacketReceiveListener(IPacketReceiveListener listener) {
269                synchronized (packetReceiveListeners) {
270                        if (packetReceiveListeners.containsKey(listener))
271                                packetReceiveListeners.remove(listener);
272                }
273        }
274        
275        /**
276         * Adds the given IO sample receive listener to the list of listeners that 
277         * will be notified when an IO sample packet is received.
278         * 
279         * <p>If the listener has been already added, this method does nothing.</p>
280         * 
281         * @param listener Listener to be notified when new IO sample packets are 
282         *                 received.
283         * 
284         * @throws NullPointerException if {@code listener == null}.
285         * 
286         * @see #removeIOSampleReceiveListener(IIOSampleReceiveListener)
287         * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener
288         */
289        public void addIOSampleReceiveListener(IIOSampleReceiveListener listener) {
290                if (listener == null)
291                        throw new NullPointerException("Listener cannot be null.");
292                
293                synchronized (ioSampleReceiveListeners) {
294                        if (!ioSampleReceiveListeners.contains(listener))
295                                ioSampleReceiveListeners.add(listener);
296                }
297        }
298        
299        /**
300         * Removes the given IO sample receive listener from the list of IO sample 
301         * receive listeners.
302         * 
303         * <p>If the listener is not included in the list, this method does nothing.
304         * </p>
305         * 
306         * @param listener IO sample receive listener to remove from the list.
307         * 
308         * @see #addIOSampleReceiveListener(IIOSampleReceiveListener)
309         * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener
310         */
311        public void removeIOSampleReceiveListener(IIOSampleReceiveListener listener) {
312                synchronized (ioSampleReceiveListeners) {
313                        if (ioSampleReceiveListeners.contains(listener))
314                                ioSampleReceiveListeners.remove(listener);
315                }
316        }
317        
318        /**
319         * Adds the given Modem Status receive listener to the list of listeners 
320         * that will be notified when a modem status packet is received.
321         * 
322         * <p>If the listener has been already added, this method does nothing.</p>
323         * 
324         * @param listener Listener to be notified when new modem status packets are
325         *                 received.
326         * 
327         * @throws NullPointerException if {@code listener == null}.
328         * 
329         * @see #removeModemStatusReceiveListener(IModemStatusReceiveListener)
330         * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener
331         */
332        public void addModemStatusReceiveListener(IModemStatusReceiveListener listener) {
333                if (listener == null)
334                        throw new NullPointerException("Listener cannot be null.");
335                
336                synchronized (modemStatusListeners) {
337                        if (!modemStatusListeners.contains(listener))
338                                modemStatusListeners.add(listener);
339                }
340        }
341        
342        /**
343         * Removes the given Modem Status receive listener from the list of Modem 
344         * Status receive listeners.
345         * 
346         * <p>If the listener is not included in the list, this method does nothing.
347         * </p>
348         * 
349         * @param listener Modem Status receive listener to remove from the list.
350         * 
351         * @see #addModemStatusReceiveListener(IModemStatusReceiveListener)
352         * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener
353         */
354        public void removeModemStatusReceiveListener(IModemStatusReceiveListener listener) {
355                synchronized (modemStatusListeners) {
356                        if (modemStatusListeners.contains(listener))
357                                modemStatusListeners.remove(listener);
358                }
359        }
360        
361        /**
362         * Adds the given explicit data receive listener to the list of listeners 
363         * that will be notified when an explicit data packet is received.
364         * 
365         * <p>If the listener has been already added, this method does nothing.</p>
366         * 
367         * @param listener Listener to be notified when new explicit data packets 
368         *                 are received.
369         * 
370         * @throws NullPointerException if {@code listener == null}.
371         * 
372         * @see #removeExplicitDataReceiveListener(IExplicitDataReceiveListener)
373         * @see com.digi.xbee.api.listeners.IExplicitDataReceiveListener
374         */
375        public void addExplicitDataReceiveListener(IExplicitDataReceiveListener listener) {
376                if (listener == null)
377                        throw new NullPointerException("Listener cannot be null.");
378                
379                synchronized (explicitDataReceiveListeners) {
380                        if (!explicitDataReceiveListeners.contains(listener))
381                                explicitDataReceiveListeners.add(listener);
382                }
383        }
384        
385        /**
386         * Removes the given explicit data receive listener from the list of 
387         * explicit data receive listeners.
388         * 
389         * <p>If the listener is not included in the list, this method does nothing.
390         * </p>
391         * 
392         * @param listener Explicit data receive listener to remove from the list.
393         * 
394         * @see #addExplicitDataReceiveListener(IExplicitDataReceiveListener)
395         * @see com.digi.xbee.api.listeners.IExplicitDataReceiveListener
396         */
397        public void removeExplicitDataReceiveListener(IExplicitDataReceiveListener listener) {
398                synchronized (explicitDataReceiveListeners) {
399                        if (explicitDataReceiveListeners.contains(listener))
400                                explicitDataReceiveListeners.remove(listener);
401                }
402        }
403        
404        /**
405         * Adds the given IP data receive listener to the list of listeners 
406         * that will be notified when a IP data packet is received.
407         * 
408         * <p>If the listener has been already added, this method does nothing.</p>
409         * 
410         * @param listener Listener to be notified when new IP data packets 
411         *                 are received.
412         * 
413         * @throws NullPointerException if {@code listener == null}.
414         * 
415         * @see #removeIPDataReceiveListener(IIPDataReceiveListener)
416         * @see com.digi.xbee.api.listeners.IIPDataReceiveListener
417         * 
418         * @since 1.2.0
419         */
420        public void addIPDataReceiveListener(IIPDataReceiveListener listener) {
421                if (listener == null)
422                        throw new NullPointerException("Listener cannot be null.");
423                
424                synchronized (ipDataReceiveListeners) {
425                        if (!ipDataReceiveListeners.contains(listener))
426                                ipDataReceiveListeners.add(listener);
427                }
428        }
429        
430        /**
431         * Removes the given IP data receive listener from the list of 
432         * IP data receive listeners.
433         * 
434         * <p>If the listener is not included in the list, this method does nothing.
435         * </p>
436         * 
437         * @param listener IP data receive listener to remove from the list.
438         * 
439         * @see #addIPDataReceiveListener(IIPDataReceiveListener)
440         * @see com.digi.xbee.api.listeners.IIPDataReceiveListener
441         * 
442         * @since 1.2.0
443         */
444        public void removeIPDataReceiveListener(IIPDataReceiveListener listener) {
445                synchronized (ipDataReceiveListeners) {
446                        if (ipDataReceiveListeners.contains(listener))
447                                ipDataReceiveListeners.remove(listener);
448                }
449        }
450        
451        /**
452         * Adds the given SMS receive listener to the list of listeners that will 
453         * be notified when an SMS packet is received.
454         * 
455         * <p>If the listener has been already added, this method does nothing.</p>
456         * 
457         * @param listener Listener to be notified when new SMS packet is received.
458         * 
459         * @throws NullPointerException if {@code listener == null}.
460         * 
461         * @see #removeSMSReceiveListener(ISMSReceiveListener)
462         * @see com.digi.xbee.api.listeners.ISMSReceiveListener
463         * 
464         * @since 1.2.0
465         */
466        public void addSMSReceiveListener(ISMSReceiveListener listener) {
467                if (listener == null)
468                        throw new NullPointerException("Listener cannot be null.");
469                
470                synchronized (smsReceiveListeners) {
471                        if (!smsReceiveListeners.contains(listener))
472                                smsReceiveListeners.add(listener);
473                }
474        }
475        
476        /**
477         * Removes the given SMS receive listener from the list of SMS receive 
478         * listeners.
479         * 
480         * <p>If the listener is not included in the list, this method does nothing.
481         * </p>
482         * 
483         * @param listener SMS receive listener to remove from the list.
484         * 
485         * @see #addSMSReceiveListener(ISMSReceiveListener)
486         * @see com.digi.xbee.api.listeners.ISMSReceiveListener
487         * 
488         * @since 1.2.0
489         */
490        public void removeSMSReceiveListener(ISMSReceiveListener listener) {
491                synchronized (smsReceiveListeners) {
492                        if (smsReceiveListeners.contains(listener))
493                                smsReceiveListeners.remove(listener);
494                }
495        }
496
497        /**
498         * Adds the given User Data Relay receive listener to the list of listeners
499         * that will be notified when a User Data Relay packet is received.
500         *
501         * <p>If the listener has been already added, this method does nothing.</p>
502         *
503         * @param listener Listener to be notified when new User Data Relay packet
504         *                 is received.
505         *
506         * @throws NullPointerException if {@code listener == null}.
507         *
508         * @see #removeUserDataRelayReceiveListener(IUserDataRelayReceiveListener)
509         * @see IUserDataRelayReceiveListener
510         *
511         * @since 1.3.0
512         */
513        public void addUserDataRelayReceiveListener(IUserDataRelayReceiveListener listener) {
514                if (listener == null)
515                        throw new NullPointerException("Listener cannot be null");
516
517                synchronized (dataRelayReceiveListeners) {
518                        if (!dataRelayReceiveListeners.contains(listener))
519                                dataRelayReceiveListeners.add(listener);
520                }
521        }
522
523        /**
524         * Removes the given User Data Relay receive listener from the list of User
525         * Data Relay receive listeners.
526         *
527         * <p>If the listener is not included in the list, this method does nothing.
528         * </p>
529         *
530         * @param listener User Data Relay receive listener to remove from the list.
531         *
532         * @see #addUserDataRelayReceiveListener(IUserDataRelayReceiveListener)
533         * @see IUserDataRelayReceiveListener
534         *
535         * @since 1.3.0
536         */
537        public void removeUserDataRelayReceiveListener(IUserDataRelayReceiveListener listener) {
538                synchronized (dataRelayReceiveListeners) {
539                        if (dataRelayReceiveListeners.contains(listener))
540                                dataRelayReceiveListeners.remove(listener);
541                }
542        }
543
544        /**
545         * Adds the given data receive listener to the list of listeners that will
546         * be notified when new data from the Bluetooth interface is received in
547         * a User Data Relay frame.
548         *
549         * <p>If the listener has been already added, this method does nothing.</p>
550         *
551         * @param listener Listener to be notified when new data from the Bluetooth
552         *                 interface is received.
553         *
554         * @throws NullPointerException if {@code listener == null}.
555         *
556         * @see #removeBluetoothDataReceiveListener(IBluetoothDataReceiveListener)
557         * @see IBluetoothDataReceiveListener
558         *
559         * @since 1.3.0
560         */
561        public void addBluetoothDataReceiveListener(IBluetoothDataReceiveListener listener) {
562                if (listener == null)
563                        throw new NullPointerException("Listener cannot be null");
564
565                synchronized (bluetoothDataReceiveListeners) {
566                        if (!bluetoothDataReceiveListeners.contains(listener))
567                                bluetoothDataReceiveListeners.add(listener);
568                }
569        }
570
571        /**
572         * Removes the given data receive listener from the list of data receive
573         * listeners for the Bluetooth interface.
574         *
575         * <p>If the listener is not included in the list, this method does nothing.
576         * </p>
577         *
578         * @param listener Data receive listener to remove from the list.
579         *
580         * @see #addBluetoothDataReceiveListener(IBluetoothDataReceiveListener)
581         * @see IBluetoothDataReceiveListener
582         *
583         * @since 1.3.0
584         */
585        public void removeBluetoothDataReceiveListener(IBluetoothDataReceiveListener listener) {
586                synchronized (bluetoothDataReceiveListeners) {
587                        if (bluetoothDataReceiveListeners.contains(listener))
588                                bluetoothDataReceiveListeners.remove(listener);
589                }
590        }
591
592        /**
593         * Adds the given data receive listener to the list of listeners that will
594         * be notified when new data from the MicroPython interface is received in
595         * a User Data Relay frame.
596         *
597         * <p>If the listener has been already added, this method does nothing.</p>
598         *
599         * @param listener Listener to be notified when new data from the
600         *                 MicroPython interface is received.
601         *
602         * @throws NullPointerException if {@code listener == null}.
603         *
604         * @see #removeMicroPythonDataReceiveListener(IMicroPythonDataReceiveListener)
605         * @see IMicroPythonDataReceiveListener
606         *
607         * @since 1.3.0
608         */
609        public void addMicroPythonDataReceiveListener(IMicroPythonDataReceiveListener listener) {
610                if (listener == null)
611                        throw new NullPointerException("Listener cannot be null");
612
613                synchronized (microPythonDataReceiveListeners) {
614                        if (!microPythonDataReceiveListeners.contains(listener))
615                                microPythonDataReceiveListeners.add(listener);
616                }
617        }
618
619        /**
620         * Removes the given data receive listener from the list of data receive
621         * listeners for the MicroPython interface.
622         *
623         * <p>If the listener is not included in the list, this method does nothing.
624         * </p>
625         *
626         * @param listener Data receive listener to remove from the list.
627         *
628         * @see #addMicroPythonDataReceiveListener(IMicroPythonDataReceiveListener)
629         * @see IMicroPythonDataReceiveListener
630         *
631         * @since 1.3.0
632         */
633        public void removeMicroPythonDataReceiveListener(IMicroPythonDataReceiveListener listener) {
634                synchronized (microPythonDataReceiveListeners) {
635                        if (microPythonDataReceiveListeners.contains(listener))
636                                microPythonDataReceiveListeners.remove(listener);
637                }
638        }
639
640        /**
641         * Adds the given data receive listener to the list of listeners that will
642         * be notified when new data from the serial interface is received in
643         * a User Data Relay frame.
644         *
645         * <p>If the listener has been already added, this method does nothing.</p>
646         *
647         * @param listener Listener to be notified when new data from the serial
648         *                 interface is received.
649         *
650         * @throws NullPointerException if {@code listener == null}.
651         *
652         * @see #removeSerialDataReceiveListener(ISerialDataReceiveListener)
653         * @see ISerialDataReceiveListener
654         *
655         * @since 1.3.0
656         */
657        public void addSerialDataReceiveListener(ISerialDataReceiveListener listener) {
658                if (listener == null)
659                        throw new NullPointerException("Listener cannot be null");
660
661                synchronized (serialDataReceiveListeners) {
662                        if (!serialDataReceiveListeners.contains(listener))
663                                serialDataReceiveListeners.add(listener);
664                }
665        }
666
667        /**
668         * Removes the given data receive listener from the list of data receive
669         * listeners for the serial interface.
670         *
671         * <p>If the listener is not included in the list, this method does nothing.
672         * </p>
673         *
674         * @param listener Data receive listener to remove from the list.
675         *
676         * @see #addSerialDataReceiveListener(ISerialDataReceiveListener)
677         * @see ISerialDataReceiveListener
678         *
679         * @since 1.3.0
680         */
681        public void removeSerialDataReceiveListener(ISerialDataReceiveListener listener) {
682                synchronized (serialDataReceiveListeners) {
683                        if (serialDataReceiveListeners.contains(listener))
684                                serialDataReceiveListeners.remove(listener);
685                }
686        }
687
688        /*
689         * (non-Javadoc)
690         * @see java.lang.Thread#run()
691         */
692        @Override
693        public void run() {
694                logger.debug(connectionInterface.toString() + "Data reader started.");
695                running = true;
696                // Clear the list of read packets.
697                xbeePacketsQueue.clearQueue();
698                try {
699                        synchronized (connectionInterface) {
700                                connectionInterface.wait();
701                        }
702                        while (running) {
703                                if (!running)
704                                        break;
705                                if (connectionInterface.getInputStream() != null) {
706                                        switch (mode) {
707                                        case AT:
708                                        default:
709                                                break;
710                                        case API:
711                                        case API_ESCAPE:
712                                                int headerByte = connectionInterface.getInputStream().read();
713                                                // If it is packet header parse the packet, if not discard this byte and continue.
714                                                if (headerByte == SpecialByte.HEADER_BYTE.getValue()) {
715                                                        try {
716                                                                XBeePacket packet = parser.parsePacket(connectionInterface.getInputStream(), mode);
717                                                                packetReceived(packet);
718                                                        } catch (Exception e) {
719                                                                logger.error("Error parsing the API packet.", e);
720                                                        }
721                                                }
722                                                break;
723                                        }
724                                } else if (connectionInterface.getInputStream() == null)
725                                        break;
726                                if (connectionInterface.getInputStream() == null)
727                                        break;
728                                else if (connectionInterface.getInputStream().available() > 0)
729                                        continue;
730                                synchronized (connectionInterface) {
731                                        connectionInterface.wait();
732                                }
733                        }
734                } catch (IOException e) {
735                        logger.error("Error reading from input stream.", e);
736                } catch (InterruptedException e) {
737                        logger.error(e.getMessage(), e);
738                } catch (IllegalStateException e) {
739                        logger.error(e.getMessage(), e);
740                } finally {
741                        if (running) {
742                                running = false;
743                                if (connectionInterface.isOpen())
744                                        connectionInterface.close();
745                        }
746                }
747        }
748        
749        /**
750         * Dispatches the received XBee packet to the corresponding listener(s).
751         * 
752         * @param packet The received XBee packet to be dispatched to the 
753         *               corresponding listeners.
754         * 
755         * @see com.digi.xbee.api.packet.XBeeAPIPacket
756         * @see com.digi.xbee.api.packet.XBeePacket
757         */
758        private void packetReceived(XBeePacket packet) {
759                // Add the packet to the packets queue.
760                xbeePacketsQueue.addPacket(packet);
761                // Notify that a packet has been received to the corresponding listeners.
762                notifyPacketReceived(packet);
763                
764                // Check if the packet is an API packet.
765                if (!(packet instanceof XBeeAPIPacket))
766                        return;
767                
768                // Get the API packet type.
769                XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet;
770                APIFrameType apiType = apiPacket.getFrameType();
771                if (apiType == null)
772                        return;
773                
774                try {
775                        // Obtain the remote device from the packet.
776                        RemoteXBeeDevice remoteDevice = getRemoteXBeeDeviceFromPacket(apiPacket);
777                        byte[] data = null;
778                        
779                        switch(apiType) {
780                        case RECEIVE_PACKET:
781                                ReceivePacket receivePacket = (ReceivePacket)apiPacket;
782                                data = receivePacket.getRFData();
783                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
784                                break;
785                        case RX_64:
786                                RX64Packet rx64Packet = (RX64Packet)apiPacket;
787                                data = rx64Packet.getRFData();
788                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
789                                break;
790                        case RX_16:
791                                RX16Packet rx16Packet = (RX16Packet)apiPacket;
792                                data = rx16Packet.getRFData();
793                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
794                                break;
795                        case IO_DATA_SAMPLE_RX_INDICATOR:
796                                IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket;
797                                notifyIOSampleReceived(remoteDevice, ioSamplePacket.getIOSample());
798                                break;
799                        case RX_IO_64:
800                                RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket;
801                                if (rx64IOPacket.getIOSamples() != null && rx64IOPacket.getIOSamples().size() > 0) {
802                                        for (IOSample sample : rx64IOPacket.getIOSamples())
803                                                notifyIOSampleReceived(remoteDevice, sample);
804                                }
805                                break;
806                        case RX_IO_16:
807                                RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket;
808                                if (rx16IOPacket.getIOSamples() != null && rx16IOPacket.getIOSamples().size() > 0) {
809                                        for (IOSample sample : rx16IOPacket.getIOSamples())
810                                                notifyIOSampleReceived(remoteDevice, sample);
811                                }
812                                break;
813                        case IPV6_IO_DATA_SAMPLE_RX_INDICATOR:
814                                IPv6IODataSampleRxIndicator ioSampleIPv6Packet = (IPv6IODataSampleRxIndicator)apiPacket;
815                                notifyIOSampleReceived(remoteDevice, ioSampleIPv6Packet.getIOSample());
816                                break;
817                        case MODEM_STATUS:
818                                ModemStatusPacket modemStatusPacket = (ModemStatusPacket)apiPacket;
819                                notifyModemStatusReceived(modemStatusPacket.getStatus());
820                                break;
821                        case EXPLICIT_RX_INDICATOR:
822                                ExplicitRxIndicatorPacket explicitDataPacket = (ExplicitRxIndicatorPacket)apiPacket;
823                                int sourceEndpoint = explicitDataPacket.getSourceEndpoint();
824                                int destEndpoint = explicitDataPacket.getDestinationEndpoint();
825                                int clusterID = explicitDataPacket.getClusterID();
826                                int profileID = explicitDataPacket.getProfileID();
827                                data = explicitDataPacket.getRFData();
828                                // If this is an explicit packet for data transmissions in the Digi profile, 
829                                // notify also the data listener and add a Receive packet to the queue.
830                                if (sourceEndpoint == ExplicitRxIndicatorPacket.DATA_ENDPOINT && 
831                                                destEndpoint == ExplicitRxIndicatorPacket.DATA_ENDPOINT &&
832                                                clusterID == ExplicitRxIndicatorPacket.DATA_CLUSTER && 
833                                                profileID == ExplicitRxIndicatorPacket.DIGI_PROFILE) {
834                                        notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
835                                        xbeePacketsQueue.addPacket(new ReceivePacket(explicitDataPacket.get64BitSourceAddress(), 
836                                                        explicitDataPacket.get16BitSourceAddress(), 
837                                                        explicitDataPacket.getReceiveOptions(), 
838                                                        explicitDataPacket.getRFData()));
839                                }
840                                notifyExplicitDataReceived(new ExplicitXBeeMessage(remoteDevice, sourceEndpoint, destEndpoint, clusterID, profileID, data, explicitDataPacket.isBroadcast()));
841                                break;
842                        case RX_IPV4:
843                                RXIPv4Packet rxIPv4Packet = (RXIPv4Packet)apiPacket;
844                                notifyIPDataReceived(new IPMessage(
845                                                rxIPv4Packet.getSourceAddress(), 
846                                                rxIPv4Packet.getSourcePort(), 
847                                                rxIPv4Packet.getDestPort(),
848                                                rxIPv4Packet.getProtocol(),
849                                                rxIPv4Packet.getData()));
850                                break;
851                        case RX_IPV6:
852                                RXIPv6Packet rxIPv6Packet = (RXIPv6Packet)apiPacket;
853                                notifyIPDataReceived(new IPMessage(
854                                                rxIPv6Packet.getSourceAddress(), 
855                                                rxIPv6Packet.getSourcePort(), 
856                                                rxIPv6Packet.getDestPort(),
857                                                rxIPv6Packet.getProtocol(),
858                                                rxIPv6Packet.getData()));
859                                break;
860                        case RX_SMS:
861                                RXSMSPacket rxSMSPacket = (RXSMSPacket)apiPacket;
862                                notifySMSReceived(new SMSMessage(rxSMSPacket.getPhoneNumber(), rxSMSPacket.getData()));
863                                break;
864                        case USER_DATA_RELAY_OUTPUT:
865                                UserDataRelayOutputPacket relayPacket = (UserDataRelayOutputPacket)apiPacket;
866                                notifyUserDataRelayReceived(new UserDataRelayMessage(relayPacket.getSourceInterface(), relayPacket.getData()));
867                                break;
868                        default:
869                                break;
870                        }
871                        
872                } catch (XBeeException e) {
873                        logger.error(e.getMessage(), e);
874                }
875        }
876        
877        /**
878         * Returns the remote XBee device from where the given package was sent 
879         * from.
880         * 
881         * <p><b>This is for internal use only.</b></p>
882         * 
883         * <p>If the package does not contain information about the source, this 
884         * method returns {@code null} (for example, {@code ModemStatusPacket}).</p>
885         * 
886         * <p>First the device that sent the provided package is looked in the 
887         * network of the local XBee device. If the remote device is not in the 
888         * network, it is automatically added only if the packet contains 
889         * information about the origin of the package.</p>
890         * 
891         * @param packet The packet sent from the remote device.
892         * 
893         * @return The remote XBee device that sends the given packet. It may be 
894         *         {@code null} if the packet is not a known frame (see 
895         *         {@link APIFrameType}) or if it does not contain information of 
896         *         the source device.
897         * 
898         * @throws NullPointerException if {@code packet == null}
899         * @throws XBeeException if any error occur while adding the device to the 
900         *                       network.
901         */
902        public RemoteXBeeDevice getRemoteXBeeDeviceFromPacket(XBeeAPIPacket packet) throws XBeeException {
903                if (packet == null)
904                        throw new NullPointerException("XBee API packet cannot be null.");
905                        
906                XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet;
907                APIFrameType apiType = apiPacket.getFrameType();
908                if (apiType == null || apiType == APIFrameType.UNKNOWN)
909                        return null;
910                
911                RemoteXBeeDevice remoteDevice = null;
912                XBee64BitAddress addr64 = null;
913                XBee16BitAddress addr16 = null;
914                Inet6Address addrIPv6 = null;
915                
916                XBeeNetwork network = xbeeDevice.getNetwork();
917                // There are protocols that do not support the network feature.
918                if (network == null && xbeeDevice.getXBeeProtocol() != XBeeProtocol.THREAD)
919                        return null;
920                
921                switch(apiType) {
922                case RECEIVE_PACKET:
923                        ReceivePacket receivePacket = (ReceivePacket)apiPacket;
924                        addr64 = receivePacket.get64bitSourceAddress();
925                        addr16 = receivePacket.get16bitSourceAddress();
926                        if (!addr64.equals(XBee64BitAddress.UNKNOWN_ADDRESS))
927                                remoteDevice = network.getDevice(addr64);
928                        else if (!addr16.equals(XBee16BitAddress.UNKNOWN_ADDRESS))
929                                remoteDevice = network.getDevice(addr16);
930                        break;
931                case RX_64:
932                        RX64Packet rx64Packet = (RX64Packet)apiPacket;
933                        addr64 = rx64Packet.get64bitSourceAddress();
934                        remoteDevice = network.getDevice(addr64);
935                        break;
936                case RX_16:
937                        RX16Packet rx16Packet = (RX16Packet)apiPacket;
938                        addr64 = XBee64BitAddress.UNKNOWN_ADDRESS;
939                        addr16 = rx16Packet.get16bitSourceAddress();
940                        remoteDevice = network.getDevice(addr16);
941                        break;
942                case RX_IPV6:
943                        RXIPv6Packet rxIPv6Packet = (RXIPv6Packet)apiPacket;
944                        addrIPv6 = rxIPv6Packet.getSourceAddress();
945                        if (xbeeDevice.getXBeeProtocol() == XBeeProtocol.THREAD)
946                                remoteDevice = new RemoteThreadDevice(xbeeDevice, addrIPv6);
947                        else
948                                remoteDevice = new RemoteXBeeDevice(xbeeDevice, addrIPv6);
949                        break;
950                case IO_DATA_SAMPLE_RX_INDICATOR:
951                        IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket;
952                        addr64 = ioSamplePacket.get64bitSourceAddress();
953                        addr16 = ioSamplePacket.get16bitSourceAddress();
954                        remoteDevice = network.getDevice(addr64);
955                        break;
956                case IPV6_IO_DATA_SAMPLE_RX_INDICATOR:
957                        IPv6IODataSampleRxIndicator ioSampleIPv6Packet = (IPv6IODataSampleRxIndicator)apiPacket;
958                        addrIPv6 = ioSampleIPv6Packet.getSourceAddress();
959                        if (xbeeDevice.getXBeeProtocol() == XBeeProtocol.THREAD)
960                                remoteDevice = new RemoteThreadDevice(xbeeDevice, addrIPv6);
961                        else
962                                remoteDevice = new RemoteXBeeDevice(xbeeDevice, addrIPv6);
963                        break;
964                case RX_IO_64:
965                        RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket;
966                        addr64 = rx64IOPacket.get64bitSourceAddress();
967                        remoteDevice = network.getDevice(addr64);
968                        break;
969                case RX_IO_16:
970                        RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket;
971                        addr64 = XBee64BitAddress.UNKNOWN_ADDRESS;
972                        addr16 = rx16IOPacket.get16bitSourceAddress();
973                        remoteDevice = network.getDevice(addr16);
974                        break;
975                case EXPLICIT_RX_INDICATOR:
976                        ExplicitRxIndicatorPacket explicitDataPacket = (ExplicitRxIndicatorPacket)apiPacket;
977                        addr64 = explicitDataPacket.get64BitSourceAddress();
978                        addr16 = explicitDataPacket.get16BitSourceAddress();
979                        remoteDevice = network.getDevice(addr64);
980                        break;
981                default:
982                        // Rest of the types are considered not to contain information 
983                        // about the origin of the packet.
984                        return remoteDevice;
985                }
986                
987                // If the origin is not in the network, add it.
988                if (remoteDevice == null) {
989                        remoteDevice = createRemoteXBeeDevice(addr64, addr16, null);
990                        if (!addr64.equals(XBee64BitAddress.UNKNOWN_ADDRESS) || !addr16.equals(XBee16BitAddress.UNKNOWN_ADDRESS))
991                                network.addRemoteDevice(remoteDevice);
992                }
993                
994                return remoteDevice;
995        }
996        
997        /**
998         * Creates a new remote XBee device with the provided 64-bit address, 
999         * 16-bit address, node identifier and the XBee device that is using this 
1000         * data reader as the connection interface for the remote device.
1001         * 
1002         * The new XBee device will be a {@code RemoteDigiMeshDevice}, 
1003         * a {@code RemoteDigiPointDevice}, a {@code RemoteRaw802Device} or a 
1004         * {@code RemoteZigBeeDevice} depending on the protocol of the local XBee 
1005         * device. If the protocol cannot be determined or is unknown a 
1006         * {@code RemoteXBeeDevice} will be created instead.
1007         * 
1008         * @param addr64 The 64-bit address of the new remote device. It cannot be 
1009         *               {@code null}.
1010         * @param addr16 The 16-bit address of the new remote device. It may be 
1011         *               {@code null}.
1012         * @param ni The node identifier of the new remote device. It may be 
1013         *           {@code null}.
1014         * 
1015         * @return a new remote XBee device with the given parameters.
1016         */
1017        private RemoteXBeeDevice createRemoteXBeeDevice(XBee64BitAddress addr64, 
1018                        XBee16BitAddress addr16, String ni) {
1019                RemoteXBeeDevice device = null;
1020                
1021                switch (xbeeDevice.getXBeeProtocol()) {
1022                case ZIGBEE:
1023                        device = new RemoteZigBeeDevice(xbeeDevice, addr64, addr16, ni);
1024                        break;
1025                case DIGI_MESH:
1026                        device = new RemoteDigiMeshDevice(xbeeDevice, addr64, ni);
1027                        break;
1028                case DIGI_POINT:
1029                        device = new RemoteDigiPointDevice(xbeeDevice, addr64, ni);
1030                        break;
1031                case RAW_802_15_4:
1032                        device = new RemoteRaw802Device(xbeeDevice, addr64, addr16, ni);
1033                        break;
1034                default:
1035                        device = new RemoteXBeeDevice(xbeeDevice, addr64, addr16, ni);
1036                        break;
1037                }
1038                
1039                return device;
1040        }
1041        
1042        /**
1043         * Notifies subscribed data receive listeners that a new XBee data packet 
1044         * has been received in form of an {@code XBeeMessage}.
1045         *
1046         * @param xbeeMessage The XBee message to be sent to subscribed XBee data
1047         *                    listeners.
1048         * 
1049         * @see com.digi.xbee.api.models.XBeeMessage
1050         */
1051        private void notifyDataReceived(final XBeeMessage xbeeMessage) {
1052                if (xbeeMessage.isBroadcast())
1053                        logger.info(connectionInterface.toString() + 
1054                                        "Broadcast data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData()));
1055                else
1056                        logger.info(connectionInterface.toString() + 
1057                                        "Data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData()));
1058                
1059                try {
1060                        synchronized (dataReceiveListeners) {
1061                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1062                                                dataReceiveListeners.size()));
1063                                for (final IDataReceiveListener listener:dataReceiveListeners) {
1064                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1065                                        if (!running)
1066                                                break;
1067                                        executor.execute(new Runnable() {
1068                                                /*
1069                                                 * (non-Javadoc)
1070                                                 * @see java.lang.Runnable#run()
1071                                                 */
1072                                                @Override
1073                                                public void run() {
1074                                                        /* Synchronize the listener so it is not called 
1075                                                         twice. That is, let the listener to finish its job. */
1076                                                        synchronized (listener) {
1077                                                                listener.dataReceived(xbeeMessage);
1078                                                        }
1079                                                }
1080                                        });
1081                                }
1082                                executor.shutdown();
1083                        }
1084                } catch (Exception e) {
1085                        logger.error(e.getMessage(), e);
1086                }
1087        }
1088        
1089        /**
1090         * Notifies subscribed XBee packet listeners that a new XBee packet has 
1091         * been received.
1092         *
1093         * @param packet The received XBee packet.
1094         * 
1095         * @see com.digi.xbee.api.packet.XBeeAPIPacket
1096         * @see com.digi.xbee.api.packet.XBeePacket
1097         */
1098        private void notifyPacketReceived(final XBeePacket packet) {
1099                logger.debug(connectionInterface.toString() + "Packet received: \n{}", packet.toPrettyString());
1100                
1101                try {
1102                        synchronized (packetReceiveListeners) {
1103                                final ArrayList<IPacketReceiveListener> removeListeners = new ArrayList<IPacketReceiveListener>();
1104                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1105                                                packetReceiveListeners.size()));
1106                                for (final IPacketReceiveListener listener:packetReceiveListeners.keySet()) {
1107                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1108                                        if (!running)
1109                                                break;
1110                                        executor.execute(new Runnable() {
1111                                                /*
1112                                                 * (non-Javadoc)
1113                                                 * @see java.lang.Runnable#run()
1114                                                 */
1115                                                @Override
1116                                                public void run() {
1117                                                        // Synchronize the listener so it is not called 
1118                                                        // twice. That is, let the listener to finish its job.
1119                                                        synchronized (packetReceiveListeners) {
1120                                                                synchronized (listener) {
1121                                                                        if (packetReceiveListeners.get(listener) == ALL_FRAME_IDS)
1122                                                                                listener.packetReceived(packet);
1123                                                                        else if (((XBeeAPIPacket)packet).needsAPIFrameID() && 
1124                                                                                        ((XBeeAPIPacket)packet).getFrameID() == packetReceiveListeners.get(listener)) {
1125                                                                                listener.packetReceived(packet);
1126                                                                                removeListeners.add(listener);
1127                                                                        }
1128                                                                }
1129                                                        }
1130                                                }
1131                                        });
1132                                }
1133                                executor.shutdown();
1134                                // Remove required listeners.
1135                                for (IPacketReceiveListener listener:removeListeners)
1136                                        packetReceiveListeners.remove(listener);
1137                        }
1138                } catch (Exception e) {
1139                        logger.error(e.getMessage(), e);
1140                }
1141        }
1142        
1143        /**
1144         * Notifies subscribed IO sample listeners that a new IO sample packet has
1145         * been received.
1146         *
1147         * @param ioSample The received IO sample.
1148         * @param remoteDevice The remote XBee device that sent the sample.
1149         * 
1150         * @see com.digi.xbee.api.RemoteXBeeDevice
1151         * @see com.digi.xbee.api.io.IOSample
1152         */
1153        private void notifyIOSampleReceived(final RemoteXBeeDevice remoteDevice, final IOSample ioSample) {
1154                logger.debug(connectionInterface.toString() + "IO sample received.");
1155                
1156                try {
1157                        synchronized (ioSampleReceiveListeners) {
1158                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1159                                                ioSampleReceiveListeners.size()));
1160                                for (final IIOSampleReceiveListener listener:ioSampleReceiveListeners) {
1161                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1162                                        if (!running)
1163                                                break;
1164                                        executor.execute(new Runnable() {
1165                                                /*
1166                                                 * (non-Javadoc)
1167                                                 * @see java.lang.Runnable#run()
1168                                                 */
1169                                                @Override
1170                                                public void run() {
1171                                                        // Synchronize the listener so it is not called 
1172                                                        // twice. That is, let the listener to finish its job.
1173                                                        synchronized (listener) {
1174                                                                listener.ioSampleReceived(remoteDevice, ioSample);
1175                                                        }
1176                                                }
1177                                        });
1178                                }
1179                                executor.shutdown();
1180                        }
1181                } catch (Exception e) {
1182                        logger.error(e.getMessage(), e);
1183                }
1184        }
1185        
1186        /**
1187         * Notifies subscribed Modem Status listeners that a Modem Status event 
1188         * packet has been received.
1189         *
1190         * @param modemStatusEvent The Modem Status event.
1191         * 
1192         * @see com.digi.xbee.api.models.ModemStatusEvent
1193         */
1194        private void notifyModemStatusReceived(final ModemStatusEvent modemStatusEvent) {
1195                logger.debug(connectionInterface.toString() + "Modem Status event received.");
1196                
1197                try {
1198                        synchronized (modemStatusListeners) {
1199                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1200                                                modemStatusListeners.size()));
1201                                for (final IModemStatusReceiveListener listener:modemStatusListeners) {
1202                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1203                                        if (!running)
1204                                                break;
1205                                        executor.execute(new Runnable() {
1206                                                /*
1207                                                 * (non-Javadoc)
1208                                                 * @see java.lang.Runnable#run()
1209                                                 */
1210                                                @Override
1211                                                public void run() {
1212                                                        // Synchronize the listener so it is not called 
1213                                                        // twice. That is, let the listener to finish its job.
1214                                                        synchronized (listener) {
1215                                                                listener.modemStatusEventReceived(modemStatusEvent);
1216                                                        }
1217                                                }
1218                                        });
1219                                }
1220                                executor.shutdown();
1221                        }
1222                } catch (Exception e) {
1223                        logger.error(e.getMessage(), e);
1224                }
1225        }
1226        
1227        /**
1228         * Notifies subscribed explicit data receive listeners that a new XBee 
1229         * explicit data packet has been received in form of an 
1230         * {@code ExplicitXBeeMessage}.
1231         *
1232         * @param explicitXBeeMessage The XBee message to be sent to subscribed 
1233         *                            XBee data listeners.
1234         * 
1235         * @see com.digi.xbee.api.models.ExplicitXBeeMessage
1236         */
1237        private void notifyExplicitDataReceived(final ExplicitXBeeMessage explicitXBeeMessage) {
1238                if (explicitXBeeMessage.isBroadcast())
1239                        logger.info(connectionInterface.toString() + 
1240                                        "Broadcast explicit data received from {} >> {}.", explicitXBeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(explicitXBeeMessage.getData()));
1241                else
1242                        logger.info(connectionInterface.toString() + 
1243                                        "Explicit data received from {} >> {}.", explicitXBeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(explicitXBeeMessage.getData()));
1244                
1245                try {
1246                        synchronized (explicitDataReceiveListeners) {
1247                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1248                                                explicitDataReceiveListeners.size()));
1249                                for (final IExplicitDataReceiveListener listener:explicitDataReceiveListeners) {
1250                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1251                                        if (!running)
1252                                                break;
1253                                        executor.execute(new Runnable() {
1254                                                /*
1255                                                 * (non-Javadoc)
1256                                                 * @see java.lang.Runnable#run()
1257                                                 */
1258                                                @Override
1259                                                public void run() {
1260                                                        /* Synchronize the listener so it is not called 
1261                                                         twice. That is, let the listener to finish its job. */
1262                                                        synchronized (listener) {
1263                                                                listener.explicitDataReceived(explicitXBeeMessage);
1264                                                        }
1265                                                }
1266                                        });
1267                                }
1268                                executor.shutdown();
1269                        }
1270                } catch (Exception e) {
1271                        logger.error(e.getMessage(), e);
1272                }
1273        }
1274        
1275        /**
1276         * Notifies subscribed IP data receive listeners that a new IP data 
1277         * packet has been received in form of a {@code ipMessage}.
1278         *
1279         * @param ipMessage The IP message to be sent to subscribed 
1280         *                  IP data listeners.
1281         * 
1282         * @see com.digi.xbee.api.models.IPMessage
1283         * 
1284         * @since 1.2.0
1285         */
1286        private void notifyIPDataReceived(final IPMessage ipMessage) {
1287                logger.info(connectionInterface.toString() + 
1288                                "IP data received from {} >> {}.", ipMessage.getHostAddress(), HexUtils.prettyHexString(ipMessage.getData()));
1289                
1290                try {
1291                        synchronized (ipDataReceiveListeners) {
1292                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1293                                                ipDataReceiveListeners.size()));
1294                                for (final IIPDataReceiveListener listener:ipDataReceiveListeners) {
1295                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1296                                        if (!running)
1297                                                break;
1298                                        executor.execute(new Runnable() {
1299                                                /*
1300                                                 * (non-Javadoc)
1301                                                 * @see java.lang.Runnable#run()
1302                                                 */
1303                                                @Override
1304                                                public void run() {
1305                                                        /* Synchronize the listener so it is not called 
1306                                                         twice. That is, let the listener to finish its job. */
1307                                                        synchronized (listener) {
1308                                                                listener.ipDataReceived(ipMessage);
1309                                                        }
1310                                                }
1311                                        });
1312                                }
1313                                executor.shutdown();
1314                        }
1315                } catch (Exception e) {
1316                        logger.error(e.getMessage(), e);
1317                }
1318        }
1319        
1320        /**
1321         * Notifies subscribed SMS receive listeners that a new SMS packet has 
1322         * been received in form of an {@code SMSMessage}.
1323         *
1324         * @param smsMessage The SMS message to be sent to subscribed SMS listeners.
1325         * 
1326         * @see com.digi.xbee.api.models.SMSMessage
1327         * 
1328         * @since 1.2.0
1329         */
1330        private void notifySMSReceived(final SMSMessage smsMessage) {
1331                logger.info(connectionInterface.toString() + 
1332                                "SMS received from {} >> {}.", smsMessage.getPhoneNumber(), smsMessage.getData());
1333                
1334                try {
1335                        synchronized (smsReceiveListeners) {
1336                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1337                                                smsReceiveListeners.size()));
1338                                for (final ISMSReceiveListener listener:smsReceiveListeners) {
1339                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1340                                        if (!running)
1341                                                break;
1342                                        executor.execute(new Runnable() {
1343                                                /*
1344                                                 * (non-Javadoc)
1345                                                 * @see java.lang.Runnable#run()
1346                                                 */
1347                                                @Override
1348                                                public void run() {
1349                                                        /* Synchronize the listener so it is not called 
1350                                                         twice. That is, let the listener to finish its job. */
1351                                                        synchronized (listener) {
1352                                                                listener.smsReceived(smsMessage);
1353                                                        }
1354                                                }
1355                                        });
1356                                }
1357                                executor.shutdown();
1358                        }
1359                } catch (Exception e) {
1360                        logger.error(e.getMessage(), e);
1361                }
1362        }
1363
1364        /**
1365         * Notifies subscribed User Data Relay receive listeners that a new User
1366         * Data Relay packet has been received in form of a
1367         * {@code UserDataRelayMessage}.
1368         *
1369         * @param relayMessage The User Data Relay message to be sent to subscribed
1370         *                     User Data Relay listeners.
1371         *
1372         * @see UserDataRelayMessage
1373         *
1374         * @since 1.3.0
1375         */
1376        private void notifyUserDataRelayReceived(UserDataRelayMessage relayMessage) {
1377                logger.info(connectionInterface.toString() +
1378                                "User Data Relay received from {} >> {}.", relayMessage.getSourceInterface().getDescription(),
1379                                relayMessage.getData() != null ? HexUtils.prettyHexString(relayMessage.getData()) : "");
1380
1381                // Notify the generic User Data Relay listeners.
1382                notifyUserDataRelayReceived(relayMessage, true);
1383
1384                // Notify the specific User Data Relay listeners.
1385                notifyUserDataRelayReceived(relayMessage, false);
1386        }
1387
1388        /**
1389         * Notifies subscribed generic or specific User Data Relay receive listeners
1390         * that a new User Data Relay packet has been received in form of a
1391         * {@code UserDataRelayMessage}.
1392         *
1393         * @param relayMessage The User Data Relay message to be sent to subscribed
1394         *                     User Data Relay listeners.
1395         * @param notifyGeneric {@code true} to notify only the generic listeners,
1396         *                      {@code false} to notify the specific ones.
1397         *
1398         * @see UserDataRelayMessage
1399         *
1400         * @since 1.3.0
1401         */
1402        private void notifyUserDataRelayReceived(final UserDataRelayMessage relayMessage, final boolean notifyGeneric) {
1403                ArrayList<?> listenerList = new ArrayList<>();
1404
1405                // Get the list of listeners that should be notified depending on the parameters.
1406                if (notifyGeneric) {
1407                        listenerList = dataRelayReceiveListeners;
1408                } else {
1409                        switch (relayMessage.getSourceInterface()) {
1410                                case SERIAL:
1411                                        listenerList = serialDataReceiveListeners;
1412                                        break;
1413                                case BLUETOOTH:
1414                                        listenerList = bluetoothDataReceiveListeners;
1415                                        break;
1416                                case MICROPYTHON:
1417                                        listenerList = microPythonDataReceiveListeners;
1418                                        break;
1419                                default:
1420                                        break;
1421                        }
1422                }
1423
1424                // Notify the appropriate listeners.
1425                try {
1426                        synchronized (listenerList) {
1427                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS,
1428                                                listenerList.size()));
1429                                for (final Object listener : listenerList) {
1430                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1431                                        if (!running)
1432                                                break;
1433                                        executor.execute(new Runnable() {
1434                                                @Override
1435                                                public void run() {
1436                                                        /* Synchronize the listener so it is not called
1437                                                         twice. That is, let the listener to finish its job. */
1438                                                        synchronized (listener) {
1439                                                                if (notifyGeneric) {
1440                                                                        ((IUserDataRelayReceiveListener) listener).userDataRelayReceived(relayMessage);
1441                                                                } else {
1442                                                                        switch (relayMessage.getSourceInterface()) {
1443                                                                                case SERIAL:
1444                                                                                        ((ISerialDataReceiveListener) listener).dataReceived(relayMessage.getData());
1445                                                                                        break;
1446                                                                                case BLUETOOTH:
1447                                                                                        ((IBluetoothDataReceiveListener) listener).dataReceived(relayMessage.getData());
1448                                                                                        break;
1449                                                                                case MICROPYTHON:
1450                                                                                        ((IMicroPythonDataReceiveListener) listener).dataReceived(relayMessage.getData());
1451                                                                                        break;
1452                                                                                default:
1453                                                                                        break;
1454                                                                        }
1455                                                                }
1456                                                        }
1457                                                }
1458                                        });
1459                                }
1460                                executor.shutdown();
1461                        }
1462                } catch (Exception e) {
1463                        logger.error(e.getMessage(), e);
1464                }
1465        }
1466        
1467        /**
1468         * Returns whether this Data reader is running or not.
1469         * 
1470         * @return {@code true} if the Data reader is running, {@code false} 
1471         *         otherwise.
1472         * 
1473         * @see #stopReader()
1474         */
1475        public boolean isRunning() {
1476                return running;
1477        }
1478        
1479        /**
1480         * Stops the Data reader thread.
1481         * 
1482         * @see #isRunning()
1483         */
1484        public void stopReader() {
1485                running = false;
1486                synchronized (connectionInterface) {
1487                        connectionInterface.notify();
1488                }
1489                logger.debug(connectionInterface.toString() + "Data reader stopped.");
1490        }
1491        
1492        /**
1493         * Returns the queue of read XBee packets.
1494         * 
1495         * @return The queue of read XBee packets.
1496         * 
1497         * @see com.digi.xbee.api.models.XBeePacketsQueue
1498         */
1499        public XBeePacketsQueue getXBeePacketsQueue() {
1500                return xbeePacketsQueue;
1501        }
1502}