core/Xtreams__WriteStream.st
author Martin Kobetic <mkobetic@gmail.com>
Mon, 22 Aug 2011 15:42:51 +0000
changeset 2 faf220cbe5b9
child 7 e1b3d2820de7
permissions -rw-r--r--
first cut

"{ Package: 'stx:goodies/xtreams/core' }"

"{ NameSpace: Xtreams }"

Object subclass:#WriteStream
	instanceVariableNames:'destination'
	classVariableNames:'Backspace Bell CarriageReturn Delete DoubleQuote Escape FormFeed
		LineFeed Quote Space Tab VerticalTab'
	poolDictionaries:'Tools.BehaviorIcons'
	category:'My Classes'
!

WriteStream comment:'Abstract superclass of all write streams; defines the API.

Write streams are created by sending #writing to a concrete resource (a.k.a terminal), such as a Collection, SocketAccessor, Filename, etc.

	String new writing write: ''testing''; close; terminal

Transform write streams are created through one of the messages in the ''transforming'' protocol sent to other write streams.

	(String new writing collecting: #asUppercase) write: ''testing''; close; terminal

Subclasses must implement the following messages:
	#read:into:at:
	#contentsSpecies

Instance Variables
	destination     <Object> a stream or "terminal" consuming written elements

Shared Variables
	Backspace       <Character>
	Bell    <Character>
	CarriageReturn  <Character>
	Delete  <Character>
	DoubleQuote     <Character>
	Escape  <Character>
	FormFeed        <Character>
	LineFeed        <Character>
	Quote   <Character>
	Space   <Character>
	Tab     <Character>
	VerticalTab     <Character>

'
!


!WriteStream class methodsFor:'instance creation'!

on: aDestination
	^self new on: aDestination
! !

!WriteStream class methodsFor:'class initialization'!

initialize
        Backspace := String with: Character backspace.
        Bell := String with: (Character value: 7).
        CarriageReturn := String with: (Character value: 13).
        Delete := String with: (Character value: 127).
        DoubleQuote := String with: $".
        Escape := String with: (Character value: 27).
        FormFeed := String with: Character newPage.
        LineFeed := String with: Character lf.
        Quote := String with: $'.
        Space := String with: Character space.
        Tab := String with: Character tab.
        VerticalTab := String with: (Character value: 11)
! !

!WriteStream methodsFor:'accessing'!

conclusion
	"Close the stream and return the object at the bottom of the stream."
	"       ^<Collection | Buffer | IOAccessor | BlockClosure> "
	self close.
	^self terminal
!

destination

	^destination
!

insert: aStreamable
	"Insert aStreamable into self at current position."
	"       aStreamable     <SequenceableCollection | ReadStream | Buffer>  the source to write in to the destination
		^<Integer>      the number of elements written to the destination"
	"
		' World!!' copy writing insert: 'Hello' reading; -= 0; close; destination
	"
	^aStreamable streamingInsertInto: self
!

insert: anInteger from: aStreamable
	"Insert anIntegers worth of elements from aStreamable into self at current position."
	"       anInteger       <Integer>       the number of elements to insert
		aStreamable     <ReadStream | SequenceableCollection | Buffer > the source to write into the destination
		startIndex      <Integer>       the index into aSequenceableCollection to start writing from
		^<Integer>      number of elements inserted
	""
		' World!!' copy writing insert: 5 from: 'Hello Underworld!!' reading; -= 0; close; destination
	"
	aStreamable streamingInsert: anInteger into: self.
	^anInteger
!

insert: anInteger from: aSequenceableCollection at: startIndex
	"Insert anIntegers worth of elements from aSequenceableCollection starting at startIndex into self at current position."
	"       anInteger       <Integer>       the number of elements to insert
		aStreamable     <SequenceableCollection>        the source to write into the destination
		startIndex      <Integer>       the index into aSequenceableCollection to start writing from
		^<Integer>      number of elements inserted
	""
		' World!!' copy writing insert: 5 from: 'Hello' at: 1; -= 0; close; destination
	"
	self write: anInteger from: aSequenceableCollection at: startIndex.
	^anInteger
!

put: anObject
	"Write anObject into self."
	"       anObject                <Object>        the object to write in to the destination
		^                               <Object>        the object that was written to the destination
	""
		String new writing put: $h; close; destination
	"
	| cache |
	cache := self contentsSpecies newRecycled: 1.
	cache at: 1 put: anObject.
	self write: 1 from: cache at: 1.
	cache recycle.
	^anObject
!

terminal
	"Return the object at the bottom of the stream."
	"       ^<Collection | Buffer | IOAccessor | BlockClosure>
	"
	^(destination isKindOf: WriteStream)
		ifTrue: [ destination terminal ]
		ifFalse: [ destination ]
!

write: aStreamable
	"Write aStreamable into self."
	"       aStreamable     <SequenceableCollection | ReadStream | Buffer>  the source to write in to the destination
		^<Integer>      the number of elements written to the destination"
	"
		String new writing write: 'Hello' reading; close; destination
	"
	^aStreamable streamingWriteInto: self
!

write: anInteger from: aStreamable
	"Write anInteger's worth of elements from aStreamable into self."
	"       anInteger       <Integer>       the number of elements to write
		aStreamable     <SequenceableCollection | ReadStream | Buffer>  the source to write in to the destination
		^<Integer>      number of elements written
	""
		String new writing write: 3 from: 'Hello' reading; close; destination
	"
	^aStreamable streamingWrite: anInteger into: self
!

write: anInteger from: aSequenceableCollection at: startIndex
	"Write anIntegers worth of elements from aSequenceableCollection starting at startIndex into self."
	"       anInteger       <Integer>       the number of elements to write
		aStreamable     <SequenceableCollection>        the source to write in to the destination
		startIndex      <Integer>       the index into aSequenceableCollection to start writing from
		^<Integer>      number of elements written
	""
		String new writing write: 3 from: 'Hello' at: 2; close; destination
	"
	^self subclassResponsibility
! !

!WriteStream methodsFor:'characters'!

backspace
	self write: Backspace
!

bell
	self write: Bell
!

cr
	self write: CarriageReturn
!

delete
	self write: Delete
!

escape
	self write: Escape
!

ff
	self write: FormFeed
!

lf
	self write: LineFeed
!

print: anObject
	anObject streamingPrintOn: self
!

q
	self write: Quote
!

qq
	self write: DoubleQuote
!

space
	self write: Space
!

space: anInteger
	anInteger timesRepeat: [self space]
!

tab
	self write: Tab
!

tab: anInteger
	anInteger timesRepeat: [self tab]
!

vtab
	self write: VerticalTab
! !

!WriteStream methodsFor:'converting'!

writing
	^[:object | self nextPut: object] writing
		contentsSpecies: self contentsSpecies;
		yourself
! !

!WriteStream methodsFor:'initialize-release'!

close
	"Close the destination from any more writes."

	self flush.
	destination close
!

contentsSpecies
	"The class of collection that is able to hold the kind of elements that this stream consumes."
	"       ^       <Class> collection class
	"
	^self subclassResponsibility
!

flush
	"Make sure all the previously written elements are pushed down into the destination."
	destination flush
!

on: aDestination
	destination := aDestination
! !


!WriteStream methodsFor:'printing'!

printOn: aStream
	| stream |
	stream := String new writing.
	self streamingPrintOn: stream.
	aStream nextPutAll: stream conclusion.
	aStream cr.
	destination printOn: aStream.
!

streamingPrintOn: aStream
	aStream write: self class name
! !

!WriteStream methodsFor:'private'!

streamingInsert: anInteger from: aReadStream
	| cache count |
	cache := self contentsSpecies newRecycledAtLeast: anInteger.
	count := [aReadStream read: anInteger into: cache at: 1. anInteger] on: Incomplete do: [ :ex | ex count ].
	self insert: count from: cache at: 1.
	cache recycle.
	count < anInteger ifTrue: [(Incomplete count: count) raise]
!

streamingInsertFrom: aReadStream
	| count cache |
	count := 0.
	cache := self contentsSpecies newRecycledDefaultSize.
	[[aReadStream read: cache size into: cache at: 1] on: Incomplete do: [:exception |
		self insert: exception.
		cache recycle.
		^count + exception count].
	self insert: cache size from: cache at: 1.
	count := count + cache size] repeat
!

streamingWrite: anInteger from: aReadStream
	| cache toDo continue amount |
	cache := self contentsSpecies newRecycledDefaultSize.
	toDo := anInteger. continue := true.
	[ continue and: [ toDo > 0 ] ] whileTrue: [
		amount := [ aReadStream read: (cache size min: toDo) into: cache at: 1 ] on: Incomplete do: [ :ex | continue := false. ex count ].
		self write: amount from: cache at: 1.
		toDo := toDo - amount ].
	cache recycle.
	toDo > 0 ifTrue: [(Incomplete count: anInteger - toDo) raise].
	^anInteger
!

streamingWriteFrom: aReadStream
	| count cache |
	count := 0.
	cache := self contentsSpecies newRecycledDefaultSize.
	[[aReadStream read: cache size into: cache at: 1] on: Incomplete do: [:exception |
		self write: exception.
		cache recycle.
		^count + exception count].
	self write: cache size from: cache at: 1.
	count := count + cache size] repeat
! !

!WriteStream methodsFor:'seeking'!

++ anInteger
	"Seek forward by anInteger elements. The stream must be positionable."
	"       anInteger       <Integer>       the number of elements to go forward by.
		^<Integer>      the number of elements actually skipped
	"
	"
		'Hello Would' copy writing ++ 6; write: 'World'; close; destination
	"
	"Subclasses should reimplement this method if the stream is positionable."
	self isPositionable
		ifFalse:        [Incomplete zero raise]
		ifTrue: [self subclassResponsibility]
!

+= anInteger
	"Seek from the start of the stream by anInteger elements. The stream must be positionable."
	"       anInteger       <Integer>       The number of elements to go forward by."
	"
		String new writing write: 'Hello Would'; += 6; write: 'World'; close; destination
	"
	^self position: anInteger
!

-- anInteger
	"Seek backward by anInteger elements. The stream must be positionable."
	"       anInteger       <Integer>       The number of elements to go back by."
	"
		String new writing write: 'helio'; -- 2; write: 'lo'; close; destination
	"
	"Subclasses should reimplement this method if the stream is positionable."
	self isPositionable
		ifFalse:        [self error: 'This stream is not positionable.']
		ifTrue: [self subclassResponsibility]
!

-= anInteger
	"Seek backwards from the end of the stream by anInteger elements. The stream must be positionable."
	"       anInteger       <Integer>       The number of elements to go back by.
		^<Integer>      the number of elements actually skipped"
	"
		'Hello Would' copy writing -= 3; write: 'rld'; close; terminal
	"
	| available |
	available := anInteger min: self length.
	self position: self length - available.
	available = anInteger ifTrue: [ ^anInteger ].
	^(Incomplete count: available) raise
!

available
	"Return the number of elements from the current position to the end of the stream. The stream must be positionable."
	"       ^       <Integer>       the number of elements available"
	"
		String new writing write: 'Hello World'; -- 5; available
	"
	^self length - self position
!

explore: aBlock
	" Explore the stream within the block but return to where we started when the block completes. The stream must be positionable."
	"       aBlock  <BlockClosure>  defines the exploration activity
		^               <Object>        result of aBlock"
	"
		String new writing explore: [ :s | s write: 'Hello' ]; write: 'World'; close; destination
	"
	| position |
	position := self position.
	^[aBlock cull: self] ensure: [self position: position]
!

length
	"Return total length of the stream. The stream must be positionable."
	"       ^       <Integer>       the total number of elements in the stream. (position + available)"
	"
		'Hello World' copy writing ++ 5; length
	"
	"Subclasses should reimplement this method if the stream is positionable."
	^self isPositionable
		ifFalse:        [self error: 'This stream is not positionable.']
		ifTrue: [self subclassResponsibility]
!

position
	"Return current position of the stream. The stream must be positionable."
	"       ^       <Integer>       current position of the stream."
	"
		'Hello World' copy writing -= 5; position
	"
	"Subclasses should reimplement this method if the stream is positionable."
	^self isPositionable
		ifFalse:        [self error: 'This stream is not positionable.']
		ifTrue: [self subclassResponsibility]
!

position: anInteger
	"Change current position of the stream to anInteger. The stream must be positionable."
	"       anInteger       <Integer>       current position of the stream."
	"
		'Hello Would' copy writing position: 6; write: 'World'; close; destination
	"
	"Subclasses should reimplement this method if the stream is positionable."
	self isPositionable
		ifFalse:        [self error: 'This stream is not positionable.']
		ifTrue: [self subclassResponsibility]
! !


!WriteStream methodsFor:'testing'!

isPositionable
	"Can this stream be positioned. Positionable streams come with extra API: #position, #position:, etc."

	^false
!

isReadable
	^false
!

isWritable
	^true
! !

!WriteStream methodsFor:'transforming'!

buffering: bufferSize
	"Delays committing its content to its underlying stream until it has reached a certain size ,#flush is sent, or the stream is closed."
	"       bufferSize      <Integer> The size of the buffer to start with.
		^<PositionWriteStream>"
	"
		(ByteArray new writing buffering: 5)
			write: (ByteArray withAll: (1 to: 11));
			conclusion
	"
	^BufferedWriteStream on: self bufferSize: bufferSize
!

positioning
	"If necessary add positioning layer. Note that positiong layer employs buffering to implement the positioning ability. The default buffering strategy will grow the buffer up to the full size of the underlying stream if not released. Consequently other Buffer types might be more suitable for specific circumstances, e.g. if only last n elements need to be buffered, a fixed size RingBuffer can be substitued with #buffer: accessor."
	"       ^       <WriteStream>   a positionable read stream
	""
		[ :x | Transcript space; print: x ] writing positioning write: (1 to: 10); -- 5; write: (11 to: 15); close
	"
	^self isPositionable
		ifTrue: [self]
		ifFalse:        [PositionWriteStream on: self]
! !

!WriteStream class methodsFor:'documentation'!

version_SVN
    ^ '$Id$'
! !

WriteStream initialize!