author Martin Kobetic <>
Mon, 22 Aug 2011 16:06:51 +0000
changeset 10 3813193bdf4e
child 27 2cc5a8a3ca14
permissions -rw-r--r--
first cut

"{ Package: 'stx:goodies/xtreams/transforms' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#CollectReadStream
	instanceVariableNames:'block cache contentsSpecies direct'
	category:'My Classes'

CollectReadStream comment:'Converts elements being read using the provided conversion block. When the contentSpecies of the source doesn''t match the desired contentSpecies of this stream, we optimize bulk reads by reading into an internal buffer.

Instance Variables
	block	<BlockClosure> collecting block (same style as collect: blocks)
	cache	<SequenceableCollection | nil> caches elements before transformation when direct is false
	contentsSpecies	<Class> species for collections of elements of this stream
	direct	<Boolean> are we using an intermediate buffer (when source''s contentSpecies doesn''t match ours)


!CollectReadStream class methodsFor:'instance creation'!

on: aReadStream block: block
	^self new on: aReadStream block: block
! !

!CollectReadStream methodsFor:'accessing'!


	^block value: source get

read: anInteger into: aSequenceableCollection at: startIndex
		ifTrue:	[self directRead: anInteger into: aSequenceableCollection at: startIndex]
		ifFalse:	[self bufferRead: anInteger into: aSequenceableCollection at: startIndex].
! !

!CollectReadStream methodsFor:'initialize-release'!

	super close.
	cache ifNotNil: [ cache recycle ]


contentsSpecies: aClass
	contentsSpecies := aClass.
	cache ifNotNil: [ cache recycle ].
	cache := (direct := contentsSpecies == source contentsSpecies)
		ifTrue:	[nil]
		ifFalse:	[
			source contentsSpecies newRecycled: (
				(cache notNil and: [ cache size > 0 ])
					ifTrue: [cache size]
					ifFalse: [DefaultBufferSize])]

on: aSource block: aBlock
	self on: aSource.
	block := aBlock.
	contentsSpecies := aSource contentsSpecies.
	direct := true
! !

!CollectReadStream methodsFor:'private'!

bufferRead: anInteger into: aSequenceableCollection at: startIndex
	| amount count read |
	count := 0.
	[count < anInteger] whileTrue:
		[amount := (anInteger - count) min: cache size.
		read := [source read: amount into: cache at: 1. amount] on: Incomplete do: [ :ex | ex count ].
		1 to: read do: [:index | aSequenceableCollection at: count + startIndex + index - 1 put: (block value: (cache at: index))].
		count := count + read.
		read < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]]

directRead: anInteger into: aSequenceableCollection at: startIndex
	| count |
	count := [source read: anInteger into: aSequenceableCollection at: startIndex. anInteger] on: Incomplete do: [ :ex | ex count ].
	startIndex to: startIndex + count - 1 do: [:index | aSequenceableCollection at: index put: (block value: (aSequenceableCollection at: index))].
	count < anInteger ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]
! !

!CollectReadStream methodsFor:'seeking'!

++ anInteger
	^source ++ anInteger

-- anInteger
	^source -- anInteger

	^source length

	^source position

position: anInteger
	^source position: anInteger
! !

!CollectReadStream methodsFor:'testing'!

	^source isPositionable
! !

!CollectReadStream class methodsFor:'documentation'!

    ^ '$Id$'
! !