Imported Upstream version 0.3
[anytun.git] / src / anyrtpproxy / anyrtpproxy.cpp
1 /*
2  *  anytun
3  *
4  *  The secure anycast tunneling protocol (satp) defines a protocol used
5  *  for communication between any combination of unicast and anycast
6  *  tunnel endpoints.  It has less protocol overhead than IPSec in Tunnel
7  *  mode and allows tunneling of every ETHER TYPE protocol (e.g.
8  *  ethernet, ip, arp ...). satp directly includes cryptography and
9  *  message authentication based on the methodes used by SRTP.  It is
10  *  intended to deliver a generic, scaleable and secure solution for
11  *  tunneling and relaying of packets of any protocol.
12  *
13  *
14  *  Copyright (C) 2007-2008 Othmar Gsenger, Erwin Nindl, 
15  *                          Christian Pointner <satp@wirdorange.org>
16  *
17  *  This file is part of Anytun.
18  *
19  *  Anytun is free software: you can redistribute it and/or modify
20  *  it under the terms of the GNU General Public License version 3 as
21  *  published by the Free Software Foundation.
22  *
23  *  Anytun is distributed in the hope that it will be useful,
24  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *  GNU General Public License for more details.
27  *
28  *  You should have received a copy of the GNU General Public License
29  *  along with anytun.  If not, see <http://www.gnu.org/licenses/>.
30  */
31
32 #include <iostream>
33
34 #include <boost/asio.hpp>
35
36 #include <fcntl.h>
37 #include <pwd.h>
38 #include <grp.h>
39
40 #include "../datatypes.h"
41
42 #include "../log.h"
43 #include "../signalController.h"
44 #include "../buffer.h"
45 #include "connectionList.h"
46 #include "rtpSessionTable.h"
47 #include "syncRtpCommand.h"
48 #include "../syncQueue.h"
49 #include "../syncClient.h"
50 #include "syncOnConnect.hpp"
51
52 #include "../threadUtils.hpp"
53
54 #include "commandHandler.h"
55 #include "callIdQueue.h"
56
57 #include "options.h"
58 #include "portWindow.h"
59 #include <map>
60 #include <fstream>
61
62 #define MAX_PACKET_SIZE 1500
63
64 void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
65 {
66   cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
67
68   try 
69   {
70     Buffer buf(u_int32_t(MAX_PACKET_SIZE));
71     RtpSession::proto::endpoint remote_end;
72
73     while(1) {
74       buf.setLength(MAX_PACKET_SIZE);
75       u_int32_t len=0;
76       if(dir == 1)
77         len = 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
78       else if(dir == 2)
79         len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
80                         else break;
81
82       RtpSession& session = gRtpSessionTable.getSession(call_id);
83       if(session.isDead()) {
84         cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting"; 
85         break;
86       }
87
88       if(!len)
89         continue;
90       buf.setLength(len);
91       
92       if((dir == 1 && remote_end != session.getRemoteEnd1()) || 
93          (dir == 2 && remote_end != session.getRemoteEnd2()))
94       {
95         if(gOpt.getNat() ||
96            (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) || 
97                                      (dir == 2 && !session.getSeen2()))))
98         {
99           cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
100                                      << remote_end;
101           if(dir == 1)
102             session.setRemoteEnd1(remote_end);
103           if(dir == 2)
104             session.setRemoteEnd2(remote_end);
105           
106           if(!gOpt.getNat()) { // with nat enabled sync is not needed
107             SyncRtpCommand sc(call_id);
108             queue->push(sc);
109           }
110         }
111         else
112           continue;
113                         }
114       session.setSeen1();
115       session.setSeen2();
116
117       if(dir == 1)
118         sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
119       else if(dir == 2)
120         sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
121       else break;
122     }  
123   }
124   catch(std::exception &e)
125   {
126     cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
127   }
128   *running = false;
129   gCallIdQueue.push(call_id);
130 }
131
132 class ListenerData
133 {
134 public:
135   ListenerData() : sock1_(ios1_), sock2_(ios2_) {}
136
137   boost::asio::io_service ios1_;
138   boost::asio::io_service ios2_;
139   RtpSession::proto::socket sock1_;
140   RtpSession::proto::socket sock2_;
141   boost::thread* thread1_;
142   boost::thread* thread2_;
143   bool running1_;
144   bool running2_;
145 };
146
147 void listenerManager(void* p)
148 {
149   SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
150
151   std::map<std::string, ListenerData*> listenerMap;
152   while(1)
153   {
154     try 
155     {
156       std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
157       gCallIdQueue.pop();
158
159       RtpSession& session = gRtpSessionTable.getSession(call_id);
160       if(!session.isComplete())
161         continue;
162
163       std::map<std::string, ListenerData*>::iterator it;
164       it = listenerMap.find(call_id);
165       if(it == listenerMap.end()) // listener Threads not existing yet
166       {        
167         ListenerData* ld = new ListenerData();
168
169         ld->sock1_.open(session.getLocalEnd1().protocol());
170         ld->sock1_.bind(session.getLocalEnd1());
171
172         ld->sock2_.open(session.getLocalEnd2().protocol());
173         ld->sock2_.bind(session.getLocalEnd2());
174
175         ld->thread1_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 1, queue_, &(ld->running1_)));
176         ld->thread2_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 2, queue_, &(ld->running2_)));
177
178         std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
179         ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
180         continue;
181       }
182
183       if(!it->second->running1_ && !it->second->running2_)
184       {
185         cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
186         if(it->second->thread1_) {
187           it->second->thread1_->join();
188           delete it->second->thread1_;
189         }
190         if(it->second->thread2_) {
191           it->second->thread2_->join();
192           delete it->second->thread2_;
193         }
194         delete it->second;
195         listenerMap.erase(it);
196         gRtpSessionTable.delSession(call_id);
197         continue;
198       }
199           // TODO: reinit if session changed
200     }
201     catch(std::exception &e)
202     {
203       cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
204       usleep(500); // in case of an hard error don't block cpu (this is ugly)
205     }
206   }
207   cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason";
208 }
209
210 void chrootAndDrop(string const& chrootdir, string const& username)
211 {
212         if (getuid() != 0)
213         {
214           std::cerr << "this programm has to be run as root in order to run in a chroot" << std::endl;
215                 exit(-1);
216         }       
217
218   struct passwd *pw = getpwnam(username.c_str());
219         if(pw) {
220                 if(chroot(chrootdir.c_str()))
221                 {
222       std::cerr << "can't chroot to " << chrootdir << std::endl;
223       exit(-1);
224                 }
225     std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
226     chdir("/");
227                 if (initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) 
228                 {
229                         std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
230                         exit(-1);
231                 }
232     std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
233         }
234         else 
235   {
236     std::cerr << "unknown user " << username << std::endl;
237     exit(-1);
238         }
239 }
240
241 void daemonize()
242 {
243   pid_t pid;
244
245   pid = fork();
246   if(pid) exit(0);  
247   setsid();
248   pid = fork();
249   if(pid) exit(0);
250   
251 //  std::cout << "running in background now..." << std::endl;
252
253   int fd;
254 //  for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
255   for (fd=0;fd<=2;fd++) // close all file descriptors
256     close(fd);
257   fd=open("/dev/null",O_RDWR);        // stdin
258   dup(fd);                            // stdout
259   dup(fd);                            // stderr
260   umask(027); 
261 }
262
263 class ThreadParam
264 {
265 public:
266   ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
267     : queue(queue_),connto(connto_)
268     {};
269   SyncQueue & queue;
270   OptionConnectTo & connto;
271 };
272
273 void syncConnector(void* p)
274 {
275         ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
276
277         SyncClient sc ( param->connto.host, param->connto.port);
278         sc.run();
279 }
280
281 void syncListener(SyncQueue * queue)
282 {
283   try
284   {
285     boost::asio::io_service io_service;
286                 SyncTcpConnection::proto::resolver resolver(io_service);
287                 SyncTcpConnection::proto::endpoint e;
288                 if(gOpt.getLocalSyncAddr()!="")
289                 {
290                         SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
291                         e = *resolver.resolve(query);
292                 } else {
293                         SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
294                         e = *resolver.resolve(query);
295                 }
296
297
298     SyncServer server(io_service,e);
299                 server.onConnect=boost::bind(syncOnConnect,_1);
300                 queue->setSyncServerPtr(&server);
301     io_service.run();
302   }
303   catch (std::exception& e)
304   {
305     std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
306     cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
307                             << " (" << e.what() << ")" << std::endl;
308   }
309
310 }
311
312 int main(int argc, char* argv[])
313 {
314 //  std::cout << "anyrtpproxy" << std::endl;
315   if(!gOpt.parse(argc, argv))
316   {
317     gOpt.printUsage();
318     exit(-1);
319   }
320
321   cLog.setLogName("anyrtpproxy");
322   cLog.msg(Log::PRIO_NOTICE) << "anyrtpproxy started...";
323
324   std::ofstream pidFile;
325   if(gOpt.getPidFile() != "") {
326     pidFile.open(gOpt.getPidFile().c_str());
327     if(!pidFile.is_open()) {
328       std::cout << "can't open pid file" << std::endl;
329     }
330   }
331
332   if(gOpt.getChroot())
333     chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
334   if(gOpt.getDaemonize())
335     daemonize();
336
337   if(pidFile.is_open()) {
338     pid_t pid = getpid();
339     pidFile << pid;
340     pidFile.close();
341   }
342   
343   SignalController sig;
344   sig.init();
345
346   SyncQueue queue;
347
348
349   boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
350
351
352 // #ifndef ANYTUN_NOSYNC
353 //     boost::thread * syncListenerThread;
354 //     if(gOpt.getLocalSyncPort() != "")
355 //       syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
356     
357 //     std::list<boost::thread *> connectThreads;
358 //     for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) { 
359 //       ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
360 //       connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
361 //     }
362 // #endif
363
364
365
366 //   pthread_t syncListenerThread;
367
368 //      ConnectToList connect_to = gOpt.getConnectTo();
369 //      ThreadParam p( queue,*(new OptionConnectTo()));
370 //   if ( gOpt.getLocalSyncPort())
371 //     pthread_create(&syncListenerThread, NULL, syncListener, &p);
372
373 //   std::list<pthread_t> connectThreads;
374 //   for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
375 //   {
376 //     connectThreads.push_back(pthread_t());
377 //     ThreadParam * point = new ThreadParam(queue,*it);
378 //     pthread_create(& connectThreads.back(),  NULL, syncConnector, point);
379 //   }
380
381         PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
382   CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
383   
384   int ret = sig.run();
385   return ret;
386 }
387