00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "Aria.h"
00027 #include "ArExport.h"
00028 #include "ArCentralForwarder.h"
00029
00030 AREXPORT ArCentralForwarder::ArCentralForwarder(
00031 ArServerBase *mainServer, ArSocket *socket,
00032 const char *robotName, int port,
00033 std::set<int> *usedPorts) :
00034 myReceiveDataFunctor(this, &ArCentralForwarder::receiveData),
00035 myRequestChangedFunctor(this, &ArCentralForwarder::requestChanged),
00036 myRequestOnceFunctor(this, &ArCentralForwarder::requestOnce),
00037 myServerClientRemovedCB(this, &ArCentralForwarder::serverClientRemoved),
00038 myNetCentralHeartbeatCB(this, &ArCentralForwarder::netCentralHeartbeat)
00039 {
00040 myMainServer = mainServer;
00041 mySocket = socket;
00042 myRobotName = robotName;
00043 myStartingPort = port;
00044 myUsedPorts = usedPorts;
00045
00046 myServer = NULL;
00047 myClient = NULL;
00048 myPort = 0;
00049 myState = STATE_STARTING;
00050
00051 }
00052
00053 AREXPORT ArCentralForwarder::~ArCentralForwarder()
00054 {
00055 if (myServer != NULL)
00056 {
00057 myServer->close();
00058 delete myServer;
00059 }
00060
00061 if (myClient != NULL)
00062 {
00063 if (myClient->isConnected())
00064 myClient->disconnect();
00065 delete myClient;
00066 }
00067
00068 if (myRequestOnces.begin() != myRequestOnces.end())
00069 ArUtil::deleteSetPairs(myRequestOnces.begin(), myRequestOnces.end());
00070 myRequestOnces.clear();
00071 if (myLastRequest.begin() != myLastRequest.end())
00072 ArUtil::deleteSetPairs(myLastRequest.begin(), myLastRequest.end());
00073 myLastRequest.clear();
00074 if (myLastBroadcast.begin() != myLastBroadcast.end())
00075 ArUtil::deleteSetPairs(myLastBroadcast.begin(), myLastBroadcast.end());
00076 myLastBroadcast.clear();
00077 }
00078
00079 AREXPORT bool ArCentralForwarder::callOnce(double heartbeatTimeout)
00080 {
00081 if (myState == STATE_CONNECTED)
00082 {
00083 return connectedCallOnce(heartbeatTimeout);
00084 }
00085 else if (myState == STATE_CONNECTING)
00086 {
00087 return connectingCallOnce(heartbeatTimeout);
00088 }
00089 else if (myState == STATE_STARTING)
00090 {
00091 return startingCallOnce(heartbeatTimeout);
00092 }
00093 else
00094 {
00095 ArLog::log(ArLog::Normal, "%s in bad state, disconnecting",
00096 myRobotName.c_str());
00097 return false;
00098 }
00099
00100 }
00101
00102 AREXPORT bool ArCentralForwarder::startingCallOnce(double heartbeatTimeout)
00103 {
00104 myClient = new ArClientBase;
00105
00106 if (!myClient->internalBlockingConnect("", 0, true, "", "",
00107 mySocket))
00108 {
00109 ArLog::log(ArLog::Normal,
00110 "Could not connect to switching client %s from %s",
00111 myRobotName.c_str(), mySocket->getIPString());
00112 return false;
00113 }
00114 myState = STATE_CONNECTING;
00115 myLastHeartbeat.setToNow();
00116 return callOnce(heartbeatTimeout);
00117 }
00118
00119 AREXPORT bool ArCentralForwarder::connectingCallOnce(double heartbeatTimeout)
00120 {
00121
00122
00123 if (heartbeatTimeout >= -.00000001 &&
00124 myLastHeartbeat.secSince() / 60.0 > heartbeatTimeout)
00125 {
00126 ArLog::log(ArLog::Normal,
00127 "Haven't connected to %s in %g minutes, dropping connection",
00128 myRobotName.c_str(), heartbeatTimeout);
00129 return false;
00130 }
00131
00132 if (!myClient->getReceivedDataList() ||
00133 !myClient->getReceivedArgRetList() ||
00134 !myClient->getReceivedGroupAndFlagsList())
00135 {
00136 myClient->loopOnce();
00137 return true;
00138 }
00139
00140 ArLog::log(ArLog::Normal, "Connected to switching client %s from %s",
00141 myRobotName.c_str(), mySocket->getIPString());
00142
00143 myServer = new ArServerBase(false);
00144
00145 ArTime startedOpening;
00146 startedOpening.setToNow();
00147 int port;
00148 bool foundPort;
00149
00150 for (port = myStartingPort, foundPort = false;
00151 !foundPort && port < 65536;
00152 port++)
00153 {
00154
00155 if (myUsedPorts->find(port) != myUsedPorts->end())
00156 continue;
00157
00158 if (myServer->open(port))
00159 {
00160 foundPort = true;
00161 myPort = port;
00162 }
00163 }
00164
00165 if (!foundPort)
00166 {
00167 ArLog::log(ArLog::Normal, "Could not find port for %s",
00168 myRobotName.c_str());
00169 }
00170 myServer->setUserInfo(myMainServer->getUserInfo());
00171
00172 std::map<unsigned int, ArClientData *>::const_iterator dIt;
00173 ArClientData *clientData;
00174
00175 myServer->addClientRemovedCallback(&myServerClientRemovedCB);
00176
00177 myClient->addHandler("centralHeartbeat", &myNetCentralHeartbeatCB);
00178 myClient->request("centralHeartbeat", 1000);
00179
00180 myLastHeartbeat.setToNow();
00181
00182 for (dIt = myClient->getDataMap()->begin();
00183 dIt != myClient->getDataMap()->end();
00184 dIt++)
00185 {
00186 clientData = (*dIt).second;
00187
00188 if (myMainServer->dataHasFlag(clientData->getName(),
00189 "MAIN_SERVER_ONLY"))
00190 {
00191 ArLog::log(ArLog::Normal,
00192 "Not forwarding %s since that is MAIN_SERVER_ONLY",
00193 clientData->getName());
00194 continue;
00195 }
00196 else if (clientData->hasDataFlag("RETURN_NONE"))
00197 {
00198 myReturnTypes[clientData->getCommand()] = RETURN_NONE;
00199 }
00200 else if (clientData->hasDataFlag("RETURN_SINGLE"))
00201 {
00202 myReturnTypes[clientData->getCommand()] = RETURN_SINGLE;
00203 myRequestOnces[clientData->getCommand()] =
00204 new std::list<ArServerClient *>;
00205 }
00206 else if (clientData->hasDataFlag("RETURN_VIDEO"))
00207 {
00208 ArLog::log(ArLog::Normal,
00209 "Forwarding %s that is RETURN_VIDEO",
00210 clientData->getName());
00211 myReturnTypes[clientData->getCommand()] = RETURN_VIDEO;
00212 myRequestOnces[clientData->getCommand()] =
00213 new std::list<ArServerClient *>;
00214 }
00215 else if (clientData->hasDataFlag("RETURN_UNTIL_EMPTY"))
00216 {
00217 myReturnTypes[clientData->getCommand()] = RETURN_UNTIL_EMPTY;
00218 myRequestOnces[clientData->getCommand()] =
00219 new std::list<ArServerClient *>;
00220
00221 }
00222 else if (clientData->hasDataFlag("RETURN_COMPLEX"))
00223 {
00224 ArLog::log(ArLog::Normal,
00225 "Not forwarding %s since it is a complex return",
00226 clientData->getName());
00227 continue;
00228 }
00229 else
00230 {
00231 ArLog::log(ArLog::Normal,
00232 "Not forwarding %s since it is an unknown return (data flags %s)",
00233 clientData->getName(), clientData->getDataFlagsString());
00234 continue;
00235 }
00236
00237 myLastRequest[clientData->getCommand()] = new ArTime;
00238 myLastBroadcast[clientData->getCommand()] = new ArTime;
00239
00240 myServer->addDataAdvanced(
00241 clientData->getName(), clientData->getDescription(),
00242 NULL, clientData->getArgumentDescription(),
00243 clientData->getReturnDescription(), clientData->getCommandGroup(),
00244 clientData->getDataFlagsString(), clientData->getCommand(),
00245 &myRequestChangedFunctor, &myRequestOnceFunctor);
00246
00247 myClient->addHandler(clientData->getName(), &myReceiveDataFunctor);
00248 }
00249 myState = STATE_CONNECTED;
00250 return callOnce(heartbeatTimeout);
00251 }
00252
00253 AREXPORT bool ArCentralForwarder::connectedCallOnce(double heartbeatTimeout)
00254 {
00255 if (!myClient->isConnected())
00256 {
00257 ArLog::log(ArLog::Normal, "Lost connection to server %s",
00258 myRobotName.c_str());
00259 return false;
00260 }
00261 myClient->loopOnce();
00262 myServer->loopOnce();
00263
00264
00265
00266 if (heartbeatTimeout >= -.00000001 &&
00267 myLastHeartbeat.secSince() / 60.0 > heartbeatTimeout)
00268 {
00269 ArLog::log(ArLog::Normal, "Haven't heard from %s in %g minutes, dropping connection", myRobotName.c_str(), heartbeatTimeout);
00270 return false;
00271 }
00272
00273 return true;
00274 }
00275
00276 void ArCentralForwarder::serverClientRemoved(ArServerClient *client)
00277 {
00278 std::map<unsigned int, std::list<ArServerClient *> *>::iterator rIt;
00279 std::list<ArServerClient *> *requestList = NULL;
00280 std::list<ArServerClient *>::iterator scIt;
00281
00282 printf("Client disconnected\n");
00283 for (rIt = myRequestOnces.begin(); rIt != myRequestOnces.end(); rIt++)
00284 {
00285 requestList = (*rIt).second;
00286
00287
00288 bool foundOne = true;
00289 while (foundOne)
00290 {
00291 foundOne = false;
00292
00293 for (scIt = requestList->begin();
00294 !foundOne && scIt != requestList->end();
00295 scIt++)
00296 {
00297 if ((*scIt) == client)
00298 {
00299 foundOne = true;
00300 printf("Got...\n");
00301 requestList->insert(scIt, NULL);
00302 for (scIt = requestList->begin();
00303 scIt != requestList->end();
00304 scIt++)
00305 {
00306 if ((*scIt) == client)
00307 {
00308 requestList->erase(scIt);
00309 printf("Removed request for client %p\n", client);
00310 break;
00311 }
00312 }
00313 }
00314 }
00315 }
00316 }
00317 }
00318
00319
00320 void ArCentralForwarder::receiveData(ArNetPacket *packet)
00321 {
00322 ReturnType returnType;
00323 std::list<ArServerClient *>::iterator it;
00324 ArServerClient *client;
00325
00326
00327
00328 packet->setAddedFooter(true);
00329
00330
00331
00332
00333
00334
00335 returnType = myReturnTypes[packet->getCommand()];
00336
00337
00338
00339
00340
00341
00342 if ((returnType == RETURN_SINGLE || returnType == RETURN_UNTIL_EMPTY) &&
00343 (it = myRequestOnces[packet->getCommand()]->begin()) !=
00344 myRequestOnces[packet->getCommand()]->end())
00345 {
00346
00347
00348
00349 client = (*it);
00350 if (client != NULL)
00351 {
00352 if (packet->getPacketSource() == ArNetPacket::TCP)
00353 client->sendPacketTcp(packet);
00354 else if (packet->getPacketSource() == ArNetPacket::UDP)
00355 client->sendPacketUdp(packet);
00356 else
00357 {
00358 client->sendPacketTcp(packet);
00359 ArLog::log(ArLog::Normal,
00360 "ArCentralForward::receiveData: Don't know what type of packet %s is (%d)",
00361 myClient->getName(packet->getCommand(), true),
00362 packet->getPacketSource());
00363 }
00364 }
00365 if ((returnType == RETURN_UNTIL_EMPTY && packet->getDataLength() == 0) ||
00366 returnType == RETURN_SINGLE)
00367 {
00368
00369
00370 myRequestOnces[packet->getCommand()]->pop_front();
00371 }
00372 }
00373 else if (returnType == RETURN_VIDEO)
00374 {
00375
00376
00377
00378
00379
00380 while ((it = myRequestOnces[packet->getCommand()]->begin()) !=
00381 myRequestOnces[packet->getCommand()]->end())
00382 {
00383
00384 client = (*it);
00385 if (client != NULL && client->getFrequency(packet->getCommand()) == -2)
00386 {
00387 if (packet->getPacketSource() == ArNetPacket::TCP)
00388 client->sendPacketTcp(packet);
00389 else if (packet->getPacketSource() == ArNetPacket::UDP)
00390 client->sendPacketUdp(packet);
00391 else
00392 {
00393 client->sendPacketTcp(packet);
00394 ArLog::log(ArLog::Normal,
00395 "ArCentralForward::receiveData: Don't know what type of packet %s is (%d)",
00396 myClient->getName(packet->getCommand(), true),
00397 packet->getPacketSource());
00398 }
00399 }
00400 myRequestOnces[packet->getCommand()]->pop_front();
00401 }
00402
00403 myLastBroadcast[packet->getCommand()]->setToNow();
00404 if (packet->getPacketSource() == ArNetPacket::TCP)
00405 {
00406 myServer->broadcastPacketTcpByCommand(packet, packet->getCommand());
00407 }
00408 else if (packet->getPacketSource() == ArNetPacket::UDP)
00409 {
00410 myServer->broadcastPacketUdpByCommand(packet, packet->getCommand());
00411 }
00412 else
00413 {
00414 myServer->broadcastPacketTcpByCommand(packet, packet->getCommand());
00415 ArLog::log(ArLog::Normal,
00416 "ArCentralForward::receiveData: Don't know what type of packet %s is (%d)",
00417 myClient->getName(packet->getCommand(), true),
00418 packet->getPacketSource());
00419 }
00420
00421 }
00422 else
00423 {
00424 myLastBroadcast[packet->getCommand()]->setToNow();
00425 if (packet->getPacketSource() == ArNetPacket::TCP)
00426 {
00427 myServer->broadcastPacketTcpByCommand(packet, packet->getCommand());
00428 }
00429 else if (packet->getPacketSource() == ArNetPacket::UDP)
00430 {
00431 myServer->broadcastPacketUdpByCommand(packet, packet->getCommand());
00432 }
00433 else
00434 {
00435 myServer->broadcastPacketTcpByCommand(packet, packet->getCommand());
00436 ArLog::log(ArLog::Normal,
00437 "ArCentralForward::receiveData: Don't know what type of packet %s is (%d)",
00438 myClient->getName(packet->getCommand(), true),
00439 packet->getPacketSource());
00440 }
00441 }
00442
00443 }
00444
00445 void ArCentralForwarder::requestChanged(long interval,
00446 unsigned int command)
00447 {
00448
00449
00450
00451
00452
00453 if (interval == -2)
00454 {
00455 ArLog::log(ArLog::Verbose, "Stopping request for %s",
00456 myClient->getName(command, true));
00457 myClient->requestStopByCommand(command);
00458 myLastRequest[command]->setToNow();
00459 }
00460 else
00461 {
00462 ReturnType returnType;
00463
00464 returnType = myReturnTypes[command];
00465 if (returnType == RETURN_VIDEO && interval != -1)
00466 {
00467 ArLog::log(ArLog::Verbose, "Ignoring a RETURN_VIDEO attempted request of %s at %d interval since RETURN_VIDEOs cannot request at an interval",
00468 myClient->getName(command, true), interval);
00469 return;
00470 }
00471
00472 ArLog::log(ArLog::Verbose, "Requesting %s at interval of %ld",
00473 myClient->getName(command, true), interval);
00474 myClient->requestByCommand(command, interval);
00475 myLastRequest[command]->setToNow();
00476
00477
00478 myClient->requestOnceByCommand(command);
00479 }
00480 }
00481
00482 void ArCentralForwarder::requestOnce(ArServerClient *client, ArNetPacket *packet)
00483 {
00484 ReturnType returnType;
00485
00486 returnType = myReturnTypes[packet->getCommand()];
00487
00488
00489 packet->setAddedFooter(true);
00490
00491
00492
00493
00494
00495
00496
00497
00498 if (returnType == RETURN_VIDEO &&
00499 (myLastBroadcast[packet->getCommand()]->mSecSince() < 25 ||
00500 myLastRequest[packet->getCommand()]->mSecSince() < 25))
00501 {
00502 ArLog::log(ArLog::Normal, "Ignoring a RETURN_VIDEO of %s since request or broadcast was recent", myClient->getName(packet->getCommand(), true));
00503 return;
00504 }
00505
00506 if (returnType == RETURN_SINGLE || returnType == RETURN_UNTIL_EMPTY ||
00507 returnType == RETURN_VIDEO)
00508 {
00509
00510
00511 myRequestOnces[packet->getCommand()]->push_back(client);
00512 }
00513
00514 ArLog::log(ArLog::Verbose, "Requesting %s once",
00515 myClient->getName(packet->getCommand()));
00516 myClient->requestOnceByCommand(packet->getCommand(), packet);
00517 myLastRequest[packet->getCommand()]->setToNow();
00518 }
00519
00520
00521 AREXPORT void ArCentralForwarder::netCentralHeartbeat(ArNetPacket *packet)
00522 {
00523 myLastHeartbeat.setToNow();
00524 }