CollectingSharedQueueStream.st
changeset 500 f1df3a589fb4
child 501 3299cf782635
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CollectingSharedQueueStream.st	Wed Mar 05 14:50:52 1997 +0100
@@ -0,0 +1,171 @@
+Collection subclass:#CollectingSharedQueueStream
+	instanceVariableNames:'contents readPosition writePosition dataAvailable closed'
+	classVariableNames:''
+	poolDictionaries:''
+	category:'Collections-Ordered'
+!
+
+!CollectingSharedQueueStream class methodsFor:'documentation'!
+
+documentation
+"
+    This class combines the functionality of a growable collection
+    with sharedQueue and double-stream-like behavior.
+    The reader side may read from it using #next, and possibly access
+    elements via #at:.
+    Reading may start immediately, but will block until enough elements
+    have been added by another process, the writer.
+    Instances of this class may be useful to start processing on
+    big document/data collection immediately, while the data is still being
+    read; a concrete application is the HTMLDocumentReader, which starts
+    processing and displaying the document while the rest is still being
+    read.
+
+    [author:]
+        Claus Gittinger
+
+    [see also:]
+        Stream OrderedCollection SharedQueue
+"
+!
+
+examples
+"
+                                                        [exBegin]
+    |s reader|
+
+    s := CollectingSharedQueueStream new.
+    reader := [
+                [s atEnd] whileFalse:[
+                    Transcript showCR:s next
+                ].
+              ] fork.
+
+    1 to:10 do:[:i |
+        Delay waitForSeconds:1.
+        s nextPut:i
+    ].
+                                                        [exEnd]
+
+  the reader reads from a (slow) pipe;
+  the writer sends it to the transcript.
+                                                        [exBegin]
+    |pipe s reader|
+
+    s := CollectingSharedQueueStream new.
+    reader := [
+                [s atEnd] whileFalse:[
+                    Transcript showCR:s next
+                ].
+              ] forkAt:9.
+
+    pipe := PipeStream readingFrom:'ls -lR /usr'.
+    pipe notNil ifTrue:[
+        [pipe atEnd] whileFalse:[
+            pipe readWait.
+            s nextPut:(pipe nextLine).
+        ].
+        pipe close.
+    ].
+    s close
+                                                        [exEnd]
+"
+
+! !
+
+!CollectingSharedQueueStream class methodsFor:'instance creation'!
+
+new
+    ^ self basicNew initialize
+
+    "Created: 5.3.1997 / 14:30:36 / cg"
+! !
+
+!CollectingSharedQueueStream methodsFor:'accessing'!
+
+at:index
+    "synchronized read - possibly wait for elements up to index
+     being added (by someone else); then return it."
+
+    writePosition > index ifTrue:[
+        ^ contents at:index
+    ].
+
+    [writePosition <= index] whileTrue:[
+        closed ifTrue:[
+            ^ self subscriptBoundsError:index
+        ].
+        dataAvailable wait.
+    ].
+    ^ contents at:index
+
+    "Created: 5.3.1997 / 14:44:41 / cg"
+!
+
+close
+    "signal the end of input; to be used by the writer"
+
+    closed := true.
+    dataAvailable signal
+
+    "Modified: 5.3.1997 / 14:45:11 / cg"
+!
+
+next
+    "return the next value in the queue; if there is none,
+     wait 'til something is put into the receiver."
+
+    |value|
+
+    [readPosition >= writePosition] whileTrue:[
+        closed ifTrue:[
+            ^ nil
+        ].
+        dataAvailable wait
+    ].
+
+    value := contents at:readPosition.
+    readPosition := readPosition + 1.
+    ^ value
+
+    "Created: 5.3.1997 / 14:28:57 / cg"
+    "Modified: 5.3.1997 / 14:45:54 / cg"
+!
+
+nextPut:anObject
+    "append anObject to the queue; if anyone is waiting, tell him"
+
+    |value|
+
+    contents add:anObject.
+    writePosition := writePosition + 1.
+    dataAvailable signal
+
+    "Created: 5.3.1997 / 14:33:44 / cg"
+! !
+
+!CollectingSharedQueueStream methodsFor:'initialization'!
+
+initialize
+    readPosition := writePosition := 1.
+    dataAvailable := Semaphore new.
+    contents := OrderedCollection new.
+    closed := false.
+
+    "Modified: 5.3.1997 / 14:34:55 / cg"
+! !
+
+!CollectingSharedQueueStream methodsFor:'queries'!
+
+atEnd
+    closed ifFalse:[^ false].
+    ^ readPosition >= writePosition
+
+    "Modified: 5.3.1997 / 14:41:04 / cg"
+! !
+
+!CollectingSharedQueueStream class methodsFor:'documentation'!
+
+version
+    ^ '$Header: /cvs/stx/stx/libbasic2/CollectingSharedQueueStream.st,v 1.1 1997-03-05 13:50:52 cg Exp $'
+! !