SharedListener.h

Go to the documentation of this file.
00001 // ----------------------------------------------------------------------------
00002 //
00003 // $Id$
00004 //
00005 // Copyright 2008, 2009, 2010, 2011, 2012  Antonio Franchi and Paolo Stegagno    
00006 //
00007 // This file is part of MIP.
00008 //
00009 // MIP is free software: you can redistribute it and/or modify
00010 // it under the terms of the GNU General Public License as published by
00011 // the Free Software Foundation, either version 3 of the License, or
00012 // (at your option) any later version.
00013 //
00014 // MIP is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with MIP. If not, see <http://www.gnu.org/licenses/>.
00021 //
00022 // Contact info: antonio.franchi@tuebingen.mpg.de stegagno@diag.uniroma1.it
00023 //
00024 // ----------------------------------------------------------------------------
00025 
00026 
00032 
00033 
00034 #ifndef __SHARED_LISTENER_INCLUDED_
00035 #define __SHARED_LISTENER_INCLUDED_
00036 
00037 #ifdef MIP_HOST_APPLE
00038 #include <applePatch.h>
00039 #endif
00040 
00041 #include <list>
00042 
00043 #include <Time.h>
00044 #include <Message.h>
00045 
00046 #include <pthread.h>
00047 
00048 using namespace std;
00049 using namespace MipBaselib;
00050 
00051 namespace MipBaselib{
00052 
00056  
00058  /* @{ */
00059  
00060 template <class T> void* sharedListenerThread(void* p) { 
00061  T* sl = (T*) p;
00062 
00063  while(true){
00064   string msg,ip;
00065  
00066   sl->_receive(msg,ip);
00067   Time currTime;
00068   Message message(msg);
00069  
00070   pthread_mutex_lock(&(sl->_mtx));
00071  
00072   /*First loop for the received messages*/
00073   bool handlerFound = false;
00074   for(int i=0; i < sl->_handlersNum;i++){
00075    if( message.checkId( sl->_handlerIds.at(i) ) ){
00076     if( sl->_toBeDelayed[i]){
00077      sl->_delayedMsgs[i]->push_back(msg);
00078      sl->_delayedMsgIps[i]->push_back(ip);
00079      sl->_delayedMsgTimeStamps[i]->push_back(currTime);
00080     }else{
00081      sl->_handlers.at(i)->handleMessage(msg,ip);
00082     }
00083     handlerFound = true;
00084    }
00085   }
00086   /*Second loop for the delayed messages*/
00087   for(int i=0; i < sl->_handlersNum;i++){
00088  //    cout << "TEST " << i << endl;
00089    int delayedMsgNum = sl->_delayedMsgs[i]->size();
00090    bool loop = true;
00091    Time handlerDelay = sl->_handlerDelays[i];
00092    while(delayedMsgNum > 0 && loop){
00093     list<Time>::iterator timeStampIt = sl->_delayedMsgTimeStamps[i]->begin();
00094     if( currTime - *timeStampIt  > handlerDelay){
00095      list<string>::iterator msgIt = sl->_delayedMsgs[i]->begin();
00096      list<string>::iterator ipIt  = sl->_delayedMsgIps[i]->begin();
00097      sl->_handlers.at(i)->handleMessage(*msgIt,*ipIt);
00098      sl->_delayedMsgs[i]->pop_front();
00099      sl->_delayedMsgIps[i]->pop_front();
00100      sl->_delayedMsgTimeStamps[i]->pop_front();
00101      delayedMsgNum --;
00102     }else{
00103      loop = false;
00104     }
00105    }
00106   }
00107  
00108   pthread_mutex_unlock(&(sl->_mtx));
00109  
00110   if(!handlerFound){
00111    cout << "Warning: handler not found for the message." << endl;
00112    cout << msg  << endl;
00113   }
00114  }
00115 
00116  return 0;
00117 }
00118  
00122 class SharedListener {
00123   
00124   vector<MessageHandler*> _handlers;
00125   vector<string>      _handlerIds;
00126   vector<bool>       _toBeDelayed;
00127   vector<Time>       _handlerDelays;
00128   vector< list < string > * > _delayedMsgs;
00129   vector< list < string > * > _delayedMsgIps;
00130   vector< list < Time   > * > _delayedMsgTimeStamps;
00131   int _handlersNum;
00132   
00133   pthread_mutex_t _mtx;
00134 //   ACE_thread_t _listThreadID;
00135   
00136   template <class T> friend void* sharedListenerThread(void* p);
00137 
00138   virtual void _receive(string& message, string& address) = 0;
00139   
00140  protected:
00141   
00142   template <class T> void _runThread(void* p){
00143 //    ACE_Thread::spawn((ACE_THR_FUNC)sharedListenerThread<T>,(T*) p, THR_NEW_LWP | THR_JOINABLE, &_listThreadID, &_listThreadHandle);
00144    pthread_t st;
00145    pthread_create(&st, NULL,sharedListenerThread<T>,(T*) p);
00146   }
00147   
00148  public:
00149   
00150   SharedListener();
00151   
00153   ~SharedListener();
00154   
00160   bool registerHandler(MessageHandler *handler,string id,Time addedDelay = Time(0,0));
00161  
00162 };
00163 
00164 
00165 
00166 /* @} */
00167  
00168 };// end namespace MipResources{
00169 
00170 
00171 #endif
00172 
00173 
00174 
00175 
00176 
00177 

Generated on Mon Feb 20 07:01:07 2017 for MIP by  doxygen 1.5.6