core/Xtreams__ReadStream.st
author Jan Vrany <jan.vrany@fit.cvut.cz>
Wed, 20 Mar 2013 12:03:03 +0000
changeset 120 2ffa5dddf5e8
parent 109 9587e2df7029
permissions -rw-r--r--
Merged heads 8afa49727a25 b7321b3f5858

"{ Package: 'stx:goodies/xtreams/core' }"

"{ NameSpace: Xtreams }"

Object subclass:#ReadStream
	instanceVariableNames:'source'
	classVariableNames:''
	poolDictionaries:'XtreamsPool'
	category:'Xtreams-Core'
!

ReadStream comment:'Abstract superclass of all read streams, defines the API.

Read streams are created by sending #reading to a concrete resource (a.k.a terminal), such as a Collection, SocketAccessor, Filename, etc.

	''testing'' reading rest

Transform read streams are created through one of the messages in the ''transforming'' protocol sent to other read streams.

	(''testing'' reading collecting: #asUppercase) rest

Subclasses must implement the following messages:
	#read:into:at:
	#contentsSpecies

Instance Variables
	source  <Object> a read stream or "terminal" producing the elements

'
!


!ReadStream class methodsFor:'instance creation'!

on: aSource
	^self new on: aSource
! !

!ReadStream methodsFor:'accessing'!

get
	"Read an object from self.
	If there aren't any elements left in the stream, the Incomplete exception is raised."
	"       ^       <Object> an object read from this stream
	"
	| cache object |
	cache := self contentsSpecies newRecycled: 1.
	self read: 1 into: cache at: 1.
	object := cache first.
	cache recycle.
	^object
!

read: anInteger
	"Read anInteger's worth of elements from self and return them in a collection.
	If full anInteger number of elements cannot be read from the source, the Incomplete exception is raised."
	"       anInteger       <Integer>       the number of elements to read
		^<SequenceableCollection>       a new collection containing anIntegers worth of elements
	"
	| newCollection |
	newCollection := self contentsSpecies withSize: anInteger.
	self read: anInteger into: newCollection at: 1.
	^newCollection
!

read: anInteger into: aSequenceableCollection
	"Read anInteger's worth of elements into aSequenceableCollection starting from index 1.
	If full anInteger number of elements cannot be read from the source, the Incomplete exception is raised."
	"       anInteger       <Integer>       the number of elements to read
		aSequenceableCollection <SequenceableCollection>        the destination to read into
		^<Integer>      number of elements read
	"
	^self read: anInteger into: aSequenceableCollection at: 1
!

read: anInteger into: aSequenceableCollection at: startIndex
	"Read anInteger's worth of elements into aSequenceableCollection starting at startIndex.
	If full anInteger number of elements cannot be read from the source, the Incomplete exception is raised."
	"       anInteger       <Integer>       the number of elements to read
		aSequenceableCollection <SequenceableCollection>        the destination to read into
		startIndex      <Integer>       the index into aSequenceableCollection to start writing to
		^<Integer>      number of elements read
	"
	self subclassResponsibility
	"Subclasses must implement this message. All effort must be taken to read anInteger, blocking if necessary. "
!

rest
	"Read all remaining elements from self."
	"       ^               <SequenceableCollection>        a new collection of elements read from this stream
	"

	"Implementation note: Originally we sent #close then #destination, but sending #close will perform a #become:
		and our constructed write stream is going to go away as soon as we leave this method, so we're better off
		using #contents instead"

	^self contentsSpecies new writing
		write: self;
		contents
!

source
	"Return the source of this stream."

	^source
!

terminal
	"Return the object at the bottom of the stream."
	"       ^<Collection | Buffer | IOAccessor | BlockClosure>
	"
	^(source isKindOf: ReadStream)
		ifTrue: [ source terminal ]
		ifFalse: [ source ]
! !

!ReadStream methodsFor:'converting'!

reading
	^[[self next]
		on: EndOfStreamNotification
		do: [:notification |
			notification originator == self
				ifTrue:	[Incomplete zero raise]
				ifFalse:	[notification pass]]] reading
		contentsSpecies: self contentsSpecies;
		yourself
! !

!ReadStream methodsFor:'enumerating'!

collect: aBlock
	"Evaluate aBlock with each of the values of the receiver as the argument. Collect the resulting values into a collection."
	"       aBlock  <BlockClosure>  #collect: style block transforming the elements being read
		^               <Collection>
	""
		(1 to: 10) reading collect: [ :e | e * e ]
	"
	| contents |
	contents := self contentsSpecies new writing.
	self do: [:each | contents put: (aBlock value: each)].
	^contents close; destination
!

detect: aBlock
	"Evaluate aBlock with each of the receiver's elements as the argument. Answer the first element for which aBlock evaluates to true."
	"       aBlock  <BlockClosure>  #detect: style block
		^               <Object>        first element for which aBlock evalutes to true
	"
	^self detect: aBlock ifNone: [NotFoundError raise]
!

detect: aBlock ifNone: exceptionBlock
	"Evaluate aBlock with each of the receiver's elements as the argument. Answer the first element for which aBlock evaluates to true.
	Evaluate exceptionBlock if no such element is found."
	"       aBlock                  <BlockClosure>  #detect: style block
		exceptionBlock  <BlockClosure>  evaluated if there wasn't any element for which aBlock returns true
		^                               <Object>        first element for which aBlock evalutes to true
	"
	self do: [:each | (aBlock value: each) ifTrue: [^each]].
	^exceptionBlock value
!

do: aBlock
        "Read from the stream and pass the read objects to the #do: block one by one until it reaches the end of stream."
        "       aBlock  <BlockClosure>  #do: style block evaluated with each element
        "
        [[aBlock value: self get] repeat] on: Incomplete do: []
!

do: elementBlock separatedBy: separatorBlock
	"Evaluate elementBlock for each element in the stream. Between each pair of elements, but not before the first or after the last, evaluate the separatorBlock."
	"       elementBlock    <BlockClosure> evaluated with each element
		separatorBlock  <BlockClosure> evaluated between elements
	"
	| first |
	first := true.
	self do:
		[:elmnt |
		first ifTrue: [first := false]
			ifFalse: [separatorBlock value].
		elementBlock value: elmnt]
!

fold: binaryBlock
	"Evaluate binaryBlock with the 1st and th 2nd element of the stream, followed by the result of the first evaluation and the 3rd element of the stream, so on until the stream comes to an end."
	"       binaryBlock     <BlockClosure> evaluated with each element
	"

	| current |
	current := self get.
	self do: [:next | current := binaryBlock value: current value: next].
	^current
!

groupedBy: aBlock
	"Return a dictionary whose keys are the result of evaluating aBlock for all elements in  the stream, and the value for each key is a collection of elements that evaluated to that key.
	""      aBlock  <BlockClosure>  #collect: style block evaluated with each element
		^               <Dictionary>            keys are results of aBlock for all elements, values are corresponding elements that evaluated to the key
	""
	     #(1 2 3 4 5) reading groupedBy: [:each | each odd]
	"
	| result |
	result := Dictionary new.
	self do:
		[:each | | key collection |
		key := aBlock value: each.
		collection := result at: key ifAbsentPut: [OrderedCollection new].
		collection add: each].
	self species ~~ OrderedCollection ifTrue:
		["Convert the result collections to be the right type.
		  Note that it should be safe to modify the dictionary
		  while iterating because we only replace values for existing keys"
		result keysAndValuesDo:
			[:key :value | result at: key put: (self contentsSpecies withAll: value)]].

	^result
!

inject: initialValue into: binaryBlock
	"Accumulate a running value associated with evaluation of binaryBlock with the each element and the result of evaluation for previous element.
	The initialValue serves as the previous result for evaluation of the first element."
	"       initialValue    <Object>        servers as the previous result for evaluation of the first element.
		binaryBlock     <BlockClosure>  evaluated with each element and result of evaluation of previous element
	""
		(1 to: 10) reading inject: 0 into: [:subTotal :next | subTotal + next].
	"
	| nextValue |
	nextValue := initialValue.
	self do: [:each | nextValue := binaryBlock value: nextValue value: each].
	^nextValue
!

reject: aBlock
	"Evaluate aBlock with each element, collect into a collection only those elements for which aBlock evaluates to false."
	"       aBlock  <BlockClosure>  #reject: style block used to filter the elements
		^               <Collection>            all elements that evaluate to false
	""
		(1 to: 10) reading reject: [ :e | e odd ]
	"
	^self select: [:element | (aBlock value: element) == false]
!

select: aBlock
	"Evaluate aBlock with each element, collect into a collection only those elements for which aBlock evaluates to true."
	"       aBlock  <BlockClosure>  #select: style block used to filter the elements
		^               <Collection>            all elements that evaluate to true
	""
		(1 to: 10) reading select: [ :e | e odd ]
	"
	| contents |
	contents := self contentsSpecies new writing.
	self do: [:each | (aBlock value: each) ifTrue: [contents put: each]].
	^contents close; destination
! !

!ReadStream methodsFor:'initialize-release'!

close
	source close
!

contentsSpecies
	"Returns collection class suitable to hold elements of this stream."
	^self subclassResponsibility
!

on: aSource
	source := aSource
! !


!ReadStream methodsFor:'printing'!

printOn: aStream
	| stream |
	stream := String new writing.
	self streamingPrintOn: stream.
	aStream nextPutAll: stream conclusion.
	aStream cr.
	source printOn: aStream
!

streamingPrintOn: aStream
	aStream write: self class name
! !

!ReadStream methodsFor:'private'!

next
	"This is here for compatibility with the existing StreamEncoders so that they can be re-used with transformation streams for encoding."

	^self get
!

streamingInsert: anInteger into: aWriteStream
	aWriteStream streamingInsert: anInteger from: self
!

streamingInsertInto: aWriteStream
	^aWriteStream streamingInsertFrom: self
!

streamingWrite: anInteger into: aWriteStream
	^aWriteStream streamingWrite: anInteger from: self
!

streamingWriteInto: aWriteStream
	^aWriteStream streamingWriteFrom: self
! !

!ReadStream methodsFor:'seeking'!

++ anInteger
        "Seek forward by anInteger elements."
        "       anInteger       <Integer>       the number of elements to go forward by
                ^<Integer>      the number of elements skipped
        ""
                'Hello' reading ++ 2; rest
        "
        | count cache chunk read |
        anInteger < 0 ifTrue: [ ^self -- anInteger negated ].
        anInteger = 0 ifTrue: [^0].
        count := 0.
        chunk := DefaultBufferSize min: anInteger.
        cache := self contentsSpecies newRecycled: DefaultBufferSize.
        [[count < anInteger] whileTrue:
                [read := chunk min: (anInteger - count).
                self read: read into: cache at: 1.
                count := count + read]]
                        on: Incomplete do: [ :ex | cache recycle. (Incomplete count: count + ex count) raise].
        cache recycle.
        ^anInteger
!

+= anInteger
	"Seek from the start of the stream by anInteger elements. The stream must be positionable."
	"       anInteger       <Integer>       The number of elements to go forward by."
	"
		'Hello' reading rest; += 2; rest
	"
	^self position: anInteger
!

-- anInteger
	"Seek backward by anInteger elements. The stream must be positionable."
	"       anInteger       <Integer>       The number of elements to go back by."
	"
		'hello' reading rest; -- 3; rest
	"
	"Subclasses should reimplement this method if the stream is positionable."
	self isPositionable
		ifFalse:        [self error: 'This stream is not positionable.']
		ifTrue: [self subclassResponsibility]
!

-= anInteger
	"Seek backwards from the end of the stream by anInteger elements. The stream must be positionable."
	"       anInteger       <Integer>       number of elements to go back by
		^<Integer>      number of elements actually skipped
	""
		'Hello' reading -= 3; rest
	"
	| available |
	(self isPositionable not and: [anInteger isZero]) ifTrue:
		["If we are attempting to skip to end and the stream is not positionable, skip forward until we hit an incomplete"
		[[self ++ SmallInteger maxVal] repeat] on: Incomplete do: [].
		^0 ].
	available := anInteger min: self length.
	self position: self length - available.
	available = anInteger ifTrue: [^anInteger].
	^(Incomplete count: available) raise
!

available
	"Return the number of elements available. The stream must be positionable."
	"       ^       <Integer>       the number of elements available"
	"
		'Hello' reading ++2; available
	"
	^self length - self position
!

explore: aBlock
        " Explore the stream within the block but return to where we started when the block completes. The stream must be positionable."
        "       aBlock  <BlockClosure>  defines the exploration activity
                ^               <Object>        result of aBlock"
        "
                'Hello' reading explore: [ :s | s -= 0 ]; rest
        "
        | position |
        position := self position.
        ^[aBlock cull: self] ensure: [self position: position]
!

length
	"Return total length of the stream. The stream must be positionable."
	"       ^       <Integer>       the total number of elements in the stream. (position + available)"
	"
		'Hello' reading read: 2; length
	"
	"Subclasses should reimplement this method if the stream is positionable."
	^self isPositionable
		ifFalse:        [self error: 'This stream is not positionable.']
		ifTrue: [self subclassResponsibility]
!

position
	"Return current position of the stream. The stream must be positionable."
	"       ^       <Integer>       the current position in the stream."
	"
		'Hello' reading read: 2; position
	"
	"Subclasses should reimplement this method if the stream is positionable."
	^self isPositionable
		ifFalse:        [self error: 'This stream is not positionable.']
		ifTrue: [self subclassResponsibility]
!

position: anInteger
	"Change position of the stream to anInteger. The stream must be positionable."
	"       anInteger       <Integer>       the position to set the stream at.
		^<Integer>      the position the stream was set to
	""
		'Hello' reading position: 2; rest
	"
	"Subclasses should reimplement this method if the stream is positionable."
	self isPositionable
		ifFalse:        [self error: 'This stream is not positionable.']
		ifTrue: [self subclassResponsibility]
! !


!ReadStream methodsFor:'testing'!

isPositionable
	"Can this stream be positioned. Positionable streams support additional API: #position, #position:, ++, --, ..."

	^false
!

isReadable
	^true
!

isWritable
	^false
! !


!ReadStream class methodsFor:'documentation'!

version_HG

    ^ '$Changeset: <not expanded> $'
!

version_SVN
    ^ '$Id: Xtreams__ReadStream.st 75 2012-01-30 22:33:34Z mkobetic $'
! !