Imported Upstream version 0.3.4
[anytun.git] / src / anyrtpproxy / anyrtpproxy.cpp
1 /*
2  *  anytun
3  *
4  *  The secure anycast tunneling protocol (satp) defines a protocol used
5  *  for communication between any combination of unicast and anycast
6  *  tunnel endpoints.  It has less protocol overhead than IPSec in Tunnel
7  *  mode and allows tunneling of every ETHER TYPE protocol (e.g.
8  *  ethernet, ip, arp ...). satp directly includes cryptography and
9  *  message authentication based on the methodes used by SRTP.  It is
10  *  intended to deliver a generic, scaleable and secure solution for
11  *  tunneling and relaying of packets of any protocol.
12  *
13  *
14  *  Copyright (C) 2007-2009 Othmar Gsenger, Erwin Nindl,
15  *                          Christian Pointner <satp@wirdorange.org>
16  *
17  *  This file is part of Anytun.
18  *
19  *  Anytun is free software: you can redistribute it and/or modify
20  *  it under the terms of the GNU General Public License as published by
21  *  the Free Software Foundation, either version 3 of the License, or
22  *  any later version.
23  *
24  *  Anytun is distributed in the hope that it will be useful,
25  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
26  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *  GNU General Public License for more details.
28  *
29  *  You should have received a copy of the GNU General Public License
30  *  along with anytun.  If not, see <http://www.gnu.org/licenses/>.
31  */
32
33 #include <iostream>
34
35 #include <boost/asio.hpp>
36
37 #include <fcntl.h>
38 #include <pwd.h>
39 #include <grp.h>
40
41 #include "../datatypes.h"
42
43 #include "../log.h"
44 #include "../signalController.h"
45 #include "../buffer.h"
46 #include "connectionList.h"
47 #include "rtpSessionTable.h"
48 #include "syncRtpCommand.h"
49 #include "../syncQueue.h"
50 #include "../syncClient.h"
51 #include "syncOnConnect.hpp"
52
53 #include "../threadUtils.hpp"
54
55 #include "commandHandler.h"
56 #include "callIdQueue.h"
57
58 #include "options.h"
59 #include "portWindow.h"
60 #include <map>
61 #include <fstream>
62
63 #define MAX_PACKET_SIZE 1500
64
65 void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
66 {
67   cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
68
69   try {
70     Buffer buf(uint32_t(MAX_PACKET_SIZE));
71     RtpSession::proto::endpoint remote_end;
72
73     while(1) {
74       buf.setLength(MAX_PACKET_SIZE);
75       uint32_t len=0;
76       if(dir == 1) {
77         len = 0;  //sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
78       } else if(dir == 2) {
79         len = 0;  //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
80       } else { break; }
81
82       RtpSession& session = gRtpSessionTable.getSession(call_id);
83       if(session.isDead()) {
84         cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting";
85         break;
86       }
87
88       if(!len) {
89         continue;
90       }
91       buf.setLength(len);
92
93       if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
94           (dir == 2 && remote_end != session.getRemoteEnd2())) {
95         if(gOpt.getNat() ||
96             (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
97                                       (dir == 2 && !session.getSeen2())))) {
98           cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
99                                      << remote_end;
100           if(dir == 1) {
101             session.setRemoteEnd1(remote_end);
102           }
103           if(dir == 2) {
104             session.setRemoteEnd2(remote_end);
105           }
106
107           if(!gOpt.getNat()) { // with nat enabled sync is not needed
108             SyncRtpCommand sc(call_id);
109             queue->push(sc);
110           }
111         } else {
112           continue;
113         }
114       }
115       session.setSeen1();
116       session.setSeen2();
117
118       if(dir == 1) {
119         sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
120       } else if(dir == 2) {
121         sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
122       } else { break; }
123     }
124   } catch(std::exception& e) {
125     cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
126   }
127   *running = false;
128   gCallIdQueue.push(call_id);
129 }
130
131 class ListenerData
132 {
133 public:
134   ListenerData() : sock1_(ios1_), sock2_(ios2_) {}
135
136   boost::asio::io_service ios1_;
137   boost::asio::io_service ios2_;
138   RtpSession::proto::socket sock1_;
139   RtpSession::proto::socket sock2_;
140   boost::thread* thread1_;
141   boost::thread* thread2_;
142   bool running1_;
143   bool running2_;
144 };
145
146 void listenerManager(void* p)
147 {
148   SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
149
150   std::map<std::string, ListenerData*> listenerMap;
151   while(1) {
152     try {
153       std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
154       gCallIdQueue.pop();
155
156       RtpSession& session = gRtpSessionTable.getSession(call_id);
157       if(!session.isComplete()) {
158         continue;
159       }
160
161       std::map<std::string, ListenerData*>::iterator it;
162       it = listenerMap.find(call_id);
163       if(it == listenerMap.end()) { // listener Threads not existing yet
164         ListenerData* ld = new ListenerData();
165
166         ld->sock1_.open(session.getLocalEnd1().protocol());
167         ld->sock1_.bind(session.getLocalEnd1());
168
169         ld->sock2_.open(session.getLocalEnd2().protocol());
170         ld->sock2_.bind(session.getLocalEnd2());
171
172         ld->thread1_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 1, queue_, &(ld->running1_)));
173         ld->thread2_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 2, queue_, &(ld->running2_)));
174
175         std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
176         ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
177         continue;
178       }
179
180       if(!it->second->running1_ && !it->second->running2_) {
181         cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
182         if(it->second->thread1_) {
183           it->second->thread1_->join();
184           delete it->second->thread1_;
185         }
186         if(it->second->thread2_) {
187           it->second->thread2_->join();
188           delete it->second->thread2_;
189         }
190         delete it->second;
191         listenerMap.erase(it);
192         gRtpSessionTable.delSession(call_id);
193         continue;
194       }
195       // TODO: reinit if session changed
196     } catch(std::exception& e) {
197       cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
198       usleep(500); // in case of an hard error don't block cpu (this is ugly)
199     }
200   }
201   cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason";
202 }
203
204 void chrootAndDrop(string const& chrootdir, string const& username)
205 {
206   if(getuid() != 0) {
207     std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl;
208     exit(-1);
209   }
210
211   struct passwd* pw = getpwnam(username.c_str());
212   if(pw) {
213     if(chroot(chrootdir.c_str())) {
214       std::cerr << "can't chroot to " << chrootdir << std::endl;
215       exit(-1);
216     }
217     std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
218     chdir("/");
219     if(initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) {
220       std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
221       exit(-1);
222     }
223     std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
224   } else {
225     std::cerr << "unknown user " << username << std::endl;
226     exit(-1);
227   }
228 }
229
230 void daemonize()
231 {
232   pid_t pid;
233
234   pid = fork();
235   if(pid) { exit(0); }
236   setsid();
237   pid = fork();
238   if(pid) { exit(0); }
239
240   //  std::cout << "running in background now..." << std::endl;
241
242   int fd;
243   //  for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
244   for(fd=0; fd<=2; fd++) { // close all file descriptors
245     close(fd);
246   }
247   fd=open("/dev/null",O_RDWR);        // stdin
248   dup(fd);                            // stdout
249   dup(fd);                            // stderr
250   umask(027);
251 }
252
253 class ThreadParam
254 {
255 public:
256   ThreadParam(SyncQueue& queue_,OptionConnectTo& connto_)
257     : queue(queue_),connto(connto_)
258   {};
259   SyncQueue& queue;
260   OptionConnectTo& connto;
261 };
262
263 void syncConnector(void* p)
264 {
265   ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
266
267   SyncClient sc(param->connto.host, param->connto.port);
268   sc.run();
269 }
270
271 void syncListener(SyncQueue* queue)
272 {
273   try {
274     boost::asio::io_service io_service;
275     SyncTcpConnection::proto::resolver resolver(io_service);
276     SyncTcpConnection::proto::endpoint e;
277     if(gOpt.getLocalSyncAddr()!="") {
278       SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
279       e = *resolver.resolve(query);
280     } else {
281       SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
282       e = *resolver.resolve(query);
283     }
284
285
286     SyncServer server(io_service,e);
287     server.onConnect=boost::bind(syncOnConnect,_1);
288     queue->setSyncServerPtr(&server);
289     io_service.run();
290   } catch(std::exception& e) {
291     std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
292     cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
293                             << " (" << e.what() << ")" << std::endl;
294   }
295
296 }
297
298 int main(int argc, char* argv[])
299 {
300   //  std::cout << "anyrtpproxy" << std::endl;
301   if(!gOpt.parse(argc, argv)) {
302     gOpt.printUsage();
303     exit(-1);
304   }
305
306   cLog.setLogName("anyrtpproxy");
307   cLog.msg(Log::PRIO_NOTICE) << "anyrtpproxy started...";
308
309   std::ofstream pidFile;
310   if(gOpt.getPidFile() != "") {
311     pidFile.open(gOpt.getPidFile().c_str());
312     if(!pidFile.is_open()) {
313       std::cout << "can't open pid file" << std::endl;
314     }
315   }
316
317   if(gOpt.getChroot()) {
318     chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
319   }
320   if(gOpt.getDaemonize()) {
321     daemonize();
322   }
323
324   if(pidFile.is_open()) {
325     pid_t pid = getpid();
326     pidFile << pid;
327     pidFile.close();
328   }
329
330   SignalController sig;
331   sig.init();
332
333   SyncQueue queue;
334
335
336   boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
337
338
339   // #ifndef ANYTUN_NOSYNC
340   //     boost::thread * syncListenerThread;
341   //     if(gOpt.getLocalSyncPort() != "")
342   //       syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
343
344   //     std::list<boost::thread *> connectThreads;
345   //     for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
346   //       ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
347   //       connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
348   //     }
349   // #endif
350
351
352
353   //   pthread_t syncListenerThread;
354
355   //    ConnectToList connect_to = gOpt.getConnectTo();
356   //    ThreadParam p( queue,*(new OptionConnectTo()));
357   //   if ( gOpt.getLocalSyncPort())
358   //     pthread_create(&syncListenerThread, NULL, syncListener, &p);
359
360   //   std::list<pthread_t> connectThreads;
361   //   for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
362   //   {
363   //     connectThreads.push_back(pthread_t());
364   //     ThreadParam * point = new ThreadParam(queue,*it);
365   //     pthread_create(& connectThreads.back(),  NULL, syncConnector, point);
366   //   }
367
368   PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
369   CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
370
371   int ret = sig.run();
372   return ret;
373 }
374