]> git.tdb.fi Git - libs/gl.git/commitdiff
Rewrite ResourceManager internals for more scalability
authorMikko Rasa <tdb@tdb.fi>
Tue, 30 Sep 2014 17:05:45 +0000 (20:05 +0300)
committerMikko Rasa <tdb@tdb.fi>
Tue, 30 Sep 2014 17:05:45 +0000 (20:05 +0300)
Since loading small resources can often take less than one frame,
especially with vsync enabled, the thread can now handle more than one
resource at once.  The code is also somewhat better structured for using
more than one thread in the future.

source/resourcemanager.cpp
source/resourcemanager.h

index 999b86c190451dae5a8d33967a3c461c5282862c..7bf4412f41c7bdbc7d372ab08aba0d9280f1cdfc 100644 (file)
@@ -15,8 +15,7 @@ ResourceManager::ResourceManager():
        frame(0),
        min_retain_frames(30),
        max_retain_frames(0),
-       next_unload(0),
-       thread(*this)
+       next_unload(0)
 { }
 
 ResourceManager::~ResourceManager()
@@ -90,7 +89,7 @@ void ResourceManager::load_resource(Resource &r)
        {
                managed.start_loading();
                while(!managed.loader->process()) ;
-               managed.finish_loading();
+               managed.finish_loading(true);
        }
 }
 
@@ -102,15 +101,21 @@ void ResourceManager::resource_used(const Resource &r)
 
        managed->last_used = frame;
        if(max_retain_frames && !next_unload)
-               next_unload = frame+max_retain_frames;
+               next_unload = frame+max_retain_frames+1;
 }
 
 void ResourceManager::remove_resource(Resource &r)
 {
-       ManagedResource *loading = thread.get_resource();
-       if(loading && loading->resource==&r)
-               thread.set_resource(0);
-
+       ManagedResource &managed = get_item(resources, &r);
+       ManagedResource::State state = managed.state;
+       if(state==ManagedResource::LOAD_QUEUED)
+       {
+               LoadQueue::iterator i = find(queue.begin(), queue.end(), &managed);
+               if(i!=queue.end())
+                       queue.erase(i);
+       }
+       else if(state>ManagedResource::LOAD_QUEUED && state<ManagedResource::LOADED)
+               thread.remove_resource(managed);
        remove_existing(resources, &r);
 }
 
@@ -126,62 +131,85 @@ void ResourceManager::unwatch_resource(const Resource &r, ResourceWatcher &w)
 
 void ResourceManager::tick()
 {
-       LoadingThread::State thread_state = thread.get_state();
-       bool check_total_size = false;
-       if(thread_state==LoadingThread::SYNC_PENDING || thread_state==LoadingThread::LOAD_FINISHED)
+       ++frame;
+
+       bool do_unload = (frame>=next_unload);
+       if(thread.sync())
+               do_unload = true;
+
+       if(thread.needs_work() && !queue.empty())
+               dispatch_work();
+
+       if(do_unload)
        {
-               thread.sync();
-               check_total_size = true;
+               if(max_retain_frames && frame>=next_unload)
+               {
+                       unload_by_age();
+
+                       next_unload = frame;
+                       for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i)
+                               if(i->second.state==ManagedResource::LOADED)
+                                       next_unload = min(next_unload, i->second.last_used);
+                       next_unload = (next_unload<frame ? next_unload+max_retain_frames : 0);
+               }
+
+               if(size_limit)
+                       unload_by_size();
+       }
+}
+
+void ResourceManager::dispatch_work()
+{
+       queue.sort(age_order);
+
+       if(queue.front()->last_used+10<frame)
+       {
+               for(LoadQueue::iterator i=queue.begin(); i!=queue.end(); ++i)
+                       (*i)->state = ManagedResource::NOT_LOADED;
+               queue.clear();
+               return;
        }
-       else if(thread_state==LoadingThread::IDLE && !queue.empty())
+
+       while(thread.needs_work() && !queue.empty())
        {
                ManagedResource *managed = queue.front();
                queue.pop_front();
-               thread.set_resource(managed);
+               thread.add_resource(*managed);
        }
+}
 
