00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045
00046 #include <sstream>
00047
00048 #include "OSGConfig.h"
00049 #include "OSGLog.h"
00050 #include "OSGBaseFunctions.h"
00051 #include "OSGSocketSelection.h"
00052 #include "OSGSocketException.h"
00053 #include "OSGGroupSockConnection.h"
00054 #include "OSGConnectionType.h"
00055
00056 OSG_USING_NAMESPACE
00057
00062
00063
00064
00068 GroupSockConnection::GroupSockConnection():
00069 GroupConnection(0)
00070 {
00071 _acceptSocket.open();
00072 _acceptSocket.setReusePort(true);
00073
00074 _socketReadBuffer.resize(131071);
00075 _socketWriteBuffer.resize( _socketReadBuffer.size() );
00076
00077 readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)],
00078 _socketReadBuffer.size() -sizeof(SocketBufferHeader));
00079 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00080 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00081 }
00082
00085 GroupSockConnection::~GroupSockConnection(void)
00086 {
00087
00088 while(_sockets.size())
00089 {
00090 try
00091 {
00092 _sockets.begin()->close();
00093 _sockets.erase(_sockets.begin());
00094 }
00095 catch(...)
00096 {
00097 }
00098 }
00099 _acceptSocket.close();
00100 }
00101
00104 const ConnectionType *GroupSockConnection::getType()
00105 {
00106 return &_type;
00107 }
00108
00109
00110
00111
00115 GroupConnection::Channel GroupSockConnection::connectPoint(
00116 const std::string &address,
00117 Time timeout)
00118 {
00119 Channel channel = -1;
00120 StreamSocket socket;
00121 SocketAddress destination;
00122 if(connectSocket(socket,address,destination,timeout))
00123 {
00124 channel = newChannelIndex(_sockets.size());
00125 _sockets.push_back(socket);
00126 _remoteAddresses.push_back(destination);
00127 _readIndex = 0;
00128 }
00129 return channel;
00130 }
00131
00134 void GroupSockConnection::disconnect(Channel channel)
00135 {
00136 ChannelIndex index = channelToIndex(channel);
00137 try
00138 {
00139 _sockets[index].close();
00140 }
00141 catch(...)
00142 {
00143 }
00144 _sockets.erase(_sockets.begin() + index);
00145 delChannelIndex(index);
00146 _readIndex = 0;
00147 }
00148
00152 GroupConnection::Channel GroupSockConnection::acceptPoint(Time timeout)
00153 {
00154 StreamSocket from;
00155 SocketAddress destination;
00156 if(GroupSockConnection::acceptSocket(_acceptSocket,from,destination,timeout))
00157 {
00158 Channel channel = newChannelIndex(_sockets.size());
00159 _sockets.push_back(from);
00160 _remoteAddresses.push_back(destination);
00161 _readIndex = 0;
00162 return channel;
00163 }
00164 else
00165 {
00166 return -1;
00167 }
00168 }
00169
00176 std::string GroupSockConnection::bind(const std::string &address)
00177 {
00178 int port=0;
00179 char localhost[256];
00180 char host[256];
00181 char portStr[256];
00182 std::string interf;
00183 std::string boundedAddress;
00184
00185
00186 osgGetHostname(localhost,255);
00187
00188 if(!getInterface().empty())
00189 interf = getInterface();
00190 else
00191 interf = localhost;
00192
00193 if(!address.empty())
00194 if(sscanf(address.c_str(),"%*[^:]:%d",&port) != 1)
00195 if(sscanf(address.c_str(),":%d",&port) != 1)
00196 port = 0;
00197
00198 _acceptSocket.setReusePort(true);
00199 _acceptSocket.bind(SocketAddress(interf.c_str(),port));
00200 SINFO << "Connection bound to "
00201 << _acceptSocket.getAddress().getHost() << ":"
00202 << _acceptSocket.getAddress().getPort() << std::endl;
00203 _acceptSocket.listen();
00204
00205 sprintf(portStr,"%d",_acceptSocket.getAddress().getPort());
00206 return interf + ":" + portStr;
00207 }
00208
00211 void GroupSockConnection::setParams(const std::string ¶ms)
00212 {
00213 if(params.empty())
00214 return;
00215
00216 std::string option = "bufferSize=";
00217 std::string::size_type i = 0;
00218 if((i=params.find(option)) != std::string::npos)
00219 {
00220 std::string str = params.substr(i + option.size());
00221
00222 std::stringstream ss;
00223 std::string::size_type j = 0;
00224 while(j < str.length() && str[j] != ',' && isdigit(str[j]))
00225 {
00226 ss << str[j++];
00227 }
00228 UInt32 bufferSize;
00229 ss >> bufferSize;
00230
00231
00232 readBufClear();
00233 writeBufClear();
00234
00235 _socketReadBuffer.resize(bufferSize);
00236 _socketWriteBuffer.resize(_socketReadBuffer.size());
00237
00238
00239 readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)],
00240 _socketReadBuffer.size() -sizeof(SocketBufferHeader));
00241 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00242 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00243
00244 FINFO(("GroupSockConnection::setParams : setting buffer size to %u.\n", bufferSize));
00245 }
00246 }
00247
00248
00249
00250
00254 Connection::Channel GroupSockConnection::selectChannel(Time timeout)
00255 throw (ReadError)
00256 {
00257 Int32 maxnread=0,nread;
00258 ChannelIndex index;
00259 SocketSelection selection,result;
00260
00261
00262 if(_zeroCopyThreshold != 1 &&
00263 _currentReadBuffer != readBufEnd())
00264 {
00265 FFATAL(("Channel change ignores data in current buffer"))
00266 return indexToChannel(_readIndex);
00267 }
00268
00269 if(_selection[_readIndex] &&
00270 _sockets[_readIndex].getAvailable())
00271 {
00272 return indexToChannel(_readIndex);;
00273 }
00274
00275
00276 for(index = 0 ; index < _sockets.size() ; ++index)
00277 {
00278 if(_selection[index])
00279 selection.setRead(_sockets[index]);
00280 }
00281
00282 try
00283 {
00284
00285 if(!selection.select(timeout,result))
00286 return -1;
00287
00288
00289 for(index = 0 ; index < _sockets.size() ; ++index)
00290 {
00291 if(result.isSetRead(_sockets[index]))
00292 {
00293 nread=_sockets[index].getAvailable();
00294 if(maxnread < nread)
00295 {
00296 maxnread = nread;
00297 _readIndex=index;
00298 }
00299 }
00300 }
00301 }
00302 catch(SocketException &e)
00303 {
00304 throw ReadError(e.what());
00305 }
00306
00307
00308 return indexToChannel(_readIndex);
00309 }
00310
00311
00312
00313
00316 bool GroupSockConnection::wait(Time timeout) throw (ReadError)
00317 {
00318 UInt32 len;
00319 UInt32 index;
00320 UInt32 tag=314156;
00321 UInt32 missing = _sockets.size();
00322 SocketSelection selection,result;
00323
00324 for(index = 0 ; index < _sockets.size() ; ++index)
00325 selection.setRead(_sockets[index]);
00326
00327 try
00328 {
00329 while(missing)
00330 {
00331 if(!selection.select(timeout,result))
00332 return false;
00333 for(index = 0 ; index < _sockets.size() ; ++index)
00334 {
00335 if(result.isSetRead(_sockets[index]))
00336 {
00337 len = _sockets[index].recv(&tag,sizeof(tag));
00338 tag = osgntohl(tag);
00339 if(len == 0)
00340 throw ReadError("Channel closed");
00341 selection.clearRead(_sockets[index]);
00342 missing--;
00343 if(tag != 314156)
00344 {
00345 FFATAL(("Stream out of sync in SockConnection\n"));
00346 throw ReadError("Stream out of sync");
00347 }
00348 }
00349 }
00350 }
00351 }
00352 catch(SocketException &e)
00353 {
00354 throw ReadError(e.what());
00355 }
00356 return true;
00357 }
00358
00361 void GroupSockConnection::signal(void) throw (WriteError)
00362 {
00363 UInt32 tag=osghtonl(314156);
00364 UInt32 index;
00365
00366 try
00367 {
00368 for(index = 0 ; index<_sockets.size() ; ++index)
00369 _sockets[index].send(&tag,sizeof(tag));
00370 }
00371 catch(SocketError &e)
00372 {
00373 throw WriteError(e.what());
00374 }
00375 }
00376
00377
00378
00382 GroupConnection *GroupSockConnection::create(void)
00383 {
00384 return new GroupSockConnection();
00385 }
00386
00387
00388
00389
00397 void GroupSockConnection::read(MemoryHandle mem,UInt32 size)
00398 {
00399 int len;
00400
00401
00402 len=_sockets[_readIndex].recv(mem,size);
00403 if(len==0)
00404 {
00405
00406 throw ReadError("Channel closed");
00407 }
00408 }
00409
00417 void GroupSockConnection::readBuffer()
00418 {
00419 int size;
00420 int len;
00421
00422
00423 len=_sockets[_readIndex].recv(&_socketReadBuffer[0],sizeof(SocketBufferHeader));
00424 if(len==0)
00425 throw ReadError("Channel closed");
00426
00427 size=osgntohl(((SocketBufferHeader*)&_socketReadBuffer[0])->size);
00428 len=_sockets[_readIndex].recv(&_socketReadBuffer[sizeof(SocketBufferHeader)],
00429 size);
00430 if(len==0)
00431 throw ReadError("Channel closed");
00432 readBufBegin()->setDataSize(size);
00433 }
00434
00442 void GroupSockConnection::write(MemoryHandle mem,UInt32 size)
00443 {
00444 Int32 index;
00445
00446 try
00447 {
00448
00449 for(index = 0 ; index < _sockets.size() ; ++index)
00450 _sockets[index].send(mem,size);
00451 }
00452 catch(SocketException &e)
00453 {
00454 throw WriteError(e.what());
00455 }
00456 }
00457
00463 void GroupSockConnection::writeBuffer(void)
00464 {
00465 Int32 index;
00466 UInt32 size = writeBufBegin()->getDataSize();
00467
00468 ((SocketBufferHeader*)&_socketWriteBuffer[0])->size=osghtonl(size);
00469 if(size)
00470 {
00471
00472 for(index = 0 ; index < _sockets.size() ; ++index)
00473 {
00474
00475 _sockets[index].send(&_socketWriteBuffer[0],
00476 size+sizeof(SocketBufferHeader));
00477 }
00478 }
00479 }
00480
00481
00482
00483
00486 bool GroupSockConnection::connectSocket(StreamSocket &socket,
00487 std::string address,
00488 SocketAddress &destination,
00489 Time timeout)
00490 {
00491 std::string host="unknown";
00492 int port=0;
00493 Time startTime = getSystemTime();
00494 bool connected=false;
00495
00496 int pos = address.find(':');
00497 if(pos>=0)
00498 {
00499 host = std::string(address,0,pos);
00500 port = atoi(std::string(address,pos+1,std::string::npos).c_str());
00501 }
00502 else
00503 {
00504 host = address;
00505 }
00506
00507 socket.open();
00508 socket.setDelay(false);
00509 socket.setReadBufferSize(1048576);
00510 socket.setWriteBufferSize(1048576);
00511 destination = SocketAddress(host.c_str(),port);
00512 while(!connected &&
00513 (timeout == -1 || (getSystemTime()-startTime) < timeout))
00514 {
00515 try
00516 {
00517 socket.connect(destination);
00518 connected = true;
00519 }
00520 catch(...)
00521 {
00522 }
00523 }
00524 if(connected)
00525 return true;
00526 else
00527 return false;
00528 }
00529
00532 bool GroupSockConnection::acceptSocket(StreamSocket &accept,
00533 StreamSocket &from,
00534 SocketAddress &destination,
00535 Time timeout)
00536 {
00537 if(!accept.waitReadable(timeout))
00538 return false;
00539 from=accept.acceptFrom(destination);
00540 from.setDelay(false);
00541 from.setReadBufferSize(1048576);
00542 from.setWriteBufferSize(1048576);
00543 return true;
00544 }
00545
00546
00547
00548
00549 ConnectionType GroupSockConnection::_type(
00550 &GroupSockConnection::create,
00551 "StreamSock");
00552
00553
00554
00555
00556 #ifdef __sgi
00557 #pragma set woff 1174
00558 #endif
00559
00560 #ifdef OSG_LINUX_ICC
00561 #pragma warning( disable : 177 )
00562 #endif
00563
00564 namespace
00565 {
00566 static Char8 cvsid_cpp [] = "@(#)$Id: $";
00567 static Char8 cvsid_hpp [] = OSG_GROUPSOCKCONNECTION_HEADER_CVSID;
00568 }
00569