4 * The secure anycast tunneling protocol (satp) defines a protocol used
5 * for communication between any combination of unicast and anycast
6 * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
7 * mode and allows tunneling of every ETHER TYPE protocol (e.g.
8 * ethernet, ip, arp ...). satp directly includes cryptography and
9 * message authentication based on the methods used by SRTP. It is
10 * intended to deliver a generic, scaleable and secure solution for
11 * tunneling and relaying of packets of any protocol.
14 * Copyright (C) 2007-2014 Markus Grüneis, Othmar Gsenger, Erwin Nindl,
15 * Christian Pointner <satp@wirdorange.org>
17 * This file is part of Anytun.
19 * Anytun is free software: you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation, either version 3 of the License, or
24 * Anytun is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
29 * You should have received a copy of the GNU General Public License
30 * along with Anytun. If not, see <http://www.gnu.org/licenses/>.
32 * In addition, as a special exception, the copyright holders give
33 * permission to link the code of portions of this program with the
34 * OpenSSL library under certain conditions as described in each
35 * individual source file, and distribute linked combinations
37 * You must obey the GNU General Public License in all respects
38 * for all of the code used other than OpenSSL. If you modify
39 * file(s) with this exception, you may extend this exception to your
40 * version of the file(s), but you are not obligated to do so. If you
41 * do not wish to do so, delete this exception statement from your
42 * version. If you delete this exception statement from all source
43 * files in the program, then also delete it here.
48 #include <boost/asio.hpp>
54 #include "../datatypes.h"
57 #include "../signalController.h"
58 #include "../buffer.h"
59 #include "connectionList.h"
60 #include "rtpSessionTable.h"
61 #include "syncRtpCommand.h"
62 #include "../syncQueue.h"
63 #include "../syncClient.h"
64 #include "syncOnConnect.hpp"
66 #include "../threadUtils.hpp"
68 #include "commandHandler.h"
69 #include "callIdQueue.h"
72 #include "portWindow.h"
76 #define MAX_PACKET_SIZE 1500
78 void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
80 cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
83 Buffer buf(uint32_t(MAX_PACKET_SIZE));
84 RtpSession::proto::endpoint remote_end;
87 buf.setLength(MAX_PACKET_SIZE);
90 len = 0; //sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
92 len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
95 RtpSession& session = gRtpSessionTable.getSession(call_id);
96 if(session.isDead()) {
97 cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting";
106 if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
107 (dir == 2 && remote_end != session.getRemoteEnd2())) {
109 (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
110 (dir == 2 && !session.getSeen2())))) {
111 cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
114 session.setRemoteEnd1(remote_end);
117 session.setRemoteEnd2(remote_end);
120 if(!gOpt.getNat()) { // with nat enabled sync is not needed
121 SyncRtpCommand sc(call_id);
132 sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
133 } else if(dir == 2) {
134 sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
137 } catch(std::exception& e) {
138 cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
141 gCallIdQueue.push(call_id);
147 ListenerData() : sock1_(ios1_), sock2_(ios2_) {}
149 boost::asio::io_service ios1_;
150 boost::asio::io_service ios2_;
151 RtpSession::proto::socket sock1_;
152 RtpSession::proto::socket sock2_;
153 boost::thread* thread1_;
154 boost::thread* thread2_;
159 void listenerManager(void* p)
161 SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
163 std::map<std::string, ListenerData*> listenerMap;
166 std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
169 RtpSession& session = gRtpSessionTable.getSession(call_id);
170 if(!session.isComplete()) {
174 std::map<std::string, ListenerData*>::iterator it;
175 it = listenerMap.find(call_id);
176 if(it == listenerMap.end()) { // listener Threads not existing yet
177 ListenerData* ld = new ListenerData();
179 ld->sock1_.open(session.getLocalEnd1().protocol());
180 ld->sock1_.bind(session.getLocalEnd1());
182 ld->sock2_.open(session.getLocalEnd2().protocol());
183 ld->sock2_.bind(session.getLocalEnd2());
185 ld->thread1_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 1, queue_, &(ld->running1_)));
186 ld->thread2_ = new boost::thread(boost::bind(listener, &(ld->sock1_), &(ld->sock2_), call_id, 2, queue_, &(ld->running2_)));
188 std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
189 ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
193 if(!it->second->running1_ && !it->second->running2_) {
194 cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
195 if(it->second->thread1_) {
196 it->second->thread1_->join();
197 delete it->second->thread1_;
199 if(it->second->thread2_) {
200 it->second->thread2_->join();
201 delete it->second->thread2_;
204 listenerMap.erase(it);
205 gRtpSessionTable.delSession(call_id);
208 // TODO: reinit if session changed
209 } catch(std::exception& e) {
210 cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
211 usleep(500); // in case of an hard error don't block cpu (this is ugly)
214 cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason";
217 void chrootAndDrop(string const& chrootdir, string const& username)
220 std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl;
224 struct passwd* pw = getpwnam(username.c_str());
226 if(chroot(chrootdir.c_str())) {
227 std::cerr << "can't chroot to " << chrootdir << std::endl;
230 std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
232 if(initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) {
233 std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
236 std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
238 std::cerr << "unknown user " << username << std::endl;
253 // std::cout << "running in background now..." << std::endl;
256 // for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
257 for(fd=0; fd<=2; fd++) { // close all file descriptors
260 fd=open("/dev/null",O_RDWR); // stdin
269 ThreadParam(SyncQueue& queue_,OptionConnectTo& connto_)
270 : queue(queue_),connto(connto_)
273 OptionConnectTo& connto;
276 void syncConnector(void* p)
278 ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
280 SyncClient sc(param->connto.host, param->connto.port);
284 void syncListener(SyncQueue* queue)
287 boost::asio::io_service io_service;
288 SyncTcpConnection::proto::resolver resolver(io_service);
289 SyncTcpConnection::proto::endpoint e;
290 if(gOpt.getLocalSyncAddr()!="") {
291 SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
292 e = *resolver.resolve(query);
294 SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
295 e = *resolver.resolve(query);
299 SyncServer server(io_service,e);
300 server.onConnect=boost::bind(syncOnConnect,_1);
301 queue->setSyncServerPtr(&server);
303 } catch(std::exception& e) {
304 std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
305 cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
306 << " (" << e.what() << ")" << std::endl;
311 int main(int argc, char* argv[])
313 // std::cout << "anyrtpproxy" << std::endl;
314 if(!gOpt.parse(argc, argv)) {
319 cLog.setLogName("anyrtpproxy");
320 cLog.msg(Log::PRIO_NOTICE) << "anyrtpproxy started...";
322 std::ofstream pidFile;
323 if(gOpt.getPidFile() != "") {
324 pidFile.open(gOpt.getPidFile().c_str());
325 if(!pidFile.is_open()) {
326 std::cout << "can't open pid file" << std::endl;
330 if(gOpt.getChroot()) {
331 chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
333 if(gOpt.getDaemonize()) {
337 if(pidFile.is_open()) {
338 pid_t pid = getpid();
343 SignalController sig;
349 boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
352 // #ifndef ANYTUN_NOSYNC
353 // boost::thread * syncListenerThread;
354 // if(gOpt.getLocalSyncPort() != "")
355 // syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
357 // std::list<boost::thread *> connectThreads;
358 // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
359 // ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
360 // connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
366 // pthread_t syncListenerThread;
368 // ConnectToList connect_to = gOpt.getConnectTo();
369 // ThreadParam p( queue,*(new OptionConnectTo()));
370 // if ( gOpt.getLocalSyncPort())
371 // pthread_create(&syncListenerThread, NULL, syncListener, &p);
373 // std::list<pthread_t> connectThreads;
374 // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
376 // connectThreads.push_back(pthread_t());
377 // ThreadParam * point = new ThreadParam(queue,*it);
378 // pthread_create(& connectThreads.back(), NULL, syncConnector, point);
381 PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
382 CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);