transforms/Xtreams__CollectWriteStream.st
author Jan Vrany <jan.vrany@fit.cvut.cz>
Wed, 01 Feb 2012 00:34:28 +0000
changeset 97 2a7827f4dce2
parent 72 d16c7d84d4a8
child 111 44ac233b2f83
permissions -rw-r--r--
pool name fixes

"{ Package: 'stx:goodies/xtreams/transforms' }"

"{ NameSpace: Xtreams }"

WriteStream subclass:#CollectWriteStream
	instanceVariableNames:'block cache'
	classVariableNames:''
	poolDictionaries:'Xtreams::XtreamsPool'
	category:'Xtreams-Transforms'
!

CollectWriteStream comment:'Converts written elements using the provided conversion block. Optimizes bulk writes by batching the conversion results, allowing the destination stream to perform bulk writes as well.

Instance Variables
	block	<BlockClosure> collecting block (same style as collect: blocks)
	cache	<SequenceableCollection> caches results after transformation so that they can be bulk written too

'
!


!CollectWriteStream class methodsFor:'instance creation'!

on: aWriteStream block: block
	^self new on: aWriteStream block: block
! !

!CollectWriteStream methodsFor:'accessing'!

block
	^block
!

insert: anInteger from: aSequenceableCollection at: startIndex
	| count amount written offset |
	count := 0.
	offset := startIndex.
	[count < anInteger] whileTrue:
		[amount := anInteger min: cache size.
		1 to: amount do: [:index | cache at: index put: (block value: (aSequenceableCollection at: offset+index))].
		written := [destination insert: anInteger from: cache. amount] on: Incomplete do: [ :ex | ex count ].
		count := count + written.
		written < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
		offset := offset + amount ].
	^anInteger
!

put: anObject
	destination put: (block value: anObject).
	^anObject
!

write: anInteger from: aSequenceableCollection at: startIndex
	| count amount written offset |
	count := 0.
	offset := startIndex - 1.
	[count < anInteger] whileTrue:
		[amount := anInteger - count min: cache size.
		1 to: amount do: [:index | cache at: index put: (block value: (aSequenceableCollection at: offset + index))].
		written := [destination write: amount from: cache. amount] on: Incomplete do: [ :ex | ex count ].
		count := count + written.
		written < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
		offset := offset + amount ].
	^anInteger
! !

!CollectWriteStream methodsFor:'initialize-release'!

close
	super close.
	cache recycle.
	cache := nil
!

contentsSpecies

	^cache species
!

contentsSpecies: aClass

	cache ifNotNil: [ cache recycle ].
	cache := aClass newRecycled: (
				(cache notNil and: [ cache size > 0 ])
					ifTrue: [cache size]
					ifFalse: [DefaultBufferSize])
!

on: aDestination block: aBlock
	self on: aDestination.
	block := aBlock.
	self contentsSpecies: aDestination contentsSpecies
! !

!CollectWriteStream methodsFor:'seeking'!

++ anInteger
	^destination ++ anInteger
!

-- anInteger
	^destination -- anInteger
!

length
	^destination length
!

position
	^destination position
!

position: anInteger
	^destination position: anInteger
! !

!CollectWriteStream methodsFor:'testing'!

isPositionable
	^destination isPositionable
! !

!CollectWriteStream class methodsFor:'documentation'!

version_SVN
    ^ '$Id$'
! !