transforms/Xtreams__InterpretedReadStream.st
author Jan Vrany <jan.vrany@fit.cvut.cz>
Wed, 01 Feb 2012 00:34:28 +0000
changeset 97 2a7827f4dce2
parent 72 d16c7d84d4a8
child 111 44ac233b2f83
permissions -rw-r--r--
pool name fixes

"{ Package: 'stx:goodies/xtreams/transforms' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#InterpretedReadStream
	instanceVariableNames:'elementSize cache cacheSize operation contentsSpecies'
	classVariableNames:''
	poolDictionaries:'Xtreams::XtreamsPool'
	category:'Xtreams-Transforms'
!

InterpretedReadStream comment:'Interprets bytes from a binary source as values of preconfigured (primitive) CType, e.g. float, long etc.

Instance Variables
	elementSize	<Integer> byte size of elements of the pre-configured CType
	cache	<InterpretedBytes> caches bulk read bytes before interpreting for speed; size = elementSize * cacheSize
	cacheSize	<SmallInteger> how many elements (not bytes) do we want to cache
	operation	<BlockClosure> CType translation primitive used to read from the buffer
	contentsSpecies	<Class> collection type to use to hold collections of elements

'
!


!InterpretedReadStream class methodsFor:'instance creation'!

on: aReadStream bytesPerElement: anInteger contentsSpecies: aClass operation: aBlock cacheSize: aSize
	^self new on: aReadStream bytesPerElement: anInteger contentsSpecies: aClass operation: aBlock cacheSize: aSize
!

on: aReadStream type: aSymbol cacheSize: anInteger
	| interpretation |
	interpretation := InterpretedBytes perform: aSymbol.
	^self on: aReadStream bytesPerElement: (interpretation at: 1) contentsSpecies: (interpretation at: 2) operation: (interpretation at: 3) cacheSize: anInteger
! !

!InterpretedReadStream methodsFor:'accessing'!

read: anInteger into: aSequenceableCollection at: startIndex
	| count amount |
	count := 0.
	[[count = anInteger] whileFalse:
		[amount := cacheSize min: (anInteger - count).
		source read: amount * elementSize into: cache at: 1.
		0 to: amount - 1 do: [:index | aSequenceableCollection at: startIndex + count + index put: (operation value: cache value: index * elementSize + 1)].
		count := count + amount]]
			on: Incomplete do: [:exception |
				0 to: exception count // elementSize - 1 do: [:index | aSequenceableCollection at: startIndex + count + index put: (operation value: cache value: index * elementSize + 1)].
				(Incomplete on: aSequenceableCollection count: count + (exception count / elementSize) at: startIndex) raise].
	^anInteger
! !

!InterpretedReadStream methodsFor:'initialize-release'!

close
	super close.
	cache recycle.
	cache := nil
!

contentsSpecies
	^contentsSpecies
!

on: aReadStream bytesPerElement: anInteger contentsSpecies: aClass operation: aBlock cacheSize: aSize
	super on: aReadStream.
	cacheSize := aSize.
	elementSize := anInteger.
	contentsSpecies := aClass.
	operation := aBlock.
	cache := InterpretedBytes newRecycled: ((elementSize * cacheSize) max: DefaultBufferSize)
! !

!InterpretedReadStream methodsFor:'seeking'!

++ anInteger

	anInteger < 0 ifTrue: [ ^self -- anInteger negated ].
	[source ++ anInteger * elementSize]
		on: Incomplete do: [:exception | (Incomplete count: exception count / elementSize) raise].
	^anInteger
!

-- anInteger
	anInteger < 0 ifTrue: [ ^self ++ anInteger negated ].
	[source -- anInteger * elementSize]
		on: Incomplete do: [:exception | (Incomplete count: exception count / elementSize) raise].
	^anInteger
!

length
	^source length / elementSize
!

position
	^source position / elementSize
!

position: aPosition

	^([ source position: aPosition * elementSize ] on: Incomplete do: [ :ex | ex count ]) // elementSize
! !

!InterpretedReadStream methodsFor:'testing'!

isPositionable

	^source isPositionable
! !

!InterpretedReadStream class methodsFor:'documentation'!

version_SVN
    ^ '$Id$'
! !