"{ Package: 'stx:goodies/xtreams/transforms' }"
"{ NameSpace: Xtreams }"
ReadStream subclass:#InterpretedReadStream
instanceVariableNames:'elementSize cache cacheSize operation contentsSpecies'
classVariableNames:''
poolDictionaries:'Xtreams::XtreamsPool'
category:'Xtreams-Transforms'
!
InterpretedReadStream comment:'Interprets bytes from a binary source as values of preconfigured (primitive) CType, e.g. float, long etc.
Instance Variables
elementSize <Integer> byte size of elements of the pre-configured CType
cache <InterpretedBytes> caches bulk read bytes before interpreting for speed; size = elementSize * cacheSize
cacheSize <SmallInteger> how many elements (not bytes) do we want to cache
operation <BlockClosure> CType translation primitive used to read from the buffer
contentsSpecies <Class> collection type to use to hold collections of elements
'
!
!InterpretedReadStream class methodsFor:'instance creation'!
on: aReadStream bytesPerElement: anInteger contentsSpecies: aClass operation: aBlock cacheSize: aSize
^self new on: aReadStream bytesPerElement: anInteger contentsSpecies: aClass operation: aBlock cacheSize: aSize
!
on: aReadStream type: aSymbol cacheSize: anInteger
| interpretation |
interpretation := InterpretedBytes perform: aSymbol.
^self on: aReadStream bytesPerElement: (interpretation at: 1) contentsSpecies: (interpretation at: 2) operation: (interpretation at: 3) cacheSize: anInteger
! !
!InterpretedReadStream methodsFor:'accessing'!
read: anInteger into: aSequenceableCollection at: startIndex
| count amount |
count := 0.
[[count = anInteger] whileFalse:
[amount := cacheSize min: (anInteger - count).
source read: amount * elementSize into: cache at: 1.
0 to: amount - 1 do: [:index | aSequenceableCollection at: startIndex + count + index put: (operation value: cache value: index * elementSize + 1)].
count := count + amount]]
on: Incomplete do: [:exception |
0 to: exception count // elementSize - 1 do: [:index | aSequenceableCollection at: startIndex + count + index put: (operation value: cache value: index * elementSize + 1)].
(Incomplete on: aSequenceableCollection count: count + (exception count / elementSize) at: startIndex) raise].
^anInteger
! !
!InterpretedReadStream methodsFor:'initialize-release'!
close
super close.
cache recycle.
cache := nil
!
contentsSpecies
^contentsSpecies
!
on: aReadStream bytesPerElement: anInteger contentsSpecies: aClass operation: aBlock cacheSize: aSize
super on: aReadStream.
cacheSize := aSize.
elementSize := anInteger.
contentsSpecies := aClass.
operation := aBlock.
cache := InterpretedBytes newRecycled: ((elementSize * cacheSize) max: DefaultBufferSize)
! !
!InterpretedReadStream methodsFor:'seeking'!
++ anInteger
anInteger < 0 ifTrue: [ ^self -- anInteger negated ].
[source ++ anInteger * elementSize]
on: Incomplete do: [:exception | (Incomplete count: exception count / elementSize) raise].
^anInteger
!
-- anInteger
anInteger < 0 ifTrue: [ ^self ++ anInteger negated ].
[source -- anInteger * elementSize]
on: Incomplete do: [:exception | (Incomplete count: exception count / elementSize) raise].
^anInteger
!
length
^source length / elementSize
!
position
^source position / elementSize
!
position: aPosition
^([ source position: aPosition * elementSize ] on: Incomplete do: [ :ex | ex count ]) // elementSize
! !
!InterpretedReadStream methodsFor:'testing'!
isPositionable
^source isPositionable
! !
!InterpretedReadStream class methodsFor:'documentation'!
version_SVN
^ '$Id$'
! !