terminals/Xtreams__SharedQueueReadStream.st
changeset 9 6c90659cf105
child 25 02e7c3b6f63c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/terminals/Xtreams__SharedQueueReadStream.st	Mon Aug 22 16:04:00 2011 +0000
@@ -0,0 +1,84 @@
+"{ Package: 'stx:goodies/xtreams/terminals' }"
+
+"{ NameSpace: Xtreams }"
+
+ReadStream subclass:#SharedQueueReadStream
+	instanceVariableNames:'contentsSpecies'
+	classVariableNames:''
+	poolDictionaries:''
+	category:'My Classes'
+!
+
+SharedQueueReadStream comment:'Read stream on a SharedQueue. Primarily used for data transfer between processes.
+{{{
+	queue := SharedQueue new.
+	in := queue reading.
+	out := queue writing.
+	received := Array new writing.
+	done := Semaphore new.
+	consumer :=	
+		[ | size |
+			[	(size := in get) isZero
+			] whileFalse: [ | word |
+				word := ByteString new: size.
+				in read: size into: word.
+				received put: word ].
+			done signal.
+		] fork.
+	#(one two three four) do: [ :word | out put: word size; write: word ].
+	out put: 0.
+	done wait.
+	received conclusion
+}}}
+
+Instance Variables
+	contentsSpecies	<Class> species for collections of elements of this stream
+
+'
+!
+
+
+!SharedQueueReadStream methodsFor:'accessing'!
+
+get
+	^source next
+!
+
+read: anInteger into: aSequenceableCollection at: startIndex
+	startIndex to: startIndex + anInteger - 1 do: [:index | aSequenceableCollection at: index put: source next].
+	^anInteger
+! !
+
+!SharedQueueReadStream methodsFor:'initialize-release'!
+
+close
+!
+
+contentsSpecies
+	^contentsSpecies
+!
+
+contentsSpecies: aClass
+	contentsSpecies := aClass
+!
+
+on: aSource
+	super on: aSource.
+	contentsSpecies := Array
+! !
+
+!SharedQueueReadStream methodsFor:'private'!
+
+streamingInsertInto: aWriteStream
+	self shouldNotImplement
+!
+
+streamingWriteInto: aWriteStream
+	self shouldNotImplement
+! !
+
+!SharedQueueReadStream class methodsFor:'documentation'!
+
+version_SVN
+    ^ '$Id$'
+! !