Imported Upstream version 0.3.4
[anytun.git] / src / anyrtpproxy / anyrtpproxy.cpp
index 7c5514c..1daf35b 100644 (file)
@@ -11,7 +11,7 @@
  *  tunneling and relaying of packets of any protocol.
  *
  *
- *  Copyright (C) 2007-2009 Othmar Gsenger, Erwin Nindl, 
+ *  Copyright (C) 2007-2009 Othmar Gsenger, Erwin Nindl,
  *                          Christian Pointner <satp@wirdorange.org>
  *
  *  This file is part of Anytun.
@@ -66,64 +66,62 @@ void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2
 {
   cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
 
-  try 
-  {
-    Buffer buf(u_int32_t(MAX_PACKET_SIZE));
+  try {
+    Buffer buf(uint32_t(MAX_PACKET_SIZE));
     RtpSession::proto::endpoint remote_end;
 
     while(1) {
       buf.setLength(MAX_PACKET_SIZE);
-      u_int32_t len=0;
-      if(dir == 1)
-        len = 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
-      else if(dir == 2)
-        len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
-                       else break;
+      uint32_t len=0;
+      if(dir == 1) {
+        len = 0;  //sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
+      } else if(dir == 2) {
+        len = 0;  //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
+      } else { break; }
 
       RtpSession& session = gRtpSessionTable.getSession(call_id);
       if(session.isDead()) {
-        cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting"; 
+        cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting";
         break;
       }
 
-      if(!len)
+      if(!len) {
         continue;
+      }
       buf.setLength(len);
-      
-      if((dir == 1 && remote_end != session.getRemoteEnd1()) || 
-         (dir == 2 && remote_end != session.getRemoteEnd2()))
-      {
+
+      if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
+          (dir == 2 && remote_end != session.getRemoteEnd2())) {
         if(gOpt.getNat() ||
-           (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) || 
-                                     (dir == 2 && !session.getSeen2()))))
-        {
+            (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
+                                      (dir == 2 && !session.getSeen2())))) {
           cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
                                      << remote_end;
-          if(dir == 1)
+          if(dir == 1) {
             session.setRemoteEnd1(remote_end);
-          if(dir == 2)
+          }
+          if(dir == 2) {
             session.setRemoteEnd2(remote_end);
-          
+          }
+
           if(!gOpt.getNat()) { // with nat enabled sync is not needed
             SyncRtpCommand sc(call_id);
             queue->push(sc);
           }
-        }
-        else
+        } else {
           continue;
-                       }
+        }
+      }
       session.setSeen1();
       session.setSeen2();
 
-      if(dir == 1)
+      if(dir == 1) {
         sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
-      else if(dir == 2)
+      } else if(dir == 2) {
         sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
-      else break;
-    }  
-  }
-  catch(std::exception &e)
-  {
+      } else { break; }
+    }
+  } catch(std::exception& e) {
     cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
   }
   *running = false;
@@ -150,21 +148,19 @@ void listenerManager(void* p)
   SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
 
   std::map<std::string, ListenerData*> listenerMap;
