terminals/Xtreams__BlockClosureGenerateStream.st
changeset 9 6c90659cf105
child 25 02e7c3b6f63c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/terminals/Xtreams__BlockClosureGenerateStream.st	Mon Aug 22 16:04:00 2011 +0000
@@ -0,0 +1,93 @@
+"{ Package: 'stx:goodies/xtreams/terminals' }"
+
+"{ NameSpace: Xtreams }"
+
+ReadStream subclass:#BlockClosureGenerateStream
+	instanceVariableNames:'process current contentsSpecies writing readingSemaphore
+		writingSemaphore closed'
+	classVariableNames:''
+	poolDictionaries:''
+	category:'My Classes'
+!
+
+BlockClosureGenerateStream comment:'Read stream on a one argument block, evaluates the block once for the life time of the stream. Once the block finishes execution, the stream is closed. The block is expected to write to the block argument, which will block exectuion until elements are read from the stream.
+{{{
+	"In this example, we have a -hard- loop, not using collection protocol, which will only run one element at a time."
+	[:out | 1 to: 10 do: [:i | out put: i]] reading read: 5.
+}}}
+{{{
+	"Fibonacci"
+	[:out | | a b x |
+	a := 0. b := 1.
+	[out put: a.
+	x := a.
+	a := b.
+	b := b + x] repeat] reading ++ 500; get.
+}}}
+
+Instance Variables
+	current	<Object>	the current element in the stream
+	closed	<Boolean>	true if the block has finished execution
+	contentsSpecies	<Class>	species for collections of elements of this stream
+	process	<Process>	the process executing the block
+	readingSemaphore	<Semaphore>	attempts to read will wait on this
+	writingSemaphore	<Semaphore>	attempts to write will wait on this, signaled when an attempt to read is performed
+
+'
+!
+
+
+!BlockClosureGenerateStream methodsFor:'accessing'!
+
+get
+	writingSemaphore signal.
+	closed ifTrue: [Incomplete zero raise].
+	readingSemaphore wait.
+	^current
+!
+
+read: anInteger into: aSequenceableCollection at: startIndex
+	| count |
+	count := 0.
+	[[count < anInteger] whileTrue:
+		[aSequenceableCollection at: startIndex + count put: self get.
+		count := count + 1]]
+			on: Incomplete do: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
+	^anInteger
+! !
+
+!BlockClosureGenerateStream methodsFor:'initialize-release'!
+
+close
+	closed := true.
+	process terminate.
+	readingSemaphore signal
+!
+
+contentsSpecies
+	^contentsSpecies
+!
+
+contentsSpecies: aClass
+	contentsSpecies := aClass
+!
+
+on: aBlockClosure
+	super on: aBlockClosure.
+	contentsSpecies := Array.
+	readingSemaphore := Semaphore new.
+	writingSemaphore := Semaphore new.
+	closed := false.
+	process :=
+		[aBlockClosure value: [:in |
+			writingSemaphore wait.
+			current := in.
+			readingSemaphore signal] writing.
+		closed := true] fork
+! !
+
+!BlockClosureGenerateStream class methodsFor:'documentation'!
+
+version_SVN
+    ^ '$Id$'
+! !