author | Martin Kobetic |
Sun, 17 Nov 2013 00:22:31 -0500 | |
changeset 144 | e193a6772be4 |
parent 109 | 9587e2df7029 |
permissions | -rw-r--r-- |
9 | 1 |
"{ Package: 'stx:goodies/xtreams/terminals' }" |
2 |
||
3 |
"{ NameSpace: Xtreams }" |
|
4 |
||
5 |
ReadStream subclass:#BlockClosureGenerateStream |
|
6 |
instanceVariableNames:'process current contentsSpecies writing readingSemaphore |
|
7 |
writingSemaphore closed' |
|
8 |
classVariableNames:'' |
|
9 |
poolDictionaries:'' |
|
25
02e7c3b6f63c
added XtreamsPool to fix DefaultBufferSize; set proper category names
mkobetic
parents:
9
diff
changeset
|
10 |
category:'Xtreams-Terminals' |
9 | 11 |
! |
12 |
||
13 |
BlockClosureGenerateStream comment:'Read stream on a one argument block, evaluates the block once for the life time of the stream. Once the block finishes execution, the stream is closed. The block is expected to write to the block argument, which will block exectuion until elements are read from the stream. |
|
14 |
{{{ |
|
15 |
"In this example, we have a -hard- loop, not using collection protocol, which will only run one element at a time." |
|
16 |
[:out | 1 to: 10 do: [:i | out put: i]] reading read: 5. |
|
17 |
}}} |
|
18 |
{{{ |
|
19 |
"Fibonacci" |
|
20 |
[:out | | a b x | |
|
21 |
a := 0. b := 1. |
|
22 |
[out put: a. |
|
23 |
x := a. |
|
24 |
a := b. |
|
25 |
b := b + x] repeat] reading ++ 500; get. |
|
26 |
}}} |
|
27 |
||
28 |
Instance Variables |
|
109 | 29 |
current <Object> the current element in the stream |
30 |
closed <Boolean> true if the block has finished execution |
|
31 |
contentsSpecies <Class> species for collections of elements of this stream |
|
32 |
process <Process> the process executing the block |
|
33 |
readingSemaphore <Semaphore> attempts to read will wait on this |
|
34 |
writingSemaphore <Semaphore> attempts to write will wait on this, signaled when an attempt to read is performed |
|
9 | 35 |
|
36 |
' |
|
37 |
! |
|
38 |
||
39 |
||
40 |
!BlockClosureGenerateStream methodsFor:'accessing'! |
|
41 |
||
42 |
get |
|
43 |
writingSemaphore signal. |
|
44 |
closed ifTrue: [Incomplete zero raise]. |
|
45 |
readingSemaphore wait. |
|
46 |
^current |
|
47 |
! |
|
48 |
||
49 |
read: anInteger into: aSequenceableCollection at: startIndex |
|
50 |
| count | |
|
51 |
count := 0. |
|
52 |
[[count < anInteger] whileTrue: |
|
53 |
[aSequenceableCollection at: startIndex + count put: self get. |
|
54 |
count := count + 1]] |
|
55 |
on: Incomplete do: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]. |
|
56 |
^anInteger |
|
57 |
! ! |
|
58 |
||
59 |
!BlockClosureGenerateStream methodsFor:'initialize-release'! |
|
60 |
||
61 |
close |
|
62 |
closed := true. |
|
63 |
process terminate. |
|
64 |
readingSemaphore signal |
|
65 |
! |
|
66 |
||
67 |
contentsSpecies |
|
68 |
^contentsSpecies |
|
69 |
! |
|
70 |
||
71 |
contentsSpecies: aClass |
|
72 |
contentsSpecies := aClass |
|
73 |
! |
|
74 |
||
75 |
on: aBlockClosure |
|
76 |
super on: aBlockClosure. |
|
77 |
contentsSpecies := Array. |
|
78 |
readingSemaphore := Semaphore new. |
|
79 |
writingSemaphore := Semaphore new. |
|
80 |
closed := false. |
|
81 |
process := |
|
82 |
[aBlockClosure value: [:in | |
|
83 |
writingSemaphore wait. |
|
84 |
current := in. |
|
85 |
readingSemaphore signal] writing. |
|
86 |
closed := true] fork |
|
87 |
! ! |
|
88 |
||
89 |
!BlockClosureGenerateStream class methodsFor:'documentation'! |
|
90 |
||
109 | 91 |
version_HG |
92 |
||
93 |
^ '$Changeset: <not expanded> $' |
|
9 | 94 |
! ! |
109 | 95 |