]> git.tdb.fi Git - libs/gl.git/blob - source/resourcemanager.cpp
Make ResourceManager thread-safe
[libs/gl.git] / source / resourcemanager.cpp
1 #include <algorithm>
2 #include <typeinfo>
3 #include <msp/debug/demangle.h>
4 #include <msp/strings/format.h>
5 #include <msp/time/utils.h>
6 #include "resourcemanager.h"
7 #include "resources.h"
8 #include "resourcewatcher.h"
9
10 using namespace std;
11
12 namespace Msp {
13 namespace GL {
14
15 resource_load_error::resource_load_error(const string &name, const string &err):
16         runtime_error(format("%s: %s", name, err))
17 { }
18
19 resource_load_error::resource_load_error(const string &name, const exception &exc):
20         runtime_error(format("%s: %s: %s", name, Debug::demangle(typeid(exc).name()), exc.what()))
21 { }
22
23
24 ResourceManager::ResourceManager():
25         policy(LOAD_ON_DEMAND),
26         async_loads(true),
27         size_limit(0),
28         frame(0),
29         min_retain_frames(30),
30         max_retain_frames(0),
31         next_unload(0)
32 { }
33
34 ResourceManager::~ResourceManager()
35 {
36         thread.terminate();
37
38         while(!resources.empty())
39                 resources.begin()->second.resource->set_manager(0);
40 }
41
42 void ResourceManager::set_loading_policy(LoadingPolicy p)
43 {
44         policy = p;
45 }
46
47 void ResourceManager::set_async_loads(bool a)
48 {
49         async_loads = a;
50 }
51
52 void ResourceManager::set_size_limit(UInt64 s)
53 {
54         size_limit = s;
55 }
56
57 void ResourceManager::set_min_retain_frames(unsigned f)
58 {
59         min_retain_frames = f;
60 }
61
62 void ResourceManager::set_max_retain_frames(unsigned f)
63 {
64         max_retain_frames = f;
65 }
66
67 void ResourceManager::add_resource(Resource &r)
68 {
69         MutexLock lock(map_mutex);
70         insert_unique(resources, &r, ManagedResource(r));
71 }
72
73 const ResourceManager::ManagedResource &ResourceManager::get_managed_resource(const Resource &r) const
74 {
75         MutexLock lock(map_mutex);
76         return get_item(resources, &r);
77 }
78
79 ResourceManager::ManagedResource &ResourceManager::get_managed_resource(const Resource &r)
80 {
81         MutexLock lock(map_mutex);
82         return get_item(resources, &r);
83 }
84
85 void ResourceManager::set_resource_location(Resource &r, DataFile::Collection &c, const string &n)
86 {
87         set_resource_location(r, ResourceLocation(c, n));
88 }
89
90 void ResourceManager::set_resource_location(Resource &r, const ResourceLocation &l)
91 {
92         {
93                 MutexLock lock(map_mutex);
94                 ManagedResource &managed = get_item(resources, &r);
95                 managed.location = l;
96         }
97
98         if(policy==LOAD_IMMEDIATELY)
99                 load_resource(r);
100 }
101
102 const ResourceManager::ResourceLocation *ResourceManager::get_resource_location(const Resource &r) const
103 {
104         const ManagedResource &managed = get_managed_resource(r);
105         return managed.location.collection ? &managed.location : 0;
106 }
107
108 void ResourceManager::load_resource(Resource &r)
109 {
110         ManagedResource &managed = get_managed_resource(r);
111         if(!managed.location.collection)
112                 throw runtime_error("no location");
113
114         if(managed.state!=ManagedResource::NOT_LOADED)
115                 return;
116
117         if(async_loads)
118         {
119                 managed.state = ManagedResource::LOAD_QUEUED;
120                 LoadQueue::iterator i;
121                 for(i=queue.begin(); (i!=queue.end() && (*i)->load_priority>=managed.load_priority); ++i) ;
122                 queue.insert(i, &managed);
123         }
124         else
125         {
126                 managed.start_loading();
127                 while(!managed.loader->process()) ;
128                 managed.finish_loading(true);
129         }
130 }
131
132 bool ResourceManager::is_resource_loaded(const Resource &r) const
133 {
134         ManagedResource *managed = reinterpret_cast<ManagedResource *>(r.get_manager_data());
135         return managed ? managed->state==ManagedResource::LOADED : false;
136 }
137
138 void ResourceManager::resource_used(const Resource &r)
139 {
140         ManagedResource *managed = reinterpret_cast<ManagedResource *>(r.get_manager_data());
141         if(!managed)
142                 return;
143         if(managed->state==ManagedResource::NOT_LOADED && policy!=LOAD_MANUALLY)
144                 load_resource(*managed->resource);
145
146         managed->last_used = frame;
147         if(max_retain_frames && !next_unload)
148                 next_unload = frame+max_retain_frames+1;
149 }
150
151 void ResourceManager::remove_resource(Resource &r)
152 {
153         ManagedResource &managed = get_managed_resource(r);
154         ManagedResource::State state = managed.state;
155         if(state==ManagedResource::LOAD_QUEUED)
156         {
157                 LoadQueue::iterator i = find(queue.begin(), queue.end(), &managed);
158                 if(i!=queue.end())
159                         queue.erase(i);
160         }
161         else if(state>ManagedResource::LOAD_QUEUED && state<ManagedResource::LOADED)
162                 thread.remove_resource(managed);
163
164         MutexLock lock(map_mutex);
165         remove_existing(resources, &r);
166 }
167
168 void ResourceManager::watch_resource(const Resource &r, ResourceWatcher &w)
169 {
170         get_managed_resource(r).add_watcher(w);
171 }
172
173 void ResourceManager::unwatch_resource(const Resource &r, ResourceWatcher &w)
174 {
175         get_managed_resource(r).remove_watcher(w);
176 }
177
178 void ResourceManager::tick()
179 {
180         ++frame;
181
182         bool do_unload = (frame>=next_unload);
183         if(thread.sync())
184                 do_unload = true;
185
186         if(thread.needs_work() && !queue.empty())
187                 dispatch_work();
188
189         if(do_unload)
190         {
191                 MutexLock lock(map_mutex);
192                 if(max_retain_frames && frame>=next_unload)
193                 {
194                         unload_by_age();
195
196                         next_unload = frame;
197                         for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i)
198                                 if(i->second.state==ManagedResource::LOADED)
199                                         next_unload = min(next_unload, i->second.last_used);
200                         next_unload = (next_unload<frame ? next_unload+max_retain_frames : 0);
201                 }
202
203                 if(size_limit)
204                         unload_by_size();
205         }
206 }
207
208 void ResourceManager::dispatch_work()
209 {
210         queue.sort(age_order);
211
212         if(queue.front()->last_used+10<frame)
213         {
214                 for(LoadQueue::iterator i=queue.begin(); i!=queue.end(); ++i)
215                         (*i)->state = ManagedResource::NOT_LOADED;
216                 queue.clear();
217                 return;
218         }
219
220         while(thread.needs_work() && !queue.empty())
221         {
222                 ManagedResource *managed = queue.front();
223                 queue.pop_front();
224                 thread.add_resource(*managed);
225         }
226 }
227
228 void ResourceManager::unload_by_age()
229 {
230         unsigned unload_limit = frame-max_retain_frames;
231
232         for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i)
233                 if(i->second.state==ManagedResource::LOADED && i->second.last_used<unload_limit)
234                         i->second.unload();
235 }
236
237 void ResourceManager::unload_by_size()
238 {
239         unsigned unload_limit = frame-min_retain_frames;
240
241         while(get_total_data_size_()>size_limit)
242         {
243                 ManagedResource *best = 0;
244                 UInt64 best_impact = 0;
245                 for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i)
246                         if(i->second.state==ManagedResource::LOADED && i->second.last_used<unload_limit)
247                         {
248                                 UInt64 impact = i->second.data_size*(frame-i->second.last_used);
249                                 if(!best || impact>best_impact)
250                                 {
251                                         best = &i->second;
252                                         best_impact = impact;
253                                 }
254                         }
255
256                 if(!best)
257                         break;
258
259                 best->unload();
260         }
261 }
262
263 UInt64 ResourceManager::get_total_data_size_() const
264 {
265         UInt64 total = 0;
266         for(ResourceMap::const_iterator i=resources.begin(); i!=resources.end(); ++i)
267                 if(i->second.state==ManagedResource::LOADED)
268                         total += i->second.data_size;
269         return total;
270 }
271
272 UInt64 ResourceManager::get_total_data_size() const
273 {
274         MutexLock lock(map_mutex);
275         return get_total_data_size_();
276 }
277
278 bool ResourceManager::age_order(ManagedResource *mr1, ManagedResource *mr2)
279 {
280         return mr1->last_used>mr2->last_used;
281 }
282
283
284 ResourceManager::ResourceLocation::ResourceLocation():
285         collection(0)
286 { }
287
288 ResourceManager::ResourceLocation::ResourceLocation(DataFile::Collection &c, const string &n):
289         collection(&c),
290         name(n)
291 { }
292
293
294 ResourceManager::ManagedResource::ManagedResource(Resource &r):
295         resource(&r),
296         load_priority(r.get_load_priority()),
297         io(0),
298         loader(0),
299         state(NOT_LOADED),
300         last_used(0),
301         data_size(0)
302 { }
303
304 void ResourceManager::ManagedResource::start_loading()
305 {
306         io = location.collection->open_raw(location.name);
307         if(!io)
308                 throw resource_load_error(location.name, "open failed");
309
310         const Resources *res = dynamic_cast<Resources *>(location.collection);
311         loader = resource->load(*io, res);
312         if(!loader)
313         {
314                 delete io;
315                 io = 0;
316                 throw logic_error("no loader created");
317         }
318         state = LOADING;
319 }
320
321 bool ResourceManager::ManagedResource::process(bool sync)
322 {
323         while(state!=LOAD_FINISHED && loader->needs_sync()==sync)
324                 if(loader->process())
325                         state = LOAD_FINISHED;
326
327         return state==LOAD_FINISHED;
328 }
329
330 void ResourceManager::ManagedResource::finish_loading(bool successful)
331 {
332         delete loader;
333         loader = 0;
334         delete io;
335         io = 0;
336
337         if(successful)
338         {
339                 state = LOADED;
340                 data_size = resource->get_data_size();
341
342                 for(vector<ResourceWatcher *>::const_iterator i=watchers.begin(); i!=watchers.end(); ++i)
343                         (*i)->resource_loaded(*resource);
344         }
345         else
346         {
347                 resource->unload();
348                 state = NOT_LOADED;
349         }
350 }
351
352 void ResourceManager::ManagedResource::finish_loading()
353 {
354         finish_loading(state==LOAD_FINISHED);
355 }
356
357 void ResourceManager::ManagedResource::unload()
358 {
359         resource->unload();
360         state = NOT_LOADED;
361
362         for(vector<ResourceWatcher *>::const_iterator i=watchers.begin(); i!=watchers.end(); ++i)
363                 (*i)->resource_unloaded(*resource);
364 }
365
366 void ResourceManager::ManagedResource::add_watcher(ResourceWatcher &w)
367 {
368         if(find(watchers.begin(), watchers.end(), &w)==watchers.end())
369                 watchers.push_back(&w);
370 }
371
372 void ResourceManager::ManagedResource::remove_watcher(ResourceWatcher &w)
373 {
374         vector<ResourceWatcher *>::iterator end = remove(watchers.begin(), watchers.end(), &w);
375         if(end!=watchers.end())
376                 watchers.erase(end, watchers.end());
377 }
378
379
380 ResourceManager::LoadingThread::LoadingThread():
381         sem(1),
382         capacity(2),
383         size(0),
384         done(false)
385 {
386         launch();
387 }
388
389 void ResourceManager::LoadingThread::main()
390 {
391         bool wait_for_work = false;
392         while(!done)
393         {
394                 if(wait_for_work)
395                         sem.wait();
396
397                 if(ManagedResource *managed = front(async_queue))
398                 {
399                         try
400                         {
401                                 managed->process(false);
402                         }
403                         catch(const exception &e)
404                         {
405                                 MutexLock lock(queue_mutex);
406                                 error_queue.push_back(resource_load_error(managed->location.name, e));
407                                 managed->state = ManagedResource::LOAD_ERROR;
408                         }
409
410                         MutexLock lock(queue_mutex);
411                         sync_queue.splice(sync_queue.end(), async_queue, async_queue.begin());
412                         wait_for_work = async_queue.empty();
413                 }
414                 else
415                         wait_for_work = true;
416         }
417 }
418
419 ResourceManager::ManagedResource *ResourceManager::LoadingThread::front(LoadQueue &queue)
420 {
421         MutexLock lock(queue_mutex);
422         if(queue.empty())
423                 return 0;
424
425         return queue.front();
426 }
427
428 void ResourceManager::LoadingThread::add_resource(ManagedResource &r)
429 {
430         r.start_loading();
431
432         MutexLock lock(queue_mutex);
433         if(r.loader->needs_sync())
434                 sync_queue.push_back(&r);
435         else
436         {
437                 bool was_empty = async_queue.empty();
438                 async_queue.push_back(&r);
439                 if(was_empty)
440                         sem.signal();
441         }
442
443         ++size;
444 }
445
446 void ResourceManager::LoadingThread::remove_resource(ManagedResource &r)
447 {
448         while(!try_remove_resource(r))
449                 Time::sleep(Time::msec);
450
451         r.finish_loading();
452 }
453
454 bool ResourceManager::LoadingThread::try_remove_resource(ManagedResource &r)
455 {
456         MutexLock lock(queue_mutex);
457
458         LoadQueue::iterator i = find(async_queue.begin(), async_queue.end(), &r);
459         if(i==async_queue.end())
460         {
461                 i = find(sync_queue.begin(), sync_queue.end(), &r);
462                 if(i!=sync_queue.end())
463                 {
464                         sync_queue.erase(i);
465                         --size;
466                 }
467         }
468         else if(i==async_queue.begin())
469                 return false;
470         else
471         {
472                 async_queue.erase(i);
473                 --size;
474         }
475
476         return true;
477 }
478
479 bool ResourceManager::LoadingThread::sync()
480 {
481         {
482                 MutexLock lock(queue_mutex);
483
484                 if(!error_queue.empty())
485                 {
486                         resource_load_error err = error_queue.front();
487                         error_queue.pop_front();
488                         throw err;
489                 }
490
491                 unsigned async_size = async_queue.size();
492                 if(async_size==0 && size==capacity)
493                         ++capacity;
494                 else if(async_size>2 && capacity>2)
495                         --capacity;
496         }
497
498         bool any_finished = false;
499         while(ManagedResource *managed = front(sync_queue))
500         {
501                 if(managed->state==ManagedResource::LOAD_ERROR || managed->process(true))
502                 {
503                         managed->finish_loading();
504                         any_finished = true;
505                         --size;
506
507                         MutexLock lock(queue_mutex);
508                         sync_queue.pop_front();
509                 }
510                 else
511                 {
512                         MutexLock lock(queue_mutex);
513                         bool was_empty = async_queue.empty();
514                         async_queue.splice(async_queue.end(), sync_queue, sync_queue.begin());
515                         if(was_empty)
516                                 sem.signal();
517                 }
518         }
519
520         return any_finished;
521 }
522
523 void ResourceManager::LoadingThread::terminate()
524 {
525         done = true;
526         sem.signal();
527         join();
528         async_queue.clear();
529         sync_queue.clear();
530 }
531
532 } // namespace GL
533 } // namespace Msp