terminals/Xtreams__BlockClosureGenerateStream.st
author Jan Vrany <jan.vrany@fit.cvut.cz>
Wed, 01 Feb 2012 00:41:14 +0000
changeset 99 677c81c943e4
parent 78 a9dd8b69b39f
child 109 9587e2df7029
permissions -rw-r--r--
build files regenerated

"{ Package: 'stx:goodies/xtreams/terminals' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#BlockClosureGenerateStream
	instanceVariableNames:'process current contentsSpecies writing readingSemaphore
		writingSemaphore closed'
	classVariableNames:''
	poolDictionaries:''
	category:'Xtreams-Terminals'
!

BlockClosureGenerateStream comment:'Read stream on a one argument block, evaluates the block once for the life time of the stream. Once the block finishes execution, the stream is closed. The block is expected to write to the block argument, which will block exectuion until elements are read from the stream.
{{{
	"In this example, we have a -hard- loop, not using collection protocol, which will only run one element at a time."
	[:out | 1 to: 10 do: [:i | out put: i]] reading read: 5.
}}}
{{{
	"Fibonacci"
	[:out | | a b x |
	a := 0. b := 1.
	[out put: a.
	x := a.
	a := b.
	b := b + x] repeat] reading ++ 500; get.
}}}

Instance Variables
	current	<Object>	the current element in the stream
	closed	<Boolean>	true if the block has finished execution
	contentsSpecies	<Class>	species for collections of elements of this stream
	process	<Process>	the process executing the block
	readingSemaphore	<Semaphore>	attempts to read will wait on this
	writingSemaphore	<Semaphore>	attempts to write will wait on this, signaled when an attempt to read is performed

'
!


!BlockClosureGenerateStream methodsFor:'accessing'!

get
	writingSemaphore signal.
	closed ifTrue: [Incomplete zero raise].
	readingSemaphore wait.
	^current
!

read: anInteger into: aSequenceableCollection at: startIndex
	| count |
	count := 0.
	[[count < anInteger] whileTrue:
		[aSequenceableCollection at: startIndex + count put: self get.
		count := count + 1]]
			on: Incomplete do: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
	^anInteger
! !

!BlockClosureGenerateStream methodsFor:'initialize-release'!

close
	closed := true.
	process terminate.
	readingSemaphore signal
!

contentsSpecies
	^contentsSpecies
!

contentsSpecies: aClass
	contentsSpecies := aClass
!

on: aBlockClosure
	super on: aBlockClosure.
	contentsSpecies := Array.
	readingSemaphore := Semaphore new.
	writingSemaphore := Semaphore new.
	closed := false.
	process :=
		[aBlockClosure value: [:in |
			writingSemaphore wait.
			current := in.
			readingSemaphore signal] writing.
		closed := true] fork
! !

!BlockClosureGenerateStream class methodsFor:'documentation'!

version_SVN
    ^ '$Id$'
! !