terminals/Xtreams__BlockClosureGenerateStream.st
author Martin Kobetic
Sun, 17 Nov 2013 00:21:39 -0500
changeset 141 263190106319
parent 109 9587e2df7029
permissions -rw-r--r--
merging

"{ Package: 'stx:goodies/xtreams/terminals' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#BlockClosureGenerateStream
	instanceVariableNames:'process current contentsSpecies writing readingSemaphore
		writingSemaphore closed'
	classVariableNames:''
	poolDictionaries:''
	category:'Xtreams-Terminals'
!

BlockClosureGenerateStream comment:'Read stream on a one argument block, evaluates the block once for the life time of the stream. Once the block finishes execution, the stream is closed. The block is expected to write to the block argument, which will block exectuion until elements are read from the stream.
{{{
	"In this example, we have a -hard- loop, not using collection protocol, which will only run one element at a time."
	[:out | 1 to: 10 do: [:i | out put: i]] reading read: 5.
}}}
{{{
	"Fibonacci"
	[:out | | a b x |
	a := 0. b := 1.
	[out put: a.
	x := a.
	a := b.
	b := b + x] repeat] reading ++ 500; get.
}}}

Instance Variables
	current <Object>        the current element in the stream
	closed  <Boolean>       true if the block has finished execution
	contentsSpecies <Class> species for collections of elements of this stream
	process <Process>       the process executing the block
	readingSemaphore        <Semaphore>     attempts to read will wait on this
	writingSemaphore        <Semaphore>     attempts to write will wait on this, signaled when an attempt to read is performed

'
!


!BlockClosureGenerateStream methodsFor:'accessing'!

get
	writingSemaphore signal.
	closed ifTrue: [Incomplete zero raise].
	readingSemaphore wait.
	^current
!

read: anInteger into: aSequenceableCollection at: startIndex
	| count |
	count := 0.
	[[count < anInteger] whileTrue:
		[aSequenceableCollection at: startIndex + count put: self get.
		count := count + 1]]
			on: Incomplete do: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
	^anInteger
! !

!BlockClosureGenerateStream methodsFor:'initialize-release'!

close
	closed := true.
	process terminate.
	readingSemaphore signal
!

contentsSpecies
	^contentsSpecies
!

contentsSpecies: aClass
	contentsSpecies := aClass
!

on: aBlockClosure
	super on: aBlockClosure.
	contentsSpecies := Array.
	readingSemaphore := Semaphore new.
	writingSemaphore := Semaphore new.
	closed := false.
	process :=
		[aBlockClosure value: [:in |
			writingSemaphore wait.
			current := in.
			readingSemaphore signal] writing.
		closed := true] fork
! !

!BlockClosureGenerateStream class methodsFor:'documentation'!

version_HG

    ^ '$Changeset: <not expanded> $'
! !