-       ++frame;
-       if(frame==next_unload)
-       {
-               unsigned unload_limit = frame-max_retain_frames;
-               next_unload = 0;
+void ResourceManager::unload_by_age()
+{
+       unsigned unload_limit = frame-max_retain_frames;
 
-               for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i)
-                       if(i->second.state==ManagedResource::LOADED)
-                       {
-                               if(i->second.last_used<=unload_limit)
-                                       i->second.unload();
-                               else if(!next_unload || i->second.last_used<next_unload)
-                                       next_unload = i->second.last_used;
-                       }
+       for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i)
+               if(i->second.state==ManagedResource::LOADED && i->second.last_used<unload_limit)
+                       i->second.unload();
+}
 
-               if(next_unload)
-                       next_unload += max_retain_frames;
-       }
+void ResourceManager::unload_by_size()
+{
+       unsigned unload_limit = frame-min_retain_frames;
 
-       if(check_total_size)
+       while(get_total_data_size()>size_limit)
        {
-               while(get_total_data_size()>size_limit)
-               {
-                       unsigned unload_limit = frame-min_retain_frames;
-                       ManagedResource *best = 0;
-                       UInt64 best_impact = 0;
-                       for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i)
-                               if(i->second.state==ManagedResource::LOADED && i->second.last_used<unload_limit)
+               ManagedResource *best = 0;
+               UInt64 best_impact = 0;
+               for(ResourceMap::iterator i=resources.begin(); i!=resources.end(); ++i)
+                       if(i->second.state==ManagedResource::LOADED && i->second.last_used<unload_limit)
+                       {
+                               UInt64 impact = i->second.data_size*(frame-i->second.last_used);
+                               if(!best || impact>best_impact)
                                {
-                                       UInt64 impact = i->second.data_size*(frame-i->second.last_used);
-                                       if(!best || impact>best_impact)
-                                       {
-                                               best = &i->second;
-                                               best_impact = impact;
-                                       }
+                                       best = &i->second;
+                                       best_impact = impact;
                                }
+                       }
 
-                       if(!best)
-                               break;
+               if(!best)
+                       break;
 
-                       best->unload();
-               }
+               best->unload();
        }
 }
 
@@ -194,6 +222,11 @@ UInt64 ResourceManager::get_total_data_size() const
        return total;
 }
 
+bool ResourceManager::age_order(ManagedResource *mr1, ManagedResource *mr2)
+{
+       return mr1->last_used>mr2->last_used;
+}
+
 
 ResourceManager::ManagedResource::ManagedResource(Resource &r):
        resource(&r),
@@ -218,17 +251,40 @@ void ResourceManager::ManagedResource::start_loading()
        state = LOADING;
 }
 
-void ResourceManager::ManagedResource::finish_loading()
+bool ResourceManager::ManagedResource::process(bool sync)
+{
+       while(state!=LOAD_FINISHED && loader->needs_sync()==sync)
+               if(loader->process())
+                       state = LOAD_FINISHED;
+
+       return state==LOAD_FINISHED;
+}
+
+void ResourceManager::ManagedResource::finish_loading(bool successful)
 {
        delete loader;
        loader = 0;
-       state = LOADED;
        delete io;
        io = 0;
-       data_size = resource->get_data_size();
 
-       for(vector<ResourceWatcher *>::const_iterator i=watchers.begin(); i!=watchers.end(); ++i)
-               (*i)->resource_loaded(*resource);
+       if(successful)
+       {
+               state = LOADED;
+               data_size = resource->get_data_size();
+
+               for(vector<ResourceWatcher *>::const_iterator i=watchers.begin(); i!=watchers.end(); ++i)
+                       (*i)->resource_loaded(*resource);
+       }
+       else
+       {
+               resource->unload();
+               state = NOT_LOADED;
+       }
+}
+
+void ResourceManager::ManagedResource::finish_loading()
+{
+       finish_loading(state==LOAD_FINISHED);
 }
 
 void ResourceManager::ManagedResource::unload()
