Rewrite Buffered to support read-write buffering correctly
authorMikko Rasa <tdb@tdb.fi>
Mon, 4 Feb 2008 14:42:42 +0000 (14:42 +0000)
committerMikko Rasa <tdb@tdb.fi>
Mon, 4 Feb 2008 14:42:42 +0000 (14:42 +0000)
Rename signal_closing to signal_flush_required

source/base.h
source/buffered.cpp
source/buffered.h
source/file.cpp
source/file.h
source/pipe.cpp

index b4b8627363fbb720f128d2c492f97160091d07cd..c4d4ed94ba66f7f7e8eb14516104a589f3371cd7 100644 (file)
@@ -36,10 +36,10 @@ public:
        sigc::signal<void> signal_end_of_file;
 
        /**
-       Emitted when the I/O object is about to close.  Mainly intended for
-       buffering objects that need to flush their buffers at closing.
+       Emitted when there is a nonlinearity in I/O (such as a file being seeked)
+       and any data buffered by upper layers needs to be flushed.
        */
-       sigc::signal<void> signal_closing;
+       sigc::signal<void> signal_flush_required;
 
        /**
        Emitted when the I/O object has closed.
index 4327413d2c3e136f34f952f70bf52ecd32cf6fe3..09cc0994840d9e4959bef8a28e062208273c0a44 100644 (file)
@@ -15,21 +15,22 @@ namespace IO {
 Buffered::Buffered(Base &b, unsigned s):
        below(b),
        buf_size(s),
-       in_buf(new char[buf_size]),
-       in_ptr(in_buf),
-       in_avail(0),
-       out_buf(new char[buf_size]),
-       out_used(0)
+       buf(new char[buf_size]),
+       begin(buf),
+       end(buf),
+       cur_op(M_NONE)
 {
        mode=below.get_mode();
-       below.signal_closing.connect(sigc::mem_fun(this, &Buffered::below_closing));
+       below.signal_flush_required.connect(sigc::mem_fun(this, &Buffered::flush));
 }
 
 unsigned Buffered::put(char c)
 {
-       if(out_used<buf_size)
+       set_op(M_WRITE);
+
+       if(end<buf+buf_size)
        {
-               out_buf[out_used++]=c;
+               *end++=c;
                return 1;
        }
        else
@@ -38,27 +39,32 @@ unsigned Buffered::put(char c)
 
 void Buffered::flush()
 {
-       if(out_used==0)
-               return;
-
-       unsigned len=below.write(out_buf, out_used);
-       if(len<out_used)
+       if(cur_op==M_WRITE)
        {
-               memmove(out_buf, out_buf+len, out_used-len);
-               out_used-=len;
-               throw Exception("Couldn't flush all data");
+               unsigned used=end-begin;
+               if(used)
+               {
+                       unsigned len=below.write(begin, used);
+
+                       begin=end=buf;
+
+                       if(len<used)
+                               throw Exception("Couldn't flush all data");
+               }
        }
-       out_used=0;
+       else if(cur_op==M_READ)
+               begin=end=buf;
 }
 
 bool Buffered::getline(std::string &line)
 {
-       for(unsigned i=0; i<in_avail; ++i)
-               if(in_ptr[i]=='\n')
+       set_op(M_READ);
+
+       for(char *i=begin; i!=end; ++i)
+               if(*i=='\n')
                {
-                       line.assign(in_ptr, i);
-                       in_ptr+=i+1;
-                       in_avail-=i+1;
+                       line.assign(begin, i-begin);
+                       begin=i+1;
                        return true;
                }
 
@@ -67,11 +73,10 @@ bool Buffered::getline(std::string &line)
 
 int Buffered::get()
 {
-       if(in_avail>0)
-       {
-               --in_avail;
-               return *in_ptr++;
-       }
+       set_op(M_READ);
+
+       if(begin<end)
+               return static_cast<unsigned char>(*begin++);
 
        char c;
        if(do_read(&c, 1)==0)
@@ -84,6 +89,11 @@ Handle Buffered::get_event_handle()
        throw Exception("Buffered doesn't support events");
 }
 
+unsigned Buffered::get_current_size() const
+{
+       return end-begin;
+}
+
 Buffered::~Buffered()
 {
        try
@@ -93,108 +103,87 @@ Buffered::~Buffered()
        catch(...)
        { }
 
-       delete[] in_buf;
-       delete[] out_buf;
+       delete[] buf;
 }
 
-void Buffered::below_closing()
+void Buffered::set_op(Mode op)
 {
-       flush();
+       if(op!=cur_op)
+               flush();
+       cur_op=op;
 }
 
-unsigned Buffered::do_write(const char *buf, unsigned size)
+unsigned Buffered::do_write(const char *data, unsigned size)
 {
-       if(out_used+size<buf_size)
+       set_op(M_WRITE);
+
+       if(end+size<buf+buf_size)
        {
-               // All data fits in the buffer
-               memcpy(out_buf+out_used, buf, size);
-               out_used+=size;
+               // All data fits in buffer with whatever is already there
+               memcpy(end, data, size);
+               end+=size;
 
                return size;
        }
        else
        {
-               int ret=0;
-               bool ok=true;
+               // Clear the buffer to make more room
+               flush();
 
-               while(size>0)
+               if(size<buf_size)
                {
-                       // XXX sub-obtimal - should try to write directly from input first
-                       // Fill the buffer and pass it on
-                       unsigned head=min(buf_size-out_used, size);
-
-                       memcpy(out_buf+out_used, buf, head);
-                       out_used+=head;
-
-                       buf+=head;
-                       size-=head;
-                       ret+=head;
-
-                       if(!ok) break;
+                       // Put new data in the buffer to wait for more
+                       memcpy(end, data, size);
+                       end+=size;
 
-                       unsigned len=below.write(out_buf, out_used);
-
-                       ok=(len==out_used);
-                       if(ok)
-                               out_used=0;
-                       else
-                       {
-                               memmove(out_buf, out_buf+len, buf_size-len);
-                               out_used=buf_size-len;
-                       }
+                       return size;
                }
-
-               return ret;
+               else
+                       // New data still doesn't fit in the buffer, so write it directly
+                       return below.write(data, size);
        }
 }
 
-unsigned Buffered::do_read(char *buf, unsigned size)
+unsigned Buffered::do_read(char *data, unsigned size)
 {
-       if(size<=in_avail)
+       set_op(M_READ);
+
+       if(begin+size<=end)
        {
                // The request can be served from the buffer
-               memcpy(buf, in_ptr, size);
-               in_ptr+=size;
-               in_avail-=size;
+               memcpy(data, begin, size);
+               begin+=size;
+
+               eof_flag=(below.eof() && begin==end);
 
                return size;
        }
        else
        {
-               // Use whatever is left in the buffer
-               memcpy(buf, in_ptr, in_avail);
-
-               buf+=in_avail;
-               size-=in_avail;
-               int ret=in_avail;
+               // Give out whatever is in the buffer already
+               memcpy(data, begin, end-begin);
+               unsigned ret=end-begin;
+               begin=end=buf;
 
-               in_ptr=in_buf;
-               in_avail=0;
+               data+=ret;
+               size-=ret;
 
-               if(size>=buf_size)
-                       ret+=below.read(buf, size);
-               else
+               if(size<buf_size)
                {
-                       // Read more data into the buffer
-                       while(size>0)
-                       {
-                               in_avail=below.read(in_buf, buf_size);
-                               if(in_avail==0)
-                               {
-                                       eof_flag=true;
-                                       break;
-                               }
-
-                               unsigned head=min(size, in_avail);
-                               memcpy(buf, in_buf, head);
-                               buf+=head;
-                               size-=head;
-
-                               in_ptr=in_buf+head;
-                               in_avail-=head;
-                               ret+=head;
-                       }
+                       // Fill the buffer and serve the rest of the request from it
+                       unsigned len=below.read(end, buf+buf_size-end);
+                       end+=len;
+
+                       len=min(static_cast<unsigned>(end-begin), size);
+                       memcpy(data, begin, len);
+                       begin+=len;
+                       ret+=len;
                }
+               else
+                       // Read the rest directly from the underlying object
+                       ret+=below.read(data, size);
+
+               eof_flag=(below.eof() && begin==end);
 
                return ret;
        }
index b6584c1c23e289ce1d974cf02c0a247cf4f52f38..092be19e8f0ad00241d67f915d0931b2697cc9ce 100644 (file)
@@ -14,25 +14,27 @@ namespace IO {
 
 class Buffered: public Base
 {
+private:
+       Base     &below;
+       unsigned buf_size;
+       char     *buf;
+       char     *begin;
+       char     *end;
+       Mode     cur_op;
+
 public:
        Buffered(Base &, unsigned =8192);
+       ~Buffered();
+
        unsigned put(char);
        void flush();
        bool getline(std::string &);
        int  get();
-       int  tell() const;
        Handle get_event_handle();
-       ~Buffered();
+       Mode get_current_op() const { return cur_op; }
+       unsigned get_current_size() const;
 private:
-       Base     &below;
-       unsigned buf_size;
-       char     *in_buf;
-       char     *in_ptr;
-       unsigned in_avail;
-       char     *out_buf;
-       unsigned out_used;
-
-       void  below_closing();
+       void set_op(Mode);
        unsigned do_write(const char *, unsigned);
        unsigned do_read(char *, unsigned);
 };
index a22b832be46a4f56179544fa96cc2cc0d8de331b..ac514121dfa05fe99afa9aee4e51ba9fe66361c1 100644 (file)
@@ -105,7 +105,7 @@ void File::close()
 
        set_events(P_NONE);
 
-       signal_closing.emit();
+       signal_flush_required.emit();
 
 #ifdef WIN32
        CloseHandle(handle);
@@ -134,6 +134,15 @@ void File::set_block(bool b)
 #endif
 }
 
+void File::sync()
+{
+#ifndef WIN32
+       signal_flush_required.emit();
+
+       fsync(handle);
+#endif
+}
+
 /**
 Seeks the file to the given byte offset.
 
@@ -146,6 +155,8 @@ int File::seek(int off, SeekType st)
 {
        check_access(M_NONE);
 
+       signal_flush_required.emit();
+
        int type=sys_seek_type(st);
 #ifdef WIN32
        DWORD ret=SetFilePointer(handle, off, 0, type);
@@ -232,13 +243,6 @@ unsigned File::do_write(const char *buf, unsigned size)
        return ret;
 }
 
-void File::sync()
-{
-#ifndef WIN32
-       fsync(handle);
-#endif
-}
-
 /**
 Reads data from the file.
 
index 49b596bb943077411db57b0bc66044de8de87a92..f2271d02623c439dcd1385af35beb4d4c4fc841f 100644 (file)
@@ -34,22 +34,21 @@ public:
        void close();
 
        void set_block(bool);
-       void enable_events();
 
-       void sync();
+       virtual void sync();
 
-       int  seek(int, SeekType);
-       int  tell() const;
+       virtual int  seek(int, SeekType);
+       virtual int  tell() const;
 
-       Handle get_event_handle() { return handle; }
+       virtual Handle get_event_handle() { return handle; }
 
-       ~File();
+       virtual ~File();
 private:
        Handle handle;
 
-       void      check_access(Mode) const;
-       unsigned  do_write(const char *, unsigned);
-       unsigned  do_read(char *, unsigned);
+       void              check_access(Mode) const;
+       virtual unsigned  do_write(const char *, unsigned);
+       virtual unsigned  do_read(char *, unsigned);
 };
 
 inline File::CreateMode operator|(File::CreateMode m, File::CreateMode n)
index 88dd46210b844602c45ba0d12438a82100a7baac..bc62af1a4ac4e790e3a7fb044cc93485f5e87111 100644 (file)
@@ -58,7 +58,7 @@ void Pipe::close()
 {
        set_events(P_NONE);
 
-       signal_closing.emit();
+       signal_flush_required.emit();
 #ifdef WIN32
        CloseHandle(handle[0]);
        CloseHandle(handle[1]);