author | Jan Vrany <jan.vrany@fit.cvut.cz> |
Wed, 01 Feb 2012 00:34:28 +0000 | |
changeset 97 | 2a7827f4dce2 |
parent 72 | d16c7d84d4a8 |
child 111 | 44ac233b2f83 |
permissions | -rw-r--r-- |
10 | 1 |
"{ Package: 'stx:goodies/xtreams/transforms' }" |
2 |
||
3 |
"{ NameSpace: Xtreams }" |
|
4 |
||
5 |
WriteStream subclass:#TransformWriteStream |
|
6 |
instanceVariableNames:'buffer block closeBlock process incompleteCount readReady |
|
7 |
writeReady closeReady' |
|
8 |
classVariableNames:'' |
|
97 | 9 |
poolDictionaries:'Xtreams::XtreamsPool' |
27
2cc5a8a3ca14
added XtreamsPool to fix DefaultBufferSize; set proper category names
mkobetic
parents:
10
diff
changeset
|
10 |
category:'Xtreams-Transforms' |
10 | 11 |
! |
12 |
||
13 |
TransformWriteStream comment:'Transform write stream provides the most expressive form of transformation. The transformation is described by a binary block that is given two arguments, @input and @output. The @input is a virtual stream of elements written into the stream. The @output is the destination stream under the transform stream. The block can read arbitrary amount of elements from @input (including none) and write arbitrary amount of elements into @output (including none). The block will be invoked as many times as necessary to consume everything written into the stream, or until an Incomplete is raised by the destination. |
|
14 |
||
15 |
The closing behavior of the stream can be customized through the closeBlock. The default closeBlock simply propagates #close to the destination as with any other transform stream. |
|
16 |
||
17 |
In some cases it might be desirable to tweak the buffering strategy of the virtual input stream. In that case the buffer of the stream can be set manually to any kind of Buffer that matches the requirements of the transformation. |
|
18 |
||
19 |
From the point of view of the API, the TransformWriteStream is very much like the TransformReadStream. Notably any valid transform block should work the same way on either read or write stream without modification. However to preserve the invariants and expressivity of the transform block, the implementation is vastly different. Primarily it is necessary to convert all the writes into a virtual stream of written elements that can be passed into the transform block as the input stream. Consequently the transformation itself needs to be suspended if there weren''t enough elements written yet, to complete an iteration of the transform block. Therefore it needs to run in its own process. Any writes get redirected into an internal buffer and a background process repeatedly invokes the transform block to drain the contents of the buffer and produces output into the destination. Obviously buffer access has to be synchronized between any writing threads and the background process. The readReady/writeReady semaphores work in a lock-step mode, to interleave the background buffer draining with any writes. |
|
20 |
||
21 |
Instance Variables |
|
22 |
buffer <Buffer> holds the contents of the virtual input stream |
|
23 |
block <BlockClosure> binary transformation block that reads elements from input (first argument) and writes elements into output (second argument) |
|
24 |
closeBlock <BlockClosure> binary block invoked in response to the #close message, allows customizing the close behavior |
|
25 |
process <Process> background process that runs the transformation block |
|
26 |
incompleteCount <Integer> indicates that the transformation raised or received Incomplete and how many elements were actually consumed by the transformation block so that we can reraise Incomplete with correct count in the client thread |
|
27 |
readReady <Semaphore> signals to the background process that elements were written into the buffer |
|
28 |
writeReady <Semaphore> gates any writes into the stream, making sure background process is not draining the buffer at the same time |
|
29 |
closeReady <Semaphore> signals back to the user thread that the background process finished draining the buffer and the stream is properly closed, so the #close call can return |
|
30 |
||
31 |
' |
|
32 |
! |
|
33 |
||
34 |
||
35 |
!TransformWriteStream class methodsFor:'instance creation'! |
|
36 |
||
37 |
on: aWriteStream block: block |
|
38 |
^self new on: aWriteStream block: block |
|
39 |
! ! |
|
40 |
||
41 |
!TransformWriteStream methodsFor:'accessing'! |
|
42 |
||
43 |
block |
|
44 |
^block |
|
45 |
! |
|
46 |
||
47 |
buffer |
|
48 |
^buffer |
|
49 |
! |
|
50 |
||
51 |
closeBlock |
|
52 |
^closeBlock |
|
53 |
! |
|
54 |
||
55 |
insert: anInteger from: aSequenceableCollection at: startIndex |
|
56 |
| count amount | |
|
57 |
anInteger isZero ifTrue: [^0]. |
|
58 |
||
59 |
count := 0. |
|
60 |
[count < anInteger] whileTrue: |
|
61 |
[amount := anInteger - count. |
|
62 |
buffer hasFixedWriteSpace ifTrue: [amount := amount min: buffer cacheSize]. |
|
63 |
buffer insert: amount from: aSequenceableCollection at: startIndex + count. |
|
64 |
||
65 |
"There is now data in the buffer for the drain to read" |
|
66 |
readReady signal. |
|
67 |
writeReady wait. |
|
68 |
incompleteCount == nil ifFalse: [(Incomplete count: count + incompleteCount) raise]. |
|
69 |
count := count + amount ]. |
|
70 |
^anInteger |
|
71 |
! |
|
72 |
||
73 |
put: anObject |
|
74 |
incompleteCount == nil ifFalse: [Incomplete zero raise]. |
|
75 |
||
76 |
buffer put: anObject. |
|
77 |
||
78 |
"There is now data in the buffer for the drain to read" |
|
79 |
readReady signal. |
|
80 |
writeReady wait |
|
81 |
! |
|
82 |
||
83 |
write: anInteger from: aSequenceableCollection at: startIndex |
|
84 |
| count amount | |
|
85 |
anInteger isZero ifTrue: [^0]. |
|
86 |
||
87 |
count := 0. |
|
88 |
[count < anInteger] whileTrue: |
|
89 |
[amount := anInteger - count. |
|
90 |
buffer hasFixedWriteSpace ifTrue: [amount := amount min: buffer cacheSize]. |
|
91 |
buffer write: amount from: aSequenceableCollection at: startIndex + count. |
|
92 |
||
93 |
"There is now data in the buffer for the drain to read" |
|
94 |
readReady signal. |
|
95 |
writeReady wait. |
|
96 |
incompleteCount == nil ifFalse: [(Incomplete count: count + incompleteCount) raise]. |
|
97 |
count := count + amount ]. |
|
98 |
^anInteger |
|
99 |
! ! |
|
100 |
||
101 |
!TransformWriteStream methodsFor:'initialize-release'! |
|
102 |
||
103 |
buffer: aBuffer |
|
104 |
buffer := aBuffer. |
|
105 |
self drainBuffer |
|
106 |
! |
|
107 |
||
108 |
close |
|
109 |
buffer ifNil: [^self]. |
|
110 |
"Write nothing to the buffer, but signal that there is data to read, causing the drain to throw Incomplete" |
|
111 |
readReady signal. |
|
112 |
||
113 |
"Wait for the drain to finish up completely. Rearm the closeReady incase somebody calls #close again." |
|
114 |
closeReady wait. |
|
115 |
closeReady signal. |
|
116 |
||
117 |
closeBlock cull: destination cull: self. |
|
118 |
||
119 |
buffer recycle. |
|
120 |
buffer := nil |
|
121 |
! |
|
122 |
||
123 |
closeBlock: anObject |
|
124 |
closeBlock := anObject |
|
125 |
! |
|
126 |
||
127 |
contentsSpecies |
|
128 |
^buffer contentsSpecies |
|
129 |
! |
|
130 |
||
131 |
contentsSpecies: aClass |
|
132 |
||
133 |
self buffer: (buffer class new: buffer cacheSize class: aClass) |
|
134 |
! |
|
135 |
||
136 |
on: aStreamable block: aBlock |
|
137 |
super on: aStreamable. |
|
138 |
block := aBlock. |
|
139 |
incompleteCount := nil. |
|
140 |
closeBlock := [ :destinationStream | destinationStream close]. |
|
141 |
buffer := ElasticBuffer new: DefaultBufferSize class: aStreamable contentsSpecies. |
|
142 |
self drainBuffer |
|
143 |
! ! |
|
144 |
||
145 |
!TransformWriteStream methodsFor:'printing'! |
|
146 |
||
147 |
streamingPrintOn: aStream |
|
148 |
super streamingPrintOn: aStream. |
|
149 |
aStream |
|
150 |
space; |
|
151 |
write: '#'; |
|
152 |
write: block method homeMethod selector; |
|
153 |
write: ' buffered: '; |
|
154 |
print: buffer writeSize. |
|
155 |
buffer writeSize isZero ifTrue: [^self]. |
|
156 |
aStream |
|
157 |
cr; tab; |
|
158 |
print: buffer contentsPast |
|
159 |
! ! |
|
160 |
||
161 |
!TransformWriteStream methodsFor:'private'! |
|
162 |
||
163 |
drainBuffer |
|
164 |
| reading | |
|
165 |
process ifNotNil: [process terminate]. |
|
166 |
incompleteCount == nil ifFalse: [^self]. |
|
167 |
||
168 |
closeReady := Semaphore new. |
|
169 |
readReady := Semaphore new. |
|
170 |
writeReady := Semaphore new. |
|
171 |
reading := buffer reading transforming: [:in :out | |
|
172 |
| count | |
|
173 |
"Wait for the main process to have written to the buffer" |
|
174 |
readReady wait. |
|
175 |
||
176 |
"If nothing was written to the buffer, this indicates we're closing, we raise Incomplete" |
|
177 |
(count := buffer readSize) isZero ifTrue: [Incomplete zero raise]. |
|
178 |
out write: count from: in. |
|
179 |
||
180 |
"Allow the main process to write to the buffer" |
|
181 |
writeReady signal]. |
|
182 |
process := [ |
|
183 |
[[block value: reading value: destination] repeat] on: Incomplete do: []. |
|
184 |
incompleteCount := reading buffer readPosition. |
|
185 |
||
186 |
"Signal that we're closed so that an attempt to write or close will immediately complete" |
|
187 |
writeReady signal. |
|
188 |
closeReady signal] |
|
189 |
newProcess. |
|
190 |
process resume |
|
191 |
! ! |
|
192 |
||
193 |
!TransformWriteStream class methodsFor:'documentation'! |
|
194 |
||
195 |
version_SVN |
|
196 |
^ '$Id$' |
|
197 |
! ! |