transforms/Xtreams__CollectReadStream.st
author Jan Vrany <jan.vrany@fit.cvut.cz>
Wed, 01 Feb 2012 00:34:28 +0000
changeset 97 2a7827f4dce2
parent 72 d16c7d84d4a8
child 111 44ac233b2f83
permissions -rw-r--r--
pool name fixes

"{ Package: 'stx:goodies/xtreams/transforms' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#CollectReadStream
	instanceVariableNames:'block cache contentsSpecies direct'
	classVariableNames:''
	poolDictionaries:'Xtreams::XtreamsPool'
	category:'Xtreams-Transforms'
!

CollectReadStream comment:'Converts elements being read using the provided conversion block. When the contentSpecies of the source doesn''t match the desired contentSpecies of this stream, we optimize bulk reads by reading into an internal buffer.

Instance Variables
	block	<BlockClosure> collecting block (same style as collect: blocks)
	cache	<SequenceableCollection | nil> caches elements before transformation when direct is false
	contentsSpecies	<Class> species for collections of elements of this stream
	direct	<Boolean> are we using an intermediate buffer (when source''s contentSpecies doesn''t match ours)

'
!


!CollectReadStream class methodsFor:'instance creation'!

on: aReadStream block: block
	^self new on: aReadStream block: block
! !

!CollectReadStream methodsFor:'accessing'!

block
	^block
!

get
	^block value: source get
!

read: anInteger into: aSequenceableCollection at: startIndex
	direct
		ifTrue:	[self directRead: anInteger into: aSequenceableCollection at: startIndex]
		ifFalse:	[self bufferRead: anInteger into: aSequenceableCollection at: startIndex].
	^anInteger
! !

!CollectReadStream methodsFor:'initialize-release'!

close
	super close.
	cache ifNotNil: [ cache recycle ]
!

contentsSpecies
	^contentsSpecies
!

contentsSpecies: aClass
	contentsSpecies := aClass.
	cache ifNotNil: [ cache recycle ].
	cache := (direct := contentsSpecies == source contentsSpecies)
		ifTrue:	[nil]
		ifFalse:	[
			source contentsSpecies newRecycled: (
				(cache notNil and: [ cache size > 0 ])
					ifTrue: [cache size]
					ifFalse: [DefaultBufferSize])]
!

on: aSource block: aBlock
	self on: aSource.
	block := aBlock.
	contentsSpecies := aSource contentsSpecies.
	direct := true
! !

!CollectReadStream methodsFor:'private'!

bufferRead: anInteger into: aSequenceableCollection at: startIndex
	| amount count read |
	count := 0.
	[count < anInteger] whileTrue:
		[amount := (anInteger - count) min: cache size.
		read := [source read: amount into: cache at: 1. amount] on: Incomplete do: [ :ex | ex count ].
		1 to: read do: [:index | aSequenceableCollection at: count + startIndex + index - 1 put: (block value: (cache at: index))].
		count := count + read.
		read < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]]
!

directRead: anInteger into: aSequenceableCollection at: startIndex
	| count |
	count := [source read: anInteger into: aSequenceableCollection at: startIndex. anInteger] on: Incomplete do: [ :ex | ex count ].
	startIndex to: startIndex + count - 1 do: [:index | aSequenceableCollection at: index put: (block value: (aSequenceableCollection at: index))].
	count < anInteger ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]
! !

!CollectReadStream methodsFor:'seeking'!

++ anInteger
	^source ++ anInteger
!

-- anInteger
	^source -- anInteger
!

length
	^source length
!

position
	^source position
!

position: anInteger
	^source position: anInteger
! !

!CollectReadStream methodsFor:'testing'!

isPositionable
	^source isPositionable
! !

!CollectReadStream class methodsFor:'documentation'!

version_SVN
    ^ '$Id$'
! !