Main Page | Class Hierarchy | Class List | Directories | File List | Class Members | File Members | Related Pages | Examples

ArCentralForwarder.cpp

Go to the documentation of this file.
00001 /*
00002 MobileRobots Advanced Robotics Interface for Applications (ARIA)
00003 Copyright (C) 2004, 2005 ActivMedia Robotics LLC
00004 Copyright (C) 2006, 2007 MobileRobots Inc.
00005 
00006      This program is free software; you can redistribute it and/or modify
00007      it under the terms of the GNU General Public License as published by
00008      the Free Software Foundation; either version 2 of the License, or
00009      (at your option) any later version.
00010 
00011      This program is distributed in the hope that it will be useful,
00012      but WITHOUT ANY WARRANTY; without even the implied warranty of
00013      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014      GNU General Public License for more details.
00015 
00016      You should have received a copy of the GNU General Public License
00017      along with this program; if not, write to the Free Software
00018      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00019 
00020 If you wish to redistribute ARIA under different terms, contact 
00021 MobileRobots for information about a commercial version of ARIA at 
00022 robots@mobilerobots.com or 
00023 MobileRobots Inc, 19 Columbia Drive, Amherst, NH 03031; 800-639-9481
00024 */
00025 
00026 #include "Aria.h"
00027 #include "ArExport.h"
00028 #include "ArCentralForwarder.h"
00029 
00030 AREXPORT ArCentralForwarder::ArCentralForwarder(
00031         ArServerBase *mainServer, ArSocket *socket,
00032         const char *robotName, int port, 
00033         std::set<int> *usedPorts) : 
00034   myReceiveDataFunctor(this, &ArCentralForwarder::receiveData),
00035   myRequestChangedFunctor(this, &ArCentralForwarder::requestChanged),
00036   myRequestOnceFunctor(this, &ArCentralForwarder::requestOnce),
00037   myServerClientRemovedCB(this, &ArCentralForwarder::serverClientRemoved),
00038   myNetCentralHeartbeatCB(this, &ArCentralForwarder::netCentralHeartbeat)
00039 {
00040   myMainServer = mainServer;
00041   mySocket = socket;
00042   myRobotName = robotName;
00043   myStartingPort = port;
00044   myUsedPorts = usedPorts;
00045 
00046   myServer = NULL;
00047   myClient = NULL;
00048   myPort = 0;
00049   myState = STATE_STARTING;
00050 
00051 }
00052 
00053 AREXPORT ArCentralForwarder::~ArCentralForwarder()
00054 {
00055   if (myServer != NULL)
00056   {
00057     myServer->close();
00058     delete myServer;
00059   }
00060   
00061   if (myClient != NULL)
00062   {
00063     if (myClient->isConnected())
00064       myClient->disconnect();
00065     delete myClient;
00066   }
00067 
00068   if (myRequestOnces.begin() != myRequestOnces.end())
00069     ArUtil::deleteSetPairs(myRequestOnces.begin(), myRequestOnces.end());
00070   myRequestOnces.clear();
00071   if (myLastRequest.begin() != myLastRequest.end())
00072     ArUtil::deleteSetPairs(myLastRequest.begin(), myLastRequest.end());
00073   myLastRequest.clear();
00074   if (myLastBroadcast.begin() != myLastBroadcast.end())
00075     ArUtil::deleteSetPairs(myLastBroadcast.begin(), myLastBroadcast.end());
00076   myLastBroadcast.clear();
00077 }
00078 
00079 AREXPORT bool ArCentralForwarder::callOnce(double heartbeatTimeout)
00080 {
00081   if (myState == STATE_CONNECTED)
00082   {
00083     return connectedCallOnce(heartbeatTimeout);
00084   }
00085   else if (myState == STATE_CONNECTING)
00086   {
00087     return connectingCallOnce(heartbeatTimeout);
00088   }
00089   else if (myState == STATE_STARTING)
00090   {
00091     return startingCallOnce(heartbeatTimeout);
00092   }
00093   else
00094   {
00095     ArLog::log(ArLog::Normal, "%s in bad state, disconnecting", 
00096                myRobotName.c_str());
00097     return false;
00098   }
00099 
00100 }
00101 
00102 AREXPORT bool ArCentralForwarder::startingCallOnce(double heartbeatTimeout)
00103 {
00104   myClient = new ArClientBase;
00105   
00106   if (!myClient->internalBlockingConnect("", 0, true, "", "", 
00107                                          mySocket))
00108   {
00109     ArLog::log(ArLog::Normal, 
00110                "Could not connect to switching client %s from %s",
00111                myRobotName.c_str(), mySocket->getIPString());
00112     return false;
00113   }
00114   myState = STATE_CONNECTING;  
00115   myLastHeartbeat.setToNow();
00116   return callOnce(heartbeatTimeout);
00117 }
00118 
00119 AREXPORT bool ArCentralForwarder::connectingCallOnce(double heartbeatTimeout)
00120 {
00121   // if we have a heartbeat timeout make sure we've heard the
00122   // heartbeat within that range
00123   if (heartbeatTimeout >= -.00000001 && 
00124       myLastHeartbeat.secSince() / 60.0 > heartbeatTimeout)
00125   {
00126     ArLog::log(ArLog::Normal, 
00127                "Haven't connected to %s in %g minutes, dropping connection", 
00128                myRobotName.c_str(), heartbeatTimeout);
00129     return false;
00130   }
00131 
00132   if (!myClient->getReceivedDataList() || 
00133       !myClient->getReceivedArgRetList() || 
00134       !myClient->getReceivedGroupAndFlagsList())
00135   {
00136     myClient->loopOnce();
00137     return true;
00138   }
00139 
00140   ArLog::log(ArLog::Normal, "Connected to switching client %s from %s",
00141              myRobotName.c_str(), mySocket->getIPString());
00142   //clientBase->logDataList();
00143   myServer = new ArServerBase(false);
00144 
00145   ArTime startedOpening;
00146   startedOpening.setToNow();
00147   int port;
00148   bool foundPort;
00149   // walk through our ports starting at our starting port
00150   for (port = myStartingPort, foundPort = false; 
00151        !foundPort && port < 65536; 
00152        port++)
00153   {
00154     // if we've already used the port then skip it
00155     if (myUsedPorts->find(port) != myUsedPorts->end())
00156       continue;
00157     // try to open it
00158     if (myServer->open(port))
00159     {
00160       foundPort = true;
00161       myPort = port;
00162     }
00163   }
00164   
00165   if (!foundPort)
00166   {
00167     ArLog::log(ArLog::Normal, "Could not find port for %s", 
00168                myRobotName.c_str());
00169   }
00170   myServer->setUserInfo(myMainServer->getUserInfo());
00171 
00172   std::map<unsigned int, ArClientData *>::const_iterator dIt;
00173   ArClientData *clientData;
00174 
00175   myServer->addClientRemovedCallback(&myServerClientRemovedCB);
00176 
00177   myClient->addHandler("centralHeartbeat", &myNetCentralHeartbeatCB);
00178   myClient->request("centralHeartbeat", 1000);
00179 
00180   myLastHeartbeat.setToNow();
00181 
00182   for (dIt = myClient->getDataMap()->begin(); 
00183        dIt != myClient->getDataMap()->end(); 
00184        dIt++)
00185   {
00186     clientData = (*dIt).second;
00187     
00188     if (myMainServer->dataHasFlag(clientData->getName(), 
00189                                 "MAIN_SERVER_ONLY"))
00190     {
00191       ArLog::log(ArLog::Normal, 
00192                  "Not forwarding %s since that is MAIN_SERVER_ONLY",
00193                  clientData->getName());
00194       continue;
00195     }
00196     else if (clientData->hasDataFlag("RETURN_NONE"))
00197     {
00198       myReturnTypes[clientData->getCommand()] = RETURN_NONE;
00199     }
00200     else if (clientData->hasDataFlag("RETURN_SINGLE"))
00201     {
00202       myReturnTypes[clientData->getCommand()] = RETURN_SINGLE;
00203       myRequestOnces[clientData->getCommand()] = 
00204                     new std::list<ArServerClient *>;
00205     }
00206     else if (clientData->hasDataFlag("RETURN_VIDEO"))
00207     {
00208       ArLog::log(ArLog::Normal, 
00209                  "Forwarding %s that is RETURN_VIDEO",
00210                  clientData->getName());
00211       myReturnTypes[clientData->getCommand()] = RETURN_VIDEO;
00212       myRequestOnces[clientData->getCommand()] = 
00213                     new std::list<ArServerClient *>;
00214     }
00215     else if (clientData->hasDataFlag("RETURN_UNTIL_EMPTY"))
00216     {
00217       myReturnTypes[clientData->getCommand()] = RETURN_UNTIL_EMPTY;
00218       myRequestOnces[clientData->getCommand()] = 
00219                     new std::list<ArServerClient *>;
00220 
00221     }
00222     else if (clientData->hasDataFlag("RETURN_COMPLEX"))
00223     {
00224       ArLog::log(ArLog::Normal, 
00225                  "Not forwarding %s since it is a complex return",
00226                  clientData->getName());
00227       continue;
00228     }
00229     else
00230     {
00231       ArLog::log(ArLog::Normal, 
00232                  "Not forwarding %s since it is an unknown return (data flags %s)",
00233                  clientData->getName(), clientData->getDataFlagsString());
00234       continue;
00235     }
00236 
00237     myLastRequest[clientData->getCommand()] = new ArTime;
00238     myLastBroadcast[clientData->getCommand()] = new ArTime;
00239 
00240     myServer->addDataAdvanced(
00241             clientData->getName(), clientData->getDescription(),
00242             NULL, clientData->getArgumentDescription(),
00243             clientData->getReturnDescription(), clientData->getCommandGroup(),
00244             clientData->getDataFlagsString(), clientData->getCommand(), 
00245             &myRequestChangedFunctor, &myRequestOnceFunctor);
00246 
00247     myClient->addHandler(clientData->getName(), &myReceiveDataFunctor);
00248   }
00249   myState = STATE_CONNECTED;
00250   return callOnce(heartbeatTimeout);
00251 }
00252 
00253 AREXPORT bool ArCentralForwarder::connectedCallOnce(double heartbeatTimeout)
00254 {
00255   if (!myClient->isConnected())
00256   {
00257     ArLog::log(ArLog::Normal, "Lost connection to server %s", 
00258                myRobotName.c_str());
00259     return false;
00260   }
00261   myClient->loopOnce();
00262   myServer->loopOnce();
00263   
00264   // if we have a heartbeat timeout make sure we've heard the
00265   // heartbeat within that range
00266   if (heartbeatTimeout >= -.00000001 && 
00267       myLastHeartbeat.secSince() / 60.0 > heartbeatTimeout)
00268   {
00269     ArLog::log(ArLog::Normal, "Haven't heard from %s in %g minutes, dropping connection", myRobotName.c_str(), heartbeatTimeout);
00270     return false;
00271   }
00272 
00273   return true;
00274 }
00275 
00276 void ArCentralForwarder::serverClientRemoved(ArServerClient *client)
00277 {
00278   std::map<unsigned int, std::list<ArServerClient *> *>::iterator rIt;
00279   std::list<ArServerClient *> *requestList = NULL;
00280   std::list<ArServerClient *>::iterator scIt;
00281 
00282   printf("Client disconnected\n");
00283   for (rIt = myRequestOnces.begin(); rIt != myRequestOnces.end(); rIt++)
00284   {
00285     requestList = (*rIt).second;
00286     // while we have entries for this client we loop and replace them
00287     // with NULLs
00288     bool foundOne = true;
00289     while (foundOne)
00290     {
00291       foundOne = false;
00292       // see if we found one
00293       for (scIt = requestList->begin(); 
00294            !foundOne && scIt != requestList->end(); 
00295            scIt++)
00296       {
00297         if ((*scIt) == client)
00298         {
00299           foundOne = true;
00300           printf("Got...\n");
00301           requestList->insert(scIt, NULL);        
00302           for (scIt = requestList->begin(); 
00303                scIt != requestList->end(); 
00304                scIt++)
00305           {
00306             if ((*scIt) == client)
00307             {
00308               requestList->erase(scIt);
00309               printf("Removed request for client %p\n", client);
00310               break;
00311             }
00312           }
00313         }
00314       }
00315     }
00316   }
00317 }
00318 
00319 
00320 void ArCentralForwarder::receiveData(ArNetPacket *packet)
00321 {
00322   ReturnType returnType;
00323   std::list<ArServerClient *>::iterator it;
00324   ArServerClient *client;
00325 
00326   // chop off the old footer
00327   //packet->setLength(packet->getLength() - ArNetPacket::FOOTER_LENGTH);
00328   packet->setAddedFooter(true);
00329 
00330   /*
00331   if (strcmp(myClient->getName(packet->getCommand(), true), 
00332              "getPictureCam1") == 0)
00333     printf("Got getPictureCam1...\n");
00334   */
00335   returnType = myReturnTypes[packet->getCommand()];
00336   //printf("Got a packet in for %s %d\n", myClient->getName(packet->getCommand(), true), packet->getCommand());
00337 
00338   // this part is seeing if it came from a request_once, if so we
00339   // don't service anything else (so we take care of those things that
00340   // only happen once better then the ones that are mixing... but that
00341   // should be okay)
00342   if ((returnType == RETURN_SINGLE || returnType == RETURN_UNTIL_EMPTY) &&
00343       (it = myRequestOnces[packet->getCommand()]->begin()) != 
00344       myRequestOnces[packet->getCommand()]->end())
00345   {
00346     //if (returnType == RETURN_UNTIL_EMPTY)
00347     //printf("Got a packet for %s with length %d %d\n", myClient->getName(packet->getCommand(), true), packet->getDataLength(), packet->getLength());
00348 
00349     client = (*it);
00350     if (client != NULL)
00351     {
00352       if (packet->getPacketSource() == ArNetPacket::TCP)
00353         client->sendPacketTcp(packet);
00354       else if (packet->getPacketSource() == ArNetPacket::UDP)
00355         client->sendPacketUdp(packet);
00356       else
00357       {
00358         client->sendPacketTcp(packet);
00359         ArLog::log(ArLog::Normal, 
00360                    "ArCentralForward::receiveData: Don't know what type of packet %s is (%d)", 
00361                    myClient->getName(packet->getCommand(), true), 
00362                    packet->getPacketSource());
00363       }
00364     }
00365     if ((returnType == RETURN_UNTIL_EMPTY && packet->getDataLength() == 0) || 
00366         returnType == RETURN_SINGLE)
00367     {
00368       //if (returnType == RETURN_UNTIL_EMPTY)
00369       //printf("Got final packet for for %s\n", myClient->getName(packet->getCommand(), true));
00370       myRequestOnces[packet->getCommand()]->pop_front();
00371     }
00372   }
00373   else if (returnType == RETURN_VIDEO)
00374   {
00375     // what we do here is send it to the ones that have requested it
00376     // but aren't listening for the broadcast... then we broadcast it
00377     // to everyone whose listening... this should ensure that everyone
00378     // just gets the packet once as often as we see it
00379 
00380     while ((it = myRequestOnces[packet->getCommand()]->begin()) != 
00381            myRequestOnces[packet->getCommand()]->end())
00382     {
00383       //printf("Sent a single return_single_and_broadcast for %s\n", myClient->getName(packet->getCommand(), true));
00384       client = (*it);
00385       if (client != NULL && client->getFrequency(packet->getCommand()) == -2)
00386       {
00387         if (packet->getPacketSource() == ArNetPacket::TCP)
00388           client->sendPacketTcp(packet);
00389         else if (packet->getPacketSource() == ArNetPacket::UDP)
00390           client->sendPacketUdp(packet);
00391         else
00392         {
00393           client->sendPacketTcp(packet);
00394           ArLog::log(ArLog::Normal, 
00395                      "ArCentralForward::receiveData: Don't know what type of packet %s is (%d)", 
00396                      myClient->getName(packet->getCommand(), true), 
00397                      packet->getPacketSource());
00398         }
00399       }
00400       myRequestOnces[packet->getCommand()]->pop_front();
00401     }
00402     //printf("Broadcast return_single_and_broadcast for %s\n", myClient->getName(packet->getCommand(), true));
00403     myLastBroadcast[packet->getCommand()]->setToNow();
00404     if (packet->getPacketSource() == ArNetPacket::TCP)
00405     {
00406       myServer->broadcastPacketTcpByCommand(packet, packet->getCommand());
00407     }
00408     else if (packet->getPacketSource() == ArNetPacket::UDP)
00409     {
00410       myServer->broadcastPacketUdpByCommand(packet, packet->getCommand());
00411     }
00412     else
00413     {
00414       myServer->broadcastPacketTcpByCommand(packet, packet->getCommand());
00415       ArLog::log(ArLog::Normal, 
00416                  "ArCentralForward::receiveData: Don't know what type of packet %s is (%d)", 
00417                  myClient->getName(packet->getCommand(), true), 
00418                  packet->getPacketSource());
00419     }
00420 
00421   }
00422   else
00423   {
00424     myLastBroadcast[packet->getCommand()]->setToNow();
00425     if (packet->getPacketSource() == ArNetPacket::TCP)
00426     {
00427       myServer->broadcastPacketTcpByCommand(packet, packet->getCommand());
00428     }
00429     else if (packet->getPacketSource() == ArNetPacket::UDP)
00430     {
00431       myServer->broadcastPacketUdpByCommand(packet, packet->getCommand());
00432     }
00433     else
00434     {
00435       myServer->broadcastPacketTcpByCommand(packet, packet->getCommand());
00436       ArLog::log(ArLog::Normal, 
00437                  "ArCentralForward::receiveData: Don't know what type of packet %s is (%d)", 
00438                  myClient->getName(packet->getCommand(), true), 
00439                  packet->getPacketSource());
00440     }
00441   }
00442 
00443 }
00444 
00445 void ArCentralForwarder::requestChanged(long interval, 
00446                                              unsigned int command)
00447 {
00448   /*
00449   if (strcmp(myClient->getName(command, true), 
00450              "getPictureCam1") == 0)
00451     printf("Changed getPictureCam1 to %d...\n", interval);
00452   */
00453   if (interval == -2)
00454   {
00455     ArLog::log(ArLog::Verbose, "Stopping request for %s", 
00456                myClient->getName(command, true));
00457     myClient->requestStopByCommand(command);
00458     myLastRequest[command]->setToNow();
00459   }
00460   else 
00461   {
00462     ReturnType returnType;
00463     
00464     returnType = myReturnTypes[command];    
00465     if (returnType == RETURN_VIDEO && interval != -1)
00466     {
00467       ArLog::log(ArLog::Verbose, "Ignoring a RETURN_VIDEO attempted request of %s at %d interval since RETURN_VIDEOs cannot request at an interval", 
00468                  myClient->getName(command, true), interval);
00469       return;
00470     }
00471     
00472     ArLog::log(ArLog::Verbose, "Requesting %s at interval of %ld", 
00473                myClient->getName(command, true), interval);
00474     myClient->requestByCommand(command, interval);
00475     myLastRequest[command]->setToNow();
00476     // if the interval is -1 then also requestOnce it so that anyone
00477     // connecting after the first connection can actually get data too
00478     myClient->requestOnceByCommand(command);
00479   }
00480 }
00481 
00482 void ArCentralForwarder::requestOnce(ArServerClient *client, ArNetPacket *packet)
00483 {
00484   ReturnType returnType;
00485   
00486   returnType = myReturnTypes[packet->getCommand()];
00487 
00488   // chop off the footer
00489   packet->setAddedFooter(true);
00490   /*
00491   if (strcmp(myClient->getName(packet->getCommand(), true), 
00492              "getPictureCam1") == 0)
00493     printf("Request once for getPictureCam1...\n");
00494   */
00495   // if its video and the last broadcast or request was very recently
00496   // then ignore it so we don't wind up using up our wireless
00497   // bandwidth
00498   if (returnType == RETURN_VIDEO && 
00499       (myLastBroadcast[packet->getCommand()]->mSecSince() < 25 ||
00500        myLastRequest[packet->getCommand()]->mSecSince() < 25))
00501   {
00502     ArLog::log(ArLog::Normal, "Ignoring a RETURN_VIDEO of %s since request or broadcast was recent", myClient->getName(packet->getCommand(), true));
00503     return;
00504   }
00505   // if its a type where we keep track then put it into the list
00506   if (returnType == RETURN_SINGLE || returnType == RETURN_UNTIL_EMPTY || 
00507       returnType == RETURN_VIDEO)
00508   {
00509     //if (returnType == RETURN_UNTIL_EMPTY)
00510     //printf("Trying to request once %s\n", myClient->getName(packet->getCommand(), true));
00511     myRequestOnces[packet->getCommand()]->push_back(client);
00512   }
00513   
00514   ArLog::log(ArLog::Verbose, "Requesting %s once", 
00515              myClient->getName(packet->getCommand()));
00516   myClient->requestOnceByCommand(packet->getCommand(), packet);
00517   myLastRequest[packet->getCommand()]->setToNow();
00518 }
00519 
00520 
00521 AREXPORT void ArCentralForwarder::netCentralHeartbeat(ArNetPacket *packet)
00522 {
00523   myLastHeartbeat.setToNow();
00524 }

Generated on Tue Feb 20 10:51:49 2007 for ArNetworking by  doxygen 1.4.0