Logo Search packages:      
Sourcecode: launchtool version File versions  Download package

MultiReader.cc

#include "MultiReader.h"

#include <sys/poll.h>   // poll
#include <unistd.h>           // fcntl
#include <fcntl.h>            // fcntl
#include <errno.h>

#include <Logger.h>

#undef LOGTAG
#define LOGTAG "MultiReader"

//#define FPRINTF_DEBUG

#ifdef FPRINTF_DEBUG
#include <stdio.h>
#endif

using namespace std;
using namespace stringf;

/*
static int set_nonblocking(int fd) throw (SystemException)
{
      int oldfl;
      if (fcntl(fd, F_GETFL, &oldfl) == -1)
            throw SystemException(errno, "Reading file descriptor flags");

      int newfl = oldfl | O_NONBLOCK;
      if (fcntl(fd, F_SETFL, &newfl) == -1)
            throw SystemException(errno, "Setting O_NONBLOCK on file descriptor flags");
            
      return oldfl;
}


static void restore_file_flags(int fd, int oldfl) throw (SystemException)
{
      if (fcntl(fd, F_SETFL, &oldfl) == -1)
            throw SystemException(errno, "Restoring old file descriptor flags");
}
*/

00044 void MultiReader::addFD(int fd) throw (SystemException)
{
      fds.push_back(fddata(fd, 0/*set_nonblocking(fd)*/));
#ifdef FPRINTF_DEBUG
      fprintf(stderr, "fd %d: %d\n", fds.size() - 1, fd);
#endif
}

00052 void MultiReader::readLoop() throw (SystemException, InterruptedException)
{
      const unsigned int bufsize = 512;
      char buf[bufsize];

#ifdef FPRINTF_DEBUG
      fprintf(stderr, " *** Entering readLoop\n");
#endif

      // While there is output, read it
      while (fds.size())
      {
#ifdef FPRINTF_DEBUG
            fprintf(stderr, "Before select: %d fds to watch\n", fds.size());
#endif
            fd_set fdset;
            FD_ZERO(&fdset);
            int maxfd = 0;
            for (unsigned int i = 0; i < fds.size(); i++)
            {
#ifdef FPRINTF_DEBUG
                  fprintf(stderr, "Adding fd %d\n", fds[i].fd);
#endif
                  FD_SET(fds[i].fd, &fdset);
                  if (fds[i].fd > maxfd)
                        maxfd = fds[i].fd;
            }
            ++maxfd;
            
            int res = select(maxfd, &fdset, 0, 0, 0);
#ifdef FPRINTF_DEBUG
            fprintf(stderr, "select() (maxfd=%d) returned %d\n", maxfd, res);
#endif
            if (res == -1)
            {
                  if (errno != EINTR)
                        throw SystemException(errno, "Checking for output on " +
                                    fmt(fds.size()) + " file descriptors");
                  else
                        throw InterruptedException("Checking for output on " + fmt(fds.size()) + " file descriptors");
            }
            else if (res == 0)
                  ;// log_warn("select returned 0 even with an infinite timeout");
            else
            {
                  unsigned int cur_fdcount = fds.size();
                  for (unsigned int i = 0; i < cur_fdcount; i++)
                  {
#ifdef FPRINTF_DEBUG
                        if (FD_ISSET(fds[i].fd, &fdset))
                              fprintf(stderr, "fd %d: %d has data\n", i, fds[i].fd);
                        else
                              fprintf(stderr, "fd %d: %d does not have data\n", i, fds[i].fd);
#endif
                        if (FD_ISSET(fds[i].fd, &fdset))
                        {
                              // Read all pending data
                              int sz;
                              sz = read(fds[i].fd, buf, bufsize);
#ifdef FPRINTF_DEBUG
                              fprintf(stderr, "read returned %d; errno: %d (%s)\n", sz, errno, strerror(errno));
#endif

                              if (sz > 0)
                              {
#ifdef FPRINTF_DEBUG
                                    fprintf(stderr, "Data was: %.*s\n", sz, buf);
#endif
                                    // Notify reception of new data
                                    handleData(fds[i].fd, buf, sz);
                              }

                              if (sz == -1)
                                    throw SystemException(errno, "reading data from ");

                              if (sz == 0)
                              {
                                    // Notify EOF on this handle
                                    handleEOF(fds[i].fd);

                                    // Delete the EOF-ed handle from the poll list
                                    //restore_file_flags(fds[i].fd, fds[i].old_flags);
                                    if (i != fds.size() - 1)
                                    {
                                          fds[i] = fds[fds.size() - 1];
                                    }
                                    fds.pop_back();

#ifdef FPRINTF_DEBUG
                                    fprintf(stderr, "fd %d: %d reported EOF; fdcount: %d\n", i, fds[i].fd, fds.size());
#endif
                              }
#ifdef FPRINTF_DEBUG
                              fprintf(stderr, "fd %d: %d end reading data\n", i, fds[i].fd);
#endif
                        }
                  }
            }
      }
#ifdef FPRINTF_DEBUG
      fprintf(stderr, " *** read loop ended\n");
#endif
}


void LineMultiReader::handleData(int fd, char* buf, unsigned int size) throw ()
{
      for (unsigned int i = 0; i < size; i++)
            if (buf[i] == '\n')
            {
                  listeners[fd]->haveLine(fd, linebuf);
                  linebuf = string();
            } else
                  linebuf += buf[i];
}

void LineMultiReader::handleEOF(int fd) throw ()
{
      if (linebuf.size())
      {
            listeners[fd]->haveLine(fd, linebuf);
            listeners[fd]->haveEOF(fd);
      }
}

// vim:set ts=4 sw=4:

Generated by  Doxygen 1.6.0   Back to index