author mkobetic
Mon, 21 Nov 2011 06:44:00 +0000
changeset 25 02e7c3b6f63c
parent 9 6c90659cf105
child 39 80fdc4602b14
permissions -rw-r--r--
added XtreamsPool to fix DefaultBufferSize; set proper category names

"{ Package: 'stx:goodies/xtreams/terminals' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#BlockClosureGenerateStream
	instanceVariableNames:'process current contentsSpecies writing readingSemaphore
		writingSemaphore closed'

BlockClosureGenerateStream comment:'Read stream on a one argument block, evaluates the block once for the life time of the stream. Once the block finishes execution, the stream is closed. The block is expected to write to the block argument, which will block exectuion until elements are read from the stream.
	"In this example, we have a -hard- loop, not using collection protocol, which will only run one element at a time."
	[:out | 1 to: 10 do: [:i | out put: i]] reading read: 5.
	[:out | | a b x |
	a := 0. b := 1.
	[out put: a.
	x := a.
	a := b.
	b := b + x] repeat] reading ++ 500; get.

Instance Variables
	current	<Object>	the current element in the stream
	closed	<Boolean>	true if the block has finished execution
	contentsSpecies	<Class>	species for collections of elements of this stream
	process	<Process>	the process executing the block
	readingSemaphore	<Semaphore>	attempts to read will wait on this
	writingSemaphore	<Semaphore>	attempts to write will wait on this, signaled when an attempt to read is performed


!BlockClosureGenerateStream methodsFor:'accessing'!

	writingSemaphore signal.
	closed ifTrue: [Incomplete zero raise].
	readingSemaphore wait.

read: anInteger into: aSequenceableCollection at: startIndex
	| count |
	count := 0.
	[[count < anInteger] whileTrue:
		[aSequenceableCollection at: startIndex + count put: self get.
		count := count + 1]]
			on: Incomplete do: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
! !

!BlockClosureGenerateStream methodsFor:'initialize-release'!

	closed := true.
	process terminate.
	readingSemaphore signal


contentsSpecies: aClass
	contentsSpecies := aClass

on: aBlockClosure
	super on: aBlockClosure.
	contentsSpecies := Array.
	readingSemaphore := Semaphore new.
	writingSemaphore := Semaphore new.
	closed := false.
	process :=
		[aBlockClosure value: [:in |
			writingSemaphore wait.
			current := in.
			readingSemaphore signal] writing.
		closed := true] fork
! !

!BlockClosureGenerateStream class methodsFor:'documentation'!

    ^ '$Id$'
! !