#include <boost/asio.hpp>
#include <boost/bind.hpp>
+#include <boost/thread.hpp>
#include "datatypes.h"
#include "packetSource.h"
#include "resolver.h"
#include "options.h"
#include "signalController.h"
+#include "anytunError.h"
void PacketSource::waitUntilReady()
{
ready_sem_.down();
}
-UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port) : sock_(io_service_)
+UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port)
{
gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType());
}
-void UDPPacketSource::onResolve(const boost::asio::ip::udp::endpoint& e)
+UDPPacketSource::~UDPPacketSource()
{
- cLog.msg(Log::PRIO_NOTICE) << "opening socket: " << e;
- sock_.open(e.protocol());
- sock_.bind(e);
+ std::list<SocketsElement>::iterator it = sockets_.begin();
+ for(;it != sockets_.end(); ++it) {
+/// this might be a needed by the receiver thread, TODO cleanup
+// delete[](it->buf_);
+// delete(it->sem_);
+// delete(it->sock_);
+ }
+}
+
+void UDPPacketSource::onResolve(PacketSourceResolverIt& it)
+{
+ while(it != PacketSourceResolverIt()) {
+ PacketSourceEndpoint e = *it;
+ cLog.msg(Log::PRIO_NOTICE) << "opening socket: " << e;
+
+ SocketsElement sock;
+ sock.buf_ = NULL;
+ sock.len_ = 0;
+ sock.sem_ = NULL;
+ sock.sock_ = new proto::socket(io_service_);
+ if(!sock.sock_)
+ AnytunError::throwErr() << "memory error";
+
+ sock.sock_->open(e.protocol());
+#ifndef _MSC_VER
+ if(e.protocol() == proto::v6())
+ sock.sock_->set_option(boost::asio::ip::v6_only(true));
+#endif
+ sock.sock_->bind(e);
+ sockets_.push_back(sock);
+
+ it++;
+ }
+
+ // prepare multi-socket recv
+ if(sockets_.size() > 1) {
+ std::list<SocketsElement>::iterator it = sockets_.begin();
+ for(;it != sockets_.end(); ++it) {
+ it->len_ = MAX_PACKET_LENGTH;
+ it->buf_ = new u_int8_t[it->len_];
+ if(!it->buf_)
+ AnytunError::throwErr() << "memory error";
+
+ it->sem_ = new Semaphore();
+ if(!it->sem_) {
+ delete[](it->buf_);
+ AnytunError::throwErr() << "memory error";
+ }
+
+ boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, it));
+ it->sem_->up();
+ }
+
+ }
+
ready_sem_.up();
}
gSignalController.inject(SIGERROR, e.what());
}
+void UDPPacketSource::recv_thread(std::list<SocketsElement>::iterator it)
+{
+ cLog.msg(Log::PRIO_INFO) << "started receiver thread for " << it->sock_->local_endpoint();
+
+ ThreadResult result;
+ result.it_ = it;
+ for(;;) {
+ it->sem_->down();
+ result.len_ = static_cast<u_int32_t>(it->sock_->receive_from(boost::asio::buffer(it->buf_, it->len_), result.remote_));
+ {
+ Lock lock(thread_result_mutex_);
+ thread_result_queue_.push(result);
+ }
+ thread_result_sem_.up();
+ }
+}
+
u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote)
{
- return static_cast<u_int32_t>(sock_.receive_from(boost::asio::buffer(buf, len), remote));
+ if(sockets_.size() == 1)
+ return static_cast<u_int32_t>(sockets_.front().sock_->receive_from(boost::asio::buffer(buf, len), remote));
+
+ thread_result_sem_.down();
+ ThreadResult result;
+ {
+ Lock lock(thread_result_mutex_);
+ result = thread_result_queue_.front();
+ thread_result_queue_.pop();
+ }
+ remote = result.remote_;
+ std::memcpy(buf, result.it_->buf_, (len < result.len_) ? len : result.len_);
+ len = (len < result.len_) ? len : result.len_;
+ result.it_->sem_->up();
+
+ return len;
}
void UDPPacketSource::send(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint remote)
{
- sock_.send_to(boost::asio::buffer(buf, len), remote);
+ std::list<SocketsElement>::iterator it = sockets_.begin();
+ for(;it != sockets_.end(); ++it) {
+ if(it->sock_->local_endpoint().protocol() == remote.protocol()) {
+ it->sock_->send_to(boost::asio::buffer(buf, len), remote);
+ return;
+ }
+ }
+ cLog.msg(Log::PRIO_WARNING) << "no suitable socket found for remote endpoint protocol: " << remote;
}