001/*
002 * Copyright 2017-2019, Digi International Inc.
003 *
004 * This Source Code Form is subject to the terms of the Mozilla Public
005 * License, v. 2.0. If a copy of the MPL was not distributed with this
006 * file, you can obtain one at http://mozilla.org/MPL/2.0/.
007 *
008 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 
009 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
010 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 
011 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
012 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
013 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 
014 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
015 */
016package com.digi.xbee.api;
017
018import java.io.IOException;
019import java.net.Inet6Address;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.concurrent.Executors;
023import java.util.concurrent.ScheduledExecutorService;
024
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import com.digi.xbee.api.connection.IConnectionInterface;
029import com.digi.xbee.api.exceptions.XBeeException;
030import com.digi.xbee.api.io.IOSample;
031import com.digi.xbee.api.listeners.IExplicitDataReceiveListener;
032import com.digi.xbee.api.listeners.IIOSampleReceiveListener;
033import com.digi.xbee.api.listeners.IModemStatusReceiveListener;
034import com.digi.xbee.api.listeners.IIPDataReceiveListener;
035import com.digi.xbee.api.listeners.IPacketReceiveListener;
036import com.digi.xbee.api.listeners.IDataReceiveListener;
037import com.digi.xbee.api.listeners.ISMSReceiveListener;
038import com.digi.xbee.api.listeners.IUserDataRelayReceiveListener;
039import com.digi.xbee.api.listeners.relay.IBluetoothDataReceiveListener;
040import com.digi.xbee.api.listeners.relay.IMicroPythonDataReceiveListener;
041import com.digi.xbee.api.listeners.relay.ISerialDataReceiveListener;
042import com.digi.xbee.api.models.ExplicitXBeeMessage;
043import com.digi.xbee.api.models.ModemStatusEvent;
044import com.digi.xbee.api.models.IPMessage;
045import com.digi.xbee.api.models.SMSMessage;
046import com.digi.xbee.api.models.SpecialByte;
047import com.digi.xbee.api.models.OperatingMode;
048import com.digi.xbee.api.models.UserDataRelayMessage;
049import com.digi.xbee.api.models.XBee16BitAddress;
050import com.digi.xbee.api.models.XBee64BitAddress;
051import com.digi.xbee.api.models.XBeeMessage;
052import com.digi.xbee.api.models.XBeePacketsQueue;
053import com.digi.xbee.api.models.XBeeProtocol;
054import com.digi.xbee.api.packet.XBeeAPIPacket;
055import com.digi.xbee.api.packet.APIFrameType;
056import com.digi.xbee.api.packet.XBeePacket;
057import com.digi.xbee.api.packet.XBeePacketParser;
058import com.digi.xbee.api.packet.cellular.RXSMSPacket;
059import com.digi.xbee.api.packet.common.ExplicitRxIndicatorPacket;
060import com.digi.xbee.api.packet.common.IODataSampleRxIndicatorPacket;
061import com.digi.xbee.api.packet.common.ModemStatusPacket;
062import com.digi.xbee.api.packet.common.ReceivePacket;
063import com.digi.xbee.api.packet.ip.RXIPv4Packet;
064import com.digi.xbee.api.packet.raw.RX16IOPacket;
065import com.digi.xbee.api.packet.raw.RX16Packet;
066import com.digi.xbee.api.packet.raw.RX64IOPacket;
067import com.digi.xbee.api.packet.raw.RX64Packet;
068import com.digi.xbee.api.packet.relay.UserDataRelayOutputPacket;
069import com.digi.xbee.api.packet.thread.IPv6IODataSampleRxIndicator;
070import com.digi.xbee.api.packet.thread.RXIPv6Packet;
071import com.digi.xbee.api.utils.HexUtils;
072
073/**
074 * Thread that constantly reads data from an input stream.
075 * 
076 * <p>Depending on the XBee operating mode, read data is notified as is to the 
077 * subscribed listeners or is parsed to a packet using the packet parser and 
078 * then notified to subscribed listeners.</p> 
079 */
080public class DataReader extends Thread {
081        
082        // Constants.
083        private final static int ALL_FRAME_IDS = 99999;
084        private final static int MAXIMUM_PARALLEL_LISTENER_THREADS = 20;
085        
086        // Variables.
087        private boolean running = false;
088        
089        private IConnectionInterface connectionInterface;
090        
091        private volatile OperatingMode mode;
092        
093        private ArrayList<IDataReceiveListener> dataReceiveListeners = new ArrayList<>();
094        // The packetReceiveListeners requires to be a HashMap with an associated integer. The integer is used to determine 
095        // the frame ID of the packet that should be received. When it is 99999 (ALL_FRAME_IDS), all the packets will be handled.
096        private HashMap<IPacketReceiveListener, Integer> packetReceiveListeners = new HashMap<>();
097        private ArrayList<IIOSampleReceiveListener> ioSampleReceiveListeners = new ArrayList<>();
098        private ArrayList<IModemStatusReceiveListener> modemStatusListeners = new ArrayList<>();
099        private ArrayList<IExplicitDataReceiveListener> explicitDataReceiveListeners = new ArrayList<>();
100        private ArrayList<IIPDataReceiveListener> ipDataReceiveListeners = new ArrayList<>();
101        private ArrayList<ISMSReceiveListener> smsReceiveListeners = new ArrayList<>();
102        private ArrayList<IUserDataRelayReceiveListener> dataRelayReceiveListeners = new ArrayList<>();
103        private ArrayList<IBluetoothDataReceiveListener> bluetoothDataReceiveListeners = new ArrayList<>();
104        private ArrayList<IMicroPythonDataReceiveListener> microPythonDataReceiveListeners = new ArrayList<>();
105        private ArrayList<ISerialDataReceiveListener> serialDataReceiveListeners = new ArrayList<>();
106
107        private Logger logger;
108        
109        private XBeePacketParser parser;
110        
111        private XBeePacketsQueue xbeePacketsQueue;
112        
113        private AbstractXBeeDevice xbeeDevice;
114        
115        /**
116         * Class constructor. Instantiates a new {@code DataReader} object for the 
117         * given connection interface using the given XBee operating mode and XBee
118         * device.
119         * 
120         * @param connectionInterface Connection interface to read data from.
121         * @param mode XBee operating mode.
122         * @param xbeeDevice Reference to the XBee device containing this 
123         *                   {@code DataReader} object.
124         * 
125         * @throws NullPointerException if {@code connectionInterface == null} or
126         *                                 {@code mode == null}.
127         * @throws IllegalArgumentException If {@code xbeeDevice.isRemote() == true}.
128         * 
129         * @see IConnectionInterface
130         * @see com.digi.xbee.api.XBeeDevice
131         * @see com.digi.xbee.api.models.OperatingMode
132         */
133        public DataReader(IConnectionInterface connectionInterface, OperatingMode mode, AbstractXBeeDevice xbeeDevice) {
134                if (connectionInterface == null)
135                        throw new NullPointerException("Connection interface cannot be null.");
136                if (mode == null)
137                        throw new NullPointerException("Operating mode cannot be null.");
138                if (xbeeDevice.isRemote())
139                        throw new IllegalArgumentException("The given local XBee device is remote.");
140                
141                this.connectionInterface = connectionInterface;
142                this.mode = mode;
143                this.xbeeDevice = xbeeDevice;
144                this.logger = LoggerFactory.getLogger(DataReader.class);
145                parser = new XBeePacketParser();
146                xbeePacketsQueue = new XBeePacketsQueue();
147        }
148        
149        /**
150         * Sets the XBee operating mode of this data reader.
151         * 
152         * @param mode New XBee operating mode.
153         * 
154         * @throws NullPointerException if {@code mode == null}.
155         * 
156         * @see com.digi.xbee.api.models.OperatingMode
157         */
158        public void setXBeeReaderMode(OperatingMode mode) {
159                if (mode == null)
160                        throw new NullPointerException("Operating mode cannot be null.");
161                
162                this.mode = mode;
163        }
164        
165        /**
166         * Adds the given data receive listener to the list of listeners that will 
167         * be notified when XBee data packets are received.
168         * 
169         * <p>If the listener has been already added, this method does nothing.</p>
170         * 
171         * @param listener Listener to be notified when new XBee data packets are 
172         *                 received.
173         * 
174         * @throws NullPointerException if {@code listener == null}.
175         * 
176         * @see #removeDataReceiveListener(IDataReceiveListener)
177         * @see com.digi.xbee.api.listeners.IDataReceiveListener
178         */
179        public void addDataReceiveListener(IDataReceiveListener listener) {
180                if (listener == null)
181                        throw new NullPointerException("Listener cannot be null.");
182                
183                synchronized (dataReceiveListeners) {
184                        if (!dataReceiveListeners.contains(listener))
185                                dataReceiveListeners.add(listener);
186                }
187        }
188        
189        /**
190         * Removes the given data receive listener from the list of data receive 
191         * listeners.
192         * 
193         * <p>If the listener is not included in the list, this method does nothing.
194         * </p>
195         * 
196         * @param listener Data receive listener to be remove from the list.
197         * 
198         * @see #addDataReceiveListener(IDataReceiveListener)
199         * @see com.digi.xbee.api.listeners.IDataReceiveListener
200         */
201        public void removeDataReceiveListener(IDataReceiveListener listener) {
202                synchronized (dataReceiveListeners) {
203                        if (dataReceiveListeners.contains(listener))
204                                dataReceiveListeners.remove(listener);
205                }
206        }
207        
208        /**
209         * Adds the given packet receive listener to the list of listeners that will
210         * be notified when any XBee packet is received.
211         * 
212         * <p>If the listener has been already added, this method does nothing.</p>
213         * 
214         * @param listener Listener to be notified when any XBee packet is received.
215         * 
216         * @throws NullPointerException if {@code listener == null}.
217         * 
218         * @see #addPacketReceiveListener(IPacketReceiveListener, int)
219         * @see #removePacketReceiveListener(IPacketReceiveListener)
220         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
221         */
222        public void addPacketReceiveListener(IPacketReceiveListener listener) {
223                addPacketReceiveListener(listener, ALL_FRAME_IDS);
224        }
225        
226        /**
227         * Adds the given packet receive listener to the list of listeners that will
228         * be notified when an XBee packet with the given frame ID is received.
229         * 
230         * <p>If the listener has been already added, this method does nothing.</p>
231         * 
232         * @param listener Listener to be notified when an XBee packet with the
233         *                 provided frame ID is received.
234         * @param frameID Frame ID for which this listener should be notified and 
235         *                removed after.
236         *                Using {@link #ALL_FRAME_IDS} this listener will be 
237         *                notified always and will be removed only by user request.
238         * 
239         * @throws NullPointerException if {@code listener == null}.
240         * 
241         * @see #addPacketReceiveListener(IPacketReceiveListener)
242         * @see #removePacketReceiveListener(IPacketReceiveListener)
243         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
244         */
245        public void addPacketReceiveListener(IPacketReceiveListener listener, int frameID) {
246                if (listener == null)
247                        throw new NullPointerException("Listener cannot be null.");
248                
249                synchronized (packetReceiveListeners) {
250                        if (!packetReceiveListeners.containsKey(listener))
251                                packetReceiveListeners.put(listener, frameID);
252                }
253        }
254        
255        /**
256         * Removes the given packet receive listener from the list of XBee packet 
257         * receive listeners.
258         * 
259         * <p>If the listener is not included in the list, this method does nothing.
260         * </p>
261         * 
262         * @param listener Packet receive listener to remove from the list.
263         * 
264         * @see #addPacketReceiveListener(IPacketReceiveListener)
265         * @see #addPacketReceiveListener(IPacketReceiveListener, int)
266         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
267         */
268        public void removePacketReceiveListener(IPacketReceiveListener listener) {
269                synchronized (packetReceiveListeners) {
270                        if (packetReceiveListeners.containsKey(listener))
271                                packetReceiveListeners.remove(listener);
272                }
273        }
274        
275        /**
276         * Adds the given IO sample receive listener to the list of listeners that 
277         * will be notified when an IO sample packet is received.
278         * 
279         * <p>If the listener has been already added, this method does nothing.</p>
280         * 
281         * @param listener Listener to be notified when new IO sample packets are 
282         *                 received.
283         * 
284         * @throws NullPointerException if {@code listener == null}.
285         * 
286         * @see #removeIOSampleReceiveListener(IIOSampleReceiveListener)
287         * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener
288         */
289        public void addIOSampleReceiveListener(IIOSampleReceiveListener listener) {
290                if (listener == null)
291                        throw new NullPointerException("Listener cannot be null.");
292                
293                synchronized (ioSampleReceiveListeners) {
294                        if (!ioSampleReceiveListeners.contains(listener))
295                                ioSampleReceiveListeners.add(listener);
296                }
297        }
298        
299        /**
300         * Removes the given IO sample receive listener from the list of IO sample 
301         * receive listeners.
302         * 
303         * <p>If the listener is not included in the list, this method does nothing.
304         * </p>
305         * 
306         * @param listener IO sample receive listener to remove from the list.
307         * 
308         * @see #addIOSampleReceiveListener(IIOSampleReceiveListener)
309         * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener
310         */
311        public void removeIOSampleReceiveListener(IIOSampleReceiveListener listener) {
312                synchronized (ioSampleReceiveListeners) {
313                        if (ioSampleReceiveListeners.contains(listener))
314                                ioSampleReceiveListeners.remove(listener);
315                }
316        }
317        
318        /**
319         * Adds the given Modem Status receive listener to the list of listeners 
320         * that will be notified when a modem status packet is received.
321         * 
322         * <p>If the listener has been already added, this method does nothing.</p>
323         * 
324         * @param listener Listener to be notified when new modem status packets are
325         *                 received.
326         * 
327         * @throws NullPointerException if {@code listener == null}.
328         * 
329         * @see #removeModemStatusReceiveListener(IModemStatusReceiveListener)
330         * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener
331         */
332        public void addModemStatusReceiveListener(IModemStatusReceiveListener listener) {
333                if (listener == null)
334                        throw new NullPointerException("Listener cannot be null.");
335                
336                synchronized (modemStatusListeners) {
337                        if (!modemStatusListeners.contains(listener))
338                                modemStatusListeners.add(listener);
339                }
340        }
341        
342        /**
343         * Removes the given Modem Status receive listener from the list of Modem 
344         * Status receive listeners.
345         * 
346         * <p>If the listener is not included in the list, this method does nothing.
347         * </p>
348         * 
349         * @param listener Modem Status receive listener to remove from the list.
350         * 
351         * @see #addModemStatusReceiveListener(IModemStatusReceiveListener)
352         * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener
353         */
354        public void removeModemStatusReceiveListener(IModemStatusReceiveListener listener) {
355                synchronized (modemStatusListeners) {
356                        if (modemStatusListeners.contains(listener))
357                                modemStatusListeners.remove(listener);
358                }
359        }
360        
361        /**
362         * Adds the given explicit data receive listener to the list of listeners 
363         * that will be notified when an explicit data packet is received.
364         * 
365         * <p>If the listener has been already added, this method does nothing.</p>
366         * 
367         * @param listener Listener to be notified when new explicit data packets 
368         *                 are received.
369         * 
370         * @throws NullPointerException if {@code listener == null}.
371         * 
372         * @see #removeExplicitDataReceiveListener(IExplicitDataReceiveListener)
373         * @see com.digi.xbee.api.listeners.IExplicitDataReceiveListener
374         */
375        public void addExplicitDataReceiveListener(IExplicitDataReceiveListener listener) {
376                if (listener == null)
377                        throw new NullPointerException("Listener cannot be null.");
378                
379                synchronized (explicitDataReceiveListeners) {
380                        if (!explicitDataReceiveListeners.contains(listener))
381                                explicitDataReceiveListeners.add(listener);
382                }
383        }
384        
385        /**
386         * Removes the given explicit data receive listener from the list of 
387         * explicit data receive listeners.
388         * 
389         * <p>If the listener is not included in the list, this method does nothing.
390         * </p>
391         * 
392         * @param listener Explicit data receive listener to remove from the list.
393         * 
394         * @see #addExplicitDataReceiveListener(IExplicitDataReceiveListener)
395         * @see com.digi.xbee.api.listeners.IExplicitDataReceiveListener
396         */
397        public void removeExplicitDataReceiveListener(IExplicitDataReceiveListener listener) {
398                synchronized (explicitDataReceiveListeners) {
399                        if (explicitDataReceiveListeners.contains(listener))
400                                explicitDataReceiveListeners.remove(listener);
401                }
402        }
403        
404        /**
405         * Adds the given IP data receive listener to the list of listeners 
406         * that will be notified when a IP data packet is received.
407         * 
408         * <p>If the listener has been already added, this method does nothing.</p>
409         * 
410         * @param listener Listener to be notified when new IP data packets 
411         *                 are received.
412         * 
413         * @throws NullPointerException if {@code listener == null}.
414         * 
415         * @see #removeIPDataReceiveListener(IIPDataReceiveListener)
416         * @see com.digi.xbee.api.listeners.IIPDataReceiveListener
417         * 
418         * @since 1.2.0
419         */
420        public void addIPDataReceiveListener(IIPDataReceiveListener listener) {
421                if (listener == null)
422                        throw new NullPointerException("Listener cannot be null.");
423                
424                synchronized (ipDataReceiveListeners) {
425                        if (!ipDataReceiveListeners.contains(listener))
426                                ipDataReceiveListeners.add(listener);
427                }
428        }
429        
430        /**
431         * Removes the given IP data receive listener from the list of 
432         * IP data receive listeners.
433         * 
434         * <p>If the listener is not included in the list, this method does nothing.
435         * </p>
436         * 
437         * @param listener IP data receive listener to remove from the list.
438         * 
439         * @see #addIPDataReceiveListener(IIPDataReceiveListener)
440         * @see com.digi.xbee.api.listeners.IIPDataReceiveListener
441         * 
442         * @since 1.2.0
443         */
444        public void removeIPDataReceiveListener(IIPDataReceiveListener listener) {
445                synchronized (ipDataReceiveListeners) {
446                        if (ipDataReceiveListeners.contains(listener))
447                                ipDataReceiveListeners.remove(listener);
448                }
449        }
450        
451        /**
452         * Adds the given SMS receive listener to the list of listeners that will 
453         * be notified when an SMS packet is received.
454         * 
455         * <p>If the listener has been already added, this method does nothing.</p>
456         * 
457         * @param listener Listener to be notified when new SMS packet is received.
458         * 
459         * @throws NullPointerException if {@code listener == null}.
460         * 
461         * @see #removeSMSReceiveListener(ISMSReceiveListener)
462         * @see com.digi.xbee.api.listeners.ISMSReceiveListener
463         * 
464         * @since 1.2.0
465         */
466        public void addSMSReceiveListener(ISMSReceiveListener listener) {
467                if (listener == null)
468                        throw new NullPointerException("Listener cannot be null.");
469                
470                synchronized (smsReceiveListeners) {
471                        if (!smsReceiveListeners.contains(listener))
472                                smsReceiveListeners.add(listener);
473                }
474        }
475        
476        /**
477         * Removes the given SMS receive listener from the list of SMS receive 
478         * listeners.
479         * 
480         * <p>If the listener is not included in the list, this method does nothing.
481         * </p>
482         * 
483         * @param listener SMS receive listener to remove from the list.
484         * 
485         * @see #addSMSReceiveListener(ISMSReceiveListener)
486         * @see com.digi.xbee.api.listeners.ISMSReceiveListener
487         * 
488         * @since 1.2.0
489         */
490        public void removeSMSReceiveListener(ISMSReceiveListener listener) {
491                synchronized (smsReceiveListeners) {
492                        if (smsReceiveListeners.contains(listener))
493                                smsReceiveListeners.remove(listener);
494                }
495        }
496
497        /**
498         * Adds the given User Data Relay receive listener to the list of listeners
499         * that will be notified when a User Data Relay packet is received.
500         *
501         * <p>If the listener has been already added, this method does nothing.</p>
502         *
503         * @param listener Listener to be notified when new User Data Relay packet
504         *                 is received.
505         *
506         * @throws NullPointerException if {@code listener == null}.
507         *
508         * @see #removeUserDataRelayReceiveListener(IUserDataRelayReceiveListener)
509         * @see IUserDataRelayReceiveListener
510         *
511         * @since 1.3.0
512         */
513        public void addUserDataRelayReceiveListener(IUserDataRelayReceiveListener listener) {
514                if (listener == null)
515                        throw new NullPointerException("Listener cannot be null");
516
517                synchronized (dataRelayReceiveListeners) {
518                        if (!dataRelayReceiveListeners.contains(listener))
519                                dataRelayReceiveListeners.add(listener);
520                }
521        }
522
523        /**
524         * Removes the given User Data Relay receive listener from the list of User
525         * Data Relay receive listeners.
526         *
527         * <p>If the listener is not included in the list, this method does nothing.
528         * </p>
529         *
530         * @param listener User Data Relay receive listener to remove from the list.
531         *
532         * @see #addUserDataRelayReceiveListener(IUserDataRelayReceiveListener)
533         * @see IUserDataRelayReceiveListener
534         *
535         * @since 1.3.0
536         */
537        public void removeUserDataRelayReceiveListener(IUserDataRelayReceiveListener listener) {
538                synchronized (dataRelayReceiveListeners) {
539                        if (dataRelayReceiveListeners.contains(listener))
540                                dataRelayReceiveListeners.remove(listener);
541                }
542        }
543
544        /**
545         * Adds the given data receive listener to the list of listeners that will
546         * be notified when new data from the Bluetooth interface is received in
547         * a User Data Relay frame.
548         *
549         * <p>If the listener has been already added, this method does nothing.</p>
550         *
551         * @param listener Listener to be notified when new data from the Bluetooth
552         *                 interface is received.
553         *
554         * @throws NullPointerException if {@code listener == null}.
555         *
556         * @see #removeBluetoothDataReceiveListener(IBluetoothDataReceiveListener)
557         * @see IBluetoothDataReceiveListener
558         *
559         * @since 1.3.0
560         */
561        public void addBluetoothDataReceiveListener(IBluetoothDataReceiveListener listener) {
562                if (listener == null)
563                        throw new NullPointerException("Listener cannot be null");
564
565                synchronized (bluetoothDataReceiveListeners) {
566                        if (!bluetoothDataReceiveListeners.contains(listener))
567                                bluetoothDataReceiveListeners.add(listener);
568                }
569        }
570
571        /**
572         * Removes the given data receive listener from the list of data receive
573         * listeners for the Bluetooth interface.
574         *
575         * <p>If the listener is not included in the list, this method does nothing.
576         * </p>
577         *
578         * @param listener Data receive listener to remove from the list.
579         *
580         * @see #addBluetoothDataReceiveListener(IBluetoothDataReceiveListener)
581         * @see IBluetoothDataReceiveListener
582         *
583         * @since 1.3.0
584         */
585        public void removeBluetoothDataReceiveListener(IBluetoothDataReceiveListener listener) {
586                synchronized (bluetoothDataReceiveListeners) {
587                        if (bluetoothDataReceiveListeners.contains(listener))
588                                bluetoothDataReceiveListeners.remove(listener);
589                }
590        }
591
592        /**
593         * Adds the given data receive listener to the list of listeners that will
594         * be notified when new data from the MicroPython interface is received in
595         * a User Data Relay frame.
596         *
597         * <p>If the listener has been already added, this method does nothing.</p>
598         *
599         * @param listener Listener to be notified when new data from the
600         *                 MicroPython interface is received.
601         *
602         * @throws NullPointerException if {@code listener == null}.
603         *
604         * @see #removeMicroPythonDataReceiveListener(IMicroPythonDataReceiveListener)
605         * @see IMicroPythonDataReceiveListener
606         *
607         * @since 1.3.0
608         */
609        public void addMicroPythonDataReceiveListener(IMicroPythonDataReceiveListener listener) {
610                if (listener == null)
611                        throw new NullPointerException("Listener cannot be null");
612
613                synchronized (microPythonDataReceiveListeners) {
614                        if (!microPythonDataReceiveListeners.contains(listener))
615                                microPythonDataReceiveListeners.add(listener);
616                }
617        }
618
619        /**
620         * Removes the given data receive listener from the list of data receive
621         * listeners for the MicroPython interface.
622         *
623         * <p>If the listener is not included in the list, this method does nothing.
624         * </p>
625         *
626         * @param listener Data receive listener to remove from the list.
627         *
628         * @see #addMicroPythonDataReceiveListener(IMicroPythonDataReceiveListener)
629         * @see IMicroPythonDataReceiveListener
630         *
631         * @since 1.3.0
632         */
633        public void removeMicroPythonDataReceiveListener(IMicroPythonDataReceiveListener listener) {
634                synchronized (microPythonDataReceiveListeners) {
635                        if (microPythonDataReceiveListeners.contains(listener))
636                                microPythonDataReceiveListeners.remove(listener);
637                }
638        }
639
640        /**
641         * Adds the given data receive listener to the list of listeners that will
642         * be notified when new data from the serial interface is received in
643         * a User Data Relay frame.
644         *
645         * <p>If the listener has been already added, this method does nothing.</p>
646         *
647         * @param listener Listener to be notified when new data from the serial
648         *                 interface is received.
649         *
650         * @throws NullPointerException if {@code listener == null}.
651         *
652         * @see #removeSerialDataReceiveListener(ISerialDataReceiveListener)
653         * @see ISerialDataReceiveListener
654         *
655         * @since 1.3.0
656         */
657        public void addSerialDataReceiveListener(ISerialDataReceiveListener listener) {
658                if (listener == null)
659                        throw new NullPointerException("Listener cannot be null");
660
661                synchronized (serialDataReceiveListeners) {
662                        if (!serialDataReceiveListeners.contains(listener))
663                                serialDataReceiveListeners.add(listener);
664                }
665        }
666
667        /**
668         * Removes the given data receive listener from the list of data receive
669         * listeners for the serial interface.
670         *
671         * <p>If the listener is not included in the list, this method does nothing.
672         * </p>
673         *
674         * @param listener Data receive listener to remove from the list.
675         *
676         * @see #addSerialDataReceiveListener(ISerialDataReceiveListener)
677         * @see ISerialDataReceiveListener
678         *
679         * @since 1.3.0
680         */
681        public void removeSerialDataReceiveListener(ISerialDataReceiveListener listener) {
682                synchronized (serialDataReceiveListeners) {
683                        if (serialDataReceiveListeners.contains(listener))
684                                serialDataReceiveListeners.remove(listener);
685                }
686        }
687
688        /*
689         * (non-Javadoc)
690         * @see java.lang.Thread#run()
691         */
692        @Override
693        public void run() {
694                logger.debug(connectionInterface.toString() + "Data reader started.");
695                running = true;
696                // Clear the list of read packets.
697                xbeePacketsQueue.clearQueue();
698                try {
699                        synchronized (connectionInterface) {
700                                connectionInterface.wait();
701                        }
702                        while (running) {
703                                if (!running)
704                                        break;
705                                if (connectionInterface.getInputStream() != null) {
706                                        switch (mode) {
707                                        case AT:
708                                                break;
709                                        case API:
710                                        case API_ESCAPE:
711                                                int headerByte = connectionInterface.getInputStream().read();
712                                                // If it is packet header parse the packet, if not discard this byte and continue.
713                                                if (headerByte == SpecialByte.HEADER_BYTE.getValue()) {
714                                                        try {
715                                                                XBeePacket packet = parser.parsePacket(connectionInterface.getInputStream(), mode);
716                                                                packetReceived(packet);
717                                                        } catch (Exception e) {
718                                                                logger.error("Error parsing the API packet.", e);
719                                                        }
720                                                }
721                                                break;
722                                        default:
723                                                break;
724                                        }
725                                } else if (connectionInterface.getInputStream() == null)
726                                        break;
727                                if (connectionInterface.getInputStream() == null)
728                                        break;
729                                else if (connectionInterface.getInputStream().available() > 0)
730                                        continue;
731                                synchronized (connectionInterface) {
732                                        connectionInterface.wait();
733                                }
734                        }
735                } catch (IOException e) {
736                        logger.error("Error reading from input stream.", e);
737                } catch (InterruptedException e) {
738                        logger.error(e.getMessage(), e);
739                } catch (IllegalStateException e) {
740                        logger.error(e.getMessage(), e);
741                } finally {
742                        if (running) {
743                                running = false;
744                                if (connectionInterface.isOpen())
745                                        connectionInterface.close();
746                        }
747                }
748        }
749        
750        /**
751         * Dispatches the received XBee packet to the corresponding listener(s).
752         * 
753         * @param packet The received XBee packet to be dispatched to the 
754         *               corresponding listeners.
755         * 
756         * @see com.digi.xbee.api.packet.XBeeAPIPacket
757         * @see com.digi.xbee.api.packet.XBeePacket
758         */
759        private void packetReceived(XBeePacket packet) {
760                // Add the packet to the packets queue.
761                xbeePacketsQueue.addPacket(packet);
762                // Notify that a packet has been received to the corresponding listeners.
763                notifyPacketReceived(packet);
764                
765                // Check if the packet is an API packet.
766                if (!(packet instanceof XBeeAPIPacket))
767                        return;
768                
769                // Get the API packet type.
770                XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet;
771                APIFrameType apiType = apiPacket.getFrameType();
772                if (apiType == null)
773                        return;
774                
775                try {
776                        // Obtain the remote device from the packet.
777                        RemoteXBeeDevice remoteDevice = getRemoteXBeeDeviceFromPacket(apiPacket);
778                        byte[] data = null;
779                        
780                        switch(apiType) {
781                        case RECEIVE_PACKET:
782                                ReceivePacket receivePacket = (ReceivePacket)apiPacket;
783                                data = receivePacket.getRFData();
784                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
785                                break;
786                        case RX_64:
787                                RX64Packet rx64Packet = (RX64Packet)apiPacket;
788                                data = rx64Packet.getRFData();
789                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
790                                break;
791                        case RX_16:
792                                RX16Packet rx16Packet = (RX16Packet)apiPacket;
793                                data = rx16Packet.getRFData();
794                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
795                                break;
796                        case IO_DATA_SAMPLE_RX_INDICATOR:
797                                IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket;
798                                notifyIOSampleReceived(remoteDevice, ioSamplePacket.getIOSample());
799                                break;
800                        case RX_IO_64:
801                                RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket;
802                                if (rx64IOPacket.getIOSamples() != null && rx64IOPacket.getIOSamples().size() > 0) {
803                                        for (IOSample sample : rx64IOPacket.getIOSamples())
804                                                notifyIOSampleReceived(remoteDevice, sample);
805                                }
806                                break;
807                        case RX_IO_16:
808                                RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket;
809                                if (rx16IOPacket.getIOSamples() != null && rx16IOPacket.getIOSamples().size() > 0) {
810                                        for (IOSample sample : rx16IOPacket.getIOSamples())
811                                                notifyIOSampleReceived(remoteDevice, sample);
812                                }
813                                break;
814                        case IPV6_IO_DATA_SAMPLE_RX_INDICATOR:
815                                IPv6IODataSampleRxIndicator ioSampleIPv6Packet = (IPv6IODataSampleRxIndicator)apiPacket;
816                                notifyIOSampleReceived(remoteDevice, ioSampleIPv6Packet.getIOSample());
817                                break;
818                        case MODEM_STATUS:
819                                ModemStatusPacket modemStatusPacket = (ModemStatusPacket)apiPacket;
820                                notifyModemStatusReceived(modemStatusPacket.getStatus());
821                                break;
822                        case EXPLICIT_RX_INDICATOR:
823                                ExplicitRxIndicatorPacket explicitDataPacket = (ExplicitRxIndicatorPacket)apiPacket;
824                                int sourceEndpoint = explicitDataPacket.getSourceEndpoint();
825                                int destEndpoint = explicitDataPacket.getDestinationEndpoint();
826                                int clusterID = explicitDataPacket.getClusterID();
827                                int profileID = explicitDataPacket.getProfileID();
828                                data = explicitDataPacket.getRFData();
829                                // If this is an explicit packet for data transmissions in the Digi profile, 
830                                // notify also the data listener and add a Receive packet to the queue.
831                                if (sourceEndpoint == ExplicitRxIndicatorPacket.DATA_ENDPOINT && 
832                                                destEndpoint == ExplicitRxIndicatorPacket.DATA_ENDPOINT &&
833                                                clusterID == ExplicitRxIndicatorPacket.DATA_CLUSTER && 
834                                                profileID == ExplicitRxIndicatorPacket.DIGI_PROFILE) {
835                                        notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
836                                        xbeePacketsQueue.addPacket(new ReceivePacket(explicitDataPacket.get64BitSourceAddress(), 
837                                                        explicitDataPacket.get16BitSourceAddress(), 
838                                                        explicitDataPacket.getReceiveOptions(), 
839                                                        explicitDataPacket.getRFData()));
840                                }
841                                notifyExplicitDataReceived(new ExplicitXBeeMessage(remoteDevice, sourceEndpoint, destEndpoint, clusterID, profileID, data, explicitDataPacket.isBroadcast()));
842                                break;
843                        case RX_IPV4:
844                                RXIPv4Packet rxIPv4Packet = (RXIPv4Packet)apiPacket;
845                                notifyIPDataReceived(new IPMessage(
846                                                rxIPv4Packet.getSourceAddress(), 
847                                                rxIPv4Packet.getSourcePort(), 
848                                                rxIPv4Packet.getDestPort(),
849                                                rxIPv4Packet.getProtocol(),
850                                                rxIPv4Packet.getData()));
851                                break;
852                        case RX_IPV6:
853                                RXIPv6Packet rxIPv6Packet = (RXIPv6Packet)apiPacket;
854                                notifyIPDataReceived(new IPMessage(
855                                                rxIPv6Packet.getSourceAddress(), 
856                                                rxIPv6Packet.getSourcePort(), 
857                                                rxIPv6Packet.getDestPort(),
858                                                rxIPv6Packet.getProtocol(),
859                                                rxIPv6Packet.getData()));
860                                break;
861                        case RX_SMS:
862                                RXSMSPacket rxSMSPacket = (RXSMSPacket)apiPacket;
863                                notifySMSReceived(new SMSMessage(rxSMSPacket.getPhoneNumber(), rxSMSPacket.getData()));
864                                break;
865                        case USER_DATA_RELAY_OUTPUT:
866                                UserDataRelayOutputPacket relayPacket = (UserDataRelayOutputPacket)apiPacket;
867                                notifyUserDataRelayReceived(new UserDataRelayMessage(relayPacket.getSourceInterface(), relayPacket.getData()));
868                                break;
869                        default:
870                                break;
871                        }
872                        
873                } catch (XBeeException e) {
874                        logger.error(e.getMessage(), e);
875                }
876        }
877        
878        /**
879         * Returns the remote XBee device from where the given package was sent 
880         * from.
881         * 
882         * <p><b>This is for internal use only.</b></p>
883         * 
884         * <p>If the package does not contain information about the source, this 
885         * method returns {@code null} (for example, {@code ModemStatusPacket}).</p>
886         * 
887         * <p>First the device that sent the provided package is looked in the 
888         * network of the local XBee device. If the remote device is not in the 
889         * network, it is automatically added only if the packet contains 
890         * information about the origin of the package.</p>
891         * 
892         * @param packet The packet sent from the remote device.
893         * 
894         * @return The remote XBee device that sends the given packet. It may be 
895         *         {@code null} if the packet is not a known frame (see 
896         *         {@link APIFrameType}) or if it does not contain information of 
897         *         the source device.
898         * 
899         * @throws NullPointerException if {@code packet == null}
900         * @throws XBeeException if any error occur while adding the device to the 
901         *                       network.
902         */
903        public RemoteXBeeDevice getRemoteXBeeDeviceFromPacket(XBeeAPIPacket packet) throws XBeeException {
904                if (packet == null)
905                        throw new NullPointerException("XBee API packet cannot be null.");
906                        
907                XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet;
908                APIFrameType apiType = apiPacket.getFrameType();
909                if (apiType == null || apiType == APIFrameType.UNKNOWN)
910                        return null;
911                
912                RemoteXBeeDevice remoteDevice = null;
913                XBee64BitAddress addr64 = null;
914                XBee16BitAddress addr16 = null;
915                Inet6Address addrIPv6 = null;
916                
917                XBeeNetwork network = xbeeDevice.getNetwork();
918                // There are protocols that do not support the network feature.
919                if (network == null && xbeeDevice.getXBeeProtocol() != XBeeProtocol.THREAD)
920                        return null;
921                
922                switch(apiType) {
923                case RECEIVE_PACKET:
924                        ReceivePacket receivePacket = (ReceivePacket)apiPacket;
925                        addr64 = receivePacket.get64bitSourceAddress();
926                        addr16 = receivePacket.get16bitSourceAddress();
927                        if (!addr64.equals(XBee64BitAddress.UNKNOWN_ADDRESS))
928                                remoteDevice = network.getDevice(addr64);
929                        else if (!addr16.equals(XBee16BitAddress.UNKNOWN_ADDRESS))
930                                remoteDevice = network.getDevice(addr16);
931                        break;
932                case RX_64:
933                        RX64Packet rx64Packet = (RX64Packet)apiPacket;
934                        addr64 = rx64Packet.get64bitSourceAddress();
935                        remoteDevice = network.getDevice(addr64);
936                        break;
937                case RX_16:
938                        RX16Packet rx16Packet = (RX16Packet)apiPacket;
939                        addr64 = XBee64BitAddress.UNKNOWN_ADDRESS;
940                        addr16 = rx16Packet.get16bitSourceAddress();
941                        remoteDevice = network.getDevice(addr16);
942                        break;
943                case RX_IPV6:
944                        RXIPv6Packet rxIPv6Packet = (RXIPv6Packet)apiPacket;
945                        addrIPv6 = rxIPv6Packet.getSourceAddress();
946                        if (xbeeDevice.getXBeeProtocol() == XBeeProtocol.THREAD)
947                                remoteDevice = new RemoteThreadDevice(xbeeDevice, addrIPv6);
948                        else
949                                remoteDevice = new RemoteXBeeDevice(xbeeDevice, addrIPv6);
950                        break;
951                case IO_DATA_SAMPLE_RX_INDICATOR:
952                        IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket;
953                        addr64 = ioSamplePacket.get64bitSourceAddress();
954                        addr16 = ioSamplePacket.get16bitSourceAddress();
955                        remoteDevice = network.getDevice(addr64);
956                        break;
957                case IPV6_IO_DATA_SAMPLE_RX_INDICATOR:
958                        IPv6IODataSampleRxIndicator ioSampleIPv6Packet = (IPv6IODataSampleRxIndicator)apiPacket;
959                        addrIPv6 = ioSampleIPv6Packet.getSourceAddress();
960                        if (xbeeDevice.getXBeeProtocol() == XBeeProtocol.THREAD)
961                                remoteDevice = new RemoteThreadDevice(xbeeDevice, addrIPv6);
962                        else
963                                remoteDevice = new RemoteXBeeDevice(xbeeDevice, addrIPv6);
964                        break;
965                case RX_IO_64:
966                        RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket;
967                        addr64 = rx64IOPacket.get64bitSourceAddress();
968                        remoteDevice = network.getDevice(addr64);
969                        break;
970                case RX_IO_16:
971                        RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket;
972                        addr64 = XBee64BitAddress.UNKNOWN_ADDRESS;
973                        addr16 = rx16IOPacket.get16bitSourceAddress();
974                        remoteDevice = network.getDevice(addr16);
975                        break;
976                case EXPLICIT_RX_INDICATOR:
977                        ExplicitRxIndicatorPacket explicitDataPacket = (ExplicitRxIndicatorPacket)apiPacket;
978                        addr64 = explicitDataPacket.get64BitSourceAddress();
979                        addr16 = explicitDataPacket.get16BitSourceAddress();
980                        remoteDevice = network.getDevice(addr64);
981                        break;
982                default:
983                        // Rest of the types are considered not to contain information 
984                        // about the origin of the packet.
985                        return remoteDevice;
986                }
987                
988                // If the origin is not in the network, add it.
989                if (remoteDevice == null) {
990                        remoteDevice = createRemoteXBeeDevice(addr64, addr16, null);
991                        if (!addr64.equals(XBee64BitAddress.UNKNOWN_ADDRESS) || !addr16.equals(XBee16BitAddress.UNKNOWN_ADDRESS))
992                                network.addRemoteDevice(remoteDevice);
993                }
994                
995                return remoteDevice;
996        }
997        
998        /**
999         * Creates a new remote XBee device with the provided 64-bit address, 
1000         * 16-bit address, node identifier and the XBee device that is using this 
1001         * data reader as the connection interface for the remote device.
1002         * 
1003         * The new XBee device will be a {@code RemoteDigiMeshDevice}, 
1004         * a {@code RemoteDigiPointDevice}, a {@code RemoteRaw802Device} or a 
1005         * {@code RemoteZigBeeDevice} depending on the protocol of the local XBee 
1006         * device. If the protocol cannot be determined or is unknown a 
1007         * {@code RemoteXBeeDevice} will be created instead.
1008         * 
1009         * @param addr64 The 64-bit address of the new remote device. It cannot be 
1010         *               {@code null}.
1011         * @param addr16 The 16-bit address of the new remote device. It may be 
1012         *               {@code null}.
1013         * @param ni The node identifier of the new remote device. It may be 
1014         *           {@code null}.
1015         * 
1016         * @return a new remote XBee device with the given parameters.
1017         */
1018        private RemoteXBeeDevice createRemoteXBeeDevice(XBee64BitAddress addr64, 
1019                        XBee16BitAddress addr16, String ni) {
1020                RemoteXBeeDevice device = null;
1021                
1022                switch (xbeeDevice.getXBeeProtocol()) {
1023                case ZIGBEE:
1024                        device = new RemoteZigBeeDevice(xbeeDevice, addr64, addr16, ni);
1025                        break;
1026                case DIGI_MESH:
1027                        device = new RemoteDigiMeshDevice(xbeeDevice, addr64, ni);
1028                        break;
1029                case DIGI_POINT:
1030                        device = new RemoteDigiPointDevice(xbeeDevice, addr64, ni);
1031                        break;
1032                case RAW_802_15_4:
1033                        device = new RemoteRaw802Device(xbeeDevice, addr64, addr16, ni);
1034                        break;
1035                default:
1036                        device = new RemoteXBeeDevice(xbeeDevice, addr64, addr16, ni);
1037                        break;
1038                }
1039                
1040                return device;
1041        }
1042        
1043        /**
1044         * Notifies subscribed data receive listeners that a new XBee data packet 
1045         * has been received in form of an {@code XBeeMessage}.
1046         *
1047         * @param xbeeMessage The XBee message to be sent to subscribed XBee data
1048         *                    listeners.
1049         * 
1050         * @see com.digi.xbee.api.models.XBeeMessage
1051         */
1052        private void notifyDataReceived(final XBeeMessage xbeeMessage) {
1053                if (xbeeMessage.isBroadcast())
1054                        logger.info(connectionInterface.toString() + 
1055                                        "Broadcast data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData()));
1056                else
1057                        logger.info(connectionInterface.toString() + 
1058                                        "Data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData()));
1059                
1060                try {
1061                        synchronized (dataReceiveListeners) {
1062                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1063                                                dataReceiveListeners.size()));
1064                                for (final IDataReceiveListener listener:dataReceiveListeners) {
1065                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1066                                        if (!running)
1067                                                break;
1068                                        executor.execute(new Runnable() {
1069                                                /*
1070                                                 * (non-Javadoc)
1071                                                 * @see java.lang.Runnable#run()
1072                                                 */
1073                                                @Override
1074                                                public void run() {
1075                                                        /* Synchronize the listener so it is not called 
1076                                                         twice. That is, let the listener to finish its job. */
1077                                                        synchronized (listener) {
1078                                                                listener.dataReceived(xbeeMessage);
1079                                                        }
1080                                                }
1081                                        });
1082                                }
1083                                executor.shutdown();
1084                        }
1085                } catch (Exception e) {
1086                        logger.error(e.getMessage(), e);
1087                }
1088        }
1089        
1090        /**
1091         * Notifies subscribed XBee packet listeners that a new XBee packet has 
1092         * been received.
1093         *
1094         * @param packet The received XBee packet.
1095         * 
1096         * @see com.digi.xbee.api.packet.XBeeAPIPacket
1097         * @see com.digi.xbee.api.packet.XBeePacket
1098         */
1099        private void notifyPacketReceived(final XBeePacket packet) {
1100                logger.debug(connectionInterface.toString() + "Packet received: \n{}", packet.toPrettyString());
1101                
1102                try {
1103                        synchronized (packetReceiveListeners) {
1104                                final ArrayList<IPacketReceiveListener> removeListeners = new ArrayList<IPacketReceiveListener>();
1105                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1106                                                packetReceiveListeners.size()));
1107                                for (final IPacketReceiveListener listener:packetReceiveListeners.keySet()) {
1108                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1109                                        if (!running)
1110                                                break;
1111                                        executor.execute(new Runnable() {
1112                                                /*
1113                                                 * (non-Javadoc)
1114                                                 * @see java.lang.Runnable#run()
1115                                                 */
1116                                                @Override
1117                                                public void run() {
1118                                                        // Synchronize the listener so it is not called 
1119                                                        // twice. That is, let the listener to finish its job.
1120                                                        synchronized (packetReceiveListeners) {
1121                                                                synchronized (listener) {
1122                                                                        if (packetReceiveListeners.get(listener) == ALL_FRAME_IDS)
1123                                                                                listener.packetReceived(packet);
1124                                                                        else if (((XBeeAPIPacket)packet).needsAPIFrameID() && 
1125                                                                                        ((XBeeAPIPacket)packet).getFrameID() == packetReceiveListeners.get(listener)) {
1126                                                                                listener.packetReceived(packet);
1127                                                                                removeListeners.add(listener);
1128                                                                        }
1129                                                                }
1130                                                        }
1131                                                }
1132                                        });
1133                                }
1134                                executor.shutdown();
1135                                // Remove required listeners.
1136                                for (IPacketReceiveListener listener:removeListeners)
1137                                        packetReceiveListeners.remove(listener);
1138                        }
1139                } catch (Exception e) {
1140                        logger.error(e.getMessage(), e);
1141                }
1142        }
1143        
1144        /**
1145         * Notifies subscribed IO sample listeners that a new IO sample packet has
1146         * been received.
1147         *
1148         * @param ioSample The received IO sample.
1149         * @param remoteDevice The remote XBee device that sent the sample.
1150         * 
1151         * @see com.digi.xbee.api.RemoteXBeeDevice
1152         * @see com.digi.xbee.api.io.IOSample
1153         */
1154        private void notifyIOSampleReceived(final RemoteXBeeDevice remoteDevice, final IOSample ioSample) {
1155                logger.debug(connectionInterface.toString() + "IO sample received.");
1156                
1157                try {
1158                        synchronized (ioSampleReceiveListeners) {
1159                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1160                                                ioSampleReceiveListeners.size()));
1161                                for (final IIOSampleReceiveListener listener:ioSampleReceiveListeners) {
1162                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1163                                        if (!running)
1164                                                break;
1165                                        executor.execute(new Runnable() {
1166                                                /*
1167                                                 * (non-Javadoc)
1168                                                 * @see java.lang.Runnable#run()
1169                                                 */
1170                                                @Override
1171                                                public void run() {
1172                                                        // Synchronize the listener so it is not called 
1173                                                        // twice. That is, let the listener to finish its job.
1174                                                        synchronized (listener) {
1175                                                                listener.ioSampleReceived(remoteDevice, ioSample);
1176                                                        }
1177                                                }
1178                                        });
1179                                }
1180                                executor.shutdown();
1181                        }
1182                } catch (Exception e) {
1183                        logger.error(e.getMessage(), e);
1184                }
1185        }
1186        
1187        /**
1188         * Notifies subscribed Modem Status listeners that a Modem Status event 
1189         * packet has been received.
1190         *
1191         * @param modemStatusEvent The Modem Status event.
1192         * 
1193         * @see com.digi.xbee.api.models.ModemStatusEvent
1194         */
1195        private void notifyModemStatusReceived(final ModemStatusEvent modemStatusEvent) {
1196                logger.debug(connectionInterface.toString() + "Modem Status event received.");
1197                
1198                try {
1199                        synchronized (modemStatusListeners) {
1200                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1201                                                modemStatusListeners.size()));
1202                                for (final IModemStatusReceiveListener listener:modemStatusListeners) {
1203                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1204                                        if (!running)
1205                                                break;
1206                                        executor.execute(new Runnable() {
1207                                                /*
1208                                                 * (non-Javadoc)
1209                                                 * @see java.lang.Runnable#run()
1210                                                 */
1211                                                @Override
1212                                                public void run() {
1213                                                        // Synchronize the listener so it is not called 
1214                                                        // twice. That is, let the listener to finish its job.
1215                                                        synchronized (listener) {
1216                                                                listener.modemStatusEventReceived(modemStatusEvent);
1217                                                        }
1218                                                }
1219                                        });
1220                                }
1221                                executor.shutdown();
1222                        }
1223                } catch (Exception e) {
1224                        logger.error(e.getMessage(), e);
1225                }
1226        }
1227        
1228        /**
1229         * Notifies subscribed explicit data receive listeners that a new XBee 
1230         * explicit data packet has been received in form of an 
1231         * {@code ExplicitXBeeMessage}.
1232         *
1233         * @param explicitXBeeMessage The XBee message to be sent to subscribed 
1234         *                            XBee data listeners.
1235         * 
1236         * @see com.digi.xbee.api.models.ExplicitXBeeMessage
1237         */
1238        private void notifyExplicitDataReceived(final ExplicitXBeeMessage explicitXBeeMessage) {
1239                if (explicitXBeeMessage.isBroadcast())
1240                        logger.info(connectionInterface.toString() + 
1241                                        "Broadcast explicit data received from {} >> {}.", explicitXBeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(explicitXBeeMessage.getData()));
1242                else
1243                        logger.info(connectionInterface.toString() + 
1244                                        "Explicit data received from {} >> {}.", explicitXBeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(explicitXBeeMessage.getData()));
1245                
1246                try {
1247                        synchronized (explicitDataReceiveListeners) {
1248                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1249                                                explicitDataReceiveListeners.size()));
1250                                for (final IExplicitDataReceiveListener listener:explicitDataReceiveListeners) {
1251                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1252                                        if (!running)
1253                                                break;
1254                                        executor.execute(new Runnable() {
1255                                                /*
1256                                                 * (non-Javadoc)
1257                                                 * @see java.lang.Runnable#run()
1258                                                 */
1259                                                @Override
1260                                                public void run() {
1261                                                        /* Synchronize the listener so it is not called 
1262                                                         twice. That is, let the listener to finish its job. */
1263                                                        synchronized (listener) {
1264                                                                listener.explicitDataReceived(explicitXBeeMessage);
1265                                                        }
1266                                                }
1267                                        });
1268                                }
1269                                executor.shutdown();
1270                        }
1271                } catch (Exception e) {
1272                        logger.error(e.getMessage(), e);
1273                }
1274        }
1275        
1276        /**
1277         * Notifies subscribed IP data receive listeners that a new IP data 
1278         * packet has been received in form of a {@code ipMessage}.
1279         *
1280         * @param ipMessage The IP message to be sent to subscribed 
1281         *                  IP data listeners.
1282         * 
1283         * @see com.digi.xbee.api.models.IPMessage
1284         * 
1285         * @since 1.2.0
1286         */
1287        private void notifyIPDataReceived(final IPMessage ipMessage) {
1288                logger.info(connectionInterface.toString() + 
1289                                "IP data received from {} >> {}.", ipMessage.getHostAddress(), HexUtils.prettyHexString(ipMessage.getData()));
1290                
1291                try {
1292                        synchronized (ipDataReceiveListeners) {
1293                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1294                                                ipDataReceiveListeners.size()));
1295                                for (final IIPDataReceiveListener listener:ipDataReceiveListeners) {
1296                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1297                                        if (!running)
1298                                                break;
1299                                        executor.execute(new Runnable() {
1300                                                /*
1301                                                 * (non-Javadoc)
1302                                                 * @see java.lang.Runnable#run()
1303                                                 */
1304                                                @Override
1305                                                public void run() {
1306                                                        /* Synchronize the listener so it is not called 
1307                                                         twice. That is, let the listener to finish its job. */
1308                                                        synchronized (listener) {
1309                                                                listener.ipDataReceived(ipMessage);
1310                                                        }
1311                                                }
1312                                        });
1313                                }
1314                                executor.shutdown();
1315                        }
1316                } catch (Exception e) {
1317                        logger.error(e.getMessage(), e);
1318                }
1319        }
1320        
1321        /**
1322         * Notifies subscribed SMS receive listeners that a new SMS packet has 
1323         * been received in form of an {@code SMSMessage}.
1324         *
1325         * @param smsMessage The SMS message to be sent to subscribed SMS listeners.
1326         * 
1327         * @see com.digi.xbee.api.models.SMSMessage
1328         * 
1329         * @since 1.2.0
1330         */
1331        private void notifySMSReceived(final SMSMessage smsMessage) {
1332                logger.info(connectionInterface.toString() + 
1333                                "SMS received from {} >> {}.", smsMessage.getPhoneNumber(), smsMessage.getData());
1334                
1335                try {
1336                        synchronized (smsReceiveListeners) {
1337                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
1338                                                smsReceiveListeners.size()));
1339                                for (final ISMSReceiveListener listener:smsReceiveListeners) {
1340                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1341                                        if (!running)
1342                                                break;
1343                                        executor.execute(new Runnable() {
1344                                                /*
1345                                                 * (non-Javadoc)
1346                                                 * @see java.lang.Runnable#run()
1347                                                 */
1348                                                @Override
1349                                                public void run() {
1350                                                        /* Synchronize the listener so it is not called 
1351                                                         twice. That is, let the listener to finish its job. */
1352                                                        synchronized (listener) {
1353                                                                listener.smsReceived(smsMessage);
1354                                                        }
1355                                                }
1356                                        });
1357                                }
1358                                executor.shutdown();
1359                        }
1360                } catch (Exception e) {
1361                        logger.error(e.getMessage(), e);
1362                }
1363        }
1364
1365        /**
1366         * Notifies subscribed User Data Relay receive listeners that a new User
1367         * Data Relay packet has been received in form of a
1368         * {@code UserDataRelayMessage}.
1369         *
1370         * @param relayMessage The User Data Relay message to be sent to subscribed
1371         *                     User Data Relay listeners.
1372         *
1373         * @see UserDataRelayMessage
1374         *
1375         * @since 1.3.0
1376         */
1377        private void notifyUserDataRelayReceived(UserDataRelayMessage relayMessage) {
1378                logger.info(connectionInterface.toString() +
1379                                "User Data Relay received from {} >> {}.", relayMessage.getSourceInterface().getDescription(),
1380                                relayMessage.getData() != null ? HexUtils.prettyHexString(relayMessage.getData()) : "");
1381
1382                // Notify the generic User Data Relay listeners.
1383                notifyUserDataRelayReceived(relayMessage, true);
1384
1385                // Notify the specific User Data Relay listeners.
1386                notifyUserDataRelayReceived(relayMessage, false);
1387        }
1388
1389        /**
1390         * Notifies subscribed generic or specific User Data Relay receive listeners
1391         * that a new User Data Relay packet has been received in form of a
1392         * {@code UserDataRelayMessage}.
1393         *
1394         * @param relayMessage The User Data Relay message to be sent to subscribed
1395         *                     User Data Relay listeners.
1396         * @param notifyGeneric {@code true} to notify only the generic listeners,
1397         *                      {@code false} to notify the specific ones.
1398         *
1399         * @see UserDataRelayMessage
1400         *
1401         * @since 1.3.0
1402         */
1403        private void notifyUserDataRelayReceived(final UserDataRelayMessage relayMessage, final boolean notifyGeneric) {
1404                ArrayList<?> listenerList = new ArrayList<>();
1405
1406                // Get the list of listeners that should be notified depending on the parameters.
1407                if (notifyGeneric) {
1408                        listenerList = dataRelayReceiveListeners;
1409                } else {
1410                        switch (relayMessage.getSourceInterface()) {
1411                                case SERIAL:
1412                                        listenerList = serialDataReceiveListeners;
1413                                        break;
1414                                case BLUETOOTH:
1415                                        listenerList = bluetoothDataReceiveListeners;
1416                                        break;
1417                                case MICROPYTHON:
1418                                        listenerList = microPythonDataReceiveListeners;
1419                                        break;
1420                                default:
1421                                        break;
1422                        }
1423                }
1424
1425                // Notify the appropriate listeners.
1426                try {
1427                        synchronized (listenerList) {
1428                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS,
1429                                                listenerList.size()));
1430                                for (final Object listener : listenerList) {
1431                                        // Ensure that the reader is running to avoid a RejectedExecutionException.
1432                                        if (!running)
1433                                                break;
1434                                        executor.execute(new Runnable() {
1435                                                @Override
1436                                                public void run() {
1437                                                        /* Synchronize the listener so it is not called
1438                                                         twice. That is, let the listener to finish its job. */
1439                                                        synchronized (listener) {
1440                                                                if (notifyGeneric) {
1441                                                                        ((IUserDataRelayReceiveListener) listener).userDataRelayReceived(relayMessage);
1442                                                                } else {
1443                                                                        switch (relayMessage.getSourceInterface()) {
1444                                                                                case SERIAL:
1445                                                                                        ((ISerialDataReceiveListener) listener).dataReceived(relayMessage.getData());
1446                                                                                        break;
1447                                                                                case BLUETOOTH:
1448                                                                                        ((IBluetoothDataReceiveListener) listener).dataReceived(relayMessage.getData());
1449                                                                                        break;
1450                                                                                case MICROPYTHON:
1451                                                                                        ((IMicroPythonDataReceiveListener) listener).dataReceived(relayMessage.getData());
1452                                                                                        break;
1453                                                                                default:
1454                                                                                        break;
1455                                                                        }
1456                                                                }
1457                                                        }
1458                                                }
1459                                        });
1460                                }
1461                                executor.shutdown();
1462                        }
1463                } catch (Exception e) {
1464                        logger.error(e.getMessage(), e);
1465                }
1466        }
1467        
1468        /**
1469         * Returns whether this Data reader is running or not.
1470         * 
1471         * @return {@code true} if the Data reader is running, {@code false} 
1472         *         otherwise.
1473         * 
1474         * @see #stopReader()
1475         */
1476        public boolean isRunning() {
1477                return running;
1478        }
1479        
1480        /**
1481         * Stops the Data reader thread.
1482         * 
1483         * @see #isRunning()
1484         */
1485        public void stopReader() {
1486                running = false;
1487                synchronized (connectionInterface) {
1488                        connectionInterface.notify();
1489                }
1490                logger.debug(connectionInterface.toString() + "Data reader stopped.");
1491        }
1492        
1493        /**
1494         * Returns the queue of read XBee packets.
1495         * 
1496         * @return The queue of read XBee packets.
1497         * 
1498         * @see com.digi.xbee.api.models.XBeePacketsQueue
1499         */
1500        public XBeePacketsQueue getXBeePacketsQueue() {
1501                return xbeePacketsQueue;
1502        }
1503}