transforms/Xtreams__CollectReadStream.st
author Martin Kobetic
Sun, 17 Nov 2013 00:23:18 -0500
changeset 147 bd6be28aa924
parent 111 44ac233b2f83
permissions -rw-r--r--
merging

"{ Package: 'stx:goodies/xtreams/transforms' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#CollectReadStream
	instanceVariableNames:'block cache contentsSpecies direct'
	classVariableNames:''
	poolDictionaries:'XtreamsPool'
	category:'Xtreams-Transforms'
!

CollectReadStream comment:'Converts elements being read using the provided conversion block. When the contentSpecies of the source doesn''t match the desired contentSpecies of this stream, we optimize bulk reads by reading into an internal buffer.

Instance Variables
	block	<BlockClosure> collecting block (same style as collect: blocks)
	cache	<SequenceableCollection | nil> caches elements before transformation when direct is false
	contentsSpecies	<Class> species for collections of elements of this stream
	direct	<Boolean> are we using an intermediate buffer (when source''s contentSpecies doesn''t match ours)

'
!


!CollectReadStream class methodsFor:'instance creation'!

on: aReadStream block: block
	^self new on: aReadStream block: block
! !

!CollectReadStream methodsFor:'accessing'!

block
	^block
!

get
	^block value: source get
!

read: anInteger into: aSequenceableCollection at: startIndex
	direct
		ifTrue:	[self directRead: anInteger into: aSequenceableCollection at: startIndex]
		ifFalse:	[self bufferRead: anInteger into: aSequenceableCollection at: startIndex].
	^anInteger
! !

!CollectReadStream methodsFor:'initialize-release'!

close
	super close.
	cache ifNotNil: [ cache recycle ]
!

contentsSpecies
	^contentsSpecies
!

contentsSpecies: aClass
	contentsSpecies := aClass.
	cache ifNotNil: [ cache recycle ].
	cache := (direct := contentsSpecies == source contentsSpecies)
		ifTrue:	[nil]
		ifFalse:	[
			source contentsSpecies newRecycled: (
				(cache notNil and: [ cache size > 0 ])
					ifTrue: [cache size]
					ifFalse: [DefaultBufferSize])]
!

on: aSource block: aBlock
	self on: aSource.
	block := aBlock.
	contentsSpecies := aSource contentsSpecies.
	direct := true
! !

!CollectReadStream methodsFor:'private'!

bufferRead: anInteger into: aSequenceableCollection at: startIndex
	| amount count read |
	count := 0.
	[count < anInteger] whileTrue:
		[amount := (anInteger - count) min: cache size.
		read := [source read: amount into: cache at: 1. amount] on: Incomplete do: [ :ex | ex count ].
		1 to: read do: [:index | aSequenceableCollection at: count + startIndex + index - 1 put: (block value: (cache at: index))].
		count := count + read.
		read < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]]
!

directRead: anInteger into: aSequenceableCollection at: startIndex
	| count |
	count := [source read: anInteger into: aSequenceableCollection at: startIndex. anInteger] on: Incomplete do: [ :ex | ex count ].
	startIndex to: startIndex + count - 1 do: [:index | aSequenceableCollection at: index put: (block value: (aSequenceableCollection at: index))].
	count < anInteger ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]
! !

!CollectReadStream methodsFor:'seeking'!

++ anInteger
	^source ++ anInteger
!

-- anInteger
	^source -- anInteger
!

length
	^source length
!

position
	^source position
!

position: anInteger
	^source position: anInteger
! !

!CollectReadStream methodsFor:'testing'!

isPositionable
	^source isPositionable
! !

!CollectReadStream class methodsFor:'documentation'!

version_HG

    ^ '$Changeset: <not expanded> $'
!

version_SVN
    ^ '$Id$'
! !