X-Git-Url: https://git.syn-net.org/debian/?p=anytun.git;a=blobdiff_plain;f=src%2Fanyrtpproxy%2Fanyrtpproxy.cpp;h=1daf35bdc247d39d8e942c01bcbd5b07ea00e138;hp=7c5514c4bb4a03a325866fa2f5525b6791d38293;hb=ef0cacf2508418915d3f64b04003be3c13fed3cc;hpb=ece844834d2cecc028ce81ca283f5d441088580e diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp index 7c5514c..1daf35b 100644 --- a/src/anyrtpproxy/anyrtpproxy.cpp +++ b/src/anyrtpproxy/anyrtpproxy.cpp @@ -11,7 +11,7 @@ * tunneling and relaying of packets of any protocol. * * - * Copyright (C) 2007-2009 Othmar Gsenger, Erwin Nindl, + * Copyright (C) 2007-2009 Othmar Gsenger, Erwin Nindl, * Christian Pointner * * This file is part of Anytun. @@ -66,64 +66,62 @@ void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2 { cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started"; - try - { - Buffer buf(u_int32_t(MAX_PACKET_SIZE)); + try { + Buffer buf(uint32_t(MAX_PACKET_SIZE)); RtpSession::proto::endpoint remote_end; while(1) { buf.setLength(MAX_PACKET_SIZE); - u_int32_t len=0; - if(dir == 1) - len = 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000); - else if(dir == 2) - len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000); - else break; + uint32_t len=0; + if(dir == 1) { + len = 0; //sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000); + } else if(dir == 2) { + len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000); + } else { break; } RtpSession& session = gRtpSessionTable.getSession(call_id); if(session.isDead()) { - cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting"; + cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting"; break; } - if(!len) + if(!len) { continue; + } buf.setLength(len); - - if((dir == 1 && remote_end != session.getRemoteEnd1()) || - (dir == 2 && remote_end != session.getRemoteEnd2())) - { + + if((dir == 1 && remote_end != session.getRemoteEnd1()) || + (dir == 2 && remote_end != session.getRemoteEnd2())) { if(gOpt.getNat() || - (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) || - (dir == 2 && !session.getSeen2())))) - { + (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) || + (dir == 2 && !session.getSeen2())))) { cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to " << remote_end; - if(dir == 1) + if(dir == 1) { session.setRemoteEnd1(remote_end); - if(dir == 2) + } + if(dir == 2) { session.setRemoteEnd2(remote_end); - + } + if(!gOpt.getNat()) { // with nat enabled sync is not needed SyncRtpCommand sc(call_id); queue->push(sc); } - } - else + } else { continue; - } + } + } session.setSeen1(); session.setSeen2(); - if(dir == 1) + if(dir == 1) { sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2()); - else if(dir == 2) + } else if(dir == 2) { sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1()); - else break; - } - } - catch(std::exception &e) - { + } else { break; } + } + } catch(std::exception& e) { cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what(); } *running = false; @@ -150,21 +148,19 @@ void listenerManager(void* p) SyncQueue* queue_ = reinterpret_cast(p); std::map listenerMap; - while(1) - { - try - { + while(1) { + try { std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id gCallIdQueue.pop(); RtpSession& session = gRtpSessionTable.getSession(call_id); - if(!session.isComplete()) + if(!session.isComplete()) { continue; + } std::map::iterator it; it = listenerMap.find(call_id); - if(it == listenerMap.end()) // listener Threads not existing yet - { + if(it == listenerMap.end()) { // listener Threads not existing yet ListenerData* ld = new ListenerData(); ld->sock1_.open(session.getLocalEnd1().protocol()); @@ -181,8 +177,7 @@ void listenerManager(void* p) continue; } - if(!it->second->running1_ && !it->second->running2_) - { + if(!it->second->running1_ && !it->second->running2_) { cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up"; if(it->second->thread1_) { it->second->thread1_->join(); @@ -197,10 +192,8 @@ void listenerManager(void* p) gRtpSessionTable.delSession(call_id); continue; } - // TODO: reinit if session changed - } - catch(std::exception &e) - { + // TODO: reinit if session changed + } catch(std::exception& e) { cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what(); usleep(500); // in case of an hard error don't block cpu (this is ugly) } @@ -210,33 +203,28 @@ void listenerManager(void* p) void chrootAndDrop(string const& chrootdir, string const& username) { - if (getuid() != 0) - { - std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl; - exit(-1); - } - - struct passwd *pw = getpwnam(username.c_str()); - if(pw) { - if(chroot(chrootdir.c_str())) - { + if(getuid() != 0) { + std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl; + exit(-1); + } + + struct passwd* pw = getpwnam(username.c_str()); + if(pw) { + if(chroot(chrootdir.c_str())) { std::cerr << "can't chroot to " << chrootdir << std::endl; exit(-1); - } + } std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl; chdir("/"); - if (initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) - { - std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl; - exit(-1); - } + if(initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) { + std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl; + exit(-1); + } std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl; - } - else - { + } else { std::cerr << "unknown user " << username << std::endl; exit(-1); - } + } } void daemonize() @@ -244,65 +232,62 @@ void daemonize() pid_t pid; pid = fork(); - if(pid) exit(0); + if(pid) { exit(0); } setsid(); pid = fork(); - if(pid) exit(0); - -// std::cout << "running in background now..." << std::endl; + if(pid) { exit(0); } + + // std::cout << "running in background now..." << std::endl; int fd; -// for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors - for (fd=0;fd<=2;fd++) // close all file descriptors + // for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors + for(fd=0; fd<=2; fd++) { // close all file descriptors close(fd); + } fd=open("/dev/null",O_RDWR); // stdin dup(fd); // stdout dup(fd); // stderr - umask(027); + umask(027); } class ThreadParam { public: - ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_) + ThreadParam(SyncQueue& queue_,OptionConnectTo& connto_) : queue(queue_),connto(connto_) - {}; - SyncQueue & queue; - OptionConnectTo & connto; + {}; + SyncQueue& queue; + OptionConnectTo& connto; }; void syncConnector(void* p) { - ThreadParam* param = reinterpret_cast(p); + ThreadParam* param = reinterpret_cast(p); - SyncClient sc ( param->connto.host, param->connto.port); - sc.run(); + SyncClient sc(param->connto.host, param->connto.port); + sc.run(); } -void syncListener(SyncQueue * queue) +void syncListener(SyncQueue* queue) { - try - { + try { boost::asio::io_service io_service; - SyncTcpConnection::proto::resolver resolver(io_service); - SyncTcpConnection::proto::endpoint e; - if(gOpt.getLocalSyncAddr()!="") - { - SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort()); - e = *resolver.resolve(query); - } else { - SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort()); - e = *resolver.resolve(query); - } + SyncTcpConnection::proto::resolver resolver(io_service); + SyncTcpConnection::proto::endpoint e; + if(gOpt.getLocalSyncAddr()!="") { + SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort()); + e = *resolver.resolve(query); + } else { + SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort()); + e = *resolver.resolve(query); + } SyncServer server(io_service,e); - server.onConnect=boost::bind(syncOnConnect,_1); - queue->setSyncServerPtr(&server); + server.onConnect=boost::bind(syncOnConnect,_1); + queue->setSyncServerPtr(&server); io_service.run(); - } - catch (std::exception& e) - { + } catch(std::exception& e) { std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr(); cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort() << " (" << e.what() << ")" << std::endl; @@ -312,9 +297,8 @@ void syncListener(SyncQueue * queue) int main(int argc, char* argv[]) { -// std::cout << "anyrtpproxy" << std::endl; - if(!gOpt.parse(argc, argv)) - { + // std::cout << "anyrtpproxy" << std::endl; + if(!gOpt.parse(argc, argv)) { gOpt.printUsage(); exit(-1); } @@ -330,17 +314,19 @@ int main(int argc, char* argv[]) } } - if(gOpt.getChroot()) + if(gOpt.getChroot()) { chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername()); - if(gOpt.getDaemonize()) + } + if(gOpt.getDaemonize()) { daemonize(); + } if(pidFile.is_open()) { pid_t pid = getpid(); pidFile << pid; pidFile.close(); } - + SignalController sig; sig.init(); @@ -350,38 +336,38 @@ int main(int argc, char* argv[]) boost::thread listenerManagerThread(boost::bind(listenerManager,&queue)); -// #ifndef ANYTUN_NOSYNC -// boost::thread * syncListenerThread; -// if(gOpt.getLocalSyncPort() != "") -// syncListenerThread = new boost::thread(boost::bind(syncListener,&queue)); - -// std::list connectThreads; -// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) { -// ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it); -// connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point))); -// } -// #endif + // #ifndef ANYTUN_NOSYNC + // boost::thread * syncListenerThread; + // if(gOpt.getLocalSyncPort() != "") + // syncListenerThread = new boost::thread(boost::bind(syncListener,&queue)); + + // std::list connectThreads; + // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) { + // ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it); + // connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point))); + // } + // #endif -// pthread_t syncListenerThread; + // pthread_t syncListenerThread; -// ConnectToList connect_to = gOpt.getConnectTo(); -// ThreadParam p( queue,*(new OptionConnectTo())); -// if ( gOpt.getLocalSyncPort()) -// pthread_create(&syncListenerThread, NULL, syncListener, &p); + // ConnectToList connect_to = gOpt.getConnectTo(); + // ThreadParam p( queue,*(new OptionConnectTo())); + // if ( gOpt.getLocalSyncPort()) + // pthread_create(&syncListenerThread, NULL, syncListener, &p); -// std::list connectThreads; -// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) -// { -// connectThreads.push_back(pthread_t()); -// ThreadParam * point = new ThreadParam(queue,*it); -// pthread_create(& connectThreads.back(), NULL, syncConnector, point); -// } + // std::list connectThreads; + // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) + // { + // connectThreads.push_back(pthread_t()); + // ThreadParam * point = new ThreadParam(queue,*it); + // pthread_create(& connectThreads.back(), NULL, syncConnector, point); + // } - PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort()); + PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort()); CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window); - + int ret = sig.run(); return ret; }