"{ Package: 'stx:goodies/xtreams/terminals' }"
"{ NameSpace: Xtreams }"
ReadStream subclass:#SharedQueueReadStream
instanceVariableNames:'contentsSpecies'
classVariableNames:''
poolDictionaries:''
category:'Xtreams-Terminals'
!
SharedQueueReadStream comment:'Read stream on a SharedQueue. Primarily used for data transfer between processes.
{{{
queue := SharedQueue new.
in := queue reading.
out := queue writing.
received := Array new writing.
done := Semaphore new.
consumer :=
[ | size |
[ (size := in get) isZero
] whileFalse: [ | word |
word := ByteString new: size.
in read: size into: word.
received put: word ].
done signal.
] fork.
#(one two three four) do: [ :word | out put: word size; write: word ].
out put: 0.
done wait.
received conclusion
}}}
Instance Variables
contentsSpecies <Class> species for collections of elements of this stream
'
!
!SharedQueueReadStream methodsFor:'accessing'!
get
^source next
!
read: anInteger into: aSequenceableCollection at: startIndex
startIndex to: startIndex + anInteger - 1 do: [:index | aSequenceableCollection at: index put: source next].
^anInteger
! !
!SharedQueueReadStream methodsFor:'initialize-release'!
close
!
contentsSpecies
^contentsSpecies
!
contentsSpecies: aClass
contentsSpecies := aClass
!
on: aSource
super on: aSource.
contentsSpecies := Array
! !
!SharedQueueReadStream methodsFor:'private'!
streamingInsertInto: aWriteStream
self shouldNotImplement
!
streamingWriteInto: aWriteStream
self shouldNotImplement
! !
!SharedQueueReadStream class methodsFor:'documentation'!
version_HG
^ '$Changeset: <not expanded> $'
! !