2 Copyright (C) 2000-2005 SKYRIX Software AG
4 This file is part of SOPE.
6 SOPE is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License as published by the
8 Free Software Foundation; either version 2, or (at your option) any
11 SOPE is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
14 License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with SOPE; see the file COPYING. If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
22 #include <NGStreams/NGBufferedStream.h>
25 #define NEWLINE_CHAR '\n'
26 #define WRITE_WARN_SIZE (1024 * 1024 * 100) /* 100MB */
28 @implementation NGBufferedStream
30 static const unsigned DEFAULT_BUFFER_SIZE = 512;
31 static Class DataStreamClass = Nil;
34 DataStreamClass = NSClassFromString(@"NGDataStream");
37 // returns the number of bytes which where read from the buffer
38 #define numberOfConsumedReadBufferBytes(self) \
39 ((self->readBufferSize == 0) ? 0 : (self->readBufferPos - self->readBuffer))
41 // returns the number of bytes which can be read from buffer (without source access)
42 #define numberOfAvailableReadBufferBytes(self) \
43 (self->readBufferFillSize - numberOfConsumedReadBufferBytes(self))
45 // look whether all bytes in the buffer where consumed, if so, reset the buffer
46 #define checkReadBufferFillState(self) \
47 if (numberOfAvailableReadBufferBytes(self) == 0) { \
48 self->readBufferPos = self->readBuffer; \
49 self->readBufferFillSize = 0; \
52 // ******************** constructors ********************
54 + (id)filterWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size {
55 if (_source == nil) return nil;
56 if (*(Class *)_source == DataStreamClass) return _source;
57 return [[[self alloc] initWithSource:_source bufferSize:_size] autorelease];
60 // TODO: can we reduced duplicate code here ...
62 - (id)initWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size {
67 if (*(Class *)_source == DataStreamClass) {
69 return [_source retain];
72 if ((self = [super initWithSource:_source])) {
73 self->readBuffer = calloc(_size, 1);
74 self->writeBuffer = calloc(_size, 1);
76 self->readBufferPos = self->readBuffer;
77 self->readBufferSize = _size;
78 self->readBufferFillSize = 0; // no bytes are read from source
79 self->writeBufferFillSize = 0;
80 self->writeBufferSize = _size;
81 self->flags._flushOnNewline = 1;
86 - (id)initWithInputSource:(id<NGInputStream>)_source bufferSize:(unsigned)_s {
91 if (*(Class *)_source == DataStreamClass) {
93 return [_source retain];
96 if ((self = [super initWithInputSource:_source])) {
97 self->readBuffer = calloc(_s, 1);
98 self->readBufferPos = self->readBuffer;
99 self->readBufferSize = _s;
100 self->readBufferFillSize = 0; // no bytes are read from source
101 self->flags._flushOnNewline = 1;
105 - (id)initWithOutputSource:(id<NGOutputStream>)_src bufferSize:(unsigned)_s {
110 if (*(Class *)_src == DataStreamClass) {
112 return [_src retain];
115 if ((self = [super initWithOutputSource:_src])) {
116 self->writeBuffer = calloc(_s, 1);
117 self->writeBufferFillSize = 0;
118 self->writeBufferSize = _s;
119 self->flags._flushOnNewline = 1;
124 - (id)initWithSource:(id<NGStream>)_source {
125 return [self initWithSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
127 - (id)initWithInputSource:(id<NGInputStream>)_source {
128 return [self initWithInputSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
130 - (id)initWithOutputSource:(id<NGOutputStream>)_source {
131 return [self initWithOutputSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
137 if (self->readBuffer) {
138 free(self->readBuffer);
139 self->readBuffer = NULL;
140 self->readBufferPos = NULL;
142 self->readBufferFillSize = 0;
143 self->readBufferSize = 0;
145 if (self->writeBuffer) {
146 free(self->writeBuffer);
147 self->writeBuffer = NULL;
149 self->writeBufferFillSize = 0;
150 self->writeBufferSize = 0;
156 - (void)setReadBufferSize:(unsigned)_size {
159 if (_size == self->readBufferSize)
163 if (self->readBuffer != NULL) {
164 free(self->readBuffer);
165 self->readBuffer = NULL;
167 self->readBufferSize = _size;
168 self->readBufferPos = NULL;
171 if (self->readBuffer != NULL)
172 self->readBuffer = realloc(self->readBuffer, _size);
174 self->readBuffer = calloc(_size, 1);
176 self->readBufferSize = _size;
177 self->readBufferPos = self->readBuffer;
178 self->readBufferFillSize = 0; // no bytes a read from source
181 - (unsigned)readBufferSize {
182 return self->readBufferSize;
185 - (void)setWriteBufferSize:(unsigned)_size {
188 if (_size == self->writeBufferSize)
191 self->writeBuffer = realloc(self->writeBuffer, _size);
192 self->writeBufferSize = _size;
194 - (unsigned)writeBufferSize {
195 return self->writeBufferSize;
200 - (BOOL)wouldBlockInMode:(NGStreamMode)_mode {
201 BOOL canRead, canWrite;
203 if (self->readBufferSize == 0)
206 canRead = (numberOfAvailableReadBufferBytes(self) > 0);
208 canWrite = (self->writeBufferSize == 0)
210 : (self->writeBufferFillSize > 0);
212 if ((_mode == NGStreamMode_readWrite) && canRead && canWrite)
214 if ((_mode == NGStreamMode_readOnly) && canRead) {
217 if ((_mode == NGStreamMode_writeOnly) && canWrite)
220 return ([self->source respondsToSelector:@selector(wouldBlockInMode:)])
221 ? [(id)self->source wouldBlockInMode:_mode]
227 - (unsigned)readBytes:(void *)_buf count:(unsigned)_len {
228 register unsigned availBytes = numberOfAvailableReadBufferBytes(self);
230 if (self->readBufferSize == 0) { // no read buffering is done (buffersize==0)
231 return (readBytes != NULL)
232 ? readBytes(source, _cmd, _buf, _len)
233 : [source readBytes:_buf count:_len];
236 if (availBytes >= _len) {
237 // there are enough bytes in the buffer to fulfill the request
239 *(unsigned char *)_buf = *(unsigned char *)self->readBufferPos;
240 self->readBufferPos++;
243 memcpy(_buf, self->readBufferPos, _len);
244 self->readBufferPos += _len; // update read position (consumed-size)
246 checkReadBufferFillState(self); // check whether all bytes where consumed
249 else if (availBytes > 0) {
250 // there are some bytes in the buffer, these are returned
252 memcpy(_buf, self->readBufferPos, availBytes);// copy all bytes from buffer
253 self->readBufferPos = self->readBuffer; // reset position
254 self->readBufferFillSize = 0; // no bytes available in buffer anymore
257 else if (_len > self->readBufferSize) {
259 requested _len is bigger than the buffersize, so we can bypass the
260 buffer (which is empty, as guaranteed by the previous 'ifs'
263 NSAssert(self->readBufferPos == self->readBuffer,
264 @"read buffer position is not reset");
265 NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer");
267 availBytes = (readBytes != NULL)
268 ? (unsigned)readBytes(source, _cmd, _buf, _len)
269 : [source readBytes:_buf count:_len];
271 if (availBytes == NGStreamError)
272 return NGStreamError;
274 NSAssert(availBytes != 0, @"readBytes:count: may never return zero !");
276 return availBytes; // return the number of bytes which could be read
280 no bytes are available and the requested _len is smaller than the
281 possible buffer size, we have to read the next block of input from the
285 NSAssert(self->readBufferPos == self->readBuffer,
286 @"read buffer position is not reset");
287 NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer");
289 self->readBufferFillSize = (readBytes != NULL)
290 ? (unsigned)readBytes(source,_cmd, self->readBuffer,self->readBufferSize)
291 : [source readBytes:self->readBuffer count:self->readBufferSize];
293 if (self->readBufferFillSize == NGStreamError) {
294 self->readBufferFillSize = 0;
295 return NGStreamError;
298 NSAssert(self->readBufferFillSize != 0,
299 @"readBytes:count: may never return zero !");
302 now comes a section which is roughly the same like the first to
303 conditionals in this method
305 if (self->readBufferFillSize >= _len) {
306 // there are enough bytes in the buffer to fulfill the request
308 memcpy(_buf, self->readBufferPos, _len);
309 self->readBufferPos += _len; // update read position (consumed-size)
310 checkReadBufferFillState(self); // check whether all bytes where consumed
313 else { // (readBufferFillSize > 0) (this is ensured by the above assert)
314 // there are some bytes in the buffer, these are returned
316 availBytes = self->readBufferFillSize;
317 // copy all bytes from buffer
318 memcpy(_buf, self->readBufferPos, self->readBufferFillSize);
319 self->readBufferPos = self->readBuffer; // reset position
320 self->readBufferFillSize = 0; // no bytes available in buffer anymore
327 if (self->readBufferSize == 0) // no read buffering is done (buffersize==0)
328 return [super readByte];
330 if (numberOfAvailableReadBufferBytes(self) >= 1) {
331 unsigned char byte = *(unsigned char *)self->readBufferPos;
332 self->readBufferPos++;
333 checkReadBufferFillState(self); // check whether all bytes where consumed
336 return [super readByte];
339 - (unsigned)writeBytes:(const void *)_buf count:(unsigned)_len {
340 register unsigned tmp = 0;
341 register unsigned remaining = _len;
342 register void *track = (void *)_buf;
345 if (_len > WRITE_WARN_SIZE) {
346 NSLog(@"WARNING(%s): got passed in length %uMB (%u bytes, errcode=%u) ...",
347 __PRETTY_FUNCTION__, (_len / 1024 / 1024), _len, NGStreamError);
351 while (remaining > 0) {
352 // how much bytes available in buffer ?
353 tmp = self->writeBufferSize - self->writeBufferFillSize;
354 tmp = (tmp > remaining) ? remaining : tmp;
356 memcpy((self->writeBuffer + self->writeBufferFillSize), track, tmp);
359 self->writeBufferFillSize += tmp;
361 if (self->writeBufferFillSize == self->writeBufferSize) {
364 ok = [self->source safeWriteBytes:self->writeBuffer
365 count:self->writeBufferFillSize];
366 if (!ok) return NGStreamError;
368 self->writeBufferFillSize = 0;
372 if (self->flags._flushOnNewline == 1) {
373 // scan buffer for newlines, if one is found, flush buffer
375 for (tmp = 0; tmp < _len; tmp++) {
376 if (tmp == NEWLINE_CHAR) {
378 return NGStreamError;
384 /* clean up for GC */
386 track = NULL; // clean up for GC
396 if (self->readBuffer) {
397 free(self->readBuffer);
398 self->readBuffer = NULL;
399 self->readBufferPos = NULL;
401 self->readBufferFillSize = 0;
402 self->readBufferSize = 0;
404 if (self->writeBuffer) {
405 free(self->writeBuffer);
406 self->writeBuffer = NULL;
408 self->writeBufferFillSize = 0;
409 self->writeBufferSize = 0;
411 return [super close];
415 if (self->writeBufferFillSize > 0) {
419 if (self->writeBufferFillSize > WRITE_WARN_SIZE) {
420 NSLog(@"WARNING(%s): shall flush %uMB (%u bytes, errcode=%u) ...",
421 __PRETTY_FUNCTION__, (self->writeBufferFillSize/1024/1024),
422 self->writeBufferFillSize, NGStreamError);
428 safeWriteBytes:self->writeBuffer
429 count:self->writeBufferFillSize];
431 /* should check exception for fill size ? ... */
435 self->writeBufferFillSize = 0;
440 @end /* NGBufferedStream */
442 @implementation NGStream(NGBufferedStreamExtensions)
444 - (NGBufferedStream *)bufferedStream {
445 return [NGBufferedStream filterWithSource:self];
448 @end /* NGStream(NGBufferedStreamExtensions) */
450 @implementation NGBufferedStream(NGBufferedStreamExtensions)
452 - (NGBufferedStream *)bufferedStream {
456 @end /* NGBufferedStream(NGBufferedStreamExtensions) */