-  while(1)
-  {
-    try 
-    {
+  while(1) {
+    try {
       std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
       gCallIdQueue.pop();
 
       RtpSession& session = gRtpSessionTable.getSession(call_id);
-      if(!session.isComplete())
+      if(!session.isComplete()) {
         continue;
+      }
 
       std::map<std::string, ListenerData*>::iterator it;
       it = listenerMap.find(call_id);
-      if(it == listenerMap.end()) // listener Threads not existing yet
-      {        
+      if(it == listenerMap.end()) { // listener Threads not existing yet
         ListenerData* ld = new ListenerData();
 
         ld->sock1_.open(session.getLocalEnd1().protocol());
@@ -181,8 +177,7 @@ void listenerManager(void* p)
         continue;
       }
 
-      if(!it->second->running1_ && !it->second->running2_)
-      {
+      if(!it->second->running1_ && !it->second->running2_) {
         cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
         if(it->second->thread1_) {
           it->second->thread1_->join();
@@ -197,10 +192,8 @@ void listenerManager(void* p)
         gRtpSessionTable.delSession(call_id);
         continue;
       }
-          // TODO: reinit if session changed
-    }
-    catch(std::exception &e)
-    {
+      // TODO: reinit if session changed
+    } catch(std::exception& e) {
       cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
       usleep(500); // in case of an hard error don't block cpu (this is ugly)
     }
@@ -210,33 +203,28 @@ void listenerManager(void* p)
 
 void chrootAndDrop(string const& chrootdir, string const& username)
 {
-       if (getuid() != 0)
-       {
-         std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl;
-               exit(-1);
-       }       
-
-  struct passwd *pw = getpwnam(username.c_str());
-       if(pw) {
-               if(chroot(chrootdir.c_str()))
-               {
+  if(getuid() != 0) {
+    std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl;
+    exit(-1);
+  }
+
+  struct passwd* pw = getpwnam(username.c_str());
+  if(pw) {
+    if(chroot(chrootdir.c_str())) {
       std::cerr << "can't chroot to " << chrootdir << std::endl;
       exit(-1);
-               }
+    }
     std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
     chdir("/");
-               if (initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) 
-               {
-                       std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
-                       exit(-1);
-               }
+    if(initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) {
+      std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
+      exit(-1);
+    }
     std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
-       }
-       else 
-  {
+  } else {
     std::cerr << "unknown user " << username << std::endl;
     exit(-1);
-       }
+  }
 }
 
 void daemonize()
@@ -244,65 +232,62 @@ void daemonize()
   pid_t pid;
 
   pid = fork();
-  if(pid) exit(0);  
+  if(pid) { exit(0); }
   setsid();
   pid = fork();
-  if(pid) exit(0);
-  
-//  std::cout << "running in background now..." << std::endl;
+  if(pid) { exit(0); }
+
+  //  std::cout << "running in background now..." << std::endl;
 
   int fd;
-//  for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
-  for (fd=0;fd<=2;fd++) // close all file descriptors
+  //  for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
+  for(fd=0; fd<=2; fd++) { // close all file descriptors
     close(fd);
+  }
   fd=open("/dev/null",O_RDWR);        // stdin
   dup(fd);                            // stdout
   dup(fd);                            // stderr
-  umask(027); 
+  umask(027);
 }
 
 class ThreadParam
 {
 public:
-  ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
+  ThreadParam(SyncQueue& queue_,OptionConnectTo& connto_)
     : queue(queue_),connto(connto_)
-    {};
-  SyncQueue & queue;
-  OptionConnectTo & connto;
+  {};
+  SyncQueue& queue;
+  OptionConnectTo& connto;
 };
 
 void syncConnector(void* p)
 {
-       ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
+  ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
 
-       SyncClient sc ( param->connto.host, param->connto.port);
-       sc.run();
+  SyncClient sc(param->connto.host, param->connto.port);
+  sc.run();
 }
 
-void syncListener(SyncQueue * queue)
+void syncListener(SyncQueue* queue)
 {
-  try
-  {
+  try {
     boost::asio::io_service io_service;
-               SyncTcpConnection::proto::resolver resolver(io_service);
-               SyncTcpConnection::proto::endpoint e;
-               if(gOpt.getLocalSyncAddr()!="")
-               {
-                       SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
-                       e = *resolver.resolve(query);
-               } else {
-                       SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
-                       e = *resolver.resolve(query);
-               }
+    SyncTcpConnection::proto::resolver resolver(io_service);
+    SyncTcpConnection::proto::endpoint e;
+    if(gOpt.getLocalSyncAddr()!="") {
+      SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
+      e = *resolver.resolve(query);
+    } else {
+      SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
+      e = *resolver.resolve(query);
+    }
 
 
     SyncServer server(io_service,e);
-               server.onConnect=boost::bind(syncOnConnect,_1);
-               queue->setSyncServerPtr(&server);
+    server.onConnect=boost::bind(syncOnConnect,_1);
+    queue->setSyncServerPtr(&server);
     io_service.run();
-  }
-  catch (std::exception& e)
-  {
+  } catch(std::exception& e) {
     std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
     cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
                             << " (" << e.what() << ")" << std::endl;
@@ -312,9 +297,8 @@ void syncListener(SyncQueue * queue)
 
 int main(int argc, char* argv[])
 {
-//  std::cout << "anyrtpproxy" << std::endl;
-  if(!gOpt.parse(argc, argv))
-  {
+  //  std::cout << "anyrtpproxy" << std::endl;
+  if(!gOpt.parse(argc, argv)) {
     gOpt.printUsage();
     exit(-1);
   }
@@ -330,17 +314,19 @@ int main(int argc, char* argv[])
     }
   }
 
-  if(gOpt.getChroot())
+  if(gOpt.getChroot()) {
     chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
-  if(gOpt.getDaemonize())
+  }
+  if(gOpt.getDaemonize()) {
     daemonize();
+  }
 
   if(pidFile.is_open()) {
     pid_t pid = getpid();
     pidFile << pid;
     pidFile.close();
   }
-  
+
   SignalController sig;
   sig.init();
 
@@ -350,38 +336,38 @@ int main(int argc, char* argv[])
   boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
 
 
-// #ifndef ANYTUN_NOSYNC
-//     boost::thread * syncListenerThread;
-//     if(gOpt.getLocalSyncPort() != "")
-//       syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
-    
-//     std::list<boost::thread *> connectThreads;
-//     for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) { 
-//       ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
-//       connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
-//     }
-// #endif
+  // #ifndef ANYTUN_NOSYNC
+  //     boost::thread * syncListenerThread;
+  //     if(gOpt.getLocalSyncPort() != "")
+  //       syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
+
+  //     std::list<boost::thread *> connectThreads;
+  //     for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
+  //       ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
+  //       connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
+  //     }
+  // #endif
 
 
 
-//   pthread_t syncListenerThread;
+  //   pthread_t syncListenerThread;
 
-//     ConnectToList connect_to = gOpt.getConnectTo();
-//     ThreadParam p( queue,*(new OptionConnectTo()));
-//   if ( gOpt.getLocalSyncPort())
-//     pthread_create(&syncListenerThread, NULL, syncListener, &p);
+  //   ConnectToList connect_to = gOpt.getConnectTo();
+  //   ThreadParam p( queue,*(new OptionConnectTo()));
+  //   if ( gOpt.getLocalSyncPort())
+  //     pthread_create(&syncListenerThread, NULL, syncListener, &p);
 
-//   std::list<pthread_t> connectThreads;
-//   for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
-//   {
-//     connectThreads.push_back(pthread_t());
-//     ThreadParam * point = new ThreadParam(queue,*it);
-//     pthread_create(& connectThreads.back(),  NULL, syncConnector, point);
-//   }
+  //   std::list<pthread_t> connectThreads;
+  //   for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
+  //   {
+  //     connectThreads.push_back(pthread_t());
+  //     ThreadParam * point = new ThreadParam(queue,*it);
+  //     pthread_create(& connectThreads.back(),  NULL, syncConnector, point);
+  //   }
 
-       PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
+  PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
   CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
-  
+
   int ret = sig.run();
   return ret;
 }