author | joe |
Tue, 19 Mar 2013 23:58:40 -0400 | |
changeset 116 | fa5b4c9f582d |
parent 111 | 44ac233b2f83 |
permissions | -rw-r--r-- |
10 | 1 |
"{ Package: 'stx:goodies/xtreams/transforms' }" |
2 |
||
3 |
"{ NameSpace: Xtreams }" |
|
4 |
||
5 |
ReadStream subclass:#TransformReadStream |
|
6 |
instanceVariableNames:'buffer bufferWriting sourceAtEnd block closeBlock' |
|
7 |
classVariableNames:'' |
|
111
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
8 |
poolDictionaries:'XtreamsPool' |
27
2cc5a8a3ca14
added XtreamsPool to fix DefaultBufferSize; set proper category names
mkobetic
parents:
10
diff
changeset
|
9 |
category:'Xtreams-Transforms' |
10 | 10 |
! |
11 |
||
12 |
TransformReadStream comment:'Transform read stream provides the most expressive form of transformation. The transformation is described by a binary block that is given two arguments, @input and @output. Both @input and @output are streams themselves. The block can read arbitrary amount of elements from @input (including none) and write arbitrary amount of elements into @output (including none).The block will be invoked as many times as necessary to produce the required number of elements, or until an Incomplete is raised. Consequently if the block handles Incomplete from the input, it has to raise another Incomplete at some point, otherwise the stream will never end. |
|
13 |
||
14 |
Note that if the contentSpecies of the source doesn''t fit the output of the transformation, the #contentsSpecies of the transform stream has to be set explicitly. The #contentSpecies determines the type of collection employed by the internal buffer that is used as the storage for the virtual output stream of the transformation block. In some cases it might be desirable to tweak the buffering strategy of the virtual output stream. In that case the buffer of the stream can be set manually to any kind of Buffer that matches the requirements of the transformation. |
|
15 |
The closing behavior of the stream can be customized through the closeBlock. The default closeBlock simply propagates #close to the source as with any other transform stream. |
|
16 |
||
17 |
Instance Variables |
|
18 |
buffer <Buffer> holds the contents of the virtual ouput stream |
|
19 |
bufferWriting <BufferWriteStream> the virtual output stream that is passed to the transformation block as the second argument |
|
20 |
sourceAtEnd <Boolean> flag indicating that the source has signalled an Incomplete |
|
21 |
block <BlockClosure> binary transformation block that reads elements from input (first argument) and writes elements into output (second argument) |
|
22 |
closeBlock <BlockClosure> binary block invoked in response to the #close message, allows customizing the close behavior |
|
23 |
||
24 |
' |
|
25 |
! |
|
26 |
||
27 |
||
28 |
!TransformReadStream class methodsFor:'instance creation'! |
|
29 |
||
30 |
on: aReadStream block: block |
|
31 |
^self new on: aReadStream block: block |
|
32 |
! ! |
|
33 |
||
34 |
!TransformReadStream methodsFor:'accessing'! |
|
35 |
||
36 |
block |
|
37 |
^block |
|
38 |
! |
|
39 |
||
40 |
buffer |
|
41 |
^buffer |
|
42 |
! |
|
43 |
||
44 |
closeBlock |
|
45 |
^closeBlock |
|
46 |
! |
|
47 |
||
48 |
get |
|
49 |
(self fillBufferIfRequired: 1) ifFalse: [Incomplete zero raise]. |
|
50 |
^buffer get |
|
51 |
! |
|
52 |
||
53 |
read: anInteger into: aSequenceableCollection at: startIndex |
|
54 |
| count read | |
|
55 |
anInteger isZero ifTrue: [^0]. |
|
56 |
count := 0. |
|
57 |
[count < anInteger] whileTrue: |
|
58 |
[(self fillBufferIfRequired: anInteger - count) ifFalse: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]. |
|
59 |
read := (anInteger - count) min: buffer readSize. |
|
60 |
buffer read: read into: aSequenceableCollection at: startIndex + count. |
|
61 |
count := count + read]. |
|
62 |
^anInteger |
|
63 |
! ! |
|
64 |
||
65 |
!TransformReadStream methodsFor:'initialize-release'! |
|
66 |
||
67 |
buffer: aBuffer |
|
68 |
buffer recycle. |
|
69 |
buffer := aBuffer. |
|
70 |
bufferWriting := buffer writing |
|
71 |
! |
|
72 |
||
73 |
close |
|
74 |
buffer ifNil: [^self]. |
|
75 |
sourceAtEnd := true. |
|
76 |
closeBlock cull: source cull: self. |
|
77 |
buffer recycle. |
|
78 |
buffer := nil |
|
79 |
! |
|
80 |
||
81 |
closeBlock: anObject |
|
82 |
closeBlock := anObject |
|
83 |
! |
|
84 |
||
85 |
contentsSpecies |
|
86 |
^buffer contentsSpecies |
|
87 |
! |
|
88 |
||
89 |
contentsSpecies: aClass |
|
90 |
||
91 |
self buffer: (buffer class new: buffer cacheSize class: aClass) |
|
92 |
! |
|
93 |
||
94 |
on: aReadStream block: aBlock |
|
95 |
||
96 |
self on: aReadStream. |
|
97 |
block := aBlock. |
|
98 |
closeBlock := [ :sourceStream | sourceStream close]. |
|
99 |
sourceAtEnd := false. |
|
100 |
buffer := ElasticBuffer new: DefaultBufferSize class: aReadStream contentsSpecies. |
|
101 |
bufferWriting := buffer writing |
|
102 |
! ! |
|
103 |
||
104 |
!TransformReadStream methodsFor:'printing'! |
|
105 |
||
106 |
streamingPrintOn: aStream |
|
107 |
super streamingPrintOn: aStream. |
|
108 |
aStream |
|
109 |
space; |
|
110 |
write: '#'; |
|
111 |
write: block method homeMethod selector; |
|
112 |
write: ' buffered: '; |
|
113 |
print: buffer readSize. |
|
114 |
buffer readSize isZero ifTrue: [^self]. |
|
115 |
aStream |
|
116 |
cr; tab; |
|
117 |
print: buffer contentsFuture |
|
118 |
! ! |
|
119 |
||
120 |
!TransformReadStream methodsFor:'private'! |
|
121 |
||
122 |
fillBuffer: anInteger |
|
123 |
" |
|
124 |
We have exhausted our buffer, so fill it up again from our inner stream. Do not read more than is available from the inner stream or more than was originally requested. But try to be as greedy as possible, since filling our buffer is faster than always re-filling our buffer with one object at a time. Also, cap it at @bufferSize, because if we're reading from in-memory or on-disk sources, all the data will be immediately available - which could be gigs worth. We don't want to pull all of that in to our buffer. |
|
125 |
The semantics of our block are interesting. We give it both an input and an output stream. The block must write something to the output stream otherwise we assume this means we're at the end of the stream. It does not need to read from the input stream, but if it does and that stream fires EndOfStream, that also marks us as end of stream. |
|
126 |
" |
|
127 |
| count available | |
|
128 |
count := 0. |
|
129 |
[[available := buffer readSize. |
|
130 |
block value: source value: bufferWriting. |
|
131 |
count := count + (buffer readSize - available). |
|
132 |
count >= anInteger ifTrue: [^self]. |
|
133 |
buffer hasSpaceToWrite ifFalse: [^self]] |
|
134 |
repeat] |
|
135 |
on: Incomplete do: [:ex | ^sourceAtEnd := true]. |
|
136 |
! |
|
137 |
||
138 |
fillBufferIfRequired: anInteger |
|
139 |
" |
|
140 |
If we have any data available in our buffer, return true. If not, attempt to fill up our buffer and return true if that worked. Try not to fill the buffer more than was originally requested (@anInteger), but we make sure we only fill up to what's available from our inner stream. We don't guarantee we'll fill our buffer up to @anInteger's worth, just that we'll attempt to fill the buffer. |
|
141 |
" |
|
142 |
buffer ifNil: [^false]. |
|
143 |
buffer hasDataToRead ifTrue: [^true]. |
|
144 |
sourceAtEnd ifTrue: [^false]. |
|
145 |
self fillBuffer: anInteger. |
|
146 |
^buffer hasDataToRead |
|
147 |
! ! |
|
148 |
||
149 |
!TransformReadStream class methodsFor:'documentation'! |
|
150 |
||
111
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
151 |
version_HG |
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
152 |
|
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
153 |
^ '$Changeset: <not expanded> $' |
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
154 |
! |
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
155 |
|
10 | 156 |
version_SVN |
157 |
^ '$Id$' |
|
158 |
! ! |
|
111
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
159 |