transforms/extensions.st
author Jan Vrany <jan.vrany@fit.cvut.cz>
Wed, 01 Feb 2012 00:34:28 +0000
changeset 97 2a7827f4dce2
parent 72 d16c7d84d4a8
child 111 44ac233b2f83
permissions -rw-r--r--
pool name fixes

"{ Package: 'stx:goodies/xtreams/transforms' }"

!

!Xtreams::ReadStream methodsFor:'transforming'!

collecting: aBlock
	"Transform each element using #collect: style block."
	"	aBlock	<BlockClosure>	a #collect: style block used to tranform each element
		^<CollectReadSteam>
	""
		((1 to: 5) reading collecting: [ :e | e * e ]) rest
	""
		((65 to: 90) reading collecting: [ :e | e asCharacter ]) contentsSpecies: String; rest
	"
	^CollectReadStream on: self block: aBlock
! !
!Xtreams::ReadStream methodsFor:'transforming'!

depairing
	"Transform a stream of associations in to a stream of elements made up of the key and value association components."

	^self transforming: [:in :out |
		| association |
		association := in get.
		out put: association key.
		out put: association value]
! !
!Xtreams::ReadStream methodsFor:'transforming'!

doing: aBlock
	"Perform and action with each passing element using #do: style block."
	"	aBlock	<BlockClosure>	a #do: style block invoked with each element as it passes through the stream
		^<CollectReadSteam>
	""
		((1 to: 5) reading doing: [ :e | Transcript space; print: e * e ]) rest
	"
	^self collecting: [:each | (aBlock value: each). each]
! !
!Xtreams::ReadStream methodsFor:'transforming'!

duplicating: aWriteStream
	"Duplicate all the contents written into @aWriteStream"
	"	aWriteStream <WriteStream>	a stream to copy into
		^<DuplicatingReadSteam>
	""
		| copy |
		copy := ByteArray new writing.
		((0 to: 15) reading duplicating: copy) rest -> copy conclusion
	"
	^DuplicateReadStream on: self duplicate: aWriteStream
! !
!Xtreams::ReadStream methodsFor:'transforming'!

