00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "Aria.h"
00027 #include "ArExport.h"
00028 #include "ArCentralManager.h"
00029
00030 ArCentralManager::ArCentralManager(ArServerBase *robotServer,
00031 ArServerBase *clientServer) :
00032 myNetSwitchCB(this, &ArCentralManager::netServerSwitch),
00033 myNetClientListCB(this, &ArCentralManager::netClientList),
00034 myAriaExitCB(this, &ArCentralManager::close)
00035 {
00036 myRobotServer = robotServer;
00037 myClientServer = clientServer;
00038
00039 myAriaExitCB.setName("ArCentralManager");
00040 Aria::addExitCallback(&myAriaExitCB, 25);
00041
00042 myHeartbeatTimeout = 2;
00043 Aria::getConfig()->addParam(
00044 ArConfigArg("RobotTimeoutInMins", &myHeartbeatTimeout,
00045 "The amount of time we can go without hearing a robot's heartbeat without disconnecting it. A number less than 0 means that the robots will never timeout. The time is in minutes but takes doubles (ie .5)", -1),
00046 "Central Server Settings", ArPriority::DETAILED);
00047
00048
00049 myRobotServer->addData("switch", "switches the direction of the connection, after this is requested it sends an empty packet denoting acceptance of the switch, then switches this to a client connection",
00050 &myNetSwitchCB, "string: robotName", "empty packet", "RobotInfo",
00051 "RETURN_SINGLE");
00052 myClientServer->addData("clientList", "Lists the clients that are connected",
00053 &myNetClientListCB, "none",
00054 "ubyte2: numClients; repeating for <numClients> [string: hostname (empty means this host); ubyte2: port; string: robot name; string: flags; string: robot ip address]",
00055 "RobotInfo", "RETURN_SINGLE");
00056 myClientServer->addData("clientAdded", "Broadcast when a client is added",
00057 NULL, "none",
00058 "string: hostname (empty means this host); ubyte2: port; string: robot name; string: flags; string: robot ip address",
00059 "RobotInfo", "RETURN_SINGLE");
00060 myClientServer->addData("clientRemoved", "Broadcast when a client is removed",
00061 NULL, "none",
00062 "string: hostname (empty means this host); ubyte2: port; string: robot name; string: flags; string: robot ip address",
00063 "RobotInfo", "RETURN_SINGLE");
00064
00065 myClientServer = clientServer;
00066 runAsync();
00067 }
00068
00069 ArCentralManager::~ArCentralManager()
00070 {
00071 }
00072
00073 void ArCentralManager::close(void)
00074 {
00075 std::list<ArCentralForwarder *>::iterator fIt;
00076 ArCentralForwarder *forwarder;
00077
00078 ArLog::log(ArLog::Normal, "ArCentralManager closing");
00079
00080 while ((fIt = myForwarders.begin()) != myForwarders.end())
00081 {
00082 forwarder = (*fIt);
00083 std::multimap<int, ArFunctor1<ArCentralForwarder *> *>::iterator it;
00084 for (it = myForwarderRemovedCBList.begin();
00085 it != myForwarderRemovedCBList.end();
00086 it++)
00087 (*it).second->invoke(forwarder);
00088
00089 myForwarders.pop_front();
00090 delete forwarder;
00091
00092 }
00093
00094 ArLog::log(ArLog::Normal, "ArCentralManager closed");
00095 }
00096
00097 void ArCentralManager::netServerSwitch(ArServerClient *client, ArNetPacket *packet)
00098 {
00099 char robotName[512];
00100 std::string uniqueName;
00101
00102 robotName[0] = '\0';
00103
00104
00105
00106 packet->bufToStr(robotName, sizeof(robotName));
00107
00108 ArNetPacket sendPacket;
00110 client->sendPacketTcp(&sendPacket);
00111
00112 ArSocket *clientSocket = new ArSocket;
00113 clientSocket->transfer(client->getTcpSocket());
00114
00115 client->tcpCallback();
00116 client->forceDisconnect(true);
00117
00118
00119
00120 if (robotName[0] != '\0')
00121 uniqueName = robotName;
00122 else
00123 uniqueName = client->getIPString();
00124
00125
00126
00127 myDataMutex.lock();
00128
00129 bool nameIsUnique = false;
00130 std::list<ArCentralForwarder *>::iterator fIt;
00131 ArCentralForwarder *forwarder;
00132 while (!nameIsUnique)
00133 {
00134 nameIsUnique = true;
00135 for (fIt = myForwarders.begin();
00136 fIt != myForwarders.end() && nameIsUnique;
00137 fIt++)
00138 {
00139 forwarder = (*fIt);
00140 if (strcasecmp(forwarder->getRobotName(), uniqueName.c_str()) == 0)
00141 nameIsUnique = false;
00142 }
00143 if (!nameIsUnique)
00144 uniqueName += "_";
00145 }
00146 myClientSockets.push_back(clientSocket);
00147 myClientNames.push_back(uniqueName);
00148
00149
00150
00151 if (strcmp(uniqueName.c_str(), robotName) == 0)
00152 ArLog::log(ArLog::Normal, "Got switch request from %s from %s",
00153 client->getIPString(), uniqueName.c_str());
00154
00155 if (strcmp(uniqueName.c_str(), robotName) != 0)
00156 ArLog::log(ArLog::Normal,
00157 "Got switch request from %s from %s that started as %s",
00158 client->getIPString(), uniqueName.c_str(), robotName);
00159
00160 myDataMutex.unlock();
00161 }
00162
00163 void ArCentralManager::netClientList(ArServerClient *client, ArNetPacket *packet)
00164 {
00165 ArNetPacket sendPacket;
00166 std::list<ArCentralForwarder *>::iterator fIt;
00167 ArCentralForwarder *forwarder;
00168
00169 myDataMutex.lock();
00170 sendPacket.uByte2ToBuf(myForwarders.size());
00171 for (fIt = myForwarders.begin(); fIt != myForwarders.end(); fIt++)
00172 {
00173 forwarder = (*fIt);
00174 sendPacket.strToBuf("");
00175 sendPacket.uByte2ToBuf(forwarder->getPort());
00176 sendPacket.strToBuf(forwarder->getRobotName());
00177 sendPacket.strToBuf("");
00178 sendPacket.strToBuf(
00179 forwarder->getClient()->getTcpSocket()->getIPString());
00180 }
00181 myDataMutex.unlock();
00182 client->sendPacketTcp(&sendPacket);
00183 }
00184
00186 void *ArCentralManager::runThread(void *arg)
00187 {
00188 std::list<ArSocket *>::iterator sIt;
00189 std::list<std::string>::iterator nIt;
00190 std::list<ArCentralForwarder *>::iterator fIt;
00191 ArSocket *socket;
00192 std::string robotName;
00193 ArCentralForwarder *forwarder;
00194
00195 threadStarted();
00196 setThreadName("serverSwitch");
00197 while (getRunning())
00198 {
00199 myDataMutex.lock();
00200
00201 while ((sIt = myClientSockets.begin()) != myClientSockets.end() &&
00202 (nIt = myClientNames.begin()) != myClientNames.end())
00203 {
00204 socket = (*sIt);
00205 robotName = (*nIt);
00206
00207 myClientSockets.pop_front();
00208 myClientNames.pop_front();
00209
00210 ArLog::log(ArLog::Normal,
00211 "New forwarder for socket from %s with name %s",
00212 socket->getIPString(), robotName.c_str());
00213
00214 forwarder = new ArCentralForwarder(myClientServer, socket,
00215 robotName.c_str(),
00216 myClientServer->getTcpPort() + 1,
00217 &myUsedPorts);
00218 myForwarders.push_back(forwarder);
00219 }
00220
00221 std::list<ArCentralForwarder *> removeList;
00222 for (fIt = myForwarders.begin(); fIt != myForwarders.end(); fIt++)
00223 {
00224 forwarder = (*fIt);
00225 bool connected = forwarder->isConnected();
00226 bool removed = false;
00227 if (!forwarder->callOnce(myHeartbeatTimeout))
00228 {
00229 ArLog::log(ArLog::Normal, "Will remove forwarder from %s",
00230 forwarder->getRobotName());
00231 removeList.push_back(forwarder);
00232 removed = true;
00233 }
00234 if (!connected && !removed && forwarder->isConnected())
00235 {
00236 ArLog::log(ArLog::Normal, "Adding forwarder %s",
00237 forwarder->getRobotName());
00238 myUsedPorts.insert(forwarder->getPort());
00239
00240 std::multimap<int, ArFunctor1<ArCentralForwarder *> *>::iterator it;
00241 for (it = myForwarderAddedCBList.begin();
00242 it != myForwarderAddedCBList.end();
00243 it++)
00244 {
00245 if ((*it).second->getName() == NULL ||
00246 (*it).second->getName()[0] == '\0')
00247 ArLog::log(ArLog::Normal, "Calling unnamed add functor at %d",
00248 -(*it).first);
00249 else
00250 ArLog::log(ArLog::Normal, "Calling %s add functor at %d",
00251 (*it).second->getName(), -(*it).first);
00252 (*it).second->invoke(forwarder);
00253 }
00254 ArLog::log(ArLog::Normal, "Added forwarder %s",
00255 forwarder->getRobotName());
00256 ArNetPacket sendPacket;
00257 sendPacket.strToBuf("");
00258 sendPacket.uByte2ToBuf(forwarder->getPort());
00259 sendPacket.strToBuf(forwarder->getRobotName());
00260 sendPacket.strToBuf("");
00261 sendPacket.strToBuf(
00262 forwarder->getClient()->getTcpSocket()->getIPString());
00263 myClientServer->broadcastPacketTcp(&sendPacket, "clientAdded");
00264 }
00265 }
00266
00267 while ((fIt = removeList.begin()) != removeList.end())
00268 {
00269 forwarder = (*fIt);
00270
00271 ArLog::log(ArLog::Normal, "Removing forwarder %s",
00272 forwarder->getRobotName());
00273 std::multimap<int, ArFunctor1<ArCentralForwarder *> *>::iterator it;
00274 for (it = myForwarderRemovedCBList.begin();
00275 it != myForwarderRemovedCBList.end();
00276 it++)
00277 {
00278 if ((*it).second->getName() == NULL ||
00279 (*it).second->getName()[0] == '\0')
00280 ArLog::log(ArLog::Normal, "Calling unnamed remove functor at %d",
00281 -(*it).first);
00282 else
00283 ArLog::log(ArLog::Normal, "Calling %s remove functor at %d",
00284 (*it).second->getName(), -(*it).first);
00285 (*it).second->invoke(forwarder);
00286 }
00287
00288 ArLog::log(ArLog::Normal, "Called forwarder removed for %s",
00289 forwarder->getRobotName());
00290 ArNetPacket sendPacket;
00291 sendPacket.strToBuf("");
00292 sendPacket.uByte2ToBuf(forwarder->getPort());
00293 sendPacket.strToBuf(forwarder->getRobotName());
00294 sendPacket.strToBuf("");
00295 sendPacket.strToBuf(
00296 forwarder->getClient()->getTcpSocket()->getIPString());
00297 myClientServer->broadcastPacketTcp(&sendPacket, "clientRemoved");
00298
00299 myUsedPorts.erase(forwarder->getPort());
00300 myForwarders.remove(forwarder);
00301 delete forwarder;
00302 removeList.pop_front();
00303 ArLog::log(ArLog::Normal, "Removed forwarder");
00304
00305 }
00306 myDataMutex.unlock();
00307
00308 ArUtil::sleep(100);
00309 }
00310 return NULL;
00311 }
00312
00313
00314 AREXPORT void ArCentralManager::addForwarderAddedCallback(
00315 ArFunctor1<ArCentralForwarder *> *functor, int priority)
00316 {
00317 myCallbackMutex.lock();
00318 myForwarderAddedCBList.insert(
00319 std::pair<int, ArFunctor1<ArCentralForwarder *> *>(-priority,
00320 functor));
00321 myCallbackMutex.unlock();
00322 }
00323
00324 AREXPORT void ArCentralManager::remForwarderAddedCallback(
00325 ArFunctor1<ArCentralForwarder *> *functor)
00326 {
00327 myCallbackMutex.lock();
00328
00329 std::multimap<int, ArFunctor1<ArCentralForwarder *> *>::iterator it;
00330
00331 for (it = myForwarderAddedCBList.begin();
00332 it != myForwarderAddedCBList.end();
00333 ++it)
00334 {
00335 if ((*it).second == functor)
00336 {
00337 myForwarderAddedCBList.erase(it);
00338 myCallbackMutex.unlock();
00339 remForwarderAddedCallback(functor);
00340 return;
00341 }
00342 }
00343 myCallbackMutex.unlock();
00344 }
00345
00346 AREXPORT void ArCentralManager::addForwarderRemovedCallback(
00347 ArFunctor1<ArCentralForwarder *> *functor, int priority)
00348 {
00349 myCallbackMutex.lock();
00350 myForwarderRemovedCBList.insert(
00351 std::pair<int, ArFunctor1<ArCentralForwarder *> *>(-priority,
00352 functor));
00353 myCallbackMutex.unlock();
00354 }
00355
00356 AREXPORT void ArCentralManager::remForwarderRemovedCallback(
00357 ArFunctor1<ArCentralForwarder *> *functor)
00358 {
00359 myCallbackMutex.lock();
00360 std::multimap<int, ArFunctor1<ArCentralForwarder *> *>::iterator it;
00361
00362 for (it = myForwarderRemovedCBList.begin();
00363 it != myForwarderRemovedCBList.end();
00364 ++it)
00365 {
00366 if ((*it).second == functor)
00367 {
00368 myForwarderRemovedCBList.erase(it);
00369 myCallbackMutex.unlock();
00370 remForwarderRemovedCallback(functor);
00371 return;
00372 }
00373 }
00374 myCallbackMutex.unlock();
00375 }