001/**
002 * Copyright (c) 2014 Digi International Inc.,
003 * All rights not expressly granted are reserved.
004 *
005 * This Source Code Form is subject to the terms of the Mozilla Public
006 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
007 * You can obtain one at http://mozilla.org/MPL/2.0/.
008 *
009 * Digi International Inc. 11001 Bren Road East, Minnetonka, MN 55343
010 * =======================================================================
011*/
012package com.digi.xbee.api.connection;
013
014import java.io.IOException;
015import java.util.ArrayList;
016import java.util.HashMap;
017import java.util.concurrent.Executors;
018import java.util.concurrent.ScheduledExecutorService;
019
020import org.slf4j.Logger;
021import org.slf4j.LoggerFactory;
022
023import com.digi.xbee.api.RemoteRaw802Device;
024import com.digi.xbee.api.RemoteXBeeDevice;
025import com.digi.xbee.api.XBeeDevice;
026import com.digi.xbee.api.XBeeNetwork;
027import com.digi.xbee.api.exceptions.InvalidPacketException;
028import com.digi.xbee.api.exceptions.XBeeException;
029import com.digi.xbee.api.io.IOSample;
030import com.digi.xbee.api.listeners.IIOSampleReceiveListener;
031import com.digi.xbee.api.listeners.IModemStatusReceiveListener;
032import com.digi.xbee.api.listeners.IPacketReceiveListener;
033import com.digi.xbee.api.listeners.IDataReceiveListener;
034import com.digi.xbee.api.models.ModemStatusEvent;
035import com.digi.xbee.api.models.SpecialByte;
036import com.digi.xbee.api.models.OperatingMode;
037import com.digi.xbee.api.models.XBeeMessage;
038import com.digi.xbee.api.models.XBeePacketsQueue;
039import com.digi.xbee.api.packet.XBeeAPIPacket;
040import com.digi.xbee.api.packet.APIFrameType;
041import com.digi.xbee.api.packet.XBeePacket;
042import com.digi.xbee.api.packet.XBeePacketParser;
043import com.digi.xbee.api.packet.common.IODataSampleRxIndicatorPacket;
044import com.digi.xbee.api.packet.common.ModemStatusPacket;
045import com.digi.xbee.api.packet.common.ReceivePacket;
046import com.digi.xbee.api.packet.raw.RX16IOPacket;
047import com.digi.xbee.api.packet.raw.RX16Packet;
048import com.digi.xbee.api.packet.raw.RX64IOPacket;
049import com.digi.xbee.api.packet.raw.RX64Packet;
050import com.digi.xbee.api.utils.HexUtils;
051
052/**
053 * Thread that constantly reads data from an input stream.
054 * 
055 * <p>Depending on the XBee operating mode, read data is notified as is to the 
056 * subscribed listeners or is parsed to a packet using the packet parser and 
057 * then notified to subscribed listeners.</p> 
058 */
059public class DataReader extends Thread {
060        
061        // Constants.
062        private final static int ALL_FRAME_IDS = 99999;
063        private final static int MAXIMUM_PARALLEL_LISTENER_THREADS = 20;
064        
065        // Variables.
066        private boolean running = false;
067        
068        private IConnectionInterface connectionInterface;
069        
070        private volatile OperatingMode mode;
071        
072        private ArrayList<IDataReceiveListener> dataReceiveListeners = new ArrayList<IDataReceiveListener>();
073        // The packetReceiveListeners requires to be a HashMap with an associated integer. The integer is used to determine 
074        // the frame ID of the packet that should be received. When it is 99999 (ALL_FRAME_IDS), all the packets will be handled.
075        private HashMap<IPacketReceiveListener, Integer> packetReceiveListeners = new HashMap<IPacketReceiveListener, Integer>();
076        private ArrayList<IIOSampleReceiveListener> ioSampleReceiveListeners = new ArrayList<IIOSampleReceiveListener>();
077        private ArrayList<IModemStatusReceiveListener> modemStatusListeners = new ArrayList<IModemStatusReceiveListener>();
078        
079        private Logger logger;
080        
081        private XBeePacketParser parser;
082        
083        private XBeePacketsQueue xbeePacketsQueue;
084        
085        private XBeeDevice xbeeDevice;
086        
087        /**
088         * Class constructor. Instantiates a new {@code DataReader} object for the 
089         * given connection interface using the given XBee operating mode and XBee
090         * device.
091         * 
092         * @param connectionInterface Connection interface to read data from.
093         * @param mode XBee operating mode.
094         * @param xbeeDevice Reference to the XBee device containing this 
095         *                   {@code DataReader} object.
096         * 
097         * @throws NullPointerException if {@code connectionInterface == null} or
098         *                                 {@code mode == null}.
099         * 
100         * @see IConnectionInterface
101         * @see com.digi.xbee.api.XBeeDevice
102         * @see com.digi.xbee.api.models.OperatingMode
103         */
104        public DataReader(IConnectionInterface connectionInterface, OperatingMode mode, XBeeDevice xbeeDevice) {
105                if (connectionInterface == null)
106                        throw new NullPointerException("Connection interface cannot be null.");
107                if (mode == null)
108                        throw new NullPointerException("Operating mode cannot be null.");
109                
110                this.connectionInterface = connectionInterface;
111                this.mode = mode;
112                this.xbeeDevice = xbeeDevice;
113                this.logger = LoggerFactory.getLogger(DataReader.class);
114                parser = new XBeePacketParser();
115                xbeePacketsQueue = new XBeePacketsQueue();
116        }
117        
118        /**
119         * Sets the XBee operating mode of this data reader.
120         * 
121         * @param mode New XBee operating mode.
122         * 
123         * @throws NullPointerException if {@code mode == null}.
124         * 
125         * @see com.digi.xbee.api.models.OperatingMode
126         */
127        public void setXBeeReaderMode(OperatingMode mode) {
128                if (mode == null)
129                        throw new NullPointerException("Operating mode cannot be null.");
130                
131                this.mode = mode;
132        }
133        
134        /**
135         * Adds the given data receive listener to the list of listeners that will 
136         * be notified when XBee data packets are received.
137         * 
138         * <p>If the listener has been already added, this method does nothing.</p>
139         * 
140         * @param listener Listener to be notified when new XBee data packets are 
141         *                 received.
142         * 
143         * @see #removeDataReceiveListener(IDataReceiveListener)
144         * @see com.digi.xbee.api.listeners.IDataReceiveListener
145         */
146        public void addDataReceiveListener(IDataReceiveListener listener) {
147                synchronized (dataReceiveListeners) {
148                        if (!dataReceiveListeners.contains(listener))
149                                dataReceiveListeners.add(listener);
150                }
151        }
152        
153        /**
154         * Removes the given data receive listener from the list of data receive 
155         * listeners.
156         * 
157         * <p>If the listener is not included in the list, this method does nothing.
158         * </p>
159         * 
160         * @param listener Data receive listener to be remove from the list.
161         * 
162         * @see #addDataReceiveListener(IDataReceiveListener)
163         * @see com.digi.xbee.api.listeners.IDataReceiveListener
164         */
165        public void removeDataReceiveListener(IDataReceiveListener listener) {
166                synchronized (dataReceiveListeners) {
167                        if (dataReceiveListeners.contains(listener))
168                                dataReceiveListeners.remove(listener);
169                }
170        }
171        
172        /**
173         * Adds the given packet receive listener to the list of listeners that will
174         * be notified when any XBee packet is received.
175         * 
176         * <p>If the listener has been already added, this method does nothing.</p>
177         * 
178         * @param listener Listener to be notified when any XBee packet is received.
179         * 
180         * @see #addPacketReceiveListener(IPacketReceiveListener, int)
181         * @see #removePacketReceiveListener(IPacketReceiveListener)
182         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
183         */
184        public void addPacketReceiveListener(IPacketReceiveListener listener) {
185                addPacketReceiveListener(listener, ALL_FRAME_IDS);
186        }
187        
188        /**
189         * Adds the given packet receive listener to the list of listeners that will
190         * be notified when an XBee packet with the given frame ID is received.
191         * 
192         * <p>If the listener has been already added, this method does nothing.</p>
193         * 
194         * @param listener Listener to be notified when an XBee packet with the
195         *                 provided frame ID is received.
196         * @param frameID Frame ID for which this listener should be notified and 
197         *                removed after.
198         *                Using {@link #ALL_FRAME_IDS} this listener will be 
199         *                notified always and will be removed only by user request.
200         * 
201         * @see #addPacketReceiveListener(IPacketReceiveListener)
202         * @see #removePacketReceiveListener(IPacketReceiveListener)
203         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
204         */
205        public void addPacketReceiveListener(IPacketReceiveListener listener, int frameID) {
206                synchronized (packetReceiveListeners) {
207                        if (!packetReceiveListeners.containsKey(listener))
208                                packetReceiveListeners.put(listener, frameID);
209                }
210        }
211        
212        /**
213         * Removes the given packet receive listener from the list of XBee packet 
214         * receive listeners.
215         * 
216         * <p>If the listener is not included in the list, this method does nothing.
217         * </p>
218         * 
219         * @param listener Packet receive listener to remove from the list.
220         * 
221         * @see #addPacketReceiveListener(IPacketReceiveListener)
222         * @see #addPacketReceiveListener(IPacketReceiveListener, int)
223         * @see com.digi.xbee.api.listeners.IPacketReceiveListener
224         */
225        public void removePacketReceiveListener(IPacketReceiveListener listener) {
226                synchronized (packetReceiveListeners) {
227                        if (packetReceiveListeners.containsKey(listener))
228                                packetReceiveListeners.remove(listener);
229                }
230        }
231        
232        /**
233         * Adds the given IO sample receive listener to the list of listeners that 
234         * will be notified when an IO sample packet is received.
235         * 
236         * <p>If the listener has been already added, this method does nothing.</p>
237         * 
238         * @param listener Listener to be notified when new IO sample packets are 
239         *                 received.
240         * 
241         * @see #removeIOSampleReceiveListener(IIOSampleReceiveListener)
242         * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener
243         */
244        public void addIOSampleReceiveListener(IIOSampleReceiveListener listener) {
245                synchronized (ioSampleReceiveListeners) {
246                        if (!ioSampleReceiveListeners.contains(listener))
247                                ioSampleReceiveListeners.add(listener);
248                }
249        }
250        
251        /**
252         * Removes the given IO sample receive listener from the list of IO sample 
253         * receive listeners.
254         * 
255         * <p>If the listener is not included in the list, this method does nothing.
256         * </p>
257         * 
258         * @param listener IO sample receive listener to remove from the list.
259         * 
260         * @see #addIOSampleReceiveListener(IIOSampleReceiveListener)
261         * @see com.digi.xbee.api.listeners.IIOSampleReceiveListener
262         */
263        public void removeIOSampleReceiveListener(IIOSampleReceiveListener listener) {
264                synchronized (ioSampleReceiveListeners) {
265                        if (ioSampleReceiveListeners.contains(listener))
266                                ioSampleReceiveListeners.remove(listener);
267                }
268        }
269        
270        /**
271         * Adds the given Modem Status receive listener to the list of listeners 
272         * that will be notified when a modem status packet is received.
273         * 
274         * <p>If the listener has been already added, this method does nothing.</p>
275         * 
276         * @param listener Listener to be notified when new modem status packets are
277         *                 received.
278         * 
279         * @see #removeModemStatusReceiveListener(IModemStatusReceiveListener)
280         * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener
281         */
282        public void addModemStatusReceiveListener(IModemStatusReceiveListener listener) {
283                synchronized (modemStatusListeners) {
284                        if (!modemStatusListeners.contains(listener))
285                                modemStatusListeners.add(listener);
286                }
287        }
288        
289        /**
290         * Removes the given Modem Status receive listener from the list of Modem 
291         * Status receive listeners.
292         * 
293         * <p>If the listener is not included in the list, this method does nothing.
294         * </p>
295         * 
296         * @param listener Modem Status receive listener to remove from the list.
297         * 
298         * @see #addModemStatusReceiveListener(IModemStatusReceiveListener)
299         * @see com.digi.xbee.api.listeners.IModemStatusReceiveListener
300         */
301        public void removeModemStatusReceiveListener(IModemStatusReceiveListener listener) {
302                synchronized (modemStatusListeners) {
303                        if (modemStatusListeners.contains(listener))
304                                modemStatusListeners.remove(listener);
305                }
306        }
307        
308        /*
309         * (non-Javadoc)
310         * @see java.lang.Thread#run()
311         */
312        @Override
313        public void run() {
314                logger.debug(connectionInterface.toString() + "Data reader started.");
315                running = true;
316                // Clear the list of read packets.
317                xbeePacketsQueue.clearQueue();
318                try {
319                        synchronized (connectionInterface) {
320                                connectionInterface.wait();
321                        }
322                        while (running) {
323                                if (!running)
324                                        break;
325                                if (connectionInterface.getInputStream() != null) {
326                                        switch (mode) {
327                                        case AT:
328                                                break;
329                                        case API:
330                                        case API_ESCAPE:
331                                                int headerByte = connectionInterface.getInputStream().read();
332                                                // If it is packet header parse the packet, if not discard this byte and continue.
333                                                if (headerByte == SpecialByte.HEADER_BYTE.getValue()) {
334                                                        try {
335                                                                XBeePacket packet = parser.parsePacket(connectionInterface.getInputStream(), mode);
336                                                                packetReceived(packet);
337                                                        } catch (InvalidPacketException e) {
338                                                                logger.error("Error parsing the API packet.", e);
339                                                        }
340                                                }
341                                                break;
342                                        default:
343                                                break;
344                                        }
345                                } else if (connectionInterface.getInputStream() == null)
346                                        break;
347                                if (connectionInterface.getInputStream() == null)
348                                        break;
349                                else if (connectionInterface.getInputStream().available() > 0)
350                                        continue;
351                                synchronized (connectionInterface) {
352                                        connectionInterface.wait();
353                                }
354                        }
355                } catch (IOException e) {
356                        logger.error("Error reading from input stream.", e);
357                } catch (InterruptedException e) {
358                        logger.error(e.getMessage(), e);
359                } catch (IllegalStateException e) {
360                        logger.error(e.getMessage(), e);
361                } finally {
362                        if (running) {
363                                running = false;
364                                if (connectionInterface.isOpen())
365                                        connectionInterface.close();
366                        }
367                }
368        }
369        
370        /**
371         * Dispatches the received XBee packet to the corresponding listener(s).
372         * 
373         * @param packet The received XBee packet to be dispatched to the 
374         *               corresponding listeners.
375         * 
376         * @see com.digi.xbee.api.packet.XBeeAPIPacket
377         * @see com.digi.xbee.api.packet.XBeePacket
378         */
379        private void packetReceived(XBeePacket packet) {
380                // Add the packet to the packets queue.
381                xbeePacketsQueue.addPacket(packet);
382                // Notify that a packet has been received to the corresponding listeners.
383                notifyPacketReceived(packet);
384                
385                // Check if the packet is an API packet.
386                if (!(packet instanceof XBeeAPIPacket))
387                        return;
388                
389                // Get the API packet type.
390                XBeeAPIPacket apiPacket = (XBeeAPIPacket)packet;
391                APIFrameType apiType = apiPacket.getFrameType();
392                if (apiType == null)
393                        return;
394                
395                XBeeNetwork network = xbeeDevice.getNetwork();
396                RemoteXBeeDevice remoteDevice = null;
397                byte[] data = null;
398                
399                try {
400                        switch(apiType) {
401                        case RECEIVE_PACKET:
402                                ReceivePacket receivePacket = (ReceivePacket)apiPacket;
403                                remoteDevice = network.getDevice(receivePacket.get64bitSourceAddress());
404                                if (remoteDevice == null) {
405                                        remoteDevice = new RemoteXBeeDevice(xbeeDevice, receivePacket.get64bitSourceAddress());
406                                        network.addRemoteDevice(remoteDevice);
407                                }
408                                data = receivePacket.getRFData();
409                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
410                                break;
411                        case RX_64:
412                                RX64Packet rx64Packet = (RX64Packet)apiPacket;
413                                remoteDevice = network.getDevice(rx64Packet.get64bitSourceAddress());
414                                if (remoteDevice == null) {
415                                        remoteDevice = new RemoteXBeeDevice(xbeeDevice, rx64Packet.get64bitSourceAddress());
416                                        network.addRemoteDevice(remoteDevice);
417                                }
418                                data = rx64Packet.getRFData();
419                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
420                                break;
421                        case RX_16:
422                                RX16Packet rx16Packet = (RX16Packet)apiPacket;
423                                remoteDevice = network.getDevice(rx16Packet.get16bitSourceAddress());
424                                if (remoteDevice == null) {
425                                        remoteDevice = new RemoteRaw802Device(xbeeDevice, rx16Packet.get16bitSourceAddress());
426                                        network.addRemoteDevice(remoteDevice);
427                                }
428                                data = rx16Packet.getRFData();
429                                notifyDataReceived(new XBeeMessage(remoteDevice, data, apiPacket.isBroadcast()));
430                                break;
431                        case IO_DATA_SAMPLE_RX_INDICATOR:
432                                IODataSampleRxIndicatorPacket ioSamplePacket = (IODataSampleRxIndicatorPacket)apiPacket;
433                                remoteDevice = network.getDevice(ioSamplePacket.get64bitSourceAddress());
434                                if (remoteDevice == null) {
435                                        remoteDevice = new RemoteXBeeDevice(xbeeDevice, ioSamplePacket.get64bitSourceAddress());
436                                        network.addRemoteDevice(remoteDevice);
437                                }
438                                notifyIOSampleReceived(remoteDevice, ioSamplePacket.getIOSample());
439                                break;
440                        case RX_IO_64:
441                                RX64IOPacket rx64IOPacket = (RX64IOPacket)apiPacket;
442                                remoteDevice = network.getDevice(rx64IOPacket.get64bitSourceAddress());
443                                if (remoteDevice == null) {
444                                        remoteDevice = new RemoteXBeeDevice(xbeeDevice, rx64IOPacket.get64bitSourceAddress());
445                                        network.addRemoteDevice(remoteDevice);
446                                }
447                                notifyIOSampleReceived(remoteDevice, rx64IOPacket.getIOSample());
448                                break;
449                        case RX_IO_16:
450                                RX16IOPacket rx16IOPacket = (RX16IOPacket)apiPacket;
451                                remoteDevice = network.getDevice(rx16IOPacket.get16bitSourceAddress());
452                                if (remoteDevice == null) {
453                                        remoteDevice = new RemoteRaw802Device(xbeeDevice, rx16IOPacket.get16bitSourceAddress());
454                                        network.addRemoteDevice(remoteDevice);
455                                }
456                                notifyIOSampleReceived(remoteDevice, rx16IOPacket.getIOSample());
457                                break;
458                        case MODEM_STATUS:
459                                ModemStatusPacket modemStatusPacket = (ModemStatusPacket)apiPacket;
460                                notifyModemStatusReceived(modemStatusPacket.getStatus());
461                        default:
462                                break;
463                        }
464                } catch (XBeeException e) {
465                        logger.error(e.getMessage(), e);
466                }
467        }
468        
469        /**
470         * Notifies subscribed data receive listeners that a new XBee data packet 
471         * has been received in form of an {@code XBeeMessage}.
472         *
473         * @param xbeeMessage The XBee message to be sent to subscribed XBee data
474         *                    listeners.
475         * 
476         * @see com.digi.xbee.api.models.XBeeMessage
477         */
478        private void notifyDataReceived(final XBeeMessage xbeeMessage) {
479                if (xbeeMessage.isBroadcast())
480                        logger.info(connectionInterface.toString() + 
481                                        "Broadcast data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData()));
482                else
483                        logger.info(connectionInterface.toString() + 
484                                        "Data received from {} >> {}.", xbeeMessage.getDevice().get64BitAddress(), HexUtils.prettyHexString(xbeeMessage.getData()));
485                
486                try {
487                        synchronized (dataReceiveListeners) {
488                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
489                                                dataReceiveListeners.size()));
490                                for (final IDataReceiveListener listener:dataReceiveListeners) {
491                                        executor.execute(new Runnable() {
492                                                /*
493                                                 * (non-Javadoc)
494                                                 * @see java.lang.Runnable#run()
495                                                 */
496                                                @Override
497                                                public void run() {
498                                                        /* Synchronize the listener so it is not called 
499                                                         twice. That is, let the listener to finish its job. */
500                                                        synchronized (listener) {
501                                                                listener.dataReceived(xbeeMessage);
502                                                        }
503                                                }
504                                        });
505                                }
506                                executor.shutdown();
507                        }
508                } catch (Exception e) {
509                        logger.error(e.getMessage(), e);
510                }
511        }
512        
513        /**
514         * Notifies subscribed XBee packet listeners that a new XBee packet has 
515         * been received.
516         *
517         * @param packet The received XBee packet.
518         * 
519         * @see com.digi.xbee.api.packet.XBeeAPIPacket
520         * @see com.digi.xbee.api.packet.XBeePacket
521         */
522        private void notifyPacketReceived(final XBeePacket packet) {
523                logger.debug(connectionInterface.toString() + "Packet received: \n{}", packet.toPrettyString());
524                
525                try {
526                        synchronized (packetReceiveListeners) {
527                                final ArrayList<IPacketReceiveListener> removeListeners = new ArrayList<IPacketReceiveListener>();
528                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
529                                                packetReceiveListeners.size()));
530                                for (final IPacketReceiveListener listener:packetReceiveListeners.keySet()) {
531                                        executor.execute(new Runnable() {
532                                                /*
533                                                 * (non-Javadoc)
534                                                 * @see java.lang.Runnable#run()
535                                                 */
536                                                @Override
537                                                public void run() {
538                                                        // Synchronize the listener so it is not called 
539                                                        // twice. That is, let the listener to finish its job.
540                                                        synchronized (packetReceiveListeners) {
541                                                                synchronized (listener) {
542                                                                        if (packetReceiveListeners.get(listener) == ALL_FRAME_IDS)
543                                                                                listener.packetReceived(packet);
544                                                                        else if (((XBeeAPIPacket)packet).needsAPIFrameID() && 
545                                                                                        ((XBeeAPIPacket)packet).getFrameID() == packetReceiveListeners.get(listener)) {
546                                                                                listener.packetReceived(packet);
547                                                                                removeListeners.add(listener);
548                                                                        }
549                                                                }
550                                                        }
551                                                }
552                                        });
553                                }
554                                executor.shutdown();
555                                // Remove required listeners.
556                                for (IPacketReceiveListener listener:removeListeners)
557                                        packetReceiveListeners.remove(listener);
558                        }
559                } catch (Exception e) {
560                        logger.error(e.getMessage(), e);
561                }
562        }
563        
564        /**
565         * Notifies subscribed IO sample listeners that a new IO sample packet has
566         * been received.
567         *
568         * @param ioSample The received IO sample.
569         * @param remoteDevice The remote XBee device that sent the sample.
570         * 
571         * @see com.digi.xbee.api.RemoteXBeeDevice
572         * @see com.digi.xbee.api.io.IOSample
573         */
574        private void notifyIOSampleReceived(final RemoteXBeeDevice remoteDevice, final IOSample ioSample) {
575                logger.debug(connectionInterface.toString() + "IO sample received.");
576                
577                try {
578                        synchronized (ioSampleReceiveListeners) {
579                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
580                                                ioSampleReceiveListeners.size()));
581                                for (final IIOSampleReceiveListener listener:ioSampleReceiveListeners) {
582                                        executor.execute(new Runnable() {
583                                                /*
584                                                 * (non-Javadoc)
585                                                 * @see java.lang.Runnable#run()
586                                                 */
587                                                @Override
588                                                public void run() {
589                                                        // Synchronize the listener so it is not called 
590                                                        // twice. That is, let the listener to finish its job.
591                                                        synchronized (listener) {
592                                                                listener.ioSampleReceived(remoteDevice, ioSample);
593                                                        }
594                                                }
595                                        });
596                                }
597                                executor.shutdown();
598                        }
599                } catch (Exception e) {
600                        logger.error(e.getMessage(), e);
601                }
602        }
603        
604        /**
605         * Notifies subscribed Modem Status listeners that a Modem Status event 
606         * packet has been received.
607         *
608         * @param modemStatusEvent The Modem Status event.
609         * 
610         * @see com.digi.xbee.api.models.ModemStatusEvent
611         */
612        private void notifyModemStatusReceived(final ModemStatusEvent modemStatusEvent) {
613                logger.debug(connectionInterface.toString() + "Modem Status event received.");
614                
615                try {
616                        synchronized (modemStatusListeners) {
617                                ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS, 
618                                                modemStatusListeners.size()));
619                                for (final IModemStatusReceiveListener listener:modemStatusListeners) {
620                                        executor.execute(new Runnable() {
621                                                /*
622                                                 * (non-Javadoc)
623                                                 * @see java.lang.Runnable#run()
624                                                 */
625                                                @Override
626                                                public void run() {
627                                                        // Synchronize the listener so it is not called 
628                                                        // twice. That is, let the listener to finish its job.
629                                                        synchronized (listener) {
630                                                                listener.modemStatusEventReceived(modemStatusEvent);
631                                                        }
632                                                }
633                                        });
634                                }
635                                executor.shutdown();
636                        }
637                } catch (Exception e) {
638                        logger.error(e.getMessage(), e);
639                }
640        }
641        
642        /**
643         * Returns whether this Data reader is running or not.
644         * 
645         * @return {@code true} if the Data reader is running, {@code false} 
646         *         otherwise.
647         * 
648         * @see #stopReader()
649         */
650        public boolean isRunning() {
651                return running;
652        }
653        
654        /**
655         * Stops the Data reader thread.
656         * 
657         * @see #isRunning()
658         */
659        public void stopReader() {
660                running = false;
661                synchronized (connectionInterface) {
662                        connectionInterface.notify();
663                }
664                logger.debug(connectionInterface.toString() + "Data reader stopped.");
665        }
666        
667        /**
668         * Returns the queue of read XBee packets.
669         * 
670         * @return The queue of read XBee packets.
671         * 
672         * @see com.digi.xbee.api.models.XBeePacketsQueue
673         */
674        public XBeePacketsQueue getXBeePacketsQueue() {
675                return xbeePacketsQueue;
676        }
677}