--- a/CollectingSharedQueueStream.st Wed Mar 05 15:31:35 1997 +0100
+++ b/CollectingSharedQueueStream.st Wed Mar 05 17:26:06 1997 +0100
@@ -1,5 +1,6 @@
-Collection subclass:#CollectingSharedQueueStream
- instanceVariableNames:'contents readPosition writePosition dataAvailable closed'
+SequenceableCollection subclass:#CollectingSharedQueueStream
+ instanceVariableNames:'contents readPosition writePosition dataAvailable closed
+ finalSizeIfKnown signalChanges'
classVariableNames:''
poolDictionaries:''
category:'Streams'
@@ -34,6 +35,7 @@
examples
"
+ two processes synchronized much like with a sharedQueue:
[exBegin]
|s reader|
@@ -50,8 +52,8 @@
].
[exEnd]
- the reader reads from a (slow) pipe;
- the writer sends it to the transcript.
+ the writer reads from a (slow) pipe;
+ the reader sends it to the transcript.
[exBegin]
|pipe s reader|
@@ -72,6 +74,35 @@
].
s close
[exEnd]
+
+
+ the writer reads from a (slow) pipe;
+ the collection is used in a TextView, which
+ will block whenever lines are to be displayed, which have not
+ yet been read:
+ [exBegin]
+ |view pipe buffer reader|
+
+ buffer := CollectingSharedQueueStream new.
+ buffer finalSize:100.
+
+ [
+ pipe := PipeStream readingFrom:'ls -lR /usr'.
+ pipe notNil ifTrue:[
+ [pipe atEnd] whileFalse:[
+ pipe readWait.
+ buffer nextPut:(pipe nextLine).
+ ].
+ pipe close.
+ ].
+ buffer changed:#size.
+ buffer close.
+ ] fork.
+
+ view := ScrollableView for:TextView.
+ view model:buffer; listMessage:#value; aspectMessage:#value.
+ view open.
+ [exEnd]
"
! !
@@ -105,15 +136,6 @@
"Created: 5.3.1997 / 14:44:41 / cg"
!
-close
- "signal the end of input; to be used by the writer"
-
- closed := true.
- dataAvailable signal
-
- "Modified: 5.3.1997 / 14:45:11 / cg"
-!
-
next
"return the next value in the queue; if there is none,
wait 'til something is put into the receiver."
@@ -142,9 +164,56 @@
contents add:anObject.
writePosition := writePosition + 1.
+
+ finalSizeIfKnown isNil ifTrue:[
+ signalChanges ifTrue:[
+ self changed:#size with:nil
+ ]
+ ].
dataAvailable signal
"Created: 5.3.1997 / 14:33:44 / cg"
+ "Modified: 5.3.1997 / 15:42:33 / cg"
+! !
+
+!CollectingSharedQueueStream methodsFor:'accessing - special'!
+
+close
+ "signal the end of input; to be used by the writer"
+
+ closed := true.
+ dataAvailable signal
+
+ "Modified: 5.3.1997 / 14:45:11 / cg"
+!
+
+finalSize:aNumber
+ "can be used by the writer, if the final size is known in
+ advance."
+
+ finalSizeIfKnown := aNumber.
+ signalChanges ifTrue:[
+ self changed:#size
+ ].
+
+ "Created: 5.3.1997 / 15:36:07 / cg"
+ "Modified: 5.3.1997 / 15:57:24 / cg"
+!
+
+signalChanges:aBoolean
+ "controls if I should send out size-changeMessages when new elements arrive"
+
+ signalChanges := aBoolean.
+
+ "Created: 5.3.1997 / 15:40:57 / cg"
+! !
+
+!CollectingSharedQueueStream methodsFor:'dummy converting'!
+
+asStringCollection
+ ^ self
+
+ "Created: 5.3.1997 / 16:02:57 / cg"
! !
!CollectingSharedQueueStream methodsFor:'initialization'!
@@ -165,10 +234,30 @@
^ readPosition >= writePosition
"Modified: 5.3.1997 / 14:41:04 / cg"
+!
+
+currentSize
+ ^ contents size
+
+ "Modified: 5.3.1997 / 15:56:36 / cg"
+!
+
+size
+ closed ifTrue:[^ contents size].
+ finalSizeIfKnown notNil ifTrue:[^ finalSizeIfKnown].
+
+ "/ must wait until closed
+ [closed] whileFalse:[
+ dataAvailable wait
+ ].
+ closed ifTrue:[^ contents size].
+
+ "Created: 5.3.1997 / 15:35:29 / cg"
+ "Modified: 5.3.1997 / 15:57:08 / cg"
! !
!CollectingSharedQueueStream class methodsFor:'documentation'!
version
- ^ '$Header: /cvs/stx/stx/libbasic2/CollectingSharedQueueStream.st,v 1.2 1997-03-05 14:31:35 cg Exp $'
+ ^ '$Header: /cvs/stx/stx/libbasic2/CollectingSharedQueueStream.st,v 1.3 1997-03-05 16:26:06 cg Exp $'
! !