"{ Package: 'stx:goodies/xtreams/transforms' }"
"{ NameSpace: Xtreams }"
WriteStream subclass:#CollectWriteStream
instanceVariableNames:'block cache'
classVariableNames:''
poolDictionaries:'Xtreams::XtreamsPool'
category:'Xtreams-Transforms'
!
CollectWriteStream comment:'Converts written elements using the provided conversion block. Optimizes bulk writes by batching the conversion results, allowing the destination stream to perform bulk writes as well.
Instance Variables
block <BlockClosure> collecting block (same style as collect: blocks)
cache <SequenceableCollection> caches results after transformation so that they can be bulk written too
'
!
!CollectWriteStream class methodsFor:'instance creation'!
on: aWriteStream block: block
^self new on: aWriteStream block: block
! !
!CollectWriteStream methodsFor:'accessing'!
block
^block
!
insert: anInteger from: aSequenceableCollection at: startIndex
| count amount written offset |
count := 0.
offset := startIndex.
[count < anInteger] whileTrue:
[amount := anInteger min: cache size.
1 to: amount do: [:index | cache at: index put: (block value: (aSequenceableCollection at: offset+index))].
written := [destination insert: anInteger from: cache. amount] on: Incomplete do: [ :ex | ex count ].
count := count + written.
written < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
offset := offset + amount ].
^anInteger
!
put: anObject
destination put: (block value: anObject).
^anObject
!
write: anInteger from: aSequenceableCollection at: startIndex
| count amount written offset |
count := 0.
offset := startIndex - 1.
[count < anInteger] whileTrue:
[amount := anInteger - count min: cache size.
1 to: amount do: [:index | cache at: index put: (block value: (aSequenceableCollection at: offset + index))].
written := [destination write: amount from: cache. amount] on: Incomplete do: [ :ex | ex count ].
count := count + written.
written < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
offset := offset + amount ].
^anInteger
! !
!CollectWriteStream methodsFor:'initialize-release'!
close
super close.
cache recycle.
cache := nil
!
contentsSpecies
^cache species
!
contentsSpecies: aClass
cache ifNotNil: [ cache recycle ].
cache := aClass newRecycled: (
(cache notNil and: [ cache size > 0 ])
ifTrue: [cache size]
ifFalse: [DefaultBufferSize])
!
on: aDestination block: aBlock
self on: aDestination.
block := aBlock.
self contentsSpecies: aDestination contentsSpecies
! !
!CollectWriteStream methodsFor:'seeking'!
++ anInteger
^destination ++ anInteger
!
-- anInteger
^destination -- anInteger
!
length
^destination length
!
position
^destination position
!
position: anInteger
^destination position: anInteger
! !
!CollectWriteStream methodsFor:'testing'!
isPositionable
^destination isPositionable
! !
!CollectWriteStream class methodsFor:'documentation'!
version_SVN
^ '$Id$'
! !