Imported Upstream version 0.3.3
[anytun.git] / src / packetSource.cpp
index 0882de5..a5443ad 100644 (file)
@@ -32,6 +32,7 @@
 
 #include <boost/asio.hpp>
 #include <boost/bind.hpp>
+#include <boost/thread.hpp>
 
 #include "datatypes.h"
 #include "packetSource.h"
 #include "resolver.h"
 #include "options.h"
 #include "signalController.h"
+#include "anytunError.h"
 
 void PacketSource::waitUntilReady()
 {
   ready_sem_.down();
 }
 
-UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port) : sock_(io_service_)
+UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port)
 {
   gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType());
 }
 
-void UDPPacketSource::onResolve(const boost::asio::ip::udp::endpoint& e)
+UDPPacketSource::~UDPPacketSource()
 {
-  cLog.msg(Log::PRIO_NOTICE) << "opening socket: " << e;
-  sock_.open(e.protocol());
-  sock_.bind(e);
+  std::list<SocketsElement>::iterator it = sockets_.begin();
+  for(;it != sockets_.end(); ++it) {
+/// this might be a needed by the receiver thread, TODO cleanup
+//    delete[](it->buf_);
+//    delete(it->sem_);
+//    delete(it->sock_);
+  }
+}
+
+void UDPPacketSource::onResolve(PacketSourceResolverIt& it)
+{
+  while(it != PacketSourceResolverIt()) {
+    PacketSourceEndpoint e = *it;
+    cLog.msg(Log::PRIO_NOTICE) << "opening socket: " << e;
+
+    SocketsElement sock;
+    sock.buf_ = NULL;
+    sock.len_ = 0;
+    sock.sem_ = NULL;
+    sock.sock_ = new proto::socket(io_service_);
+    if(!sock.sock_)
+      AnytunError::throwErr() << "memory error";
+
+    sock.sock_->open(e.protocol());
+#ifndef _MSC_VER
+    if(e.protocol() == proto::v6())
+      sock.sock_->set_option(boost::asio::ip::v6_only(true));
+#endif
+    sock.sock_->bind(e);
+    sockets_.push_back(sock);
+
+    it++;
+  }
+
+      // prepare multi-socket recv
+  if(sockets_.size() > 1) {
+    std::list<SocketsElement>::iterator it = sockets_.begin();
+    for(;it != sockets_.end(); ++it) {
+      it->len_ = MAX_PACKET_LENGTH;
+      it->buf_ = new u_int8_t[it->len_];
+      if(!it->buf_)
+        AnytunError::throwErr() << "memory error";
+      
+      it->sem_ = new Semaphore();
+      if(!it->sem_) {
+        delete[](it->buf_);
+        AnytunError::throwErr() << "memory error";
+      }
+
+      boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, it));
+      it->sem_->up();
+    }
+
+  }
+
   ready_sem_.up();
 }
 
@@ -63,13 +117,52 @@ void UDPPacketSource::onError(const std::runtime_error& e)
   gSignalController.inject(SIGERROR, e.what());
 }
 
+void UDPPacketSource::recv_thread(std::list<SocketsElement>::iterator it)
+{
+  cLog.msg(Log::PRIO_INFO) << "started receiver thread for " << it->sock_->local_endpoint();
+
+  ThreadResult result;
+  result.it_ = it;
+  for(;;) {
+    it->sem_->down();
+    result.len_ = static_cast<u_int32_t>(it->sock_->receive_from(boost::asio::buffer(it->buf_, it->len_), result.remote_));
+    {
+      Lock lock(thread_result_mutex_);
+      thread_result_queue_.push(result);
+    }
+    thread_result_sem_.up();
+  }
+}
+
 u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote)
 {
-  return static_cast<u_int32_t>(sock_.receive_from(boost::asio::buffer(buf, len), remote));
+  if(sockets_.size() == 1)
+    return static_cast<u_int32_t>(sockets_.front().sock_->receive_from(boost::asio::buffer(buf, len), remote));
+
+  thread_result_sem_.down();
+  ThreadResult result;
+  {
+    Lock lock(thread_result_mutex_);
+    result = thread_result_queue_.front();
+    thread_result_queue_.pop();
+  }
+  remote = result.remote_;
+  std::memcpy(buf, result.it_->buf_, (len < result.len_) ? len : result.len_);
+  len = (len < result.len_) ? len : result.len_;
+  result.it_->sem_->up();
+
+  return len;
 }
 
 void UDPPacketSource::send(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint remote)
 {
-  sock_.send_to(boost::asio::buffer(buf, len), remote);
+  std::list<SocketsElement>::iterator it = sockets_.begin();
+  for(;it != sockets_.end(); ++it) {
+    if(it->sock_->local_endpoint().protocol() == remote.protocol()) {
+      it->sock_->send_to(boost::asio::buffer(buf, len), remote);
+      return;
+    }
+  }
+  cLog.msg(Log::PRIO_WARNING) << "no suitable socket found for remote endpoint protocol: " << remote;
 }