From d184ad8a88156a5b0cfe926e5aa66fd574556560 Mon Sep 17 00:00:00 2001 From: Mikko Rasa Date: Tue, 30 Sep 2014 20:05:45 +0300 Subject: [PATCH] Rewrite ResourceManager internals for more scalability Since loading small resources can often take less than one frame, especially with vsync enabled, the thread can now handle more than one resource at once. The code is also somewhat better structured for using more than one thread in the future. --- source/resourcemanager.cpp | 302 +++++++++++++++++++++++++------------ source/resourcemanager.h | 52 ++++--- 2 files changed, 233 insertions(+), 121 deletions(-) diff --git a/source/resourcemanager.cpp b/source/resourcemanager.cpp index 999b86c1..7bf4412f 100644 --- a/source/resourcemanager.cpp +++ b/source/resourcemanager.cpp @@ -15,8 +15,7 @@ ResourceManager::ResourceManager(): frame(0), min_retain_frames(30), max_retain_frames(0), - next_unload(0), - thread(*this) + next_unload(0) { } ResourceManager::~ResourceManager() @@ -90,7 +89,7 @@ void ResourceManager::load_resource(Resource &r) { managed.start_loading(); while(!managed.loader->process()) ; - managed.finish_loading(); + managed.finish_loading(true); } } @@ -102,15 +101,21 @@ void ResourceManager::resource_used(const Resource &r) managed->last_used = frame; if(max_retain_frames && !next_unload) - next_unload = frame+max_retain_frames; + next_unload = frame+max_retain_frames+1; } void ResourceManager::remove_resource(Resource &r) { - ManagedResource *loading = thread.get_resource(); - if(loading && loading->resource==&r) - thread.set_resource(0); - + ManagedResource &managed = get_item(resources, &r); + ManagedResource::State state = managed.state; + if(state==ManagedResource::LOAD_QUEUED) + { + LoadQueue::iterator i = find(queue.begin(), queue.end(), &managed); + if(i!=queue.end()) + queue.erase(i); + } + else if(state>ManagedResource::LOAD_QUEUED && state=next_unload); + if(thread.sync()) + do_unload = true; + + if(thread.needs_work() && !queue.empty()) + dispatch_work(); + + if(do_unload) { - thread.sync(); - check_total_size = true; + if(max_retain_frames && frame>=next_unload) + { + unload_by_age(); + + next_unload = frame; + for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i) + if(i->second.state==ManagedResource::LOADED) + next_unload = min(next_unload, i->second.last_used); + next_unload = (next_unloadlast_used+10state = ManagedResource::NOT_LOADED; + queue.clear(); + return; } - else if(thread_state==LoadingThread::IDLE && !queue.empty()) + + while(thread.needs_work() && !queue.empty()) { ManagedResource *managed = queue.front(); queue.pop_front(); - thread.set_resource(managed); + thread.add_resource(*managed); } +} - ++frame; - if(frame==next_unload) - { - unsigned unload_limit = frame-max_retain_frames; - next_unload = 0; +void ResourceManager::unload_by_age() +{ + unsigned unload_limit = frame-max_retain_frames; - for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i) - if(i->second.state==ManagedResource::LOADED) - { - if(i->second.last_used<=unload_limit) - i->second.unload(); - else if(!next_unload || i->second.last_usedsecond.last_used; - } + for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i) + if(i->second.state==ManagedResource::LOADED && i->second.last_usedsecond.unload(); +} - if(next_unload) - next_unload += max_retain_frames; - } +void ResourceManager::unload_by_size() +{ + unsigned unload_limit = frame-min_retain_frames; - if(check_total_size) + while(get_total_data_size()>size_limit) { - while(get_total_data_size()>size_limit) - { - unsigned unload_limit = frame-min_retain_frames; - ManagedResource *best = 0; - UInt64 best_impact = 0; - for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i) - if(i->second.state==ManagedResource::LOADED && i->second.last_usedsecond.state==ManagedResource::LOADED && i->second.last_usedsecond.data_size*(frame-i->second.last_used); + if(!best || impact>best_impact) { - UInt64 impact = i->second.data_size*(frame-i->second.last_used); - if(!best || impact>best_impact) - { - best = &i->second; - best_impact = impact; - } + best = &i->second; + best_impact = impact; } + } - if(!best) - break; + if(!best) + break; - best->unload(); - } + best->unload(); } } @@ -194,6 +222,11 @@ UInt64 ResourceManager::get_total_data_size() const return total; } +bool ResourceManager::age_order(ManagedResource *mr1, ManagedResource *mr2) +{ + return mr1->last_used>mr2->last_used; +} + ResourceManager::ManagedResource::ManagedResource(Resource &r): resource(&r), @@ -218,17 +251,40 @@ void ResourceManager::ManagedResource::start_loading() state = LOADING; } -void ResourceManager::ManagedResource::finish_loading() +bool ResourceManager::ManagedResource::process(bool sync) +{ + while(state!=LOAD_FINISHED && loader->needs_sync()==sync) + if(loader->process()) + state = LOAD_FINISHED; + + return state==LOAD_FINISHED; +} + +void ResourceManager::ManagedResource::finish_loading(bool successful) { delete loader; loader = 0; - state = LOADED; delete io; io = 0; - data_size = resource->get_data_size(); - for(vector::const_iterator i=watchers.begin(); i!=watchers.end(); ++i) - (*i)->resource_loaded(*resource); + if(successful) + { + state = LOADED; + data_size = resource->get_data_size(); + + for(vector::const_iterator i=watchers.begin(); i!=watchers.end(); ++i) + (*i)->resource_loaded(*resource); + } + else + { + resource->unload(); + state = NOT_LOADED; + } +} + +void ResourceManager::ManagedResource::finish_loading() +{ + finish_loading(state==LOAD_FINISHED); } void ResourceManager::ManagedResource::unload() @@ -254,87 +310,133 @@ void ResourceManager::ManagedResource::remove_watcher(ResourceWatcher &w) } -ResourceManager::LoadingThread::LoadingThread(ResourceManager &m): - manager(m), +ResourceManager::LoadingThread::LoadingThread(): sem(1), - resource(0), - state(IDLE) + capacity(2), + size(0), + done(false) { launch(); } void ResourceManager::LoadingThread::main() { - while(state!=TERMINATING) + bool wait_for_work = false; + while(!done) { - sem.wait(); + if(wait_for_work) + sem.wait(); - if(state==BUSY) + if(ManagedResource *managed = front(async_queue)) { - Resource::AsyncLoader *ldr = resource->loader; - bool finished = false; - while(!finished && !ldr->needs_sync()) - finished = ldr->process(); - - if(finished) - state = LOAD_FINISHED; - else - state = SYNC_PENDING; + managed->process(false); + + MutexLock lock(queue_mutex); + sync_queue.splice(sync_queue.end(), async_queue, async_queue.begin()); + wait_for_work = async_queue.empty(); } + else + wait_for_work = true; } } -void ResourceManager::LoadingThread::set_resource(ManagedResource *r) +ResourceManager::ManagedResource *ResourceManager::LoadingThread::front(LoadQueue &queue) { - if(state!=IDLE) + MutexLock lock(queue_mutex); + if(queue.empty()) + return 0; + + return queue.front(); +} + +void ResourceManager::LoadingThread::add_resource(ManagedResource &r) +{ + r.start_loading(); + + MutexLock lock(queue_mutex); + if(r.loader->needs_sync()) + sync_queue.push_back(&r); + else { - while(state==BUSY) ; - // Force finish to clean up the loader - state = LOAD_FINISHED; - sync(); + bool was_empty = async_queue.empty(); + async_queue.push_back(&r); + if(was_empty) + sem.signal(); } - resource = r; - if(resource) + ++size; +} + +void ResourceManager::LoadingThread::remove_resource(ManagedResource &r) +{ + while(!try_remove_resource(r)) + Time::sleep(Time::msec); + + r.finish_loading(); +} + +bool ResourceManager::LoadingThread::try_remove_resource(ManagedResource &r) +{ + MutexLock lock(queue_mutex); + + LoadQueue::iterator i = find(async_queue.begin(), async_queue.end(), &r); + if(i==async_queue.end()) { - resource->start_loading(); - state = BUSY; - sem.signal(); + i = find(sync_queue.begin(), sync_queue.end(), &r); + if(i!=sync_queue.end()) + sync_queue.erase(i); } + else if(i==async_queue.begin()) + return false; + else + async_queue.erase(i); + + return true; } -void ResourceManager::LoadingThread::sync() +bool ResourceManager::LoadingThread::sync() { - State s = state; - bool finished = (s==LOAD_FINISHED); - if(s==SYNC_PENDING) { - Resource::AsyncLoader *ldr = resource->loader; - while(!finished && ldr->needs_sync()) - finished = ldr->process(); + MutexLock lock(queue_mutex); + unsigned async_size = async_queue.size(); + if(async_size==0 && size==capacity) + ++capacity; + else if(async_size>2 && capacity>2) + --capacity; + } - if(!finished) + bool any_finished = false; + while(ManagedResource *managed = front(sync_queue)) + { + if(managed->process(true)) { - state = BUSY; - sem.signal(); - return; + managed->finish_loading(); + any_finished = true; + --size; + + MutexLock lock(queue_mutex); + sync_queue.pop_front(); + } + else + { + MutexLock lock(queue_mutex); + bool was_empty = async_queue.empty(); + async_queue.splice(async_queue.end(), sync_queue, sync_queue.begin()); + if(was_empty) + sem.signal(); } } - if(finished) - { - resource->finish_loading(); - resource = 0; - state = IDLE; - } + return any_finished; } void ResourceManager::LoadingThread::terminate() { - while(state==BUSY) ; - state = TERMINATING; + done = true; sem.signal(); join(); + async_queue.clear(); + sync_queue.clear(); } } // namespace GL diff --git a/source/resourcemanager.h b/source/resourcemanager.h index 244c67ec..a1ff7aee 100644 --- a/source/resourcemanager.h +++ b/source/resourcemanager.h @@ -26,11 +26,12 @@ public: private: struct ManagedResource { - enum ResourceState + enum State { NOT_LOADED, LOAD_QUEUED, LOADING, + LOAD_FINISHED, LOADED }; @@ -39,7 +40,7 @@ private: std::string name; IO::Seekable *io; Resource::AsyncLoader *loader; - ResourceState state; + State state; unsigned last_used; UInt64 data_size; std::vector watchers; @@ -47,6 +48,8 @@ private: ManagedResource(Resource &); void start_loading(); + bool process(bool); + void finish_loading(bool); void finish_loading(); void unload(); @@ -54,41 +57,40 @@ private: void remove_watcher(ResourceWatcher &); }; + typedef std::list LoadQueue; + class LoadingThread: public Thread { - public: - enum State - { - IDLE, - SYNC_PENDING, - BUSY, - LOAD_FINISHED, - TERMINATING - }; - private: - ResourceManager &manager; Semaphore sem; - ManagedResource *volatile resource; - volatile State state; + Mutex queue_mutex; + LoadQueue async_queue; + LoadQueue sync_queue; + unsigned capacity; + unsigned size; + volatile bool done; public: - LoadingThread(ResourceManager &); + LoadingThread(); private: virtual void main(); + ManagedResource *front(LoadQueue &); + public: - void set_resource(ManagedResource *); - ManagedResource *get_resource() const { return resource; } - void sync(); - State get_state() const { return state; } + void add_resource(ManagedResource &); + void remove_resource(ManagedResource &); + private: + bool try_remove_resource(ManagedResource &); + public: + bool sync(); + bool needs_work() const { return size ResourceMap; - typedef std::list LoadQueue; LoadingPolicy policy; bool async_loads; @@ -122,7 +124,15 @@ public: void unwatch_resource(const Resource &, ResourceWatcher &); void tick(); +private: + void dispatch_work(); + void unload_by_age(); + void unload_by_size(); +public: UInt64 get_total_data_size() const; + +private: + static bool age_order(ManagedResource *, ManagedResource *); }; } // namespace GL -- 2.45.2