--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/terminals/Xtreams__SharedQueueReadStream.st Mon Aug 22 16:04:00 2011 +0000
@@ -0,0 +1,84 @@
+"{ Package: 'stx:goodies/xtreams/terminals' }"
+
+"{ NameSpace: Xtreams }"
+
+ReadStream subclass:#SharedQueueReadStream
+ instanceVariableNames:'contentsSpecies'
+ classVariableNames:''
+ poolDictionaries:''
+ category:'My Classes'
+!
+
+SharedQueueReadStream comment:'Read stream on a SharedQueue. Primarily used for data transfer between processes.
+{{{
+ queue := SharedQueue new.
+ in := queue reading.
+ out := queue writing.
+ received := Array new writing.
+ done := Semaphore new.
+ consumer :=
+ [ | size |
+ [ (size := in get) isZero
+ ] whileFalse: [ | word |
+ word := ByteString new: size.
+ in read: size into: word.
+ received put: word ].
+ done signal.
+ ] fork.
+ #(one two three four) do: [ :word | out put: word size; write: word ].
+ out put: 0.
+ done wait.
+ received conclusion
+}}}
+
+Instance Variables
+ contentsSpecies <Class> species for collections of elements of this stream
+
+'
+!
+
+
+!SharedQueueReadStream methodsFor:'accessing'!
+
+get
+ ^source next
+!
+
+read: anInteger into: aSequenceableCollection at: startIndex
+ startIndex to: startIndex + anInteger - 1 do: [:index | aSequenceableCollection at: index put: source next].
+ ^anInteger
+! !
+
+!SharedQueueReadStream methodsFor:'initialize-release'!
+
+close
+!
+
+contentsSpecies
+ ^contentsSpecies
+!
+
+contentsSpecies: aClass
+ contentsSpecies := aClass
+!
+
+on: aSource
+ super on: aSource.
+ contentsSpecies := Array
+! !
+
+!SharedQueueReadStream methodsFor:'private'!
+
+streamingInsertInto: aWriteStream
+ self shouldNotImplement
+!
+
+streamingWriteInto: aWriteStream
+ self shouldNotImplement
+! !
+
+!SharedQueueReadStream class methodsFor:'documentation'!
+
+version_SVN
+ ^ '$Id$'
+! !