encoding: anEncoding
	"Transform bytes into characters using @anEncoding such as #utf8 or #ascii, etc. Any encoding supported by StreamEncoder is allowed.
	The encoding steam also performs automatic line end conversion from arbitrary platform convention to CRs, unless set into a transparent mode"
	"	anEncoding	<Symbol> encoding identifier recognized by StreamEncoder class>>new:
		^<EncodedReadStream>
	""
		((65 to: 90) reading encoding: #ascii) rest
	""
		| crlf text |
		crlf := String with: Character cr with: Character lf.
		text := ('Hello', crlf, 'World') asByteArrayEncoding: #ascii.
		(text reading encoding: #ascii) rest.
		(text reading encoding: #ascii) setLineEndTransparent; rest
	"
	^EncodeReadStream on: self encoding: anEncoding
! !
!Xtreams::ReadStream methodsFor:'transforming'!

encodingBase64
	"Decodes characters of base-64 encoding into bytes. Ignores any intervening whitespace.
	Automatically ends the stream if it encounters final padding characters $=."
	"	^<TransformReadStream>"
	"
		'AAECAwQFBgcICQo= and the rest should be ignored' reading encodingBase64 rest
	"
	| map cache |
	map := [ :char | ('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/' indexOf: char) - 1 ].
	cache := ByteString new: 4.
	^(self transforming: [ :in :out || count end block filter |
		filter := in rejecting: #isSeparator.
		count := [ filter read: 4 into: cache at: 1. 4 ] on: Incomplete do: [ :incomplete | incomplete count].
		count isZero ifTrue: [ Incomplete zero raise ].
		(end := cache indexOf: $=) isZero ifFalse: [ count := count min: end - 1 ].
		count < 2 ifTrue: [ Incomplete zero signal ].
		block := (1 to: 4) inject: 0 into: [ :total :i || sextet |
			sextet := count < i ifTrue: [ 0 ] ifFalse: [ map value: (cache at: i) ].
			sextet negative ifTrue: [ count := i ].
			(total bitShift: 6) + sextet ].
		2 to: count do: [ :i | out put: ((block bitShift: (i - 4) * 8) bitAnd: 255) ].
		count < 4 ifTrue: [ (Incomplete count: count) raise ] ])
			buffer: (RingBuffer on: (ByteArray new: 3));
			yourself
! !
!Xtreams::ReadStream methodsFor:'transforming'!

encodingHex
	"Decodes bytes hex characters."
	"	^<TransformReadStream>"
	"
		(ByteArray withAll: (1 to: 20)) reading encodingHex rest
	"
	| i2c |
	i2c := [ :i | '0123456789abcdef' at: i + 1 ].
	^(self transforming: [ :in :out || byte |
		byte := in get.
		out put: (i2c value: (byte bitShift: -4)).
		out put: (i2c value: (byte bitAnd: 15)) ])
		contentsSpecies: ByteString;
		yourself
! !
!Xtreams::ReadStream methodsFor:'transforming'!

injecting: initialObject into: aBlock
	"Accumulates a running value combined with each passing element using the binary aBlock. aBlock takes the result of the last evaluation and the next element as its arguments. Notable difference from the collection analog is that the streaming variant is a stream of all the intermediate values of the running value."
	"	initialObject	<Object> initial value used as the previous result for the evaluation of the first element
		aBlock	<BlockClosure> binary block combining the value of each element with previous result of its evaluation
		^<CollectingReadStream>"
	"
		((1 to: 10) reading injecting: 0 into: [ :total :each | each + total ]) rest
	"
	| nextObject |
	nextObject := initialObject.
	^self collecting: [:each | nextObject := aBlock cull: nextObject cull: each]
! !
!Xtreams::ReadStream methodsFor:'interpreting'!

interpreting: type
	"Converts bytes from a binary source according to provided @type. It produces elements of corresponding class, e.g. #float -> Float, #double -> Double, etc. Supported types are defined by the Interpretations shared class variable.
	""	type	<Symbol>	identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations)
		^		<InterpretedReadStream>
	""
		| doubles bytes |
		doubles := [ Random new next ] reading.
		bytes := (ByteArray new writing interpreting: #double)
			write: 10 from: doubles;
			close;
			terminal.
		(bytes reading interpreting: #double) read: 10
	"
	^self interpreting: type cacheSize: 1
! !
!Xtreams::ReadStream methodsFor:'interpreting'!

interpreting: type cacheSize: size
	"Converts bytes from a binary source according to provided @type. It produces elements of corresponding class, e.g. #float -> Float, #double -> Double, etc. Supported types are defined on class side of InterpretedBytes.
	""	type	<Symbol>	identifies a (primitive) CType, e.g. #float, #long (mapped via InterpretatedBytes)
		size		<Integer>	requested cache size (in number of elements)
		^		<InterpretedReadStream>
	""
		| doubles bytes |
		doubles := [ Random new next ] reading.
		bytes := (ByteArray new writing interpreting: #double cacheSize: 10)
			write: 10 from: doubles;
			close;
			terminal.
		(bytes reading interpreting: #double) read: 10
	"
	^InterpretedReadStream on: self type: type cacheSize: size
! !
!Xtreams::ReadStream methodsFor:'interpreting'!

interpreting: reader size: byteSize
	"Converts bytes from a binary source according to provided @reader block. The block is evaluated with an instance of InterpretedBytes and and index into it from which it should use byteSize bytes to make an object to return.
	""	reader		<BlockClosure>	reading block, e.g. [ :b :i | (b at: i) @ (b at: i + 1) ]
		byteSize	<Integer>	byte size of an element
		^			<InterpretedReadStream>
	""
		| doubles bytes |
		doubles := [ Random new next ] reading.
		bytes := (ByteArray new writing interpreting: #double)
			write: 10 from: doubles;
			close;
			terminal.
		(bytes reading interpreting: [ :b :i | (b floatAt: i) @ (b floatAt: i + 4) ] size: 8) read: 5
	"
	^InterpretedReadStream on: self bytesPerElement: byteSize contentsSpecies: Array operation: reader cacheSize: 1
! !
!Xtreams::ReadStream methodsFor:'interpreting'!

interpreting: reader size: byteSize cacheSize: cacheSize
	"Converts bytes from a binary source according to provided @reader block. The block is evaluated with an instance of InterpretedBytes and and index into it from which it should use byteSize bytes to make an object to return.
	""	reader		<BlockClosure>	reading block, e.g. [ :b :i | (b at: i) @ (b at: i + 1) ]
		byteSize	<Integer>	byte size of an element
		cacheSize	<Integer>	requested cache size (in number of elements)
		^			<InterpretedReadStream>
	""
		| points bytes |
		points := Random new reading transforming: [ :in :out | out put: in get @ in get ].
		bytes := (ByteArray new writing interpreting: [ :b :i :o | (b floatAt: i put: o x) @ (b floatAt: i + 4 put: o y) ] size: 8 )
			write: 10 from: points;
			close;
			terminal.
		(bytes reading interpreting: [ :b :i | (b floatAt: i) @ (b floatAt: i + 4) ] size: 8 cacheSize: 5) read: 5
	"
	^InterpretedReadStream on: self bytesPerElement: byteSize contentsSpecies: Array operation: reader cacheSize: cacheSize
! !
!Xtreams::ReadStream methodsFor:'interpreting'!

marshaling
	"Marshaling streams are used to encode arbitrary smalltalk objects into a sequence of bytes suitable for binary storage or transport. The format of the binary encoding is defined by an ObjectMarshaler and is identified by particular version ID. A marshaling read stream decodes objects from a binary source previously encoded by a marshaling write stream.
	""	^	<ObjectReadSteam>
	""
		| rectangle bytes |
		rectangle := 5 @ 5 extent: 5 @ 5.
		bytes := ByteArray new writing marshaling put: rectangle; conclusion.
		bytes reading marshaling get
	"
	^ObjectReadStream on: self
! !
!Xtreams::ReadStream methodsFor:'interpreting'!

marshaling: aMarshaler
	"Marshaling streams are used to encode arbitrary smalltalk objects into a sequence of bytes suitable for binary storage or transport. The format of the binary encoding is defined by an ObjectMarshaler and is identified by particular version ID. Custom marshaling schemes can be derived by subclassing ObjectMarshaler. Custom schemes must declare their own (unique) version ID. This method allows to employ a cusomt marshaler instead of the default one (STST2.0).
	A marshaling read stream decodes objects from a binary source previously encoded by a marshaling write stream.
	""	aMarshaler	<ObjectMarshaler>	implements custom marshaling format
		^			<ObjectReadSteam>
	""
		| rectangle bytes |
		rectangle := 5 @ 5 extent: 5 @ 5.
		bytes := (ByteArray new writing marshaling: ObjectMarshaler new) put: rectangle; conclusion.
		bytes reading marshaling get
	"
	^ObjectReadStream on: self marshaler: aMarshaler
! !
!Xtreams::ReadStream methodsFor:'transforming'!

monitoring: aNotificationBlock every: aNotificationInterval
	"Monitor the through-put of the receiver."
	"	aNotificationBlock <BlockClosure>	the block to execute when notifying
		aNotificationInterval <Duration>	how often to notify
		^<PositionReadSubstream>
	"

	"
		| monitor |
		monitor := ObjectMemory imageFilename reading
			monitoring: [:totalTransferred :deltaTransferred :elapsedMicroseconds |
				throughputSpeed := deltaTransferred.
				averageSpeed := (totalTransferred / elapsedMicroseconds) * 1000000.
				Transcript writing cr;
					write: 'average speed: '; print: averageSpeed asFloat;
					write: ' through-put speed: '; print: throughputSpeed asFloat;
					write: ' elapsed-time: '; print: elapsedMicroseconds / 1000000.0]
			every: 1 milliseconds.
		[monitor rest] ensure: [monitor close].
	"

	| previousPosition timer start notifyBlock monitoring notifyProcess notifyFinished |

	start := Time microsecondClock.
	previousPosition := 0.
	monitoring := nil.
	timer := nil.
	notifyFinished := false.

	notifyBlock := [
		aNotificationBlock cull: monitoring position cull: monitoring position - previousPosition cull: Time microsecondClock - start.
		previousPosition := monitoring position].

	notifyProcess := nil.
	notifyProcess := [
		[notifyBlock value. notifyFinished] whileFalse: [notifyProcess suspend]] newProcess.
	notifyProcess priority: ((Processor activeProcess priority + 1) min: 99).

	monitoring := self closing: [
		timer stop.
		notifyProcess resume.
		notifyFinished := true.
		notifyProcess resume.
		self close].

	timer := Timer every: aNotificationInterval resume: notifyProcess.
	^monitoring
! !
!Xtreams::ReadStream methodsFor:'transforming'!

pairing
	"Transform a stream of elements in to a stream of associations between even+odd elements of the stream. This expects the stream to have an even number of elements"

	^self transforming: [:in :out | out put: (Association key: in get value: in get)]
! !
!Xtreams::ReadStream methodsFor:'transforming'!

positioning
	"If necessary add positioning layer. Note that positiong layer employs buffering to implement the positioning ability. The default buffering strategy will grow the buffer up to the full size of the underlying stream if not released. Consequently other Buffer types might be more suitable for specific circumstances, e.g. if only last n elements need to be buffered, a fixed size RingBuffer can be substitued with #buffer: accessor."
	"       ^       <ReadStream>    a positionable read stream
	""
		[ Time now ] reading positioning ++ 3; -- 2; get
	"
	^self isPositionable
		ifTrue: [self]
		ifFalse:        [PositionReadStream on: self]
! !
!Xtreams::ReadStream methodsFor:'transforming'!

rejecting: aBlock
	"Filters elements from the source using aBlock. aBlock has the same form and semantics as the #reject: block on collections."
	"	aBlock	<BlockClosure>	usual #reject: style block used to filter the elements passing through
		^<TransformReadStream>"
	"
		((1 to: 10) reading rejecting: [ :e | e odd ]) rest
	"
	^self transforming: [:input :output |
		| value |
		[value := input get.
		aBlock cull: value] whileTrue.
		output put: value]
! !
!Xtreams::ReadStream methodsFor:'transforming'!

selecting: aBlock
	"Filters elements from the source using aBlock. aBlock has the same form and semantics as the #select: block on collections."
	"	aBlock	<BlockClosure>	usual #select: style block used to filter the elements passing through
		^<TransformReadStream>"
	"
		((1 to: 10) reading selecting: [ :e | e odd ]) rest
	"
	^self transforming: [:input :output |
		| value |
		[value := input get.
		aBlock cull: value] whileFalse.
		output put: value]
! !
!Xtreams::ReadStream methodsFor:'transforming'!

transforming: aBlock
	"This is the most general form of transform stream. The block receives two streams, the source (input) and a virtual stream of elements to be produced by the stream (output). The block can read arbitrary number of elements from input (including none) and write arbitrary number of elements into the output (including none). The block will be invoked as many times as necessary to produce the required number of elements, or until an Incomplete is raised. Consequently if the block handles Incomplete from the input, it has to raise another Incomplete at some point, otherwise the stream will never end.
	Note that if the contentSpecies of the source doesn't fit the output of the transformation, the contents species of the transform stream has to be set explicitly.
	""	aBlock	<BlockClosure>	binary transformation block that reads elements from input (first argument) and writes elements into output (second argument)
		^<TransformReadStream>
	""	Convert text into a stream of words
		('hello world!! bye world!!' reading transforming: [ :in :out || word char |
			word := String new writing.
			[	[  (char := in get) = Character space ] whileFalse: [ word put: char ].
			] ensure: [ out put: (word close; destination) ] ]
		)	contentsSpecies: Array;
			rest
	""	Convert a hex-string into a byte array (2 characters per byte)
		| c2d |
		c2d := [ :char | ('0123456789abcdef' indexOf: char) - 1 ].
		('0123456789abcdef' reading transforming: [ :in :out |
			out put: (c2d value: in get) * 16 + (c2d value: in get) ]
		)	buffer: (RingBuffer on: (ByteArray new: 1));
			rest
	"
	^TransformReadStream on: self block: aBlock
! !
!Xtreams::WriteStream methodsFor:'transforming'!

buffering: bufferSize
	"Delays committing its content to its underlying stream until it has reached a certain size ,#flush is sent, or the stream is closed."
	"       bufferSize      <Integer> The size of the buffer to start with.
		^<PositionWriteStream>"
	"
		(ByteArray new writing buffering: 5)
			write: (ByteArray withAll: (1 to: 11));
			conclusion
	"
	^BufferedWriteStream on: self bufferSize: bufferSize
! !
!Xtreams::WriteStream methodsFor:'transforming'!

collecting: aBlock
	"Transform each written element using #collect: style block."
	"	aBlock	<BlockClosure>	a #collect: style block used to tranform each element
		^<CollectWriteSteam>
	""
		(Array new writing collecting: [ :e | e * e ]) write: (1 to: 5); conclusion
	""
		(String new writing collecting: [ :e | e asCharacter ]) write: (65 to: 90); conclusion
	"
	^CollectWriteStream on: self block: aBlock
! !
!Xtreams::WriteStream methodsFor:'transforming'!

depairing
	"Transform a stream of associations in to a stream of elements made up of the key and value association components."

	^self transforming: [:in :out |
		| association |
		association := in get.
		out put: association key.
		out put: association value]
! !
!Xtreams::WriteStream methodsFor:'transforming'!

doing: aBlock
	"Perform and action with each passing element using #do: style block."
	"	aBlock	<BlockClosure>	a #do: style block invoked with each element as it passes through the stream
		^<CollectWriteSteam>
	""
		(Array new writing doing: [ :e | Transcript space; print: e * e ]) write: (1 to: 10); conclusion
	"
	^self collecting: [:each | (aBlock value: each). each]
! !
!Xtreams::WriteStream methodsFor:'transforming'!

duplicating: aWriteStream
	"Duplicate all the contents written into @aWriteStream"
	"	aWriteStream <WriteStream>	a stream to copy into
		^<DuplicateWriteSteam>
	""
		| original copy |
		original := Array new writing.
		copy := ByteArray new writing.
		(original duplicating: copy) write: (0 to: 15).
		original conclusion -> copy conclusion
	"
	^DuplicateWriteStream on: self duplicate: aWriteStream
! !
!Xtreams::WriteStream methodsFor:'transforming'!

encoding: anEncoding
	"Transform characters into bytes using @anEncoding such as #utf8 or #ascii, etc. Any encoding supported by StreamEncoder is allowed.
	The encoding steam also performs automatic conversion of CRs into the native line-end convention of the underlying platform,
	unless set into a different line-end convention mode"
	"	anEncoding	<Symbol> encoding identifier recognized by StreamEncoder class>>new:
		^<EncodedWriteStream>
	""
		(ByteArray new writing encoding: #ascii) write: 'abcdefghi'; conclusion
	""
		(ByteArray new writing encoding: #ascii) write: 'Hello\World' withCRs; conclusion
	""
		(ByteArray new writing encoding: #ascii) setLineEndCRLF; write: 'Hello\World' withCRs; conclusion
	"
	^EncodeWriteStream on: self encoding: anEncoding
! !
!Xtreams::WriteStream methodsFor:'transforming'!

encodingBase64
	"Encodes bytes into characters of base-64 encoding.
	Emits final padding characters ($=) as required, when the stream is closed."
	"	^<TransformWriteStream>"
	"
		String new writing encodingBase64 write: (ByteArray withAll: (1 to: 20)); conclusion
	"
	| map cache |
	map := [ :i | 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/' at: i + 1 ].
	cache := ByteArray new: 3.
	^(self transforming: [ :in :out | | count block shift |
		count := [ in read: 3 into: cache at: 1. 3 ] on: Incomplete do: [ :incomplete | incomplete count].
		count isZero ifTrue: [ Incomplete zero raise ].
		block := (1 to: count) inject: 0 into: [ :total :byte | (total bitShift: 8) + (cache at: byte)].
		shift := count * -8.
		1 to: count + 1 do: [:i | out put: (map value: ((block bitShift: (shift + (i * 6))) bitAnd: 63))].
		count < 3 ifTrue: [
			3 - count timesRepeat: [ out put: $= ].
			(Incomplete count: count) raise]])
		buffer: (RingBuffer on: (ByteArray new: 3));
		yourself
! !
!Xtreams::WriteStream methodsFor:'transforming'!

encodingHex
	"Encodes hex characters into bytes."
	"	^<TransformReadStream>"
	"
		ByteArray new writing encodingHex write: '010203fdfeff'; terminal
	"
	| c2i |
	c2i := [ :c | ('0123456789abcdef' indexOf: c asLowercase) - 1 ].
	^(self transforming: [ :in :out |
		out put: ((c2i value: in get) bitShift: 4) + (c2i value: in get) ])
		contentsSpecies: ByteString;
		yourself
! !
!Xtreams::WriteStream methodsFor:'transforming'!

injecting: initialObject into: aBlock
	"Accumulates a running value combined with each passing element using the binary aBlock. aBlock takes the result of the last evaluation and the next element as arguments. Notable difference from the collection analog is that the streaming variant is a stream of all the intermediate values of the running value."
	"	initialObject	<Object> initial value used as the previous result for the evaluation of the first element
		aBlock	<BlockClosure> binary block combining the value of each element with previous result of its evaluation
		^<CollectingWriteStream>"
	"
		(Array new writing injecting: 0 into: [ :total :each | each + total ]) write: (1 to: 10); conclusion
	"
	| nextObject |
	nextObject := initialObject.
	^self collecting: [:each | nextObject := aBlock cull: nextObject cull: each]
! !
!Xtreams::WriteStream methodsFor:'interpreting'!

interpreting: type
	"Converts consumed elements into bytes of pre-configured (primitive) CType, e.g. float, long etc. The type of the written elements must match the CType and the underlying destination must be binary.
	""	type	<Symbol>	identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations)
		^		<InterpretedWriteStream>
	""
		| doubles bytes |
		doubles := [ Random new next ] reading.
		bytes := (ByteArray new writing interpreting: #double)
			write: 10 from: doubles;
			close;
			terminal.
		(bytes reading interpreting: #double) read: 10
	"
	^self interpreting: type cacheSize: 1
! !
!Xtreams::WriteStream methodsFor:'interpreting'!

interpreting: type cacheSize: size
	"Converts consumed elements into bytes of pre-configured (primitive) CType, e.g. float, long etc. The type of the written elements must match the CType and the underlying destination must be binary.
	""	type	<Symbol>	identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations)
		size		<Integer>	requested buffer size (in number of elements)
		^		<InterpretedWriteStream>
	""
		| doubles bytes |
		doubles := [ Random new next ] reading.
		bytes := (ByteArray new writing interpreting: #double size: 10)
			write: 10 from: doubles;
			close;
			terminal.
		(bytes reading interpreting: #double) read: 10
	"
	^InterpretedWriteStream on: self type: type cacheSize: size
! !
!Xtreams::WriteStream methodsFor:'interpreting'!

interpreting: writer size: byteSize
	"Converts objects into bytes in a binary destination according to provided @writer block. The block is evaluated with an instance of InterpretedBytes an index and object to write into the bytes.
	""	type		<Symbol>	identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations)
		byteSize	<Integer>	byte size of an element
		^			<InterpretedWriteStream>
	""
		| points bytes |
		points := Random new reading transforming: [ :in :out | out put: in get @ in get ].
		bytes := (ByteArray new writing interpreting: [ :b :i :o | (b floatAt: i put: o x) @ (b floatAt: i + 4 put: o y) ] size: 8 )
			write: 10 from: points;
			close;
			terminal.
		(bytes reading interpreting: [ :b :i | (b floatAt: i) @ (b floatAt: i + 4) ] size: 8 cacheSize: 5) read: 5
	"
	^InterpretedWriteStream on: self bytesPerElement: byteSize contentsSpecies: Array operation: writer cacheSize: 1
! !
!Xtreams::WriteStream methodsFor:'interpreting'!

interpreting: writer size: byteSize cacheSize: cacheSize
	"Converts objects into bytes in a binary destination according to provided @writer block. The block is evaluated with an instance of InterpretedBytes an index and object to write into the bytes.
	""	type		<Symbol>	identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations)
		byteSize	<Integer>	byte size of an element
		cacheSize	<Integer>	requested cache size (in number of elements)
		^			<InterpretedWriteStream>
	""
		| points bytes |
		points := Random new reading transforming: [ :in :out | out put: in get @ in get ].
		bytes := (ByteArray new writing interpreting: [ :b :i :o | (b floatAt: i put: o x) @ (b floatAt: i + 4 put: o y) ] size: 8 )
			write: 10 from: points;
			close;
			terminal.
		(bytes reading interpreting: [ :b :i | (b floatAt: i) @ (b floatAt: i + 4) ] size: 8 cacheSize: 5) read: 5
	"
	^InterpretedWriteStream on: self bytesPerElement: byteSize contentsSpecies: Array operation: writer cacheSize: cacheSize
! !
!Xtreams::WriteStream methodsFor:'interpreting'!

marshaling
	"Marshaling streams are used to encode arbitrary smalltalk objects into a sequence of bytes suitable for binary storage or transport. The format of the binary encoding is defined by an ObjectMarshaler and is identified by particular version ID.
	A marshaling write stream encodes objects into a binary destination stream.
	""	^			<ObjectWriteSteam>
	""
		| rectangle bytes |
		rectangle := 5 @ 5 extent: 5 @ 5.
		bytes := ByteArray new writing marshaling put: rectangle; conclusion.
		bytes reading marshaling get
	"
	^ObjectWriteStream on: self
! !
!Xtreams::WriteStream methodsFor:'interpreting'!

marshaling: aMarshaler
	"Marshaling streams are used to encode arbitrary smalltalk objects into a sequence of bytes suitable for binary storage or transport. The format of the binary encoding is defined by an ObjectMarshaler and is identified by particular version ID. Custom marshaling schemes can be derived by subclassing ObjectMarshaler. Custom schemes must declare their own (unique) version ID. This method allows to employ a cusomt marshaler instead of the default one (STST2.0).
	A marshaling write stream encodes objects into a binary destination stream.
	""	aMarshaler	<ObjectMarshaler>	implements custom marshaling format
		^			<ObjectWriteSteam>
	""
		| rectangle bytes |
		rectangle := 5 @ 5 extent: 5 @ 5.
		bytes := (ByteArray new writing marshaling: ObjectMarshaler new) put: rectangle; conclusion.
		bytes reading marshaling get
	"
	^ObjectWriteStream on: self marshaler: aMarshaler
! !
!Xtreams::WriteStream methodsFor:'transforming'!

monitoring: aNotificationBlock every: aNotificationInterval
	"Monitor the through-put of the receiver."
	"	aNotificationBlock <BlockClosure>	the block to execute when notifying
		aNotificationInterval <Duration>	how often to notify
		^<PositionWriteSubstream>
	"

	| previousPosition timer start notifyBlock monitoring notifyProcess notifyFinished |

	start := Time microsecondClock.
	previousPosition := 0.
	monitoring := nil.
	timer := nil.
	notifyFinished := false.

	notifyBlock := [
		aNotificationBlock cull: monitoring position cull: monitoring position - previousPosition cull: Time microsecondClock - start.
		previousPosition := monitoring position].

	notifyProcess := nil.
	notifyProcess := [
		[notifyBlock value. notifyFinished] whileFalse: [notifyProcess suspend]] newProcess.
	notifyProcess priority: ((Processor activeProcess priority + 1) min: 99).

	monitoring := self closing: [
		timer stop.
		notifyProcess resume.
		notifyFinished := true.
		notifyProcess resume.
		self close].

	timer := Timer every: aNotificationInterval resume: notifyProcess.
	^monitoring
! !
!Xtreams::WriteStream methodsFor:'transforming'!

pairing
	"Transform a stream of elements in to a stream of associations between even+odd elements of the stream. This expects the stream to have an even number of elements"

	^self transforming: [:in :out | out put: (Association key: in get value: in get)]
! !
!Xtreams::WriteStream methodsFor:'transforming'!

positioning
	"If necessary add positioning layer. Note that positiong layer employs buffering to implement the positioning ability. The default buffering strategy will grow the buffer up to the full size of the underlying stream if not released. Consequently other Buffer types might be more suitable for specific circumstances, e.g. if only last n elements need to be buffered, a fixed size RingBuffer can be substitued with #buffer: accessor."
	"       ^       <WriteStream>   a positionable read stream
	""
		[ :x | Transcript space; print: x ] writing positioning write: (1 to: 10); -- 5; write: (11 to: 15); close
	"
	^self isPositionable
		ifTrue: [self]
		ifFalse:        [PositionWriteStream on: self]
! !
!Xtreams::WriteStream methodsFor:'transforming'!

rejecting: aBlock
	"Filters written elements using aBlock. aBlock has the same form and semantics as the #reject: block on collections."
	"	aBlock	<BlockClosure>	usual #reject: style block used to filter the elements passing through
		^<TransformWriteStream>"
	"
		(Array new writing rejecting: [ :e | e odd ]) write: (1 to: 10); conclusion
	"
	^self selecting: [:each | (aBlock cull: each) not]
! !
!Xtreams::WriteStream methodsFor:'transforming'!

selecting: aBlock
	"Filters written elements using aBlock. aBlock has the same form and semantics as the #select: block on collections."
	"	aBlock	<BlockClosure>	usual #select: style block used to filter the elements passing through
		^<TransformWriteStream>"
	"
		(Array new writing selecting: [ :e | e odd ]) write: (1 to: 10); conclusion
	"
	^self transforming: [:input :output |
		| value |
		[value := input get.
		aBlock cull: value] whileFalse.
		output put: value]
! !
!Xtreams::WriteStream methodsFor:'transforming'!

transforming: aBlock
	"This is the most general form of transform stream. The block receives two streams, a virtual stream of written elements (input) and the destination (output). The block can read arbitrary number of elements from input (including none) and write arbitrary number of elements into the output (including none). The block will be invoked as many times as necessary to consume any written elements, or until an Incomplete is raised by the destination.
	Note that if the #contentSpecies of the destination doesn't fit the input of the transformation, the #contentsSpecies of the transform stream has to be set explicitly.
	""	aBlock	<BlockClosure>	binary transformation block that reads elements from input (first argument) and writes elements into output (second argument)
		^<TransformWriteStream>
	""	Convert text into a stream of words
		(Array new writing transforming: [ :in :out || word char |
			word := String new writing.
			[	[  (char := in get) = Character space ] whileFalse: [ word put: char ].
			] ensure: [ out put: (word close; destination) ] ]
		)	write: 'hello world!! bye world!!';
			close;
			terminal
	""	Convert a hex-string into a byte array (2 characters per byte)
		| c2d |
		c2d := [ :char | ('0123456789abcdef' indexOf: char) - 1 ].
		(ByteArray new writing transforming: [ :in :out |
			out put: (c2d value: in get) * 16 + (c2d value: in get) ]
		)	contentsSpecies: String;
			write: '0123456789abcdef';
			close;
			terminal
	"
	^TransformWriteStream on: self block: aBlock
! !