terminals/Xtreams__ExternalReadStream.st
author Martin Kobetic
Sun, 17 Nov 2013 00:21:39 -0500
changeset 141 263190106319
parent 109 9587e2df7029
permissions -rw-r--r--
merging

"{ Package: 'stx:goodies/xtreams/terminals' }"

"{ NameSpace: Xtreams }"

ReadStream subclass:#ExternalReadStream
	instanceVariableNames:'cache cachePosition cacheDataSize'
	classVariableNames:''
	poolDictionaries:'XtreamsPool'
	category:'Xtreams-Terminals'
!

ExternalReadStream comment:'Used to read from BlockableIOAccessors (e.g sockets or pipes). Elements are read-ahead but only what''s available. This is a binary stream (produces bytes/ByteArrays).

{{{
	[ :in :out |
		[       out writing write: ''Hello''; close.
			in reading read: 5
		] ensure: [ in close. out close ]
	] valueWithArguments: SocketAccessor openPair
}}}
{{{
	[ :in :out |
		[       out writing write: ''Hello''; close.
			in reading read: 5
		] ensure: [ in close. out close ]
	] valueWithArguments: OSSystemSupport concreteClass pipeAccessorClass openPair
}}}

Instance Variables
	cache   <ByteArray | ByteString> read-ahead buffer
	cachePosition   <SmallInteger> position in the buffer
	cacheDataSize   <SmallInteger> size of valid data in the buffer

'
!


!ExternalReadStream methodsFor:'accessing'!

get
	cachePosition < cacheDataSize ifFalse: [
		cachePosition := 0.
		cacheDataSize := (1 max: self bytesForRead) min: cache size.
		cacheDataSize := self readBytes: cacheDataSize into: cache startingAt: 1.
		cacheDataSize isZero ifTrue: [ self incompleteRead: Incomplete zero ] ].
	^cache at: (cachePosition := cachePosition + 1)
!

read: anInteger into: aSequenceableCollection at: startIndex

	| index toRead |
	anInteger isZero ifTrue: [^0].
	index := startIndex.
	toRead := anInteger.
	[toRead > 0] whileTrue: [ | amount |
		cachePosition < cacheDataSize ifFalse: [
			cachePosition := 0.
			cacheDataSize := (toRead max: self bytesForRead) min: cache size.
			cacheDataSize := self readBytes: cacheDataSize into: cache startingAt: 1.
			cacheDataSize isZero ifTrue: [ self incompleteRead: (Incomplete on: aSequenceableCollection count: anInteger - toRead at: startIndex) ] ].
		amount := (cacheDataSize - cachePosition) min: toRead.
		aSequenceableCollection replaceFrom: index to: index + amount - 1 with: cache startingAt: cachePosition + 1.
		cachePosition := cachePosition + amount.
		index := index + amount.
		toRead := toRead - amount ].
	^anInteger
! !

!ExternalReadStream methodsFor:'initialize-release'!

close
	super close.
	cache recycle
!

contentsSpecies
	^cache class
!

contentsSpecies: aClass
	cache ifNotNil: [ cache recycle ].
	cache := aClass newRecycled: DefaultBufferSize.
	cachePosition := 0.
	cacheDataSize := 0.
!

on: anAccessor
	super on: anAccessor.
	self contentsSpecies: ByteArray
! !

!ExternalReadStream methodsFor:'private'!

bytesForRead

	^source numAvailable
!

incompleteRead: incomplete

	incomplete raise
!

readBytes:count into:aByteBuffer startingAt:firstIndex

    ^source nextBytes:count into:aByteBuffer startingAt:firstIndex
! !

!ExternalReadStream class methodsFor:'documentation'!

version_HG

    ^ '$Changeset: <not expanded> $'
! !