]> err.no Git - sope/blob - sope-core/NGStreams/NGBufferedStream.m
Id fixes
[sope] / sope-core / NGStreams / NGBufferedStream.m
1 /*
2   Copyright (C) 2000-2004 SKYRIX Software AG
3
4   This file is part of OpenGroupware.org.
5
6   OGo is free software; you can redistribute it and/or modify it under
7   the terms of the GNU Lesser General Public License as published by the
8   Free Software Foundation; either version 2, or (at your option) any
9   later version.
10
11   OGo is distributed in the hope that it will be useful, but WITHOUT ANY
12   WARRANTY; without even the implied warranty of MERCHANTABILITY or
13   FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
14   License for more details.
15
16   You should have received a copy of the GNU Lesser General Public
17   License along with OGo; see the file COPYING.  If not, write to the
18   Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19   02111-1307, USA.
20 */
21 // $Id$
22
23 #include "common.h"
24 #include "NGBufferedStream.h"
25
26 #define NEWLINE_CHAR '\n'
27 #define WRITE_WARN_SIZE (1024 * 1024 * 100) /* 100MB */
28
29 @implementation NGBufferedStream
30
31 static const unsigned DEFAULT_BUFFER_SIZE = 512;
32 static Class DataStreamClass = Nil;
33
34 + (void)initialize {
35   DataStreamClass = NSClassFromString(@"NGDataStream");
36 }
37
38 // returns the number of bytes which where read from the buffer
39 #define numberOfConsumedReadBufferBytes(self) \
40   ((self->readBufferSize == 0) ? 0 : (self->readBufferPos - self->readBuffer))
41
42 // returns the number of bytes which can be read from buffer (without source access)
43 #define numberOfAvailableReadBufferBytes(self) \
44   (self->readBufferFillSize - numberOfConsumedReadBufferBytes(self))
45
46 // look whether all bytes in the buffer where consumed, if so, reset the buffer
47 #define checkReadBufferFillState(self) \
48   if (numberOfAvailableReadBufferBytes(self) == 0) { \
49     self->readBufferPos = self->readBuffer; \
50     self->readBufferFillSize = 0;  \
51   }
52
53 // ******************** constructors ********************
54
55 + (id)filterWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size {
56   if (_source == nil) return nil;
57   if (*(Class *)_source == DataStreamClass) return _source;
58   return [[[self alloc] initWithSource:_source bufferSize:_size] autorelease];
59 }
60
61 // TODO: can we reduced duplicate code here ...
62
63 - (id)initWithSource:(id<NGStream>)_source bufferSize:(unsigned)_size {
64   if (_source == nil) {
65     [self release];
66     return nil;
67   }
68   if (*(Class *)_source == DataStreamClass) {
69     [self release];
70     return [_source retain];
71   }
72
73   if ((self = [super initWithSource:_source])) {
74     self->readBuffer  = calloc(_size, 1);
75     self->writeBuffer = calloc(_size, 1);
76     
77     self->readBufferPos       = self->readBuffer;
78     self->readBufferSize      = _size;
79     self->readBufferFillSize  = 0; // no bytes are read from source
80     self->writeBufferFillSize = 0;
81     self->writeBufferSize     = _size;
82     self->flags._flushOnNewline = 1;
83   }
84   return self;
85 }
86
87 - (id)initWithInputSource:(id<NGInputStream>)_source bufferSize:(unsigned)_s {
88   if (_source == nil) {
89     [self release];
90     return nil;
91   }
92   if (*(Class *)_source == DataStreamClass) {
93     [self release];
94     return [_source retain];
95   }
96
97   if ((self = [super initWithInputSource:_source])) {
98     self->readBuffer            = calloc(_s, 1);
99     self->readBufferPos         = self->readBuffer;
100     self->readBufferSize        = _s;
101     self->readBufferFillSize    = 0; // no bytes are read from source
102     self->flags._flushOnNewline = 1;
103   }
104   return self;
105 }
106 - (id)initWithOutputSource:(id<NGOutputStream>)_src bufferSize:(unsigned)_s {
107   if (_src == nil) {
108     [self release];
109     return nil;
110   }
111   if (*(Class *)_src == DataStreamClass) {
112     [self release];
113     return [_src retain];
114   }
115
116   if ((self = [super initWithOutputSource:_src])) {
117     self->writeBuffer           = calloc(_s, 1);
118     self->writeBufferFillSize   = 0;
119     self->writeBufferSize       = _s;
120     self->flags._flushOnNewline = 1;
121   }
122   return self;
123 }
124
125 - (id)initWithSource:(id<NGStream>)_source {
126   return [self initWithSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
127 }
128 - (id)initWithInputSource:(id<NGInputStream>)_source {
129   return [self initWithInputSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
130 }
131 - (id)initWithOutputSource:(id<NGOutputStream>)_source {
132   return [self initWithOutputSource:_source bufferSize:DEFAULT_BUFFER_SIZE];
133 }
134
135 - (void)dealloc {
136   [self flush];
137   
138   if (self->readBuffer) {
139     free(self->readBuffer);
140     self->readBuffer    = NULL;
141     self->readBufferPos = NULL;
142   }
143   self->readBufferFillSize = 0;
144   self->readBufferSize     = 0;
145
146   if (self->writeBuffer) {
147     free(self->writeBuffer);
148     self->writeBuffer = NULL;
149   }
150   self->writeBufferFillSize = 0;
151   self->writeBufferSize     = 0;
152   [super dealloc];
153 }
154
155 /* accessors */
156
157 - (void)setReadBufferSize:(unsigned)_size {
158   [self flush];
159
160   if (_size == self->readBufferSize)
161     return;
162
163   if (_size == 0) {
164     if (self->readBuffer != NULL) {
165       free(self->readBuffer);
166       self->readBuffer = NULL;
167     }
168     self->readBufferSize = _size;
169     self->readBufferPos  = NULL;
170   }
171   else {
172     if (self->readBuffer != NULL)
173       self->readBuffer = realloc(self->readBuffer, _size);
174     else
175       self->readBuffer = calloc(_size, 1);
176     
177     self->readBufferSize     = _size;
178     self->readBufferPos      = self->readBuffer;
179     self->readBufferFillSize = 0; // no bytes a read from source
180   }
181 }
182 - (unsigned)readBufferSize {
183   return self->readBufferSize;
184 }
185
186 - (void)setWriteBufferSize:(unsigned)_size {
187   [self flush];
188
189   if (_size == self->writeBufferSize)
190     return;
191
192   self->writeBuffer = realloc(self->writeBuffer, _size);
193   self->writeBufferSize = _size;
194 }
195 - (unsigned)writeBufferSize {
196   return self->writeBufferSize;
197 }
198
199 /* blocking .. */
200
201 - (BOOL)wouldBlockInMode:(NGStreamMode)_mode {
202   BOOL canRead, canWrite;
203
204   if (self->readBufferSize == 0)
205     canRead = NO;
206   else
207     canRead = (numberOfAvailableReadBufferBytes(self) > 0);
208   
209   canWrite = (self->writeBufferSize == 0)
210     ? NO
211     : (self->writeBufferFillSize > 0);
212   
213   if ((_mode == NGStreamMode_readWrite) && canRead && canWrite)
214     return NO;
215   if ((_mode == NGStreamMode_readOnly) && canRead) {
216     return NO;
217   }
218   if ((_mode == NGStreamMode_writeOnly) && canWrite)
219     return NO;
220
221   return ([self->source respondsToSelector:@selector(wouldBlockInMode:)])
222     ? [(id)self->source wouldBlockInMode:_mode]
223     : YES;
224 }
225
226 /* primitives */
227
228 - (unsigned)readBytes:(void *)_buf count:(unsigned)_len {
229   register unsigned availBytes = numberOfAvailableReadBufferBytes(self);
230   
231   if (self->readBufferSize == 0) { // no read buffering is done (buffersize==0)
232     return (readBytes != NULL)
233       ? readBytes(source, _cmd, _buf, _len)
234       : [source readBytes:_buf count:_len];
235   }
236     
237   if (availBytes >= _len) {
238     // there are enough bytes in the buffer to fulfill the request
239     if (_len == 1) {
240       *(unsigned char *)_buf = *(unsigned char *)self->readBufferPos;
241       self->readBufferPos++;
242     }
243     else {
244       memcpy(_buf, self->readBufferPos, _len);
245       self->readBufferPos += _len; // update read position (consumed-size)
246     }
247     checkReadBufferFillState(self); // check whether all bytes where consumed
248     return _len;
249   }
250   else if (availBytes > 0) {
251     // there are some bytes in the buffer, these are returned
252     
253     memcpy(_buf, self->readBufferPos, availBytes);// copy all bytes from buffer
254     self->readBufferPos      = self->readBuffer;  // reset position
255     self->readBufferFillSize = 0;   // no bytes available in buffer anymore
256     return availBytes;
257   }
258   else if (_len > self->readBufferSize) {
259     /*
260       requested _len is bigger than the buffersize, so we can bypass the
261       buffer (which is empty, as guaranteed by the previous 'ifs'
262     */
263
264     NSAssert(self->readBufferPos == self->readBuffer,
265              @"read buffer position is not reset");
266     NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer");
267
268     availBytes = (readBytes != NULL)
269       ? (unsigned)readBytes(source, _cmd, _buf, _len)
270       : [source readBytes:_buf count:_len];
271     
272     if (availBytes == NGStreamError)
273       return NGStreamError;
274     
275     NSAssert(availBytes != 0, @"readBytes:count: may never return zero !");
276
277     return availBytes; // return the number of bytes which could be read
278   }
279   else {
280     /*
281       no bytes are available and the requested _len is smaller than the 
282       possible buffer size, we have to read the next block of input from the 
283       source
284     */
285     
286     NSAssert(self->readBufferPos == self->readBuffer,
287              @"read buffer position is not reset");
288     NSAssert(self->readBufferFillSize == 0, @"there are bytes in the buffer");
289     
290     self->readBufferFillSize = (readBytes != NULL)
291       ? (unsigned)readBytes(source,_cmd, self->readBuffer,self->readBufferSize)
292       : [source readBytes:self->readBuffer count:self->readBufferSize];
293     
294     if (self->readBufferFillSize == NGStreamError) {
295       self->readBufferFillSize = 0;
296       return NGStreamError;
297     }
298     
299     NSAssert(self->readBufferFillSize != 0,
300              @"readBytes:count: may never return zero !");
301     
302     /* 
303        now comes a section which is roughly the same like the first to 
304        conditionals in this method
305     */
306     if (self->readBufferFillSize >= _len) {
307       // there are enough bytes in the buffer to fulfill the request
308     
309       memcpy(_buf, self->readBufferPos, _len);
310       self->readBufferPos += _len;    // update read position (consumed-size)
311       checkReadBufferFillState(self); // check whether all bytes where consumed
312       return _len;
313     }
314     else { // (readBufferFillSize > 0) (this is ensured by the above assert)
315       // there are some bytes in the buffer, these are returned
316
317       availBytes = self->readBufferFillSize;
318       // copy all bytes from buffer
319       memcpy(_buf, self->readBufferPos, self->readBufferFillSize);
320       self->readBufferPos      = self->readBuffer; // reset position
321       self->readBufferFillSize = 0; // no bytes available in buffer anymore
322       return availBytes;
323     }
324   }
325 }
326
327 - (int)readByte {
328   if (self->readBufferSize == 0) // no read buffering is done (buffersize==0)
329     return [super readByte];
330   
331   if (numberOfAvailableReadBufferBytes(self) >= 1) {
332     unsigned char byte = *(unsigned char *)self->readBufferPos;
333     self->readBufferPos++;
334     checkReadBufferFillState(self); // check whether all bytes where consumed    
335     return byte;
336   }
337   return [super readByte];
338 }
339
340 - (unsigned)writeBytes:(const void *)_buf count:(unsigned)_len {
341   register unsigned tmp       = 0;
342   register unsigned remaining = _len;
343   register void     *track    = (void *)_buf;
344
345 #if DEBUG
346   if (_len > WRITE_WARN_SIZE) {
347     NSLog(@"WARNING(%s): got passed in length %uMB (%u bytes, errcode=%u) ...",
348           __PRETTY_FUNCTION__, (_len / 1024 / 1024), _len, NGStreamError);
349   }
350 #endif
351   
352   while (remaining > 0) {
353     // how much bytes available in buffer ?
354     tmp = self->writeBufferSize - self->writeBufferFillSize; 
355     tmp = (tmp > remaining) ? remaining : tmp;
356     
357     memcpy((self->writeBuffer + self->writeBufferFillSize), track, tmp);
358     track += tmp;
359     remaining -= tmp;
360     self->writeBufferFillSize += tmp;
361     
362     if (self->writeBufferFillSize == self->writeBufferSize) {
363       BOOL ok;
364       
365       ok = [self->source safeWriteBytes:self->writeBuffer
366                          count:self->writeBufferFillSize];
367       if (!ok) return NGStreamError;
368       
369       self->writeBufferFillSize = 0;
370     }
371   }
372   
373   if (self->flags._flushOnNewline == 1) {
374     // scan buffer for newlines, if one is found, flush buffer
375     
376     for (tmp = 0; tmp < _len; tmp++) {
377       if (tmp == NEWLINE_CHAR) {
378         if (![self flush])
379           return NGStreamError;
380         break;
381       }
382     }
383   }
384   
385   /* clean up for GC */
386   tmp       = 0;    
387   track     = NULL; // clean up for GC
388   remaining = 0;
389   
390   return _len;
391 }
392
393 - (BOOL)close {
394   if (![self flush])
395     return NO;
396   
397   if (self->readBuffer) {
398     free(self->readBuffer);
399     self->readBuffer = NULL;
400     self->readBufferPos = NULL;
401   }
402   self->readBufferFillSize = 0;
403   self->readBufferSize = 0;
404   
405   if (self->writeBuffer) {
406     free(self->writeBuffer);
407     self->writeBuffer = NULL;
408   }
409   self->writeBufferFillSize = 0;
410   self->writeBufferSize = 0;
411   
412   return [super close];
413 }
414
415 - (BOOL)flush {
416   if (self->writeBufferFillSize > 0) {
417     BOOL ok;
418     
419 #if DEBUG
420     if (self->writeBufferFillSize > WRITE_WARN_SIZE) {
421       NSLog(@"WARNING(%s): shall flush %uMB (%u bytes, errcode=%u) ...",
422             __PRETTY_FUNCTION__, (self->writeBufferFillSize/1024/1024),
423             self->writeBufferFillSize, NGStreamError);
424       //abort();
425     }
426 #endif
427     
428     ok = [self->source
429               safeWriteBytes:self->writeBuffer
430               count:self->writeBufferFillSize];
431     if (!ok) {
432       /* should check exception for fill size ? ... */
433       return NO;
434     }
435     
436     self->writeBufferFillSize = 0;
437   }
438   return YES;
439 }
440
441 @end /* NGBufferedStream */
442
443 @implementation NGStream(NGBufferedStreamExtensions)
444
445 - (NGBufferedStream *)bufferedStream {
446   return [NGBufferedStream filterWithSource:self];
447 }
448
449 @end /* NGStream(NGBufferedStreamExtensions) */
450
451 @implementation NGBufferedStream(NGBufferedStreamExtensions)
452
453 - (NGBufferedStream *)bufferedStream {
454   return self;
455 }
456
457 @end /* NGBufferedStream(NGBufferedStreamExtensions) */