2 Copyright (C) 2000-2004 SKYRIX Software AG
4 This file is part of OpenGroupware.org.
6 OGo is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License as published by the
8 Free Software Foundation; either version 2, or (at your option) any
11 OGo is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
14 License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with OGo; see the file COPYING. If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
24 #include "NGBufferedStream.h"
26 #define NEWLINE_CHAR '\n'
27 #define WRITE_WARN_SIZE (1024 * 1024 * 100) /* 100MB */
29 @implementation NGBufferedStream
31 static const unsigned DEFAULT_BUFFER_SIZE = 512;
32 static Class DataStreamClass = Nil;
35 DataStreamClass = NSClassFromString(@"NGDataStream");
38 // returns the number of bytes which where read from the buffer
39 #define numberOfConsumedReadBufferBytes(self) \
40 ((self->readBufferSize == 0) ? 0 : (self->readBufferPos - self->readBuffer))
42 // returns the number of bytes which can be read from buffer (without source access)
43 #define numberOfAvailableReadBufferBytes(self) \
44 (self->readBufferFillSize - numberOfConsumedReadBufferBytes(self))
46 // look whether all bytes in the buffer where consumed, if so, reset the buffer
47 #define checkReadBufferFillState(self) \
48 if (numberOfAvailableReadBufferBytes(self) == 0) { \
49 self->readBufferPos = self->readBuffer; \
50 self->readBufferFillSize = 0; \
53 // ******************** constructors ********************
55 + (id)filterWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size {
56 if (_source == nil) return nil;
57 if (*(Class *)_source == DataStreamClass) return _source;
58 return [[[self alloc] initWithSource:_source bufferSize:_size] autorelease];
61 // TODO: can we reduced duplicate code here ...
63 - (id)initWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size {
68 if (*(Class *)_source == DataStreamClass) {
70 return [_source retain];
73 if ((self = [super initWithSource:_source])) {
74 self->readBuffer = calloc(_size, 1);
75 self->writeBuffer = calloc(_size, 1);
77 self->readBufferPos = self->readBuffer;
78 self->readBufferSize = _size;
79 self->readBufferFillSize = 0; // no bytes are read from source
80 self->writeBufferFillSize = 0;
81 self->writeBufferSize = _size;
82 self->flags._flushOnNewline = 1;
87 - (id)initWithInputSource:(id<NGInputStream>)_source bufferSize:(unsigned)_s {
92 if (*(Class *)_source == DataStreamClass) {
94 return [_source retain];
97 if ((self = [super initWithInputSource:_source])) {
98 self->readBuffer = calloc(_s, 1);
99 self->readBufferPos = self->readBuffer;
100 self->readBufferSize = _s;
101 self->readBufferFillSize = 0; // no bytes are read from source
102 self->flags._flushOnNewline = 1;
106 - (id)initWithOutputSource:(id<NGOutputStream>)_src bufferSize:(unsigned)_s {
111 if (*(Class *)_src == DataStreamClass) {
113 return [_src retain];
116 if ((self = [super initWithOutputSource:_src])) {
117 self->writeBuffer = calloc(_s, 1);
118 self->writeBufferFillSize = 0;
119 self->writeBufferSize = _s;
120 self->flags._flushOnNewline = 1;
125 - (id)initWithSource:(id<NGStream>)_source {
126 return [self initWithSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
128 - (id)initWithInputSource:(id<NGInputStream>)_source {
129 return [self initWithInputSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
131 - (id)initWithOutputSource:(id<NGOutputStream>)_source {
132 return [self initWithOutputSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
138 if (self->readBuffer) {
139 free(self->readBuffer);
140 self->readBuffer = NULL;
141 self->readBufferPos = NULL;
143 self->readBufferFillSize = 0;
144 self->readBufferSize = 0;
146 if (self->writeBuffer) {
147 free(self->writeBuffer);
148 self->writeBuffer = NULL;
150 self->writeBufferFillSize = 0;
151 self->writeBufferSize = 0;
157 - (void)setReadBufferSize:(unsigned)_size {
160 if (_size == self->readBufferSize)
164 if (self->readBuffer != NULL) {
165 free(self->readBuffer);
166 self->readBuffer = NULL;
168 self->readBufferSize = _size;
169 self->readBufferPos = NULL;
172 if (self->readBuffer != NULL)
173 self->readBuffer = realloc(self->readBuffer, _size);
175 self->readBuffer = calloc(_size, 1);
177 self->readBufferSize = _size;
178 self->readBufferPos = self->readBuffer;
179 self->readBufferFillSize = 0; // no bytes a read from source
182 - (unsigned)readBufferSize {
183 return self->readBufferSize;
186 - (void)setWriteBufferSize:(unsigned)_size {
189 if (_size == self->writeBufferSize)
192 self->writeBuffer = realloc(self->writeBuffer, _size);
193 self->writeBufferSize = _size;
195 - (unsigned)writeBufferSize {
196 return self->writeBufferSize;
201 - (BOOL)wouldBlockInMode:(NGStreamMode)_mode {
202 BOOL canRead, canWrite;
204 if (self->readBufferSize == 0)
207 canRead = (numberOfAvailableReadBufferBytes(self) > 0);
209 canWrite = (self->writeBufferSize == 0)
211 : (self->writeBufferFillSize > 0);
213 if ((_mode == NGStreamMode_readWrite) && canRead && canWrite)
215 if ((_mode == NGStreamMode_readOnly) && canRead) {
218 if ((_mode == NGStreamMode_writeOnly) && canWrite)
221 return ([self->source respondsToSelector:@selector(wouldBlockInMode:)])
222 ? [(id)self->source wouldBlockInMode:_mode]
228 - (unsigned)readBytes:(void *)_buf count:(unsigned)_len {
229 register unsigned availBytes = numberOfAvailableReadBufferBytes(self);
231 if (self->readBufferSize == 0) { // no read buffering is done (buffersize==0)
232 return (readBytes != NULL)
233 ? readBytes(source, _cmd, _buf, _len)
234 : [source readBytes:_buf count:_len];
237 if (availBytes >= _len) {
238 // there are enough bytes in the buffer to fulfill the request
240 *(unsigned char *)_buf = *(unsigned char *)self->readBufferPos;
241 self->readBufferPos++;
244 memcpy(_buf, self->readBufferPos, _len);
245 self->readBufferPos += _len; // update read position (consumed-size)
247 checkReadBufferFillState(self); // check whether all bytes where consumed
250 else if (availBytes > 0) {
251 // there are some bytes in the buffer, these are returned
253 memcpy(_buf, self->readBufferPos, availBytes);// copy all bytes from buffer
254 self->readBufferPos = self->readBuffer; // reset position
255 self->readBufferFillSize = 0; // no bytes available in buffer anymore
258 else if (_len > self->readBufferSize) {
260 requested _len is bigger than the buffersize, so we can bypass the
261 buffer (which is empty, as guaranteed by the previous 'ifs'
264 NSAssert(self->readBufferPos == self->readBuffer,
265 @"read buffer position is not reset");
266 NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer");
268 availBytes = (readBytes != NULL)
269 ? (unsigned)readBytes(source, _cmd, _buf, _len)
270 : [source readBytes:_buf count:_len];
272 if (availBytes == NGStreamError)
273 return NGStreamError;
275 NSAssert(availBytes != 0, @"readBytes:count: may never return zero !");
277 return availBytes; // return the number of bytes which could be read
281 no bytes are available and the requested _len is smaller than the
282 possible buffer size, we have to read the next block of input from the
286 NSAssert(self->readBufferPos == self->readBuffer,
287 @"read buffer position is not reset");
288 NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer");
290 self->readBufferFillSize = (readBytes != NULL)
291 ? (unsigned)readBytes(source,_cmd, self->readBuffer,self->readBufferSize)
292 : [source readBytes:self->readBuffer count:self->readBufferSize];
294 if (self->readBufferFillSize == NGStreamError) {
295 self->readBufferFillSize = 0;
296 return NGStreamError;
299 NSAssert(self->readBufferFillSize != 0,
300 @"readBytes:count: may never return zero !");
303 now comes a section which is roughly the same like the first to
304 conditionals in this method
306 if (self->readBufferFillSize >= _len) {
307 // there are enough bytes in the buffer to fulfill the request
309 memcpy(_buf, self->readBufferPos, _len);
310 self->readBufferPos += _len; // update read position (consumed-size)
311 checkReadBufferFillState(self); // check whether all bytes where consumed
314 else { // (readBufferFillSize > 0) (this is ensured by the above assert)
315 // there are some bytes in the buffer, these are returned
317 availBytes = self->readBufferFillSize;
318 // copy all bytes from buffer
319 memcpy(_buf, self->readBufferPos, self->readBufferFillSize);
320 self->readBufferPos = self->readBuffer; // reset position
321 self->readBufferFillSize = 0; // no bytes available in buffer anymore
328 if (self->readBufferSize == 0) // no read buffering is done (buffersize==0)
329 return [super readByte];
331 if (numberOfAvailableReadBufferBytes(self) >= 1) {
332 unsigned char byte = *(unsigned char *)self->readBufferPos;
333 self->readBufferPos++;
334 checkReadBufferFillState(self); // check whether all bytes where consumed
337 return [super readByte];
340 - (unsigned)writeBytes:(const void *)_buf count:(unsigned)_len {
341 register unsigned tmp = 0;
342 register unsigned remaining = _len;
343 register void *track = (void *)_buf;
346 if (_len > WRITE_WARN_SIZE) {
347 NSLog(@"WARNING(%s): got passed in length %uMB (%u bytes, errcode=%u) ...",
348 __PRETTY_FUNCTION__, (_len / 1024 / 1024), _len, NGStreamError);
352 while (remaining > 0) {
353 // how much bytes available in buffer ?
354 tmp = self->writeBufferSize - self->writeBufferFillSize;
355 tmp = (tmp > remaining) ? remaining : tmp;
357 memcpy((self->writeBuffer + self->writeBufferFillSize), track, tmp);
360 self->writeBufferFillSize += tmp;
362 if (self->writeBufferFillSize == self->writeBufferSize) {
365 ok = [self->source safeWriteBytes:self->writeBuffer
366 count:self->writeBufferFillSize];
367 if (!ok) return NGStreamError;
369 self->writeBufferFillSize = 0;
373 if (self->flags._flushOnNewline == 1) {
374 // scan buffer for newlines, if one is found, flush buffer
376 for (tmp = 0; tmp < _len; tmp++) {
377 if (tmp == NEWLINE_CHAR) {
379 return NGStreamError;
385 /* clean up for GC */
387 track = NULL; // clean up for GC
397 if (self->readBuffer) {
398 free(self->readBuffer);
399 self->readBuffer = NULL;
400 self->readBufferPos = NULL;
402 self->readBufferFillSize = 0;
403 self->readBufferSize = 0;
405 if (self->writeBuffer) {
406 free(self->writeBuffer);
407 self->writeBuffer = NULL;
409 self->writeBufferFillSize = 0;
410 self->writeBufferSize = 0;
412 return [super close];
416 if (self->writeBufferFillSize > 0) {
420 if (self->writeBufferFillSize > WRITE_WARN_SIZE) {
421 NSLog(@"WARNING(%s): shall flush %uMB (%u bytes, errcode=%u) ...",
422 __PRETTY_FUNCTION__, (self->writeBufferFillSize/1024/1024),
423 self->writeBufferFillSize, NGStreamError);
429 safeWriteBytes:self->writeBuffer
430 count:self->writeBufferFillSize];
432 /* should check exception for fill size ? ... */
436 self->writeBufferFillSize = 0;
441 @end /* NGBufferedStream */
443 @implementation NGStream(NGBufferedStreamExtensions)
445 - (NGBufferedStream *)bufferedStream {
446 return [NGBufferedStream filterWithSource:self];
449 @end /* NGStream(NGBufferedStreamExtensions) */
451 @implementation NGBufferedStream(NGBufferedStreamExtensions)
453 - (NGBufferedStream *)bufferedStream {
457 @end /* NGBufferedStream(NGBufferedStreamExtensions) */