--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CollectingSharedQueueStream.st Wed Mar 05 14:50:52 1997 +0100
@@ -0,0 +1,171 @@
+Collection subclass:#CollectingSharedQueueStream
+ instanceVariableNames:'contents readPosition writePosition dataAvailable closed'
+ classVariableNames:''
+ poolDictionaries:''
+ category:'Collections-Ordered'
+!
+
+!CollectingSharedQueueStream class methodsFor:'documentation'!
+
+documentation
+"
+ This class combines the functionality of a growable collection
+ with sharedQueue and double-stream-like behavior.
+ The reader side may read from it using #next, and possibly access
+ elements via #at:.
+ Reading may start immediately, but will block until enough elements
+ have been added by another process, the writer.
+ Instances of this class may be useful to start processing on
+ big document/data collection immediately, while the data is still being
+ read; a concrete application is the HTMLDocumentReader, which starts
+ processing and displaying the document while the rest is still being
+ read.
+
+ [author:]
+ Claus Gittinger
+
+ [see also:]
+ Stream OrderedCollection SharedQueue
+"
+!
+
+examples
+"
+ [exBegin]
+ |s reader|
+
+ s := CollectingSharedQueueStream new.
+ reader := [
+ [s atEnd] whileFalse:[
+ Transcript showCR:s next
+ ].
+ ] fork.
+
+ 1 to:10 do:[:i |
+ Delay waitForSeconds:1.
+ s nextPut:i
+ ].
+ [exEnd]
+
+ the reader reads from a (slow) pipe;
+ the writer sends it to the transcript.
+ [exBegin]
+ |pipe s reader|
+
+ s := CollectingSharedQueueStream new.
+ reader := [
+ [s atEnd] whileFalse:[
+ Transcript showCR:s next
+ ].
+ ] forkAt:9.
+
+ pipe := PipeStream readingFrom:'ls -lR /usr'.
+ pipe notNil ifTrue:[
+ [pipe atEnd] whileFalse:[
+ pipe readWait.
+ s nextPut:(pipe nextLine).
+ ].
+ pipe close.
+ ].
+ s close
+ [exEnd]
+"
+
+! !
+
+!CollectingSharedQueueStream class methodsFor:'instance creation'!
+
+new
+ ^ self basicNew initialize
+
+ "Created: 5.3.1997 / 14:30:36 / cg"
+! !
+
+!CollectingSharedQueueStream methodsFor:'accessing'!
+
+at:index
+ "synchronized read - possibly wait for elements up to index
+ being added (by someone else); then return it."
+
+ writePosition > index ifTrue:[
+ ^ contents at:index
+ ].
+
+ [writePosition <= index] whileTrue:[
+ closed ifTrue:[
+ ^ self subscriptBoundsError:index
+ ].
+ dataAvailable wait.
+ ].
+ ^ contents at:index
+
+ "Created: 5.3.1997 / 14:44:41 / cg"
+!
+
+close
+ "signal the end of input; to be used by the writer"
+
+ closed := true.
+ dataAvailable signal
+
+ "Modified: 5.3.1997 / 14:45:11 / cg"
+!
+
+next
+ "return the next value in the queue; if there is none,
+ wait 'til something is put into the receiver."
+
+ |value|
+
+ [readPosition >= writePosition] whileTrue:[
+ closed ifTrue:[
+ ^ nil
+ ].
+ dataAvailable wait
+ ].
+
+ value := contents at:readPosition.
+ readPosition := readPosition + 1.
+ ^ value
+
+ "Created: 5.3.1997 / 14:28:57 / cg"
+ "Modified: 5.3.1997 / 14:45:54 / cg"
+!
+
+nextPut:anObject
+ "append anObject to the queue; if anyone is waiting, tell him"
+
+ |value|
+
+ contents add:anObject.
+ writePosition := writePosition + 1.
+ dataAvailable signal
+
+ "Created: 5.3.1997 / 14:33:44 / cg"
+! !
+
+!CollectingSharedQueueStream methodsFor:'initialization'!
+
+initialize
+ readPosition := writePosition := 1.
+ dataAvailable := Semaphore new.
+ contents := OrderedCollection new.
+ closed := false.
+
+ "Modified: 5.3.1997 / 14:34:55 / cg"
+! !
+
+!CollectingSharedQueueStream methodsFor:'queries'!
+
+atEnd
+ closed ifFalse:[^ false].
+ ^ readPosition >= writePosition
+
+ "Modified: 5.3.1997 / 14:41:04 / cg"
+! !
+
+!CollectingSharedQueueStream class methodsFor:'documentation'!
+
+version
+ ^ '$Header: /cvs/stx/stx/libbasic2/CollectingSharedQueueStream.st,v 1.1 1997-03-05 13:50:52 cg Exp $'
+! !