author | mkobetic |
Sun, 15 Jan 2012 02:15:02 +0000 | |
changeset 36 | cf68c4beeb11 |
parent 33 | c6555f32c9fe |
child 44 | 217e67f9f1a3 |
permissions | -rw-r--r-- |
36 | 1 |
'From Smalltalk/X, Version:6.2.1 on 14-01-2012 at 09:12:29 PM' ! |
2 |
||
2 | 3 |
"{ Package: 'stx:goodies/xtreams/core' }" |
4 |
||
5 |
"{ NameSpace: Xtreams }" |
|
6 |
||
7 |
Object subclass:#WriteStream |
|
8 |
instanceVariableNames:'destination' |
|
9 |
classVariableNames:'Backspace Bell CarriageReturn Delete DoubleQuote Escape FormFeed |
|
10 |
LineFeed Quote Space Tab VerticalTab' |
|
36 | 11 |
poolDictionaries:'XtreamsPool' |
20
51de794993c3
added XtreamsPool to fix DefaultBufferSize; set proper category names
mkobetic
parents:
7
diff
changeset
|
12 |
category:'Xtreams-Core' |
2 | 13 |
! |
14 |
||
15 |
WriteStream comment:'Abstract superclass of all write streams; defines the API. |
|
16 |
||
17 |
Write streams are created by sending #writing to a concrete resource (a.k.a terminal), such as a Collection, SocketAccessor, Filename, etc. |
|
18 |
||
19 |
String new writing write: ''testing''; close; terminal |
|
20 |
||
21 |
Transform write streams are created through one of the messages in the ''transforming'' protocol sent to other write streams. |
|
22 |
||
23 |
(String new writing collecting: #asUppercase) write: ''testing''; close; terminal |
|
24 |
||
25 |
Subclasses must implement the following messages: |
|
26 |
#read:into:at: |
|
27 |
#contentsSpecies |
|
28 |
||
29 |
Instance Variables |
|
30 |
destination <Object> a stream or "terminal" consuming written elements |
|
31 |
||
32 |
Shared Variables |
|
33 |
Backspace <Character> |
|
34 |
Bell <Character> |
|
35 |
CarriageReturn <Character> |
|
36 |
Delete <Character> |
|
37 |
DoubleQuote <Character> |
|
38 |
Escape <Character> |
|
39 |
FormFeed <Character> |
|
40 |
LineFeed <Character> |
|
41 |
Quote <Character> |
|
42 |
Space <Character> |
|
43 |
Tab <Character> |
|
44 |
VerticalTab <Character> |
|
45 |
||
46 |
' |
|
47 |
! |
|
48 |
||
49 |
||
50 |
!WriteStream class methodsFor:'instance creation'! |
|
51 |
||
52 |
on: aDestination |
|
53 |
^self new on: aDestination |
|
54 |
! ! |
|
55 |
||
56 |
!WriteStream class methodsFor:'class initialization'! |
|
57 |
||
58 |
initialize |
|
59 |
Backspace := String with: Character backspace. |
|
60 |
Bell := String with: (Character value: 7). |
|
61 |
CarriageReturn := String with: (Character value: 13). |
|
62 |
Delete := String with: (Character value: 127). |
|
63 |
DoubleQuote := String with: $". |
|
64 |
Escape := String with: (Character value: 27). |
|
65 |
FormFeed := String with: Character newPage. |
|
66 |
LineFeed := String with: Character lf. |
|
67 |
Quote := String with: $'. |
|
68 |
Space := String with: Character space. |
|
69 |
Tab := String with: Character tab. |
|
70 |
VerticalTab := String with: (Character value: 11) |
|
71 |
! ! |
|
72 |
||
73 |
!WriteStream methodsFor:'accessing'! |
|
74 |
||
75 |
conclusion |
|
76 |
"Close the stream and return the object at the bottom of the stream." |
|
77 |
" ^<Collection | Buffer | IOAccessor | BlockClosure> " |
|
78 |
self close. |
|
79 |
^self terminal |
|
80 |
! |
|
81 |
||
82 |
destination |
|
83 |
||
84 |
^destination |
|
85 |
! |
|
86 |
||
87 |
insert: aStreamable |
|
88 |
"Insert aStreamable into self at current position." |
|
89 |
" aStreamable <SequenceableCollection | ReadStream | Buffer> the source to write in to the destination |
|
90 |
^<Integer> the number of elements written to the destination" |
|
91 |
" |
|
92 |
' World!!' copy writing insert: 'Hello' reading; -= 0; close; destination |
|
93 |
" |
|
94 |
^aStreamable streamingInsertInto: self |
|
95 |
! |
|
96 |
||
97 |
insert: anInteger from: aStreamable |
|
98 |
"Insert anIntegers worth of elements from aStreamable into self at current position." |
|
99 |
" anInteger <Integer> the number of elements to insert |
|
100 |
aStreamable <ReadStream | SequenceableCollection | Buffer > the source to write into the destination |
|
101 |
startIndex <Integer> the index into aSequenceableCollection to start writing from |
|
102 |
^<Integer> number of elements inserted |
|
103 |
"" |
|
104 |
' World!!' copy writing insert: 5 from: 'Hello Underworld!!' reading; -= 0; close; destination |
|
105 |
" |
|
106 |
aStreamable streamingInsert: anInteger into: self. |
|
107 |
^anInteger |
|
108 |
! |
|
109 |
||
110 |
insert: anInteger from: aSequenceableCollection at: startIndex |
|
111 |
"Insert anIntegers worth of elements from aSequenceableCollection starting at startIndex into self at current position." |
|
112 |
" anInteger <Integer> the number of elements to insert |
|
113 |
aStreamable <SequenceableCollection> the source to write into the destination |
|
114 |
startIndex <Integer> the index into aSequenceableCollection to start writing from |
|
115 |
^<Integer> number of elements inserted |
|
116 |
"" |
|
117 |
' World!!' copy writing insert: 5 from: 'Hello' at: 1; -= 0; close; destination |
|
118 |
" |
|
119 |
self write: anInteger from: aSequenceableCollection at: startIndex. |
|
120 |
^anInteger |
|
121 |
! |
|
122 |
||
123 |
put: anObject |
|
124 |
"Write anObject into self." |
|
125 |
" anObject <Object> the object to write in to the destination |
|
126 |
^ <Object> the object that was written to the destination |
|
127 |
"" |
|
128 |
String new writing put: $h; close; destination |
|
129 |
" |
|
130 |
| cache | |
|
131 |
cache := self contentsSpecies newRecycled: 1. |
|
132 |
cache at: 1 put: anObject. |
|
133 |
self write: 1 from: cache at: 1. |
|
134 |
cache recycle. |
|
135 |
^anObject |
|
136 |
! |
|
137 |
||
138 |
terminal |
|
139 |
"Return the object at the bottom of the stream." |
|
140 |
" ^<Collection | Buffer | IOAccessor | BlockClosure> |
|
141 |
" |
|
142 |
^(destination isKindOf: WriteStream) |
|
143 |
ifTrue: [ destination terminal ] |
|
144 |
ifFalse: [ destination ] |
|
145 |
! |
|
146 |
||
147 |
write: aStreamable |
|
148 |
"Write aStreamable into self." |
|
149 |
" aStreamable <SequenceableCollection | ReadStream | Buffer> the source to write in to the destination |
|
150 |
^<Integer> the number of elements written to the destination" |
|
151 |
" |
|
152 |
String new writing write: 'Hello' reading; close; destination |
|
153 |
" |
|
154 |
^aStreamable streamingWriteInto: self |
|
155 |
! |
|
156 |
||
157 |
write: anInteger from: aStreamable |
|
158 |
"Write anInteger's worth of elements from aStreamable into self." |
|
159 |
" anInteger <Integer> the number of elements to write |
|
160 |
aStreamable <SequenceableCollection | ReadStream | Buffer> the source to write in to the destination |
|
161 |
^<Integer> number of elements written |
|
162 |
"" |
|
163 |
String new writing write: 3 from: 'Hello' reading; close; destination |
|
164 |
" |
|
165 |
^aStreamable streamingWrite: anInteger into: self |
|
166 |
! |
|
167 |
||
168 |
write: anInteger from: aSequenceableCollection at: startIndex |
|
169 |
"Write anIntegers worth of elements from aSequenceableCollection starting at startIndex into self." |
|
170 |
" anInteger <Integer> the number of elements to write |
|
171 |
aStreamable <SequenceableCollection> the source to write in to the destination |
|
172 |
startIndex <Integer> the index into aSequenceableCollection to start writing from |
|
173 |
^<Integer> number of elements written |
|
174 |
"" |
|
175 |
String new writing write: 3 from: 'Hello' at: 2; close; destination |
|
176 |
" |
|
177 |
^self subclassResponsibility |
|
178 |
! ! |
|
179 |
||
180 |
!WriteStream methodsFor:'characters'! |
|
181 |
||
182 |
backspace |
|
183 |
self write: Backspace |
|
184 |
! |
|
185 |
||
186 |
bell |
|
187 |
self write: Bell |
|
188 |
! |
|
189 |
||
190 |
cr |
|
191 |
self write: CarriageReturn |
|
192 |
! |
|
193 |
||
194 |
delete |
|
195 |
self write: Delete |
|
196 |
! |
|
197 |
||
198 |
escape |
|
199 |
self write: Escape |
|
200 |
! |
|
201 |
||
202 |
ff |
|
203 |
self write: FormFeed |
|
204 |
! |
|
205 |
||
206 |
lf |
|
207 |
self write: LineFeed |
|
208 |
! |
|
209 |
||
210 |
print: anObject |
|
211 |
anObject streamingPrintOn: self |
|
212 |
! |
|
213 |
||
214 |
q |
|
215 |
self write: Quote |
|
216 |
! |
|
217 |
||
218 |
||
219 |
self write: DoubleQuote |
|
220 |
! |
|
221 |
||
222 |
space |
|
223 |
self write: Space |
|
224 |
! |
|
225 |
||
226 |
space: anInteger |
|
227 |
anInteger timesRepeat: [self space] |
|
228 |
! |
|
229 |
||
230 |
tab |
|
231 |
self write: Tab |
|
232 |
! |
|
233 |
||
234 |
tab: anInteger |
|
235 |
anInteger timesRepeat: [self tab] |
|
236 |
! |
|
237 |
||
238 |
vtab |
|
239 |
self write: VerticalTab |
|
240 |
! ! |
|
241 |
||
242 |
!WriteStream methodsFor:'converting'! |
|
243 |
||
244 |
writing |
|
245 |
^[:object | self nextPut: object] writing |
|
246 |
contentsSpecies: self contentsSpecies; |
|
247 |
yourself |
|
248 |
! ! |
|
249 |
||
250 |
!WriteStream methodsFor:'initialize-release'! |
|
251 |
||
252 |
close |
|
253 |
"Close the destination from any more writes." |
|
254 |
||
255 |
self flush. |
|
256 |
destination close |
|
257 |
! |
|
258 |
||
259 |
contentsSpecies |
|
260 |
"The class of collection that is able to hold the kind of elements that this stream consumes." |
|
261 |
" ^ <Class> collection class |
|
262 |
" |
|
263 |
^self subclassResponsibility |
|
264 |
! |
|
265 |
||
266 |
flush |
|
267 |
"Make sure all the previously written elements are pushed down into the destination." |
|
268 |
destination flush |
|
269 |
! |
|
270 |
||
271 |
on: aDestination |
|
272 |
destination := aDestination |
|
273 |
! ! |
|
274 |
||
36 | 275 |
!WriteStream methodsFor:'interpreting'! |
276 |
||
277 |
interpreting: type |
|
278 |
"Converts consumed elements into bytes of pre-configured (primitive) CType, e.g. float, long etc. The type of the written elements must match the CType and the underlying destination must be binary. |
|
279 |
"" type <Symbol> identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations) |
|
280 |
^ <InterpretedWriteStream> |
|
281 |
"" |
|
282 |
| doubles bytes | |
|
283 |
doubles := [ Random new next ] reading. |
|
284 |
bytes := (ByteArray new writing interpreting: #double) |
|
285 |
write: 10 from: doubles; |
|
286 |
close; |
|
287 |
terminal. |
|
288 |
(bytes reading interpreting: #double) read: 10 |
|
289 |
" |
|
290 |
^self interpreting: type cacheSize: 1 |
|
291 |
! |
|
292 |
||
293 |
interpreting: type cacheSize: size |
|
294 |
"Converts consumed elements into bytes of pre-configured (primitive) CType, e.g. float, long etc. The type of the written elements must match the CType and the underlying destination must be binary. |
|
295 |
"" type <Symbol> identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations) |
|
296 |
size <Integer> requested buffer size (in number of elements) |
|
297 |
^ <InterpretedWriteStream> |
|
298 |
"" |
|
299 |
| doubles bytes | |
|
300 |
doubles := [ Random new next ] reading. |
|
301 |
bytes := (ByteArray new writing interpreting: #double size: 10) |
|
302 |
write: 10 from: doubles; |
|
303 |
close; |
|
304 |
terminal. |
|
305 |
(bytes reading interpreting: #double) read: 10 |
|
306 |
" |
|
307 |
^InterpretedWriteStream on: self type: type cacheSize: size |
|
308 |
! |
|
309 |
||
310 |
interpreting: writer size: byteSize |
|
311 |
"Converts objects into bytes in a binary destination according to provided @writer block. The block is evaluated with an instance of InterpretedBytes an index and object to write into the bytes. |
|
312 |
"" type <Symbol> identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations) |
|
313 |
byteSize <Integer> byte size of an element |
|
314 |
^ <InterpretedWriteStream> |
|
315 |
"" |
|
316 |
| points bytes | |
|
317 |
points := Random new reading transforming: [ :in :out | out put: in get @ in get ]. |
|
318 |
bytes := (ByteArray new writing interpreting: [ :b :i :o | (b floatAt: i put: o x) @ (b floatAt: i + 4 put: o y) ] size: 8 ) |
|
319 |
write: 10 from: points; |
|
320 |
close; |
|
321 |
terminal. |
|
322 |
(bytes reading interpreting: [ :b :i | (b floatAt: i) @ (b floatAt: i + 4) ] size: 8 cacheSize: 5) read: 5 |
|
323 |
" |
|
324 |
^InterpretedWriteStream on: self bytesPerElement: byteSize contentsSpecies: Array operation: writer cacheSize: 1 |
|
325 |
! |
|
326 |
||
327 |
interpreting: writer size: byteSize cacheSize: cacheSize |
|
328 |
"Converts objects into bytes in a binary destination according to provided @writer block. The block is evaluated with an instance of InterpretedBytes an index and object to write into the bytes. |
|
329 |
"" type <Symbol> identifies a (primitive) CType, e.g. #float, #long (mapped via Interpretations) |
|
330 |
byteSize <Integer> byte size of an element |
|
331 |
cacheSize <Integer> requested cache size (in number of elements) |
|
332 |
^ <InterpretedWriteStream> |
|
333 |
"" |
|
334 |
| points bytes | |
|
335 |
points := Random new reading transforming: [ :in :out | out put: in get @ in get ]. |
|
336 |
bytes := (ByteArray new writing interpreting: [ :b :i :o | (b floatAt: i put: o x) @ (b floatAt: i + 4 put: o y) ] size: 8 ) |
|
337 |
write: 10 from: points; |
|
338 |
close; |
|
339 |
terminal. |
|
340 |
(bytes reading interpreting: [ :b :i | (b floatAt: i) @ (b floatAt: i + 4) ] size: 8 cacheSize: 5) read: 5 |
|
341 |
" |
|
342 |
^InterpretedWriteStream on: self bytesPerElement: byteSize contentsSpecies: Array operation: writer cacheSize: cacheSize |
|
343 |
! |
|
344 |
||
345 |
marshaling |
|
346 |
"Marshaling streams are used to encode arbitrary smalltalk objects into a sequence of bytes suitable for binary storage or transport. The format of the binary encoding is defined by an ObjectMarshaler and is identified by particular version ID. |
|
347 |
A marshaling write stream encodes objects into a binary destination stream. |
|
348 |
"" ^ <ObjectWriteSteam> |
|
349 |
"" |
|
350 |
| rectangle bytes | |
|
351 |
rectangle := 5 @ 5 extent: 5 @ 5. |
|
352 |
bytes := ByteArray new writing marshaling put: rectangle; conclusion. |
|
353 |
bytes reading marshaling get |
|
354 |
" |
|
355 |
^ObjectWriteStream on: self |
|
356 |
! |
|
357 |
||
358 |
marshaling: aMarshaler |
|
359 |
"Marshaling streams are used to encode arbitrary smalltalk objects into a sequence of bytes suitable for binary storage or transport. The format of the binary encoding is defined by an ObjectMarshaler and is identified by particular version ID. Custom marshaling schemes can be derived by subclassing ObjectMarshaler. Custom schemes must declare their own (unique) version ID. This method allows to employ a cusomt marshaler instead of the default one (STST2.0). |
|
360 |
A marshaling write stream encodes objects into a binary destination stream. |
|
361 |
"" aMarshaler <ObjectMarshaler> implements custom marshaling format |
|
362 |
^ <ObjectWriteSteam> |
|
363 |
"" |
|
364 |
| rectangle bytes | |
|
365 |
rectangle := 5 @ 5 extent: 5 @ 5. |
|
366 |
bytes := (ByteArray new writing marshaling: ObjectMarshaler new) put: rectangle; conclusion. |
|
367 |
bytes reading marshaling get |
|
368 |
" |
|
369 |
^ObjectWriteStream on: self marshaler: aMarshaler |
|
370 |
! ! |
|
2 | 371 |
|
372 |
!WriteStream methodsFor:'printing'! |
|
373 |
||
374 |
printOn: aStream |
|
375 |
| stream | |
|
376 |
stream := String new writing. |
|
377 |
self streamingPrintOn: stream. |
|
378 |
aStream nextPutAll: stream conclusion. |
|
379 |
aStream cr. |
|
380 |
destination printOn: aStream. |
|
381 |
! |
|
382 |
||
383 |
streamingPrintOn: aStream |
|
384 |
aStream write: self class name |
|
385 |
! ! |
|
386 |
||
387 |
!WriteStream methodsFor:'private'! |
|
388 |
||
36 | 389 |
nextPut: anObject |
390 |
"This is here for compatibility with the existing StreamEncoders so that they can be re-used with transformation streams for encoding." |
|
391 |
self put: anObject. |
|
392 |
^anObject |
|
393 |
! |
|
394 |
||
395 |
nextPutAll: aCollection |
|
396 |
"This is here for compatibility with the existing StreamEncoders so that they can be re-used with transformation streams for encoding." |
|
397 |
self write: aCollection. |
|
398 |
^aCollection |
|
399 |
! |
|
400 |
||
2 | 401 |
streamingInsert: anInteger from: aReadStream |
29 | 402 |
| cache count | |
403 |
cache := self contentsSpecies newRecycled: (anInteger max: DefaultBufferSize). |
|
404 |
count := [aReadStream read: anInteger into: cache at: 1. anInteger] on: Incomplete do: [ :ex | ex count ]. |
|
405 |
self insert: count from: cache at: 1. |
|
406 |
cache recycle. |
|
407 |
count < anInteger ifTrue: [(Incomplete count: count) raise] |
|
2 | 408 |
! |
409 |
||
410 |
streamingInsertFrom: aReadStream |
|
29 | 411 |
| count cache | |
412 |
count := 0. |
|
413 |
cache := self contentsSpecies newRecycled: DefaultBufferSize. |
|
414 |
[[aReadStream read: cache size into: cache at: 1] on: Incomplete do: [:exception | |
|
415 |
self insert: exception. |
|
416 |
cache recycle. |
|
417 |
^count + exception count]. |
|
418 |
self insert: cache size from: cache at: 1. |
|
419 |
count := count + cache size] repeat |
|
2 | 420 |
! |
421 |
||
422 |
streamingWrite: anInteger from: aReadStream |
|
29 | 423 |
| cache toDo continue amount | |
424 |
cache := self contentsSpecies newRecycled: DefaultBufferSize. |
|
425 |
toDo := anInteger. continue := true. |
|
426 |
[ continue and: [ toDo > 0 ] ] whileTrue: [ |
|
427 |
amount := [ aReadStream read: (cache size min: toDo) into: cache at: 1 ] on: Incomplete do: [ :ex | continue := false. ex count ]. |
|
428 |
self write: amount from: cache at: 1. |
|
429 |
toDo := toDo - amount ]. |
|
430 |
cache recycle. |
|
431 |
toDo > 0 ifTrue: [(Incomplete count: anInteger - toDo) raise]. |
|
432 |
^anInteger |
|
2 | 433 |
! |
434 |
||
435 |
streamingWriteFrom: aReadStream |
|
29 | 436 |
| count cache | |
437 |
count := 0. |
|
438 |
cache := self contentsSpecies newRecycled: DefaultBufferSize. |
|
439 |
[[aReadStream read: cache size into: cache at: 1] on: Incomplete do: [:exception | |
|
440 |
self write: exception. |
|
441 |
cache recycle. |
|
442 |
^count + exception count]. |
|
443 |
self write: cache size from: cache at: 1. |
|
444 |
count := count + cache size] repeat |
|
2 | 445 |
! ! |
446 |
||
447 |
!WriteStream methodsFor:'seeking'! |
|
448 |
||
449 |
++ anInteger |
|
450 |
"Seek forward by anInteger elements. The stream must be positionable." |
|
451 |
" anInteger <Integer> the number of elements to go forward by. |
|
452 |
^<Integer> the number of elements actually skipped |
|
453 |
" |
|
454 |
" |
|
455 |
'Hello Would' copy writing ++ 6; write: 'World'; close; destination |
|
456 |
" |
|
457 |
"Subclasses should reimplement this method if the stream is positionable." |
|
458 |
self isPositionable |
|
459 |
ifFalse: [Incomplete zero raise] |
|
460 |
ifTrue: [self subclassResponsibility] |
|
461 |
! |
|
462 |
||
463 |
+= anInteger |
|
464 |
"Seek from the start of the stream by anInteger elements. The stream must be positionable." |
|
465 |
" anInteger <Integer> The number of elements to go forward by." |
|
466 |
" |
|
467 |
String new writing write: 'Hello Would'; += 6; write: 'World'; close; destination |
|
468 |
" |
|
469 |
^self position: anInteger |
|
470 |
! |
|
471 |
||
472 |
-- anInteger |
|
473 |
"Seek backward by anInteger elements. The stream must be positionable." |
|
474 |
" anInteger <Integer> The number of elements to go back by." |
|
475 |
" |
|
476 |
String new writing write: 'helio'; -- 2; write: 'lo'; close; destination |
|
477 |
" |
|
478 |
"Subclasses should reimplement this method if the stream is positionable." |
|
479 |
self isPositionable |
|
480 |
ifFalse: [self error: 'This stream is not positionable.'] |
|
481 |
ifTrue: [self subclassResponsibility] |
|
482 |
! |
|
483 |
||
484 |
-= anInteger |
|
485 |
"Seek backwards from the end of the stream by anInteger elements. The stream must be positionable." |
|
486 |
" anInteger <Integer> The number of elements to go back by. |
|
487 |
^<Integer> the number of elements actually skipped" |
|
488 |
" |
|
489 |
'Hello Would' copy writing -= 3; write: 'rld'; close; terminal |
|
490 |
" |
|
491 |
| available | |
|
492 |
available := anInteger min: self length. |
|
493 |
self position: self length - available. |
|
494 |
available = anInteger ifTrue: [ ^anInteger ]. |
|
495 |
^(Incomplete count: available) raise |
|
496 |
! |
|
497 |
||
498 |
available |
|
499 |
"Return the number of elements from the current position to the end of the stream. The stream must be positionable." |
|
500 |
" ^ <Integer> the number of elements available" |
|
501 |
" |
|
502 |
String new writing write: 'Hello World'; -- 5; available |
|
503 |
" |
|
504 |
^self length - self position |
|
505 |
! |
|
506 |
||
507 |
explore: aBlock |
|
508 |
" Explore the stream within the block but return to where we started when the block completes. The stream must be positionable." |
|
509 |
" aBlock <BlockClosure> defines the exploration activity |
|
510 |
^ <Object> result of aBlock" |
|
511 |
" |
|
512 |
String new writing explore: [ :s | s write: 'Hello' ]; write: 'World'; close; destination |
|
513 |
" |
|
514 |
| position | |
|
515 |
position := self position. |
|
516 |
^[aBlock cull: self] ensure: [self position: position] |
|
517 |
! |
|
518 |
||
519 |
length |
|
520 |
"Return total length of the stream. The stream must be positionable." |
|
521 |
" ^ <Integer> the total number of elements in the stream. (position + available)" |
|
522 |
" |
|
523 |
'Hello World' copy writing ++ 5; length |
|
524 |
" |
|
525 |
"Subclasses should reimplement this method if the stream is positionable." |
|
526 |
^self isPositionable |
|
527 |
ifFalse: [self error: 'This stream is not positionable.'] |
|
528 |
ifTrue: [self subclassResponsibility] |
|
529 |
! |
|
530 |
||
531 |
position |
|
532 |
"Return current position of the stream. The stream must be positionable." |
|
533 |
" ^ <Integer> current position of the stream." |
|
534 |
" |
|
535 |
'Hello World' copy writing -= 5; position |
|
536 |
" |
|
537 |
"Subclasses should reimplement this method if the stream is positionable." |
|
538 |
^self isPositionable |
|
539 |
ifFalse: [self error: 'This stream is not positionable.'] |
|
540 |
ifTrue: [self subclassResponsibility] |
|
541 |
! |
|
542 |
||
543 |
position: anInteger |
|
544 |
"Change current position of the stream to anInteger. The stream must be positionable." |
|
545 |
" anInteger <Integer> current position of the stream." |
|
546 |
" |
|
547 |
'Hello Would' copy writing position: 6; write: 'World'; close; destination |
|
548 |
" |
|
549 |
"Subclasses should reimplement this method if the stream is positionable." |
|
550 |
self isPositionable |
|
551 |
ifFalse: [self error: 'This stream is not positionable.'] |
|
552 |
ifTrue: [self subclassResponsibility] |
|
553 |
! ! |
|
554 |
||
36 | 555 |
!WriteStream methodsFor:'substreaming'! |
556 |
||
557 |
closing: aBlock |
|
558 |
^(PositionWriteSubstream on: self) |
|
559 |
closeBlock: aBlock; |
|
560 |
yourself |
|
561 |
! |
|
562 |
||
563 |
ending: aMatchable |
|
564 |
"Creates a substream that will end when aMatchable finds a match in the content passing through. aMatchable is either |
|
565 |
* a block that is evaluated with each element - the stream ends when the block returns true |
|
566 |
* a collection that is matched against the last elements written - the stream ends when the collection matches |
|
567 |
* any other object - the stream ends when an equal object is written into the stream" |
|
568 |
" aMatchable <BlockClosure | Collection | Object> the substream ending criteria |
|
569 |
^<TransformWriteStream> |
|
570 |
"" |
|
571 |
| stream slice | |
|
572 |
stream := String new writing. |
|
573 |
slice := stream ending: $j. |
|
574 |
[ slice write: 'abcdefghijklmnopqrstuvxyz' ] on: Incomplete do: []. |
|
575 |
stream conclusion |
|
576 |
"" |
|
577 |
| stream slice | |
|
578 |
stream := String new writing. |
|
579 |
slice := stream ending: 'mno'. |
|
580 |
[ slice write: 'abcdefghijklmnopqrstuvxyz' ] on: Incomplete do: []. |
|
581 |
stream conclusion |
|
582 |
"" |
|
583 |
| stream slice | |
|
584 |
stream := String new writing. |
|
585 |
slice := stream ending: [ :e | 'gmt' includes: e ]. |
|
586 |
[ slice write: 'abcdefghijklmnopqrstuvxyz' ] on: Incomplete do: []. |
|
587 |
stream conclusion |
|
588 |
" |
|
589 |
^self ending: aMatchable inclusive: false |
|
590 |
! |
|
591 |
||
592 |
ending: aMatchable inclusive: inclusive |
|
593 |
"Creates a substream that will end when aMatchable finds a match in the content passing through. aMatchable is either |
|
594 |
* a block that is evaluated with each element - the stream ends when the block returns true |
|
595 |
* a collection that is matched against the last elements written - the stream ends when the collection matches |
|
596 |
* any other object - the stream ends when an equal object is written into the stream" |
|
597 |
" aMatchable <BlockClosure | Collection | Object> the substream ending criteria |
|
598 |
inclusive <Boolean> should the matched elements be included in the stream contents or not |
|
599 |
^<TransformWriteStream> |
|
600 |
"" |
|
601 |
| stream slice | |
|
602 |
stream := String new writing. |
|
603 |
slice := stream ending: $j inclusive: true. |
|
604 |
[ slice write: 'abcdefghijklmnopqrstuvxyz' ] on: Incomplete do: []. |
|
605 |
stream conclusion |
|
606 |
"" |
|
607 |
| stream slice | |
|
608 |
stream := String new writing. |
|
609 |
slice := stream ending: 'mno' inclusive: true. |
|
610 |
[ slice write: 'abcdefghijklmnopqrstuvxyz' ] on: Incomplete do: []. |
|
611 |
stream conclusion |
|
612 |
"" |
|
613 |
| stream slice | |
|
614 |
stream := String new writing. |
|
615 |
slice := stream ending: [ :e | 'gmt' includes: e ] inclusive: true. |
|
616 |
[ slice write: 'abcdefghijklmnopqrstuvxyz' ] on: Incomplete do: []. |
|
617 |
stream conclusion |
|
618 |
" |
|
619 |
^aMatchable streamingWriteMatching: self inclusive: inclusive |
|
620 |
! |
|
621 |
||
622 |
limiting: limit |
|
623 |
"Create a substream that will allow at most @limit number of elements written into the destination." |
|
624 |
" limit <Integer> maximum number of elements that can be written into destination |
|
625 |
^<LimitWriteStream>" |
|
626 |
" |
|
627 |
| stream slice | |
|
628 |
stream := String new writing. |
|
629 |
slice := stream limiting: 5. |
|
630 |
[ slice write: 'abcdefghi' ] on: Incomplete do: []. |
|
631 |
stream conclusion |
|
632 |
" |
|
633 |
||
634 |
^LimitWriteSubstream on: self limit: limit |
|
635 |
! |
|
636 |
||
637 |
slicing |
|
638 |
"From a writable stream, return a readable stream that acts as a prototype factory for the writable stream." |
|
639 |
" ^<ReadStream>" |
|
640 |
"(destination limiting: 10) slicing" |
|
641 |
||
642 |
| substream | |
|
643 |
substream := nil. |
|
644 |
^[substream == nil ifFalse: |
|
645 |
[substream substreamClosed ifFalse: [substream close]. |
|
646 |
substream subseekend. |
|
647 |
substream destinationAtEnd ifTrue: [Incomplete zero raise]]. |
|
648 |
substream := self copy] |
|
649 |
reading |
|
650 |
closeBlock: [destination close]; |
|
651 |
yourself |
|
652 |
! |
|
653 |
||
654 |
stitching |
|
655 |
^self error: 'You can only stitch a read stream, however that read stream can return write streams and in so doing, you will create a stitching write stream.' |
|
656 |
! ! |
|
2 | 657 |
|
658 |
!WriteStream methodsFor:'testing'! |
|
659 |
||
660 |
isPositionable |
|
661 |
"Can this stream be positioned. Positionable streams come with extra API: #position, #position:, etc." |
|
662 |
||
663 |
^false |
|
664 |
! |
|
665 |
||
666 |
isReadable |
|
667 |
^false |
|
668 |
! |
|
669 |
||
670 |
isWritable |
|
671 |
^true |
|
672 |
! ! |
|
673 |
||
674 |
!WriteStream methodsFor:'transforming'! |
|
675 |
||
36 | 676 |
buffering: bufferSize |
677 |
"Delays committing its content to its underlying stream until it has reached a certain size ,#flush is sent, or the stream is closed." |
|
678 |
" bufferSize <Integer> The size of the buffer to start with. |
|
679 |
^<PositionWriteStream>" |
|
680 |
" |
|
681 |
(ByteArray new writing buffering: 5) |
|
682 |
write: (ByteArray withAll: (1 to: 11)); |
|
683 |
conclusion |
|
684 |
" |
|
685 |
^BufferedWriteStream on: self bufferSize: bufferSize |
|
686 |
! |
|
687 |
||
688 |
collecting: aBlock |
|
689 |
"Transform each written element using #collect: style block." |
|
690 |
" aBlock <BlockClosure> a #collect: style block used to tranform each element |
|
691 |
^<CollectWriteSteam> |
|
692 |
"" |
|
693 |
(Array new writing collecting: [ :e | e * e ]) write: (1 to: 5); conclusion |
|
694 |
"" |
|
695 |
(String new writing collecting: [ :e | e asCharacter ]) write: (65 to: 90); conclusion |
|
696 |
" |
|
697 |
^CollectWriteStream on: self block: aBlock |
|
698 |
! |
|
699 |
||
700 |
depairing |
|
701 |
"Transform a stream of associations in to a stream of elements made up of the key and value association components." |
|
702 |
||
703 |
^self transforming: [:in :out | |
|
704 |
| association | |
|
705 |
association := in get. |
|
706 |
out put: association key. |
|
707 |
out put: association value] |
|
708 |
! |
|
709 |
||
710 |
doing: aBlock |
|
711 |
"Perform and action with each passing element using #do: style block." |
|
712 |
" aBlock <BlockClosure> a #do: style block invoked with each element as it passes through the stream |
|
713 |
^<CollectWriteSteam> |
|
714 |
"" |
|
715 |
(Array new writing doing: [ :e | Transcript space; print: e * e ]) write: (1 to: 10); conclusion |
|
716 |
" |
|
717 |
^self collecting: [:each | (aBlock value: each). each] |
|
718 |
! |
|
719 |
||
720 |
duplicating: aWriteStream |
|
721 |
"Duplicate all the contents written into @aWriteStream" |
|
722 |
" aWriteStream <WriteStream> a stream to copy into |
|
723 |
^<DuplicateWriteSteam> |
|
724 |
"" |
|
725 |
| original copy | |
|
726 |
original := Array new writing. |
|
727 |
copy := ByteArray new writing. |
|
728 |
(original duplicating: copy) write: (0 to: 15). |
|
729 |
original conclusion -> copy conclusion |
|
730 |
" |
|
731 |
^DuplicateWriteStream on: self duplicate: aWriteStream |
|
732 |
! |
|
733 |
||
734 |
encoding: anEncoding |
|
735 |
"Transform characters into bytes using @anEncoding such as #utf8 or #ascii, etc. Any encoding supported by StreamEncoder is allowed. |
|
736 |
The encoding steam also performs automatic conversion of CRs into the native line-end convention of the underlying platform, |
|
737 |
unless set into a different line-end convention mode" |
|
738 |
" anEncoding <Symbol> encoding identifier recognized by StreamEncoder class>>new: |
|
739 |
^<EncodedWriteStream> |
|
740 |
"" |
|
741 |
(ByteArray new writing encoding: #ascii) write: 'abcdefghi'; conclusion |
|
742 |
"" |
|
743 |
(ByteArray new writing encoding: #ascii) write: 'Hello\World' withCRs; conclusion |
|
744 |
"" |
|
745 |
(ByteArray new writing encoding: #ascii) setLineEndCRLF; write: 'Hello\World' withCRs; conclusion |
|
746 |
" |
|
747 |
^EncodeWriteStream on: self encoding: anEncoding |
|
748 |
! |
|
749 |
||
750 |
encodingBase64 |
|
751 |
"Encodes bytes into characters of base-64 encoding. |
|
752 |
Emits final padding characters ($=) as required, when the stream is closed." |
|
753 |
" ^<TransformWriteStream>" |
|
754 |
" |
|
755 |
String new writing encodingBase64 write: (ByteArray withAll: (1 to: 20)); conclusion |
|
756 |
" |
|
757 |
| map cache | |
|
758 |
map := [ :i | 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/' at: i + 1 ]. |
|
759 |
cache := ByteArray new: 3. |
|
760 |
^(self transforming: [ :in :out | | count block shift | |
|
761 |
count := [ in read: 3 into: cache at: 1. 3 ] on: Incomplete do: [ :incomplete | incomplete count]. |
|
762 |
count isZero ifTrue: [ Incomplete zero raise ]. |
|
763 |
block := (1 to: count) inject: 0 into: [ :total :byte | (total bitShift: 8) + (cache at: byte)]. |
|
764 |
shift := count * -8. |
|
765 |
1 to: count + 1 do: [:i | out put: (map value: ((block bitShift: (shift + (i * 6))) bitAnd: 63))]. |
|
766 |
count < 3 ifTrue: [ |
|
767 |
3 - count timesRepeat: [ out put: $= ]. |
|
768 |
(Incomplete count: count) raise]]) |
|
769 |
buffer: (RingBuffer on: (ByteArray new: 3)); |
|
770 |
yourself |
|
771 |
! |
|
772 |
||
773 |
encodingHex |
|
774 |
"Encodes hex characters into bytes." |
|
775 |
" ^<TransformReadStream>" |
|
776 |
" |
|
777 |
ByteArray new writing encodingHex write: '010203fdfeff'; terminal |
|
778 |
" |
|
779 |
| c2i | |
|
780 |
c2i := [ :c | ('0123456789abcdef' indexOf: c asLowercase) - 1 ]. |
|
781 |
^(self transforming: [ :in :out | |
|
782 |
out put: ((c2i value: in get) bitShift: 4) + (c2i value: in get) ]) |
|
783 |
contentsSpecies: ByteString; |
|
784 |
yourself |
|
785 |
! |
|
786 |
||
787 |
injecting: initialObject into: aBlock |
|
788 |
"Accumulates a running value combined with each passing element using the binary aBlock. aBlock takes the result of the last evaluation and the next element as arguments. Notable difference from the collection analog is that the streaming variant is a stream of all the intermediate values of the running value." |
|
789 |
" initialObject <Object> initial value used as the previous result for the evaluation of the first element |
|
790 |
aBlock <BlockClosure> binary block combining the value of each element with previous result of its evaluation |
|
791 |
^<CollectingWriteStream>" |
|
792 |
" |
|
793 |
(Array new writing injecting: 0 into: [ :total :each | each + total ]) write: (1 to: 10); conclusion |
|
794 |
" |
|
795 |
| nextObject | |
|
796 |
nextObject := initialObject. |
|
797 |
^self collecting: [:each | nextObject := aBlock cull: nextObject cull: each] |
|
798 |
! |
|
799 |
||
800 |
monitoring: aNotificationBlock every: aNotificationInterval |
|
801 |
"Monitor the through-put of the receiver." |
|
802 |
" aNotificationBlock <BlockClosure> the block to execute when notifying |
|
803 |
aNotificationInterval <Duration> how often to notify |
|
804 |
^<PositionWriteSubstream> |
|
805 |
" |
|
806 |
||
807 |
| previousPosition timer start notifyBlock monitoring notifyProcess notifyFinished | |
|
808 |
||
809 |
start := Time microsecondClock. |
|
810 |
previousPosition := 0. |
|
811 |
monitoring := nil. |
|
812 |
timer := nil. |
|
813 |
notifyFinished := false. |
|
814 |
||
815 |
notifyBlock := [ |
|
816 |
aNotificationBlock cull: monitoring position cull: monitoring position - previousPosition cull: Time microsecondClock - start. |
|
817 |
previousPosition := monitoring position]. |
|
818 |
||
819 |
notifyProcess := nil. |
|
820 |
notifyProcess := [ |
|
821 |
[notifyBlock value. notifyFinished] whileFalse: [notifyProcess suspend]] newProcess. |
|
822 |
notifyProcess priority: ((Processor activeProcess priority + 1) min: 99). |
|
823 |
||
824 |
monitoring := self closing: [ |
|
825 |
timer stop. |
|
826 |
notifyProcess resume. |
|
827 |
notifyFinished := true. |
|
828 |
notifyProcess resume. |
|
829 |
self close]. |
|
830 |
||
831 |
timer := Timer every: aNotificationInterval resume: notifyProcess. |
|
832 |
^monitoring |
|
833 |
! |
|
834 |
||
835 |
pairing |
|
836 |
"Transform a stream of elements in to a stream of associations between even+odd elements of the stream. This expects the stream to have an even number of elements" |
|
837 |
||
838 |
^self transforming: [:in :out | out put: (Association key: in get value: in get)] |
|
839 |
! |
|
840 |
||
2 | 841 |
positioning |
842 |
"If necessary add positioning layer. Note that positiong layer employs buffering to implement the positioning ability. The default buffering strategy will grow the buffer up to the full size of the underlying stream if not released. Consequently other Buffer types might be more suitable for specific circumstances, e.g. if only last n elements need to be buffered, a fixed size RingBuffer can be substitued with #buffer: accessor." |
|
843 |
" ^ <WriteStream> a positionable read stream |
|
844 |
"" |
|
845 |
[ :x | Transcript space; print: x ] writing positioning write: (1 to: 10); -- 5; write: (11 to: 15); close |
|
846 |
" |
|
847 |
^self isPositionable |
|
848 |
ifTrue: [self] |
|
849 |
ifFalse: [PositionWriteStream on: self] |
|
36 | 850 |
! |
851 |
||
852 |
rejecting: aBlock |
|
853 |
"Filters written elements using aBlock. aBlock has the same form and semantics as the #reject: block on collections." |
|
854 |
" aBlock <BlockClosure> usual #reject: style block used to filter the elements passing through |
|
855 |
^<TransformWriteStream>" |
|
856 |
" |
|
857 |
(Array new writing rejecting: [ :e | e odd ]) write: (1 to: 10); conclusion |
|
858 |
" |
|
859 |
^self selecting: [:each | (aBlock cull: each) not] |
|
860 |
! |
|
861 |
||
862 |
selecting: aBlock |
|
863 |
"Filters written elements using aBlock. aBlock has the same form and semantics as the #select: block on collections." |
|
864 |
" aBlock <BlockClosure> usual #select: style block used to filter the elements passing through |
|
865 |
^<TransformWriteStream>" |
|
866 |
" |
|
867 |
(Array new writing selecting: [ :e | e odd ]) write: (1 to: 10); conclusion |
|
868 |
" |
|
869 |
^self transforming: [:input :output | |
|
870 |
| value | |
|
871 |
[value := input get. |
|
872 |
aBlock cull: value] whileFalse. |
|
873 |
output put: value] |
|
874 |
! |
|
875 |
||
876 |
transforming: aBlock |
|
877 |
"This is the most general form of transform stream. The block receives two streams, a virtual stream of written elements (input) and the destination (output). The block can read arbitrary number of elements from input (including none) and write arbitrary number of elements into the output (including none). The block will be invoked as many times as necessary to consume any written elements, or until an Incomplete is raised by the destination. |
|
878 |
Note that if the #contentSpecies of the destination doesn't fit the input of the transformation, the #contentsSpecies of the transform stream has to be set explicitly. |
|
879 |
"" aBlock <BlockClosure> binary transformation block that reads elements from input (first argument) and writes elements into output (second argument) |
|
880 |
^<TransformWriteStream> |
|
881 |
"" Convert text into a stream of words |
|
882 |
(Array new writing transforming: [ :in :out || word char | |
|
883 |
word := String new writing. |
|
884 |
[ [ (char := in get) = Character space ] whileFalse: [ word put: char ]. |
|
885 |
] ensure: [ out put: (word close; destination) ] ] |
|
886 |
) write: 'hello world!! bye world!!'; |
|
887 |
close; |
|
888 |
terminal |
|
889 |
"" Convert a hex-string into a byte array (2 characters per byte) |
|
890 |
| c2d | |
|
891 |
c2d := [ :char | ('0123456789abcdef' indexOf: char) - 1 ]. |
|
892 |
(ByteArray new writing transforming: [ :in :out | |
|
893 |
out put: (c2d value: in get) * 16 + (c2d value: in get) ] |
|
894 |
) contentsSpecies: String; |
|
895 |
write: '0123456789abcdef'; |
|
896 |
close; |
|
897 |
terminal |
|
898 |
" |
|
899 |
^TransformWriteStream on: self block: aBlock |
|
2 | 900 |
! ! |
901 |
||
902 |
!WriteStream class methodsFor:'documentation'! |
|
903 |
||
904 |
version_SVN |
|
905 |
^ '$Id$' |
|
906 |
! ! |
|
907 |
||
908 |
WriteStream initialize! |