Merge commit 'upstream/0.3.2'
[anytun.git] / src / anyrtpproxy / anyrtpproxy.cpp
1 /*
2  *  anytun
3  *
4  *  The secure anycast tunneling protocol (satp) defines a protocol used
5  *  for communication between any combination of unicast and anycast
6  *  tunnel endpoints.  It has less protocol overhead than IPSec in Tunnel
7  *  mode and allows tunneling of every ETHER TYPE protocol (e.g.
8  *  ethernet, ip, arp ...). satp directly includes cryptography and
9  *  message authentication based on the methodes used by SRTP.  It is
10  *  intended to deliver a generic, scaleable and secure solution for
11  *  tunneling and relaying of packets of any protocol.
12  *
13  *
14  *  Copyright (C) 2007-2009 Othmar Gsenger, Erwin Nindl, 
15  *                          Christian Pointner <satp@wirdorange.org>
16  *
17  *  This file is part of Anytun.
18  *
19  *  Anytun is free software: you can redistribute it and/or modify
20  *  it under the terms of the GNU General Public License as published by
21  *  the Free Software Foundation, either version 3 of the License, or
22  *  any later version.
23  *
24  *  Anytun is distributed in the hope that it will be useful,
25  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
26  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *  GNU General Public License for more details.
28  *
29  *  You should have received a copy of the GNU General Public License
30  *  along with anytun.  If not, see <http://www.gnu.org/licenses/>.
31  */
32
33 #include <iostream>
34
35 #include <boost/asio.hpp>
36
37 #include <fcntl.h>
38 #include <pwd.h>
39 #include <grp.h>
40
41 #include "../datatypes.h"
42
43 #include "../log.h"
44 #include "../signalController.h"
45 #include "../buffer.h"
46 #include "connectionList.h"
47 #include "rtpSessionTable.h"
48 #include "syncRtpCommand.h"
49 #include "../syncQueue.h"
50 #include "../syncClient.h"
51 #include "syncOnConnect.hpp"
52
53 #include "../threadUtils.hpp"
54
55 #include "commandHandler.h"
56 #include "callIdQueue.h"
57
58 #include "options.h"
59 #include "portWindow.h"
60 #include <map>
61 #include <fstream>
62
63 #define MAX_PACKET_SIZE 1500
64
65 void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
66 {
67   cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
68
69   try 
70   {
71     Buffer buf(u_int32_t(MAX_PACKET_SIZE));
72     RtpSession::proto::endpoint remote_end;
73
74     while(1) {
75       buf.setLength(MAX_PACKET_SIZE);
76       u_int32_t len=0;
77       if(dir == 1)
78         len = 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
79       else if(dir == 2)
80         len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
81                         else break;
82
83       RtpSession& session = gRtpSessionTable.getSession(call_id);
84       if(session.isDead()) {
85         cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting"; 
86         break;
87       }
88
89       if(!len)
90         continue;
91       buf.setLength(len);
92       
93       if((dir == 1 && remote_end != session.getRemoteEnd1()) || 
94          (dir == 2 && remote_end != session.getRemoteEnd2()))
95       {
96         if(gOpt.getNat() ||
97            (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) || 
98                                      (dir == 2 && !session.getSeen2()))))
99         {
100           cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
101                                      << remote_end;
102           if(dir == 1)
103             session.setRemoteEnd1(remote_end);
104           if(dir == 2)
105             session.setRemoteEnd2(remote_end);
106           
107           if(!gOpt.getNat()) { // with nat enabled sync is not needed
108             SyncRtpCommand sc(call_id);
109             queue->push(sc);
110           }
111         }
112         else
113           continue;
114                         }
115       session.setSeen1();
116       session.setSeen2();
117
118       if(dir == 1)
119         sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
120       else if(dir == 2)
121         sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
122       else break;
123     }  
124   }
125   catch(std::exception &e)
126   {
127     cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
128   }
129   *running = false;
130   gCallIdQueue.push(call_id);
131 }
132
133 class ListenerData
134 {
135 public:
136   ListenerData() : sock1_(ios1_), sock2_(ios2_) {}
137
138   boost::asio::io_service ios1_;
139   boost::asio::io_service ios2_;
140   RtpSession::proto::socket sock1_;
141   RtpSession::proto::socket sock2_;
142   boost::thread* thread1_;
143   boost::thread* thread2_;
144   bool running1_;
145   bool running2_;
146 };
147
148 void listenerManager(void* p)
149 {
150   SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
151
152   std::map<std::string, ListenerData*> listenerMap;
153   while(1)
154   {
155     try 
156     {
157       std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
158       gCallIdQueue.pop();
159
160       RtpSession& session = gRtpSessionTable.getSession(call_id);
161       if(!session.isComplete())
162         continue;
163
164       std::map<std::string, ListenerData*>::iterator it;
165       it = listenerMap.find(call_id);
166       if(it == listenerMap.end()) // listener Threads not existing yet
167       {        
168         ListenerData* ld = new ListenerData();
169
170         ld->sock1_.open(session.getLocalEnd1().protocol());
171         ld->sock1_.bind(session.getLocalEnd1());
172
173         ld->sock2_.open(session.getLocalEnd2().protocol());
174         ld->sock2_.bind(session.getLocalEnd2());
175
176         ld->thread1_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 1, queue_, &(ld->running1_)));
177         ld->thread2_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 2, queue_, &(ld->running2_)));
178
179         std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
180         ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
181         continue;
182       }
183
184       if(!it->second->running1_ && !it->second->running2_)
185       {
186         cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
187         if(it->second->thread1_) {
188           it->second->thread1_->join();
189           delete it->second->thread1_;
190         }
191         if(it->second->thread2_) {
192           it->second->thread2_->join();
193           delete it->second->thread2_;
194         }
195         delete it->second;
196         listenerMap.erase(it);
197         gRtpSessionTable.delSession(call_id);
198         continue;
199       }
200           // TODO: reinit if session changed
201     }
202     catch(std::exception &e)
203     {
204       cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
205       usleep(500); // in case of an hard error don't block cpu (this is ugly)
206     }
207   }
208   cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason";
209 }
210
211 void chrootAndDrop(string const& chrootdir, string const& username)
212 {
213         if (getuid() != 0)
214         {
215           std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl;
216                 exit(-1);
217         }       
218
219   struct passwd *pw = getpwnam(username.c_str());
220         if(pw) {
221                 if(chroot(chrootdir.c_str()))
222                 {
223       std::cerr << "can't chroot to " << chrootdir << std::endl;
224       exit(-1);
225                 }
226     std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
227     chdir("/");
228                 if (initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) 
229                 {
230                         std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
231                         exit(-1);
232                 }
233     std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
234         }
235         else 
236   {
237     std::cerr << "unknown user " << username << std::endl;
238     exit(-1);
239         }
240 }
241
242 void daemonize()
243 {
244   pid_t pid;
245
246   pid = fork();
247   if(pid) exit(0);  
248   setsid();
249   pid = fork();
250   if(pid) exit(0);
251   
252 //  std::cout << "running in background now..." << std::endl;
253
254   int fd;
255 //  for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
256   for (fd=0;fd<=2;fd++) // close all file descriptors
257     close(fd);
258   fd=open("/dev/null",O_RDWR);        // stdin
259   dup(fd);                            // stdout
260   dup(fd);                            // stderr
261   umask(027); 
262 }
263
264 class ThreadParam
265 {
266 public:
267   ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
268     : queue(queue_),connto(connto_)
269     {};
270   SyncQueue & queue;
271   OptionConnectTo & connto;
272 };
273
274 void syncConnector(void* p)
275 {
276         ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
277
278         SyncClient sc ( param->connto.host, param->connto.port);
279         sc.run();
280 }
281
282 void syncListener(SyncQueue * queue)
283 {
284   try
285   {
286     boost::asio::io_service io_service;
287                 SyncTcpConnection::proto::resolver resolver(io_service);
288                 SyncTcpConnection::proto::endpoint e;
289                 if(gOpt.getLocalSyncAddr()!="")
290                 {
291                         SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
292                         e = *resolver.resolve(query);
293                 } else {
294                         SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
295                         e = *resolver.resolve(query);
296                 }
297
298
299     SyncServer server(io_service,e);
300                 server.onConnect=boost::bind(syncOnConnect,_1);
301                 queue->setSyncServerPtr(&server);
302     io_service.run();
303   }
304   catch (std::exception& e)
305   {
306     std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
307     cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
308                             << " (" << e.what() << ")" << std::endl;
309   }
310
311 }
312
313 int main(int argc, char* argv[])
314 {
315 //  std::cout << "anyrtpproxy" << std::endl;
316   if(!gOpt.parse(argc, argv))
317   {
318     gOpt.printUsage();
319     exit(-1);
320   }
321
322   cLog.setLogName("anyrtpproxy");
323   cLog.msg(Log::PRIO_NOTICE) << "anyrtpproxy started...";
324
325   std::ofstream pidFile;
326   if(gOpt.getPidFile() != "") {
327     pidFile.open(gOpt.getPidFile().c_str());
328     if(!pidFile.is_open()) {
329       std::cout << "can't open pid file" << std::endl;
330     }
331   }
332
333   if(gOpt.getChroot())
334     chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
335   if(gOpt.getDaemonize())
336     daemonize();
337
338   if(pidFile.is_open()) {
339     pid_t pid = getpid();
340     pidFile << pid;
341     pidFile.close();
342   }
343   
344   SignalController sig;
345   sig.init();
346
347   SyncQueue queue;
348
349
350   boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
351
352
353 // #ifndef ANYTUN_NOSYNC
354 //     boost::thread * syncListenerThread;
355 //     if(gOpt.getLocalSyncPort() != "")
356 //       syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
357     
358 //     std::list<boost::thread *> connectThreads;
359 //     for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) { 
360 //       ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
361 //       connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
362 //     }
363 // #endif
364
365
366
367 //   pthread_t syncListenerThread;
368
369 //      ConnectToList connect_to = gOpt.getConnectTo();
370 //      ThreadParam p( queue,*(new OptionConnectTo()));
371 //   if ( gOpt.getLocalSyncPort())
372 //     pthread_create(&syncListenerThread, NULL, syncListener, &p);
373
374 //   std::list<pthread_t> connectThreads;
375 //   for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
376 //   {
377 //     connectThreads.push_back(pthread_t());
378 //     ThreadParam * point = new ThreadParam(queue,*it);
379 //     pthread_create(& connectThreads.back(),  NULL, syncConnector, point);
380 //   }
381
382         PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
383   CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
384   
385   int ret = sig.run();
386   return ret;
387 }
388