SharedListener.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00032
00033
00034 #ifndef __SHARED_LISTENER_INCLUDED_
00035 #define __SHARED_LISTENER_INCLUDED_
00036
00037 #ifdef MIP_HOST_APPLE
00038 #include <applePatch.h>
00039 #endif
00040
00041 #include <list>
00042
00043 #include <Time.h>
00044 #include <Message.h>
00045
00046 #include <pthread.h>
00047
00048 using namespace std;
00049 using namespace MipBaselib;
00050
00051 namespace MipBaselib{
00052
00056
00058
00059
00060 template <class T> void* sharedListenerThread(void* p) {
00061 T* sl = (T*) p;
00062
00063 while(true){
00064 string msg,ip;
00065
00066 sl->_receive(msg,ip);
00067 Time currTime;
00068 Message message(msg);
00069
00070 pthread_mutex_lock(&(sl->_mtx));
00071
00072
00073 bool handlerFound = false;
00074 for(int i=0; i < sl->_handlersNum;i++){
00075 if( message.checkId( sl->_handlerIds.at(i) ) ){
00076 if( sl->_toBeDelayed[i]){
00077 sl->_delayedMsgs[i]->push_back(msg);
00078 sl->_delayedMsgIps[i]->push_back(ip);
00079 sl->_delayedMsgTimeStamps[i]->push_back(currTime);
00080 }else{
00081 sl->_handlers.at(i)->handleMessage(msg,ip);
00082 }
00083 handlerFound = true;
00084 }
00085 }
00086
00087 for(int i=0; i < sl->_handlersNum;i++){
00088
00089 int delayedMsgNum = sl->_delayedMsgs[i]->size();
00090 bool loop = true;
00091 Time handlerDelay = sl->_handlerDelays[i];
00092 while(delayedMsgNum > 0 && loop){
00093 list<Time>::iterator timeStampIt = sl->_delayedMsgTimeStamps[i]->begin();
00094 if( currTime - *timeStampIt > handlerDelay){
00095 list<string>::iterator msgIt = sl->_delayedMsgs[i]->begin();
00096 list<string>::iterator ipIt = sl->_delayedMsgIps[i]->begin();
00097 sl->_handlers.at(i)->handleMessage(*msgIt,*ipIt);
00098 sl->_delayedMsgs[i]->pop_front();
00099 sl->_delayedMsgIps[i]->pop_front();
00100 sl->_delayedMsgTimeStamps[i]->pop_front();
00101 delayedMsgNum --;
00102 }else{
00103 loop = false;
00104 }
00105 }
00106 }
00107
00108 pthread_mutex_unlock(&(sl->_mtx));
00109
00110 if(!handlerFound){
00111 cout << "Warning: handler not found for the message." << endl;
00112 cout << msg << endl;
00113 }
00114 }
00115
00116 return 0;
00117 }
00118
00122 class SharedListener {
00123
00124 vector<MessageHandler*> _handlers;
00125 vector<string> _handlerIds;
00126 vector<bool> _toBeDelayed;
00127 vector<Time> _handlerDelays;
00128 vector< list < string > * > _delayedMsgs;
00129 vector< list < string > * > _delayedMsgIps;
00130 vector< list < Time > * > _delayedMsgTimeStamps;
00131 int _handlersNum;
00132
00133 pthread_mutex_t _mtx;
00134
00135
00136 template <class T> friend void* sharedListenerThread(void* p);
00137
00138 virtual void _receive(string& message, string& address) = 0;
00139
00140 protected:
00141
00142 template <class T> void _runThread(void* p){
00143
00144 pthread_t st;
00145 pthread_create(&st, NULL,sharedListenerThread<T>,(T*) p);
00146 }
00147
00148 public:
00149
00150 SharedListener();
00151
00153 ~SharedListener();
00154
00160 bool registerHandler(MessageHandler *handler,string id,Time addedDelay = Time(0,0));
00161
00162 };
00163
00164
00165
00166
00167
00168 };
00169
00170
00171 #endif
00172
00173
00174
00175
00176
00177