Imported Upstream version 0.3.5
[anytun.git] / src / anyrtpproxy / anyrtpproxy.cpp
1 /*
2  *  anytun
3  *
4  *  The secure anycast tunneling protocol (satp) defines a protocol used
5  *  for communication between any combination of unicast and anycast
6  *  tunnel endpoints.  It has less protocol overhead than IPSec in Tunnel
7  *  mode and allows tunneling of every ETHER TYPE protocol (e.g.
8  *  ethernet, ip, arp ...). satp directly includes cryptography and
9  *  message authentication based on the methods used by SRTP.  It is
10  *  intended to deliver a generic, scaleable and secure solution for
11  *  tunneling and relaying of packets of any protocol.
12  *
13  *
14  *  Copyright (C) 2007-2014 Markus Grüneis, Othmar Gsenger, Erwin Nindl,
15  *                          Christian Pointner <satp@wirdorange.org>
16  *
17  *  This file is part of Anytun.
18  *
19  *  Anytun is free software: you can redistribute it and/or modify
20  *  it under the terms of the GNU General Public License as published by
21  *  the Free Software Foundation, either version 3 of the License, or
22  *  any later version.
23  *
24  *  Anytun is distributed in the hope that it will be useful,
25  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
26  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *  GNU General Public License for more details.
28  *
29  *  You should have received a copy of the GNU General Public License
30  *  along with Anytun.  If not, see <http://www.gnu.org/licenses/>.
31  *
32  *  In addition, as a special exception, the copyright holders give
33  *  permission to link the code of portions of this program with the
34  *  OpenSSL library under certain conditions as described in each
35  *  individual source file, and distribute linked combinations
36  *  including the two.
37  *  You must obey the GNU General Public License in all respects
38  *  for all of the code used other than OpenSSL.  If you modify
39  *  file(s) with this exception, you may extend this exception to your
40  *  version of the file(s), but you are not obligated to do so.  If you
41  *  do not wish to do so, delete this exception statement from your
42  *  version.  If you delete this exception statement from all source
43  *  files in the program, then also delete it here.
44  */
45
46 #include <iostream>
47
48 #include <boost/asio.hpp>
49
50 #include <fcntl.h>
51 #include <pwd.h>
52 #include <grp.h>
53
54 #include "../datatypes.h"
55
56 #include "../log.h"
57 #include "../signalController.h"
58 #include "../buffer.h"
59 #include "connectionList.h"
60 #include "rtpSessionTable.h"
61 #include "syncRtpCommand.h"
62 #include "../syncQueue.h"
63 #include "../syncClient.h"
64 #include "syncOnConnect.hpp"
65
66 #include "../threadUtils.hpp"
67
68 #include "commandHandler.h"
69 #include "callIdQueue.h"
70
71 #include "options.h"
72 #include "portWindow.h"
73 #include <map>
74 #include <fstream>
75
76 #define MAX_PACKET_SIZE 1500
77
78 void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
79 {
80   cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
81
82   try {
83     Buffer buf(uint32_t(MAX_PACKET_SIZE));
84     RtpSession::proto::endpoint remote_end;
85
86     for(;;) {
87       buf.setLength(MAX_PACKET_SIZE);
88       uint32_t len=0;
89       if(dir == 1) {
90         len = 0;  //sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
91       } else if(dir == 2) {
92         len = 0;  //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
93       } else { break; }
94
95       RtpSession& session = gRtpSessionTable.getSession(call_id);
96       if(session.isDead()) {
97         cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting";
98         break;
99       }
100
101       if(!len) {
102         continue;
103       }
104       buf.setLength(len);
105
106       if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
107           (dir == 2 && remote_end != session.getRemoteEnd2())) {
108         if(gOpt.getNat() ||
109             (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
110                                       (dir == 2 && !session.getSeen2())))) {
111           cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
112                                      << remote_end;
113           if(dir == 1) {
114             session.setRemoteEnd1(remote_end);
115           }
116           if(dir == 2) {
117             session.setRemoteEnd2(remote_end);
118           }
119
120           if(!gOpt.getNat()) { // with nat enabled sync is not needed
121             SyncRtpCommand sc(call_id);
122             queue->push(sc);
123           }
124         } else {
125           continue;
126         }
127       }
128       session.setSeen1();
129       session.setSeen2();
130
131       if(dir == 1) {
132         sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
133       } else if(dir == 2) {
134         sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
135       } else { break; }
136     }
137   } catch(std::exception& e) {
138     cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
139   }
140   *running = false;
141   gCallIdQueue.push(call_id);
142 }
143
144 class ListenerData
145 {
146 public:
147   ListenerData() : sock1_(ios1_), sock2_(ios2_) {}
148
149   boost::asio::io_service ios1_;
150   boost::asio::io_service ios2_;
151   RtpSession::proto::socket sock1_;
152   RtpSession::proto::socket sock2_;
153   boost::thread* thread1_;
154   boost::thread* thread2_;
155   bool running1_;
156   bool running2_;
157 };
158
159 void listenerManager(void* p)
160 {
161   SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
162
163   std::map<std::string, ListenerData*> listenerMap;
164   for(;;) {
165     try {
166       std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
167       gCallIdQueue.pop();
168
169       RtpSession& session = gRtpSessionTable.getSession(call_id);
170       if(!session.isComplete()) {
171         continue;
172       }
173
174       std::map<std::string, ListenerData*>::iterator it;
175       it = listenerMap.find(call_id);
176       if(it == listenerMap.end()) { // listener Threads not existing yet
177         ListenerData* ld = new ListenerData();
178
179         ld->sock1_.open(session.getLocalEnd1().protocol());
180         ld->sock1_.bind(session.getLocalEnd1());
181
182         ld->sock2_.open(session.getLocalEnd2().protocol());
183         ld->sock2_.bind(session.getLocalEnd2());
184
185         ld->thread1_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 1, queue_, &(ld->running1_)));
186         ld->thread2_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 2, queue_, &(ld->running2_)));
187
188         std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
189         ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
190         continue;
191       }
192
193       if(!it->second->running1_ && !it->second->running2_) {
194         cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
195         if(it->second->thread1_) {
196           it->second->thread1_->join();
197           delete it->second->thread1_;
198         }
199         if(it->second->thread2_) {
200           it->second->thread2_->join();
201           delete it->second->thread2_;
202         }
203         delete it->second;
204         listenerMap.erase(it);
205         gRtpSessionTable.delSession(call_id);
206         continue;
207       }
208       // TODO: reinit if session changed
209     } catch(std::exception& e) {
210       cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
211       usleep(500); // in case of an hard error don't block cpu (this is ugly)
212     }
213   }
214   cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason";
215 }
216
217 void chrootAndDrop(string const& chrootdir, string const& username)
218 {
219   if(getuid() != 0) {
220     std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl;
221     exit(-1);
222   }
223
224   struct passwd* pw = getpwnam(username.c_str());
225   if(pw) {
226     if(chroot(chrootdir.c_str())) {
227       std::cerr << "can't chroot to " << chrootdir << std::endl;
228       exit(-1);
229     }
230     std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
231     chdir("/");
232     if(initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) {
233       std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
234       exit(-1);
235     }
236     std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
237   } else {
238     std::cerr << "unknown user " << username << std::endl;
239     exit(-1);
240   }
241 }
242
243 void daemonize()
244 {
245   pid_t pid;
246
247   pid = fork();
248   if(pid) { exit(0); }
249   setsid();
250   pid = fork();
251   if(pid) { exit(0); }
252
253   //  std::cout << "running in background now..." << std::endl;
254
255   int fd;
256   //  for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
257   for(fd=0; fd<=2; fd++) { // close all file descriptors
258     close(fd);
259   }
260   fd=open("/dev/null",O_RDWR);        // stdin
261   dup(fd);                            // stdout
262   dup(fd);                            // stderr
263   umask(027);
264 }
265
266 class ThreadParam
267 {
268 public:
269   ThreadParam(SyncQueue& queue_,OptionConnectTo& connto_)
270     : queue(queue_),connto(connto_)
271   {};
272   SyncQueue& queue;
273   OptionConnectTo& connto;
274 };
275
276 void syncConnector(void* p)
277 {
278   ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
279
280   SyncClient sc(param->connto.host, param->connto.port);
281   sc.run();
282 }
283
284 void syncListener(SyncQueue* queue)
285 {
286   try {
287     boost::asio::io_service io_service;
288     SyncTcpConnection::proto::resolver resolver(io_service);
289     SyncTcpConnection::proto::endpoint e;
290     if(gOpt.getLocalSyncAddr()!="") {
291       SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
292       e = *resolver.resolve(query);
293     } else {
294       SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
295       e = *resolver.resolve(query);
296     }
297
298
299     SyncServer server(io_service,e);
300     server.onConnect=boost::bind(syncOnConnect,_1);
301     queue->setSyncServerPtr(&server);
302     io_service.run();
303   } catch(std::exception& e) {
304     std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
305     cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
306                             << " (" << e.what() << ")" << std::endl;
307   }
308
309 }
310
311 int main(int argc, char* argv[])
312 {
313   //  std::cout << "anyrtpproxy" << std::endl;
314   if(!gOpt.parse(argc, argv)) {
315     gOpt.printUsage();
316     exit(-1);
317   }
318
319   cLog.setLogName("anyrtpproxy");
320   cLog.msg(Log::PRIO_NOTICE) << "anyrtpproxy started...";
321
322   std::ofstream pidFile;
323   if(gOpt.getPidFile() != "") {
324     pidFile.open(gOpt.getPidFile().c_str());
325     if(!pidFile.is_open()) {
326       std::cout << "can't open pid file" << std::endl;
327     }
328   }
329
330   if(gOpt.getChroot()) {
331     chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
332   }
333   if(gOpt.getDaemonize()) {
334     daemonize();
335   }
336
337   if(pidFile.is_open()) {
338     pid_t pid = getpid();
339     pidFile << pid;
340     pidFile.close();
341   }
342
343   SignalController sig;
344   sig.init();
345
346   SyncQueue queue;
347
348
349   boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
350
351
352   // #ifndef ANYTUN_NOSYNC
353   //     boost::thread * syncListenerThread;
354   //     if(gOpt.getLocalSyncPort() != "")
355   //       syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
356
357   //     std::list<boost::thread *> connectThreads;
358   //     for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
359   //       ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
360   //       connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
361   //     }
362   // #endif
363
364
365
366   //   pthread_t syncListenerThread;
367
368   //    ConnectToList connect_to = gOpt.getConnectTo();
369   //    ThreadParam p( queue,*(new OptionConnectTo()));
370   //   if ( gOpt.getLocalSyncPort())
371   //     pthread_create(&syncListenerThread, NULL, syncListener, &p);
372
373   //   std::list<pthread_t> connectThreads;
374   //   for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
375   //   {
376   //     connectThreads.push_back(pthread_t());
377   //     ThreadParam * point = new ThreadParam(queue,*it);
378   //     pthread_create(& connectThreads.back(),  NULL, syncConnector, point);
379   //   }
380
381   PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
382   CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
383
384   int ret = sig.run();
385   return ret;
386 }
387