]> git.tdb.fi Git - libs/core.git/blob - source/pipe.cpp
Rewrite Buffered to support read-write buffering correctly
[libs/core.git] / source / pipe.cpp
1 #ifndef WIN32
2 #include <fcntl.h>
3 #include <errno.h>
4 #endif
5 #include <msp/strings/formatter.h>
6 #include "pipe.h"
7
8 using namespace std;
9
10 namespace Msp {
11 namespace IO {
12
13 Pipe::Pipe()
14 {
15 #ifdef WIN32
16         string name=format("\\\\.\\pipe\\%u.%p", GetCurrentProcessId(), this);
17         handle[0]=CreateNamedPipe(name.c_str(), PIPE_ACCESS_INBOUND|FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE, 1, 1024, 1024, 0, 0);
18         if(handle[0]==INVALID_HANDLE_VALUE)
19                 throw SystemError("Unable to create pipe", GetLastError());
20
21         handle[1]=CreateFile(name.c_str(), GENERIC_WRITE, 0, 0, OPEN_EXISTING, 0, 0);
22         if(handle[1]==INVALID_HANDLE_VALUE)
23         {
24                 unsigned err=GetLastError();
25                 CloseHandle(handle[0]);
26                 throw SystemError("Unable to create pipe", err);
27         }
28
29         overlapped=0;
30         event=CreateEvent(0, true, false, 0);
31         buf_size=1024;
32         buffer=new char[buf_size];
33         buf_avail=0;
34         buf_next=buffer;
35 #else
36         if(pipe(handle)==-1)
37                 throw SystemError("Unable to create pipe", errno);
38 #endif
39
40         set_events(P_INPUT);
41 }
42
43 void Pipe::set_block(bool b)
44 {
45         mode=(mode&~M_NONBLOCK);
46         if(b)
47                 mode=(mode|M_NONBLOCK);
48
49 #ifndef WIN32
50         int flags=fcntl(handle[0], F_GETFD);
51         fcntl(handle[0], F_SETFL, (flags&O_NONBLOCK)|(b?0:O_NONBLOCK));
52         flags=fcntl(handle[1], F_GETFD);
53         fcntl(handle[1], F_SETFL, (flags&O_NONBLOCK)|(b?0:O_NONBLOCK));
54 #endif
55 }
56
57 void Pipe::close()
58 {
59         set_events(P_NONE);
60
61         signal_flush_required.emit();
62 #ifdef WIN32
63         CloseHandle(handle[0]);
64         CloseHandle(handle[1]);
65 #else
66         ::close(handle[0]);
67         ::close(handle[1]);
68         signal_closed.emit();
69 #endif
70 }
71
72 Handle Pipe::get_event_handle()
73 {
74 #ifdef WIN32
75         if(!overlapped && !buf_avail)
76         {
77                 overlapped=new OVERLAPPED;
78                 memset(overlapped, 0, sizeof(OVERLAPPED));
79                 overlapped->hEvent=event;
80
81                 DWORD ret;
82                 buf_next=buffer;
83                 if(!ReadFile(handle[0], buffer, buf_size, &ret, overlapped))
84                 {
85                         unsigned err=GetLastError();
86                         if(err!=ERROR_IO_PENDING)
87                                 throw SystemError("Failed to start an overlapped read", err);
88                 }
89                 else
90                 {
91                         buf_avail=ret;
92                         delete overlapped;
93                         overlapped=0;
94                         SetEvent(event);
95                 }
96         }
97
98         return event;
99 #else
100         return handle[0];
101 #endif
102 }
103
104 Pipe::~Pipe()
105 {
106         close();
107 }
108
109 unsigned Pipe::do_write(const char *buf, unsigned size)
110 {
111         if(size==0)
112                 return 0;
113
114 #ifdef WIN32
115         DWORD ret;
116         if(!WriteFile(handle[1], buf, size, &ret, 0))
117                 throw SystemError("Writing to pipe failed", GetLastError());
118 #else
119         int ret=::write(handle[1], buf, size);
120         if(ret==-1)
121         {
122                 if(errno==EAGAIN)
123                         return 0;
124                 else
125                         throw SystemError("Writing to pipe failed", errno);
126         }
127 #endif
128
129         return ret;
130 }
131
132 unsigned Pipe::do_read(char *buf, unsigned size)
133 {
134         if(size==0)
135                 return 0;
136
137 #ifdef WIN32
138         // Initiate overlapped read if needed
139         get_event_handle();
140
141         if(overlapped)
142         {
143                 DWORD ret;
144                 if(!GetOverlappedResult(handle[0], overlapped, &ret, !buf_avail))
145                         throw SystemError("Reading from pipe failed", GetLastError());
146                 else
147                 {
148                         buf_avail+=ret;
149                         delete overlapped;
150                         overlapped=0;
151                 }
152         }
153
154         unsigned ret=min(buf_avail, size);
155         memcpy(buf, buf_next, ret);
156         buf_next+=ret;
157         buf_avail-=ret;
158
159         // Initiate another overlapped read in case someone is polling us
160         get_event_handle();
161 #else
162         int ret=::read(handle[0], buf, size);
163         if(ret==-1)
164         {
165                 if(errno==EAGAIN)
166                         return 0;
167                 else
168                         throw SystemError("Reading from pipe failed", errno);
169         }
170 #endif
171
172         if(ret==0)
173         {
174                 eof_flag=true;
175                 signal_end_of_file.emit();
176         }
177
178         return ret;
179 }
180
181 } // namespace IO
182 } // namespace Msp