"{ Package: 'stx:goodies/xtreams/terminals' }"
"{ NameSpace: Xtreams }"
ReadStream subclass:#ExternalReadStream
instanceVariableNames:'cache cachePosition cacheDataSize'
classVariableNames:''
poolDictionaries:'XtreamsPool'
category:'Xtreams-Terminals'
!
ExternalReadStream comment:'Used to read from BlockableIOAccessors (e.g sockets or pipes). Elements are read-ahead but only what''s available. This is a binary stream (produces bytes/ByteArrays).
{{{
[ :in :out |
[ out writing write: ''Hello''; close.
in reading read: 5
] ensure: [ in close. out close ]
] valueWithArguments: SocketAccessor openPair
}}}
{{{
[ :in :out |
[ out writing write: ''Hello''; close.
in reading read: 5
] ensure: [ in close. out close ]
] valueWithArguments: OSSystemSupport concreteClass pipeAccessorClass openPair
}}}
Instance Variables
cache <ByteArray | ByteString> read-ahead buffer
cachePosition <SmallInteger> position in the buffer
cacheDataSize <SmallInteger> size of valid data in the buffer
'
!
!ExternalReadStream methodsFor:'accessing'!
get
cachePosition < cacheDataSize ifFalse: [
cachePosition := 0.
cacheDataSize := (1 max: self bytesForRead) min: cache size.
cacheDataSize := self readBytes: cacheDataSize into: cache startingAt: 1.
cacheDataSize isZero ifTrue: [ self incompleteRead: Incomplete zero ] ].
^cache at: (cachePosition := cachePosition + 1)
!
read: anInteger into: aSequenceableCollection at: startIndex
| index toRead |
anInteger isZero ifTrue: [^0].
index := startIndex.
toRead := anInteger.
[toRead > 0] whileTrue: [ | amount |
cachePosition < cacheDataSize ifFalse: [
cachePosition := 0.
cacheDataSize := (toRead max: self bytesForRead) min: cache size.
cacheDataSize := self readBytes: cacheDataSize into: cache startingAt: 1.
cacheDataSize isZero ifTrue: [ self incompleteRead: (Incomplete on: aSequenceableCollection count: anInteger - toRead at: startIndex) ] ].
amount := (cacheDataSize - cachePosition) min: toRead.
aSequenceableCollection replaceFrom: index to: index + amount - 1 with: cache startingAt: cachePosition + 1.
cachePosition := cachePosition + amount.
index := index + amount.
toRead := toRead - amount ].
^anInteger
! !
!ExternalReadStream methodsFor:'initialize-release'!
close
super close.
cache recycle
!
contentsSpecies
^cache class
!
contentsSpecies: aClass
cache ifNotNil: [ cache recycle ].
cache := aClass newRecycled: DefaultBufferSize.
cachePosition := 0.
cacheDataSize := 0.
!
on: anAccessor
super on: anAccessor.
self contentsSpecies: ByteArray
! !
!ExternalReadStream methodsFor:'private'!
bytesForRead
^source numAvailable
!
incompleteRead: incomplete
incomplete raise
!
readBytes:count into:aByteBuffer startingAt:firstIndex
^source nextBytes:count into:aByteBuffer startingAt:firstIndex
! !
!ExternalReadStream class methodsFor:'documentation'!
version_HG
^ '$Changeset: <not expanded> $'
! !