X-Git-Url: http://git.tdb.fi/?p=libs%2Fcore.git;a=blobdiff_plain;f=source%2Fio%2Fpipe.cpp;fp=source%2Fio%2Fpipe.cpp;h=b3735aa63b6d43a7724aaa13d1109ce96a78b0d2;hp=0000000000000000000000000000000000000000;hb=6e0fd758970bcb5bad5e3f2454b694cc4d7b4b66;hpb=b97d4e9f86e90254ab9edef7ee62a910f6333c78 diff --git a/source/io/pipe.cpp b/source/io/pipe.cpp new file mode 100644 index 0000000..b3735aa --- /dev/null +++ b/source/io/pipe.cpp @@ -0,0 +1,182 @@ +#ifndef WIN32 +#include +#include +#endif +#include +#include "pipe.h" + +using namespace std; + +namespace Msp { +namespace IO { + +Pipe::Pipe() +{ +#ifdef WIN32 + string name = format("\\\\.\\pipe\\%u.%p", GetCurrentProcessId(), this); + handle[0] = CreateNamedPipe(name.c_str(), PIPE_ACCESS_INBOUND|FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE, 1, 1024, 1024, 0, 0); + if(handle[0]==INVALID_HANDLE_VALUE) + throw SystemError("Unable to create pipe", GetLastError()); + + handle[1] = CreateFile(name.c_str(), GENERIC_WRITE, 0, 0, OPEN_EXISTING, 0, 0); + if(handle[1]==INVALID_HANDLE_VALUE) + { + unsigned err = GetLastError(); + CloseHandle(handle[0]); + throw SystemError("Unable to create pipe", err); + } + + overlapped = 0; + event = CreateEvent(0, true, false, 0); + buf_size = 1024; + buffer = new char[buf_size]; + buf_avail = 0; + buf_next = buffer; +#else + if(pipe(handle)==-1) + throw SystemError("Unable to create pipe", errno); +#endif + + set_events(P_INPUT); +} + +Pipe::~Pipe() +{ + close(); +} + +void Pipe::close() +{ + set_events(P_NONE); + + signal_flush_required.emit(); +#ifdef WIN32 + CloseHandle(handle[0]); + CloseHandle(handle[1]); +#else + ::close(handle[0]); + ::close(handle[1]); + signal_closed.emit(); +#endif +} + +void Pipe::set_block(bool b) +{ + mode = (mode&~M_NONBLOCK); + if(b) + mode = (mode|M_NONBLOCK); + +#ifndef WIN32 + int flags = fcntl(handle[0], F_GETFD); + fcntl(handle[0], F_SETFL, (flags&O_NONBLOCK)|(b?0:O_NONBLOCK)); + flags = fcntl(handle[1], F_GETFD); + fcntl(handle[1], F_SETFL, (flags&O_NONBLOCK)|(b?0:O_NONBLOCK)); +#endif +} + +unsigned Pipe::do_write(const char *buf, unsigned size) +{ + if(size==0) + return 0; + +#ifdef WIN32 + DWORD ret; + if(!WriteFile(handle[1], buf, size, &ret, 0)) + throw SystemError("Writing to pipe failed", GetLastError()); +#else + int ret = ::write(handle[1], buf, size); + if(ret==-1) + { + if(errno==EAGAIN) + return 0; + else + throw SystemError("Writing to pipe failed", errno); + } +#endif + + return ret; +} + +unsigned Pipe::do_read(char *buf, unsigned size) +{ + if(size==0) + return 0; + +#ifdef WIN32 + // Initiate overlapped read if needed + get_event_handle(); + + if(overlapped) + { + DWORD ret; + if(!GetOverlappedResult(handle[0], overlapped, &ret, !buf_avail)) + throw SystemError("Reading from pipe failed", GetLastError()); + else + { + buf_avail += ret; + delete overlapped; + overlapped = 0; + } + } + + unsigned ret = min(buf_avail, size); + memcpy(buf, buf_next, ret); + buf_next += ret; + buf_avail -= ret; + + // Initiate another overlapped read in case someone is polling us + get_event_handle(); +#else + int ret = ::read(handle[0], buf, size); + if(ret==-1) + { + if(errno==EAGAIN) + return 0; + else + throw SystemError("Reading from pipe failed", errno); + } +#endif + + if(ret==0) + { + eof_flag = true; + signal_end_of_file.emit(); + } + + return ret; +} + +Handle Pipe::get_event_handle() +{ +#ifdef WIN32 + if(!overlapped && !buf_avail) + { + overlapped = new OVERLAPPED; + memset(overlapped, 0, sizeof(OVERLAPPED)); + overlapped->hEvent = event; + + DWORD ret; + buf_next = buffer; + if(!ReadFile(handle[0], buffer, buf_size, &ret, overlapped)) + { + unsigned err = GetLastError(); + if(err!=ERROR_IO_PENDING) + throw SystemError("Failed to start an overlapped read", err); + } + else + { + buf_avail = ret; + delete overlapped; + overlapped = 0; + SetEvent(event); + } + } + + return event; +#else + return handle[0]; +#endif +} + +} // namespace IO +} // namespace Msp