@@ -254,87 +310,133 @@ void ResourceManager::ManagedResource::remove_watcher(ResourceWatcher &w)
 }
 
 
-ResourceManager::LoadingThread::LoadingThread(ResourceManager &m):
-       manager(m),
+ResourceManager::LoadingThread::LoadingThread():
        sem(1),
-       resource(0),
-       state(IDLE)
+       capacity(2),
+       size(0),
+       done(false)
 {
        launch();
 }
 
 void ResourceManager::LoadingThread::main()
 {
-       while(state!=TERMINATING)
+       bool wait_for_work = false;
+       while(!done)
        {
-               sem.wait();
+               if(wait_for_work)
+                       sem.wait();
 
-               if(state==BUSY)
+               if(ManagedResource *managed = front(async_queue))
                {
-                       Resource::AsyncLoader *ldr = resource->loader;
-                       bool finished = false;
-                       while(!finished && !ldr->needs_sync())
-                               finished = ldr->process();
-
-                       if(finished)
-                               state = LOAD_FINISHED;
-                       else
-                               state = SYNC_PENDING;
+                       managed->process(false);
+
+                       MutexLock lock(queue_mutex);
+                       sync_queue.splice(sync_queue.end(), async_queue, async_queue.begin());
+                       wait_for_work = async_queue.empty();
                }
+               else
+                       wait_for_work = true;
        }
 }
 
-void ResourceManager::LoadingThread::set_resource(ManagedResource *r)
+ResourceManager::ManagedResource *ResourceManager::LoadingThread::front(LoadQueue &queue)
 {
-       if(state!=IDLE)
+       MutexLock lock(queue_mutex);
+       if(queue.empty())
+               return 0;
+
+       return queue.front();
+}
+
+void ResourceManager::LoadingThread::add_resource(ManagedResource &r)
+{
+       r.start_loading();
+
+       MutexLock lock(queue_mutex);
+       if(r.loader->needs_sync())
+               sync_queue.push_back(&r);
+       else
        {
-               while(state==BUSY) ;
-               // Force finish to clean up the loader
-               state = LOAD_FINISHED;
-               sync();
+               bool was_empty = async_queue.empty();
+               async_queue.push_back(&r);
+               if(was_empty)
+                       sem.signal();
        }
 
-       resource = r;
-       if(resource)
+       ++size;
+}
+
+void ResourceManager::LoadingThread::remove_resource(ManagedResource &r)
+{
+       while(!try_remove_resource(r))
+               Time::sleep(Time::msec);
+
+       r.finish_loading();
+}
+
+bool ResourceManager::LoadingThread::try_remove_resource(ManagedResource &r)
+{
+       MutexLock lock(queue_mutex);
+
+       LoadQueue::iterator i = find(async_queue.begin(), async_queue.end(), &r);
+       if(i==async_queue.end())
        {
-               resource->start_loading();
-               state = BUSY;
-               sem.signal();
+               i = find(sync_queue.begin(), sync_queue.end(), &r);
+               if(i!=sync_queue.end())
+                       sync_queue.erase(i);
        }
+       else if(i==async_queue.begin())
+               return false;
+       else
+               async_queue.erase(i);
+
+       return true;
 }
 
