]> git.tdb.fi Git - libs/net.git/blob - source/net/resolve.cpp
Add a dynamic receiver class for more flexible packet handling
[libs/net.git] / source / net / resolve.cpp
1 #include "resolve.h"
2 #include <msp/core/systemerror.h>
3 #include <msp/strings/format.h>
4 #include "platform_api.h"
5 #include "sockaddr_private.h"
6 #include "socket.h"
7
8 using namespace std;
9
10 namespace {
11
12 void parse_host_serv(const string &str, string &host, string &serv)
13 {
14         if(str[0]=='[')
15         {
16                 string::size_type bracket = str.find(']');
17                 host = str.substr(1, bracket-1);
18                 string::size_type colon = str.find(':', bracket);
19                 if(colon!=string::npos)
20                         serv = str.substr(colon+1);
21         }
22         else
23         {
24                 string::size_type colon = str.find(':');
25                 if(colon!=string::npos)
26                 {
27                         host = str.substr(0, colon);
28                         serv = str.substr(colon+1);
29                 }
30                 else
31                         host = str;
32         }
33 }
34
35 }
36
37
38 namespace Msp {
39 namespace Net {
40
41 SockAddr *resolve(const string &host, const string &serv, Family family)
42 {
43         const char *chost = (host.empty() ? nullptr : host.c_str());
44         const char *cserv = (serv.empty() ? nullptr : serv.c_str());
45         int flags = 0;
46         if(host=="*")
47         {
48                 flags = AI_PASSIVE;
49                 chost = nullptr;
50         }
51
52         addrinfo hints = { flags, family_to_sys(family), 0, 0, 0, nullptr, nullptr, nullptr };
53         addrinfo *res;
54
55         int err = getaddrinfo(chost, cserv, &hints, &res);
56         if(err==0)
57         {
58                 SockAddr::SysAddr sa;
59                 sa.size = res->ai_addrlen;
60                 const char *sptr = reinterpret_cast<const char *>(res->ai_addr);
61                 char *dptr = reinterpret_cast<char *>(&sa.addr);
62                 copy(sptr, sptr+res->ai_addrlen, dptr);
63                 SockAddr *addr = SockAddr::new_from_sys(sa);
64                 freeaddrinfo(res);
65                 return addr;
66         }
67         else
68 #ifdef _WIN32
69                 throw system_error("getaddrinfo", WSAGetLastError());
70 #else
71                 throw system_error("getaddrinfo", gai_strerror(err));
72 #endif
73 }
74
75 SockAddr *resolve(const string &str, Family family)
76 {
77         string host, serv;
78         parse_host_serv(str, host, serv);
79
80         return resolve(host, serv, family);
81 }
82
83
84 Resolver::Resolver()
85 {
86         thread.get_notify_pipe().signal_data_available.connect(sigc::mem_fun(this, &Resolver::task_done));
87 }
88
89 void Resolver::use_event_dispatcher(IO::EventDispatcher *ed)
90 {
91         if(event_disp)
92                 event_disp->remove(thread.get_notify_pipe());
93         event_disp = ed;
94         if(event_disp)
95                 event_disp->add(thread.get_notify_pipe());
96 }
97
98 unsigned Resolver::resolve(const string &host, const string &serv, Family family)
99 {
100         Task task;
101         task.tag = next_tag++;
102         task.host = host;
103         task.serv = serv;
104         task.family = family;
105         thread.add_task(move(task));
106         return task.tag;
107 }
108
109 unsigned Resolver::resolve(const string &str, Family family)
110 {
111         string host, serv;
112         parse_host_serv(str, host, serv);
113
114         return resolve(host, serv, family);
115 }
116
117 void Resolver::tick()
118 {
119         if(IO::poll(thread.get_notify_pipe(), IO::P_INPUT, Time::zero))
120                 task_done();
121 }
122
123 void Resolver::task_done()
124 {
125         char buf[64];
126         thread.get_notify_pipe().read(buf, sizeof(buf));
127
128         while(Task *task = thread.get_complete_task())
129         {
130                 if(task->addr)
131                         signal_address_resolved.emit(task->tag, *task->addr);
132                 else if(task->error)
133                 {
134                         if(signal_resolve_failed.empty())
135                         {
136                                 unique_ptr<runtime_error> err = move(task->error);
137                                 thread.pop_complete_task();
138                                 throw *err;
139                         }
140                         signal_resolve_failed.emit(task->tag, *task->error);
141                 }
142                 thread.pop_complete_task();
143         }
144 }
145
146
147 Resolver::WorkerThread::WorkerThread():
148         Thread("Resolver"),
149         sem(1)
150 {
151         launch();
152 }
153
154 Resolver::WorkerThread::~WorkerThread()
155 {
156         done = true;
157         sem.signal();
158         join();
159 }
160
161 void Resolver::WorkerThread::add_task(Task &&t)
162 {
163         MutexLock lock(queue_mutex);
164         bool was_starved = (queue.empty() || queue.back().is_complete());
165         queue.push_back(move(t));
166         if(was_starved)
167                 sem.signal();
168 }
169
170 Resolver::Task *Resolver::WorkerThread::get_complete_task()
171 {
172         MutexLock lock(queue_mutex);
173         if(!queue.empty() && queue.front().is_complete())
174                 return &queue.front();
175         else
176                 return nullptr;
177 }
178
179 void Resolver::WorkerThread::pop_complete_task()
180 {
181         MutexLock lock(queue_mutex);
182         if(!queue.empty() && queue.front().is_complete())
183                 queue.pop_front();
184 }
185
186 void Resolver::WorkerThread::main()
187 {
188         bool wait = true;
189         while(!done)
190         {
191                 if(wait)
192                         sem.wait();
193                 wait = false;
194
195                 Task *task = nullptr;
196                 {
197                         MutexLock lock(queue_mutex);
198                         for(auto i=queue.begin(); (!task && i!=queue.end()); ++i)
199                                 if(!i->is_complete())
200                                         task = &*i;
201                 }
202
203                 if(task)
204                 {
205                         try
206                         {
207                                 unique_ptr<SockAddr> addr(Net::resolve(task->host, task->serv, task->family));
208                                 {
209                                         MutexLock lock(queue_mutex);
210                                         task->addr = move(addr);
211                                 }
212                         }
213                         catch(const runtime_error &e)
214                         {
215                                 MutexLock lock(queue_mutex);
216                                 task->error = make_unique<runtime_error>(e);
217                         }
218                         notify_pipe.put(1);
219                 }
220                 else
221                         wait = true;
222         }
223 }
224
225 } // namespace Net
226 } // namespace Msp