Implement an asynchronous name resolver class
authorMikko Rasa <tdb@tdb.fi>
Wed, 14 Sep 2016 10:50:29 +0000 (13:50 +0300)
committerMikko Rasa <tdb@tdb.fi>
Wed, 14 Sep 2016 10:50:29 +0000 (13:50 +0300)
source/net/resolve.cpp
source/net/resolve.h

index 88497faa4b2dead177bcb72f54dceee72713ee77..992973a01dea4f73c51b54600748b0015ce0542b 100644 (file)
 
 using namespace std;
 
+namespace {
+
+void parse_host_serv(const string &str, string &host, string &serv)
+{
+       if(str[0]=='[')
+       {
+               string::size_type bracket = str.find(']');
+               host = str.substr(1, bracket-1);
+               string::size_type colon = str.find(':', bracket);
+               if(colon!=string::npos)
+                       serv = str.substr(colon+1);
+       }
+       else
+       {
+               string::size_type colon = str.find(':');
+               if(colon!=string::npos)
+               {
+                       host = str.substr(0, colon);
+                       serv = str.substr(colon+1);
+               }
+               else
+                       host = str;
+       }
+}
+
+}
+
+
 namespace Msp {
 namespace Net {
 
@@ -52,27 +80,158 @@ SockAddr *resolve(const string &host, const string &serv, Family family)
 SockAddr *resolve(const string &str, Family family)
 {
        string host, serv;
-       if(str[0]=='[')
+       parse_host_serv(str, host, serv);
+
+       return resolve(host, serv, family);
+}
+
+
+Resolver::Resolver():
+       event_disp(0),
+       next_tag(1)
+{
+       thread.get_notify_pipe().signal_data_available.connect(sigc::mem_fun(this, &Resolver::task_done));
+}
+
+void Resolver::use_event_dispatcher(IO::EventDispatcher *ed)
+{
+       if(event_disp)
+               event_disp->remove(thread.get_notify_pipe());
+       event_disp = ed;
+       if(event_disp)
+               event_disp->add(thread.get_notify_pipe());
+}
+
+unsigned Resolver::resolve(const string &host, const string &serv, Family family)
+{
+       Task task;
+       task.tag = next_tag++;
+       task.host = host;
+       task.serv = serv;
+       task.family = family;
+       thread.add_task(task);
+       return task.tag;
+}
+
+unsigned Resolver::resolve(const string &str, Family family)
+{
+       string host, serv;
+       parse_host_serv(str, host, serv);
+
+       return resolve(host, serv, family);
+}
+
+void Resolver::tick()
+{
+       if(IO::poll(thread.get_notify_pipe(), IO::P_INPUT, Time::zero))
+               task_done();
+}
+
+void Resolver::task_done()
+{
+       char buf[64];
+       thread.get_notify_pipe().read(buf, sizeof(buf));
+
+       while(Task *task = thread.get_complete_task())
        {
-               string::size_type bracket = str.find(']');
-               host = str.substr(1, bracket-1);
-               string::size_type colon = str.find(':', bracket);
-               if(colon!=string::npos)
-                       serv = str.substr(colon+1);
+               if(task->addr)
+                       signal_address_resolved.emit(task->tag, *task->addr);
+               else if(task->error)
+                       signal_resolve_failed.emit(task->tag, *task->error);
+               thread.pop_complete_task();
        }
+}
+
+
+Resolver::Task::Task():
+       tag(0),
+       family(UNSPEC),
+       addr(0),
+       error(0)
+{ }
+
+
+Resolver::WorkerThread::WorkerThread():
+       Thread("Resolver"),
+       sem(1),
+       done(false)
+{
+       launch();
+}
+
+Resolver::WorkerThread::~WorkerThread()
+{
+       done = true;
+       sem.signal();
+       join();
+}
+
+void Resolver::WorkerThread::add_task(const Task &t)
+{
+       MutexLock lock(queue_mutex);
+       bool was_starved = (queue.empty() || queue.back().is_complete());
+       queue.push_back(t);
+       if(was_starved)
+               sem.signal();
+}
+
+Resolver::Task *Resolver::WorkerThread::get_complete_task()
+{
+       MutexLock lock(queue_mutex);
+       if(!queue.empty() && queue.front().is_complete())
+               return &queue.front();
        else
+               return 0;
+}
+
+void Resolver::WorkerThread::pop_complete_task()
+{
+       MutexLock lock(queue_mutex);
+       if(!queue.empty() && queue.front().is_complete())
        {
-               string::size_type colon = str.find(':');
-               if(colon!=string::npos)
+               delete queue.front().addr;
+               delete queue.front().error;
+               queue.pop_front();
+       }
+}
+
+void Resolver::WorkerThread::main()
+{
+       bool wait = true;
+       while(!done)
+       {
+               if(wait)
+                       sem.wait();
+               wait = false;
+
+               Task *task = 0;
                {
-                       host = str.substr(0, colon);
-                       serv = str.substr(colon+1);
+                       MutexLock lock(queue_mutex);
+                       for(list<Task>::iterator i=queue.begin(); (!task && i!=queue.end()); ++i)
+                               if(!i->is_complete())
+                                       task = &*i;
+               }
+
+               if(task)
+               {
+                       try
+                       {
+                               SockAddr *addr = Net::resolve(task->host, task->serv, task->family);
+                               {
+                                       MutexLock lock(queue_mutex);
+                                       task->addr = addr;
+                               }
+                       }
+                       catch(const runtime_error &e)
+                       {
+                               MutexLock lock(queue_mutex);
+                               task->error = new runtime_error(e);
+                       }
+                       notify_pipe.put(1);
                }
                else
-                       host = str;
+                       wait = true;
        }
-
-       return resolve(host, serv, family);
 }
 
                /*sockaddr sa;
index 256455fcb18febb08384fd7e6f50e17d35064310..21737ff4794bb0def530bd364cae00a2291ec1c2 100644 (file)
@@ -2,6 +2,11 @@
 #define MSP_NET_RESOLVE_H_
 
 #include <string>
+#include <msp/core/mutex.h>
+#include <msp/core/semaphore.h>
+#include <msp/core/thread.h>
+#include <msp/io/eventdispatcher.h>
+#include <msp/io/pipe.h>
 #include "constants.h"
 
 namespace Msp {
@@ -20,6 +25,80 @@ separated by a colon.  If the host part contains colons, such as is the case
 with a numeric IPv6 address, it must be enclosed in brackets. */
 SockAddr *resolve(const std::string &, Family = UNSPEC);
 
+
+/**
+An asynchronous name resolver.  Blocking calls are performed in a thread and
+completion is notified with one of the two signals.
+*/
+class Resolver
+{
+private:
+       struct Task
+       {
+               unsigned tag;
+               std::string host;
+               std::string serv;
+               Family family;
+               SockAddr *addr;
+               std::runtime_error *error;
+
+               Task();
+
+               bool is_complete() const { return addr || error; }
+       };
+
+       class WorkerThread: public Thread
+       {
+       private:
+               std::list<Task> queue;
+               Mutex queue_mutex;
+               Semaphore sem;
+               IO::Pipe notify_pipe;
+               bool done;
+
+       public:
+               WorkerThread();
+               ~WorkerThread();
+
+               void add_task(const Task &);
+               Task *get_complete_task();
+               void pop_complete_task();
+
+               IO::Pipe &get_notify_pipe() { return notify_pipe; }
+
+       private:
+               virtual void main();
+       };
+
+public:
+       sigc::signal<void, unsigned, const SockAddr &> signal_address_resolved;
+       sigc::signal<void, unsigned, const std::exception &> signal_resolve_failed;
+
+private:
+       IO::EventDispatcher *event_disp;
+       WorkerThread thread;
+       unsigned next_tag;
+
+public:
+       Resolver();
+
+       /** Sets the event dispatcher to use.  With no event dispatcher, tick() must
+       be called regularly to emit the completion signals. */
+       void use_event_dispatcher(IO::EventDispatcher *);
+
+       /** Resolves host and service names into a socket address.  Semantics are
+       the same as the namespace-scope resolve function. */
+       unsigned resolve(const std::string &, const std::string &, Family = UNSPEC);
+
+       unsigned resolve(const std::string &, Family = UNSPEC);
+
+       /** Checks for any completed tasks and emits the appropriate singals. */
+       void tick();
+
+private:
+       void task_done();
+};
+
 } // namespace Net
 } // namespace Msp