Implement an asynchronous name resolver class
[libs/net.git] / source / net / resolve.cpp
index 88497faa4b2dead177bcb72f54dceee72713ee77..992973a01dea4f73c51b54600748b0015ce0542b 100644 (file)
 
 using namespace std;
 
+namespace {
+
+void parse_host_serv(const string &str, string &host, string &serv)
+{
+       if(str[0]=='[')
+       {
+               string::size_type bracket = str.find(']');
+               host = str.substr(1, bracket-1);
+               string::size_type colon = str.find(':', bracket);
+               if(colon!=string::npos)
+                       serv = str.substr(colon+1);
+       }
+       else
+       {
+               string::size_type colon = str.find(':');
+               if(colon!=string::npos)
+               {
+                       host = str.substr(0, colon);
+                       serv = str.substr(colon+1);
+               }
+               else
+                       host = str;
+       }
+}
+
+}
+
+
 namespace Msp {
 namespace Net {
 
@@ -52,27 +80,158 @@ SockAddr *resolve(const string &host, const string &serv, Family family)
 SockAddr *resolve(const string &str, Family family)
 {
        string host, serv;
-       if(str[0]=='[')
+       parse_host_serv(str, host, serv);
+
+       return resolve(host, serv, family);
+}
+
+
+Resolver::Resolver():
+       event_disp(0),
+       next_tag(1)
+{
+       thread.get_notify_pipe().signal_data_available.connect(sigc::mem_fun(this, &Resolver::task_done));
+}
+
+void Resolver::use_event_dispatcher(IO::EventDispatcher *ed)
+{
+       if(event_disp)
+               event_disp->remove(thread.get_notify_pipe());
+       event_disp = ed;
+       if(event_disp)
+               event_disp->add(thread.get_notify_pipe());
+}
+
+unsigned Resolver::resolve(const string &host, const string &serv, Family family)
+{
+       Task task;
+       task.tag = next_tag++;
+       task.host = host;
+       task.serv = serv;
+       task.family = family;
+       thread.add_task(task);
+       return task.tag;
+}
+
+unsigned Resolver::resolve(const string &str, Family family)
+{
+       string host, serv;
+       parse_host_serv(str, host, serv);
+
+       return resolve(host, serv, family);
+}
+
+void Resolver::tick()
+{
+       if(IO::poll(thread.get_notify_pipe(), IO::P_INPUT, Time::zero))
+               task_done();
+}
+
+void Resolver::task_done()
+{
+       char buf[64];
+       thread.get_notify_pipe().read(buf, sizeof(buf));
+
+       while(Task *task = thread.get_complete_task())
        {
-               string::size_type bracket = str.find(']');
-               host = str.substr(1, bracket-1);
-               string::size_type colon = str.find(':', bracket);
-               if(colon!=string::npos)
-                       serv = str.substr(colon+1);
+               if(task->addr)
+                       signal_address_resolved.emit(task->tag, *task->addr);
+               else if(task->error)
+                       signal_resolve_failed.emit(task->tag, *task->error);
+               thread.pop_complete_task();
        }
+}
+
+
+Resolver::Task::Task():
+       tag(0),
+       family(UNSPEC),
+       addr(0),
+       error(0)
+{ }
+
+
+Resolver::WorkerThread::WorkerThread():
+       Thread("Resolver"),
+       sem(1),
+       done(false)
+{
+       launch();
+}
+
+Resolver::WorkerThread::~WorkerThread()
+{
+       done = true;
+       sem.signal();
+       join();
+}
+
+void Resolver::WorkerThread::add_task(const Task &t)
+{
+       MutexLock lock(queue_mutex);
+       bool was_starved = (queue.empty() || queue.back().is_complete());
+       queue.push_back(t);
+       if(was_starved)
+               sem.signal();
+}
+
+Resolver::Task *Resolver::WorkerThread::get_complete_task()
+{
+       MutexLock lock(queue_mutex);
+       if(!queue.empty() && queue.front().is_complete())
+               return &queue.front();
        else
+               return 0;
+}
+
+void Resolver::WorkerThread::pop_complete_task()
+{
+       MutexLock lock(queue_mutex);
+       if(!queue.empty() && queue.front().is_complete())
        {
-               string::size_type colon = str.find(':');
-               if(colon!=string::npos)
+               delete queue.front().addr;
+               delete queue.front().error;
+               queue.pop_front();
+       }
+}
+
+void Resolver::WorkerThread::main()
+{
+       bool wait = true;
+       while(!done)
+       {
+               if(wait)
+                       sem.wait();
+               wait = false;
+
+               Task *task = 0;
                {
-                       host = str.substr(0, colon);
-                       serv = str.substr(colon+1);
+                       MutexLock lock(queue_mutex);
+                       for(list<Task>::iterator i=queue.begin(); (!task && i!=queue.end()); ++i)
+                               if(!i->is_complete())
+                                       task = &*i;
+               }
+
+               if(task)
+               {
+                       try
+                       {
+                               SockAddr *addr = Net::resolve(task->host, task->serv, task->family);
+                               {
+                                       MutexLock lock(queue_mutex);
+                                       task->addr = addr;
+                               }
+                       }
+                       catch(const runtime_error &e)
+                       {
+                               MutexLock lock(queue_mutex);
+                               task->error = new runtime_error(e);
+                       }
+                       notify_pipe.put(1);
                }
                else
-                       host = str;
+                       wait = true;
        }
-
-       return resolve(host, serv, family);
 }
 
                /*sockaddr sa;