-void ResourceManager::LoadingThread::sync()
+bool ResourceManager::LoadingThread::sync()
 {
-       State s = state;
-       bool finished = (s==LOAD_FINISHED);
-       if(s==SYNC_PENDING)
        {
-               Resource::AsyncLoader *ldr = resource->loader;
-               while(!finished && ldr->needs_sync())
-                       finished = ldr->process();
+               MutexLock lock(queue_mutex);
+               unsigned async_size = async_queue.size();
+               if(async_size==0 && size==capacity)
+                       ++capacity;
+               else if(async_size>2 && capacity>2)
+                       --capacity;
+       }
 
-               if(!finished)
+       bool any_finished = false;
+       while(ManagedResource *managed = front(sync_queue))
+       {
+               if(managed->process(true))
                {
-                       state = BUSY;
-                       sem.signal();
-                       return;
+                       managed->finish_loading();
+                       any_finished = true;
+                       --size;
+
+                       MutexLock lock(queue_mutex);
+                       sync_queue.pop_front();
+               }
+               else
+               {
+                       MutexLock lock(queue_mutex);
+                       bool was_empty = async_queue.empty();
+                       async_queue.splice(async_queue.end(), sync_queue, sync_queue.begin());
+                       if(was_empty)
+                               sem.signal();
                }
        }
 
-       if(finished)
-       {
-               resource->finish_loading();
-               resource = 0;
-               state = IDLE;
-       }
+       return any_finished;
 }
 
 void ResourceManager::LoadingThread::terminate()
 {
-       while(state==BUSY) ;
-       state = TERMINATING;
+       done = true;
        sem.signal();
        join();
+       async_queue.clear();
+       sync_queue.clear();
 }
 
 } // namespace GL
index 244c67ec29cc5acda96ea303bddda0f7583cf1ee..a1ff7aee2fc32adda60d12b0a63166c635e81822 100644 (file)
@@ -26,11 +26,12 @@ public:
 private:
        struct ManagedResource
        {
-               enum ResourceState
+               enum State
                {
                        NOT_LOADED,
                        LOAD_QUEUED,
                        LOADING,
+                       LOAD_FINISHED,
                        LOADED
                };
 
@@ -39,7 +40,7 @@ private:
                std::string name;
                IO::Seekable *io;
                Resource::AsyncLoader *loader;
-               ResourceState state;
+               State state;
                unsigned last_used;
                UInt64 data_size;
                std::vector<ResourceWatcher *> watchers;
@@ -47,6 +48,8 @@ private:
                ManagedResource(Resource &);
 
                void start_loading();
+               bool process(bool);
+               void finish_loading(bool);
                void finish_loading();
                void unload();
 
@@ -54,41 +57,40 @@ private:
                void remove_watcher(ResourceWatcher &);
        };
 
+       typedef std::list<ManagedResource *> LoadQueue;
+
        class LoadingThread: public Thread
        {
-       public:
-               enum State
-               {
-                       IDLE,
-                       SYNC_PENDING,
-                       BUSY,
-                       LOAD_FINISHED,
-                       TERMINATING
-               };
-
        private:
-               ResourceManager &manager;
                Semaphore sem;
-               ManagedResource *volatile resource;
-               volatile State state;
+               Mutex queue_mutex;
+               LoadQueue async_queue;
+               LoadQueue sync_queue;
+               unsigned capacity;
+               unsigned size;
+               volatile bool done;
 
        public:
-               LoadingThread(ResourceManager &);
+               LoadingThread();
 
        private:
                virtual void main();
 
+               ManagedResource *front(LoadQueue &);
+
        public:
-               void set_resource(ManagedResource *);
-               ManagedResource *get_resource() const { return resource; }
-               void sync();
-               State get_state() const { return state; }
+               void add_resource(ManagedResource &);
+               void remove_resource(ManagedResource &);
+       private:
+               bool try_remove_resource(ManagedResource &);
+       public:
+               bool sync();
+               bool needs_work() const { return size<capacity; }
 
                void terminate();
        };
 
        typedef std::map<const Resource *, ManagedResource> ResourceMap;
-       typedef std::list<ManagedResource *> LoadQueue;
 
        LoadingPolicy policy;
        bool async_loads;
@@ -122,7 +124,15 @@ public:
        void unwatch_resource(const Resource &, ResourceWatcher &);
 
        void tick();
+private:
+       void dispatch_work();
+       void unload_by_age();
+       void unload_by_size();
+public:
        UInt64 get_total_data_size() const;
+
+private:
+       static bool age_order(ManagedResource *, ManagedResource *);
 };
 
 } // namespace GL