From fa637ffb18421300e401a782d28dd729a3960ac4 Mon Sep 17 00:00:00 2001 From: Mikko Rasa Date: Wed, 14 Sep 2016 13:50:29 +0300 Subject: [PATCH] Implement an asynchronous name resolver class --- source/net/resolve.cpp | 185 ++++++++++++++++++++++++++++++++++++++--- source/net/resolve.h | 79 ++++++++++++++++++ 2 files changed, 251 insertions(+), 13 deletions(-) diff --git a/source/net/resolve.cpp b/source/net/resolve.cpp index 88497fa..992973a 100644 --- a/source/net/resolve.cpp +++ b/source/net/resolve.cpp @@ -12,6 +12,34 @@ using namespace std; +namespace { + +void parse_host_serv(const string &str, string &host, string &serv) +{ + if(str[0]=='[') + { + string::size_type bracket = str.find(']'); + host = str.substr(1, bracket-1); + string::size_type colon = str.find(':', bracket); + if(colon!=string::npos) + serv = str.substr(colon+1); + } + else + { + string::size_type colon = str.find(':'); + if(colon!=string::npos) + { + host = str.substr(0, colon); + serv = str.substr(colon+1); + } + else + host = str; + } +} + +} + + namespace Msp { namespace Net { @@ -52,27 +80,158 @@ SockAddr *resolve(const string &host, const string &serv, Family family) SockAddr *resolve(const string &str, Family family) { string host, serv; - if(str[0]=='[') + parse_host_serv(str, host, serv); + + return resolve(host, serv, family); +} + + +Resolver::Resolver(): + event_disp(0), + next_tag(1) +{ + thread.get_notify_pipe().signal_data_available.connect(sigc::mem_fun(this, &Resolver::task_done)); +} + +void Resolver::use_event_dispatcher(IO::EventDispatcher *ed) +{ + if(event_disp) + event_disp->remove(thread.get_notify_pipe()); + event_disp = ed; + if(event_disp) + event_disp->add(thread.get_notify_pipe()); +} + +unsigned Resolver::resolve(const string &host, const string &serv, Family family) +{ + Task task; + task.tag = next_tag++; + task.host = host; + task.serv = serv; + task.family = family; + thread.add_task(task); + return task.tag; +} + +unsigned Resolver::resolve(const string &str, Family family) +{ + string host, serv; + parse_host_serv(str, host, serv); + + return resolve(host, serv, family); +} + +void Resolver::tick() +{ + if(IO::poll(thread.get_notify_pipe(), IO::P_INPUT, Time::zero)) + task_done(); +} + +void Resolver::task_done() +{ + char buf[64]; + thread.get_notify_pipe().read(buf, sizeof(buf)); + + while(Task *task = thread.get_complete_task()) { - string::size_type bracket = str.find(']'); - host = str.substr(1, bracket-1); - string::size_type colon = str.find(':', bracket); - if(colon!=string::npos) - serv = str.substr(colon+1); + if(task->addr) + signal_address_resolved.emit(task->tag, *task->addr); + else if(task->error) + signal_resolve_failed.emit(task->tag, *task->error); + thread.pop_complete_task(); } +} + + +Resolver::Task::Task(): + tag(0), + family(UNSPEC), + addr(0), + error(0) +{ } + + +Resolver::WorkerThread::WorkerThread(): + Thread("Resolver"), + sem(1), + done(false) +{ + launch(); +} + +Resolver::WorkerThread::~WorkerThread() +{ + done = true; + sem.signal(); + join(); +} + +void Resolver::WorkerThread::add_task(const Task &t) +{ + MutexLock lock(queue_mutex); + bool was_starved = (queue.empty() || queue.back().is_complete()); + queue.push_back(t); + if(was_starved) + sem.signal(); +} + +Resolver::Task *Resolver::WorkerThread::get_complete_task() +{ + MutexLock lock(queue_mutex); + if(!queue.empty() && queue.front().is_complete()) + return &queue.front(); else + return 0; +} + +void Resolver::WorkerThread::pop_complete_task() +{ + MutexLock lock(queue_mutex); + if(!queue.empty() && queue.front().is_complete()) { - string::size_type colon = str.find(':'); - if(colon!=string::npos) + delete queue.front().addr; + delete queue.front().error; + queue.pop_front(); + } +} + +void Resolver::WorkerThread::main() +{ + bool wait = true; + while(!done) + { + if(wait) + sem.wait(); + wait = false; + + Task *task = 0; { - host = str.substr(0, colon); - serv = str.substr(colon+1); + MutexLock lock(queue_mutex); + for(list::iterator i=queue.begin(); (!task && i!=queue.end()); ++i) + if(!i->is_complete()) + task = &*i; + } + + if(task) + { + try + { + SockAddr *addr = Net::resolve(task->host, task->serv, task->family); + { + MutexLock lock(queue_mutex); + task->addr = addr; + } + } + catch(const runtime_error &e) + { + MutexLock lock(queue_mutex); + task->error = new runtime_error(e); + } + notify_pipe.put(1); } else - host = str; + wait = true; } - - return resolve(host, serv, family); } /*sockaddr sa; diff --git a/source/net/resolve.h b/source/net/resolve.h index 256455f..21737ff 100644 --- a/source/net/resolve.h +++ b/source/net/resolve.h @@ -2,6 +2,11 @@ #define MSP_NET_RESOLVE_H_ #include +#include +#include +#include +#include +#include #include "constants.h" namespace Msp { @@ -20,6 +25,80 @@ separated by a colon. If the host part contains colons, such as is the case with a numeric IPv6 address, it must be enclosed in brackets. */ SockAddr *resolve(const std::string &, Family = UNSPEC); + +/** +An asynchronous name resolver. Blocking calls are performed in a thread and +completion is notified with one of the two signals. +*/ +class Resolver +{ +private: + struct Task + { + unsigned tag; + std::string host; + std::string serv; + Family family; + SockAddr *addr; + std::runtime_error *error; + + Task(); + + bool is_complete() const { return addr || error; } + }; + + class WorkerThread: public Thread + { + private: + std::list queue; + Mutex queue_mutex; + Semaphore sem; + IO::Pipe notify_pipe; + bool done; + + public: + WorkerThread(); + ~WorkerThread(); + + void add_task(const Task &); + Task *get_complete_task(); + void pop_complete_task(); + + IO::Pipe &get_notify_pipe() { return notify_pipe; } + + private: + virtual void main(); + }; + +public: + sigc::signal signal_address_resolved; + sigc::signal signal_resolve_failed; + +private: + IO::EventDispatcher *event_disp; + WorkerThread thread; + unsigned next_tag; + +public: + Resolver(); + + /** Sets the event dispatcher to use. With no event dispatcher, tick() must + be called regularly to emit the completion signals. */ + void use_event_dispatcher(IO::EventDispatcher *); + + /** Resolves host and service names into a socket address. Semantics are + the same as the namespace-scope resolve function. */ + unsigned resolve(const std::string &, const std::string &, Family = UNSPEC); + + unsigned resolve(const std::string &, Family = UNSPEC); + + /** Checks for any completed tasks and emits the appropriate singals. */ + void tick(); + +private: + void task_done(); +}; + } // namespace Net } // namespace Msp -- 2.45.2