transforms/Xtreams__TransformWriteStream.st
author Jan Vrany <jan.vrany@fit.cvut.cz>
Wed, 01 Feb 2012 00:34:28 +0000
changeset 97 2a7827f4dce2
parent 72 d16c7d84d4a8
child 111 44ac233b2f83
permissions -rw-r--r--
pool name fixes

"{ Package: 'stx:goodies/xtreams/transforms' }"

"{ NameSpace: Xtreams }"

WriteStream subclass:#TransformWriteStream
	instanceVariableNames:'buffer block closeBlock process incompleteCount readReady
		writeReady closeReady'
	classVariableNames:''
	poolDictionaries:'Xtreams::XtreamsPool'
	category:'Xtreams-Transforms'
!

TransformWriteStream comment:'Transform write stream provides the most expressive form of transformation. The transformation is described by a binary block that is given two arguments, @input and @output. The @input is a virtual stream of elements written into the stream. The @output is the destination stream under the transform stream. The block can read arbitrary amount of elements from @input (including none) and write arbitrary amount of elements into @output (including none). The block will be invoked as many times as necessary to consume everything written into the stream, or until an Incomplete is raised by the destination.

The closing behavior of the stream can be customized through the closeBlock. The default closeBlock simply propagates #close to the destination as with any other transform stream.

In some cases it might be desirable to tweak the buffering strategy of the virtual input stream. In that case the buffer of the stream can be set manually to any kind of Buffer that matches the requirements of the transformation.

From the point of view of the API, the TransformWriteStream is very much like the TransformReadStream. Notably any valid transform block should work the same way on either read or write stream without modification. However to preserve the invariants and expressivity of the transform block, the implementation is vastly different. Primarily it is necessary to convert all the writes into a virtual stream of written elements that can be passed into the transform block as the input stream. Consequently the transformation itself needs to be suspended if there weren''t enough elements written yet, to complete an iteration of the transform block. Therefore it needs to run in its own process. Any writes get redirected into an internal buffer and a background process repeatedly invokes the transform block to drain the contents of the buffer and produces output into the destination. Obviously buffer access has to be synchronized between any writing threads and the background process. The readReady/writeReady semaphores work in a lock-step mode, to interleave the background buffer draining with any writes.

Instance Variables
	buffer	<Buffer> holds the contents of the virtual input stream
	block	<BlockClosure> binary transformation block that reads elements from input (first argument) and writes elements into output (second argument)
	closeBlock	<BlockClosure> binary block invoked in response to the #close message, allows customizing the close behavior
	process	<Process> background process that runs the transformation block
	incompleteCount	<Integer> indicates that the transformation raised or received Incomplete and how many elements were actually consumed by the transformation block so that we can reraise Incomplete with correct count in the client thread
	readReady	<Semaphore> signals to the background process that elements were written into the buffer
	writeReady	<Semaphore> gates any writes into the stream, making sure background process is not draining the buffer at the same time
	closeReady	<Semaphore> signals back to the user thread that the background process finished draining the buffer and the stream is properly closed, so the #close call can return

'
!


!TransformWriteStream class methodsFor:'instance creation'!

on: aWriteStream block: block
	^self new on: aWriteStream block: block
! !

!TransformWriteStream methodsFor:'accessing'!

block
	^block
!

buffer
	^buffer
!

closeBlock
	^closeBlock
!

insert: anInteger from: aSequenceableCollection at: startIndex
	| count amount |
	anInteger isZero ifTrue: [^0].

	count := 0.
	[count < anInteger] whileTrue:
		[amount := anInteger - count.
		buffer hasFixedWriteSpace ifTrue: [amount := amount min: buffer cacheSize].
		buffer insert: amount from: aSequenceableCollection at: startIndex + count.

		"There is now data in the buffer for the drain to read"
		readReady signal.
		writeReady wait.
		incompleteCount == nil ifFalse: [(Incomplete count: count + incompleteCount) raise].
		count := count + amount ].
	^anInteger
!

put: anObject
	incompleteCount == nil ifFalse: [Incomplete zero raise].

	buffer put: anObject.

	"There is now data in the buffer for the drain to read"
	readReady signal.
	writeReady wait
!

write: anInteger from: aSequenceableCollection at: startIndex
	| count amount |
	anInteger isZero ifTrue: [^0].

	count := 0.
	[count < anInteger] whileTrue:
		[amount := anInteger - count.
		buffer hasFixedWriteSpace ifTrue: [amount := amount min: buffer cacheSize].
		buffer write: amount from: aSequenceableCollection at: startIndex + count.
		
		"There is now data in the buffer for the drain to read"
		readReady signal.
		writeReady wait.
		incompleteCount == nil ifFalse: [(Incomplete count: count + incompleteCount) raise].
		count := count + amount ].
	^anInteger
! !

!TransformWriteStream methodsFor:'initialize-release'!

buffer: aBuffer
	buffer := aBuffer.
	self drainBuffer
!

close
	buffer ifNil: [^self].
	"Write nothing to the buffer, but signal that there is data to read, causing the drain to throw Incomplete"
	readReady signal.

	"Wait for the drain to finish up completely. Rearm the closeReady incase somebody calls #close again."
	closeReady wait.
	closeReady signal.

	closeBlock cull: destination cull: self.

	buffer recycle.
	buffer := nil
!

closeBlock: anObject
	closeBlock := anObject
!

contentsSpecies
	^buffer contentsSpecies
!

contentsSpecies: aClass

	self buffer: (buffer class new: buffer cacheSize class: aClass)
!

on: aStreamable block: aBlock
	super on: aStreamable.
	block := aBlock.
	incompleteCount := nil.
	closeBlock := [ :destinationStream | destinationStream close].
	buffer := ElasticBuffer new: DefaultBufferSize class: aStreamable contentsSpecies.
	self drainBuffer
! !

!TransformWriteStream methodsFor:'printing'!

streamingPrintOn: aStream
	super streamingPrintOn: aStream.
	aStream
		space;
		write: '#';
		write: block method homeMethod selector;
		write: ' buffered: ';
		print: buffer writeSize.
	buffer writeSize isZero ifTrue: [^self].
	aStream
		cr; tab;
		print: buffer contentsPast
! !

!TransformWriteStream methodsFor:'private'!

drainBuffer
	| reading |
	process ifNotNil: [process terminate].
	incompleteCount == nil ifFalse: [^self].

	closeReady := Semaphore new.
	readReady := Semaphore new.
	writeReady := Semaphore new.
	reading := buffer reading transforming: [:in :out |
		| count |
		"Wait for the main process to have written to the buffer"
		readReady wait.

		"If nothing was written to the buffer, this indicates we're closing, we raise Incomplete"
		(count := buffer readSize) isZero ifTrue: [Incomplete zero raise].
		out write: count from: in.

		"Allow the main process to write to the buffer"
		writeReady signal].
	process := [
		[[block value: reading value: destination] repeat] on: Incomplete do: [].
		incompleteCount := reading buffer readPosition.

		"Signal that we're closed so that an attempt to write or close will immediately complete"
		writeReady signal.
		closeReady signal]
			newProcess.
	process resume
! !

!TransformWriteStream class methodsFor:'documentation'!

version_SVN
    ^ '$Id$'
! !