transforms/Xtreams__TransformReadStream.st
author Jan Vrany <jan.vrany@fit.cvut.cz>
Wed, 01 Feb 2012 00:34:28 +0000
changeset 97 2a7827f4dce2
parent 72 d16c7d84d4a8
child 111 44ac233b2f83
permissions -rw-r--r--
pool name fixes

"{ Package: 'stx:goodies/xtreams/transforms' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#TransformReadStream
	instanceVariableNames:'buffer bufferWriting sourceAtEnd block closeBlock'
	classVariableNames:''
	poolDictionaries:'Xtreams::XtreamsPool'
	category:'Xtreams-Transforms'
!

TransformReadStream comment:'Transform read stream provides the most expressive form of transformation. The transformation is described by a binary block that is given two arguments, @input and @output. Both @input and @output are streams themselves. The block can read arbitrary amount of elements from @input (including none) and write arbitrary amount of elements into @output (including none).The block will be invoked as many times as necessary to produce the required number of elements, or until an Incomplete is raised. Consequently if the block handles Incomplete from the input, it has to raise another Incomplete at some point, otherwise the stream will never end.

	Note that if the contentSpecies of the source doesn''t fit the output of the transformation, the #contentsSpecies of the transform stream has to be set explicitly. The #contentSpecies determines the type of collection employed by the internal buffer that is used as the storage for the virtual output stream of the transformation block. In some cases it might be desirable to tweak the buffering strategy of the virtual output stream. In that case the buffer of the stream can be set manually to any kind of Buffer that matches the requirements of the transformation.
	The closing behavior of the stream can be customized through the closeBlock. The default closeBlock simply propagates #close to the source as with any other transform stream.

Instance Variables
	buffer	<Buffer> holds the contents of the virtual ouput stream
	bufferWriting	<BufferWriteStream> the virtual output stream that is passed to the transformation block as the second argument
	sourceAtEnd	<Boolean> flag indicating that the source has signalled an Incomplete
	block	<BlockClosure> binary transformation block that reads elements from input (first argument) and writes elements into output (second argument)
	closeBlock	<BlockClosure> binary block invoked in response to the #close message, allows customizing the close behavior

'
!


!TransformReadStream class methodsFor:'instance creation'!

on: aReadStream block: block
	^self new on: aReadStream block: block
! !

!TransformReadStream methodsFor:'accessing'!

block
	^block
!

buffer
	^buffer
!

closeBlock
	^closeBlock
!

get
	(self fillBufferIfRequired: 1) ifFalse: [Incomplete zero raise].
	^buffer get
!

read: anInteger into: aSequenceableCollection at: startIndex
	| count read |
	anInteger isZero ifTrue: [^0].
	count := 0.
	[count < anInteger] whileTrue:
		[(self fillBufferIfRequired: anInteger - count) ifFalse: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
		read := (anInteger - count) min: buffer readSize.
		buffer read: read into: aSequenceableCollection at: startIndex + count.
		count := count + read].
	^anInteger
! !

!TransformReadStream methodsFor:'initialize-release'!

buffer: aBuffer
	buffer recycle.
	buffer := aBuffer.
	bufferWriting := buffer writing
!

close
	buffer ifNil: [^self].
	sourceAtEnd := true.
	closeBlock cull: source cull: self.
	buffer recycle.
	buffer := nil
!

closeBlock: anObject
	closeBlock := anObject
!

contentsSpecies
	^buffer contentsSpecies
!

contentsSpecies: aClass

	self buffer: (buffer class new: buffer cacheSize class: aClass)
!

on: aReadStream block: aBlock
	
	self on: aReadStream.
	block := aBlock.
	closeBlock := [ :sourceStream | sourceStream close].
	sourceAtEnd := false.
	buffer := ElasticBuffer new: DefaultBufferSize class: aReadStream contentsSpecies.
	bufferWriting := buffer writing
! !

!TransformReadStream methodsFor:'printing'!

streamingPrintOn: aStream
	super streamingPrintOn: aStream.
	aStream
		space;
		write: '#';
		write: block method homeMethod selector;
		write: ' buffered: ';
		print: buffer readSize.
	buffer readSize isZero ifTrue: [^self].
	aStream
		cr; tab;
		print: buffer contentsFuture
! !

!TransformReadStream methodsFor:'private'!

fillBuffer: anInteger
"
	We have exhausted our buffer, so fill it up again from our inner stream. Do not read more than is available from the inner stream or more than was originally requested. But try to be as greedy as possible, since filling our buffer is faster than always re-filling our buffer with one object at a time. Also, cap it at @bufferSize, because if we're reading from in-memory or on-disk sources, all the data will be immediately available - which could be gigs worth. We don't want to pull all of that in to our buffer.
	The semantics of our block are interesting. We give it both an input and an output stream. The block must write something to the output stream otherwise we assume this means we're at the end of the stream. It does not need to read from the input stream, but if it does and that stream fires EndOfStream, that also marks us as end of stream.
"
	| count available |
	count := 0.
	[[available := buffer readSize.
	block value: source value: bufferWriting.
	count := count + (buffer readSize - available).
	count >= anInteger ifTrue: [^self].
	buffer hasSpaceToWrite ifFalse: [^self]]
		repeat]
			on: Incomplete do: [:ex | ^sourceAtEnd := true].
!

fillBufferIfRequired: anInteger
"
	If we have any data available in our buffer, return true. If not, attempt to fill up our buffer and return true if that worked. Try not to fill the buffer more than was originally requested (@anInteger), but we make sure we only fill up to what's available from our inner stream. We don't guarantee we'll fill our buffer up to @anInteger's worth, just that we'll attempt to fill the buffer.
"
	buffer ifNil: [^false].
	buffer hasDataToRead ifTrue: [^true].
	sourceAtEnd ifTrue: [^false].
	self fillBuffer: anInteger.
	^buffer hasDataToRead
! !

!TransformReadStream class methodsFor:'documentation'!

version_SVN
    ^ '$Id$'
! !