author | Jan Vrany <jan.vrany@fit.cvut.cz> |
Wed, 01 Feb 2012 08:54:33 +0000 | |
changeset 105 | ba761aff18ac |
parent 97 | 2a7827f4dce2 |
child 111 | 44ac233b2f83 |
permissions | -rw-r--r-- |
10 | 1 |
"{ Package: 'stx:goodies/xtreams/transforms' }" |
2 |
||
3 |
"{ NameSpace: Xtreams }" |
|
4 |
||
5 |
ReadStream subclass:#CollectReadStream |
|
6 |
instanceVariableNames:'block cache contentsSpecies direct' |
|
7 |
classVariableNames:'' |
|
97 | 8 |
poolDictionaries:'Xtreams::XtreamsPool' |
27
2cc5a8a3ca14
added XtreamsPool to fix DefaultBufferSize; set proper category names
mkobetic
parents:
10
diff
changeset
|
9 |
category:'Xtreams-Transforms' |
10 | 10 |
! |
11 |
||
12 |
CollectReadStream comment:'Converts elements being read using the provided conversion block. When the contentSpecies of the source doesn''t match the desired contentSpecies of this stream, we optimize bulk reads by reading into an internal buffer. |
|
13 |
||
14 |
Instance Variables |
|
15 |
block <BlockClosure> collecting block (same style as collect: blocks) |
|
16 |
cache <SequenceableCollection | nil> caches elements before transformation when direct is false |
|
17 |
contentsSpecies <Class> species for collections of elements of this stream |
|
18 |
direct <Boolean> are we using an intermediate buffer (when source''s contentSpecies doesn''t match ours) |
|
19 |
||
20 |
' |
|
21 |
! |
|
22 |
||
23 |
||
24 |
!CollectReadStream class methodsFor:'instance creation'! |
|
25 |
||
26 |
on: aReadStream block: block |
|
27 |
^self new on: aReadStream block: block |
|
28 |
! ! |
|
29 |
||
30 |
!CollectReadStream methodsFor:'accessing'! |
|
31 |
||
32 |
block |
|
33 |
^block |
|
34 |
! |
|
35 |
||
36 |
get |
|
37 |
^block value: source get |
|
38 |
! |
|
39 |
||
40 |
read: anInteger into: aSequenceableCollection at: startIndex |
|
41 |
direct |
|
42 |
ifTrue: [self directRead: anInteger into: aSequenceableCollection at: startIndex] |
|
43 |
ifFalse: [self bufferRead: anInteger into: aSequenceableCollection at: startIndex]. |
|
44 |
^anInteger |
|
45 |
! ! |
|
46 |
||
47 |
!CollectReadStream methodsFor:'initialize-release'! |
|
48 |
||
49 |
close |
|
50 |
super close. |
|
51 |
cache ifNotNil: [ cache recycle ] |
|
52 |
! |
|
53 |
||
54 |
contentsSpecies |
|
55 |
^contentsSpecies |
|
56 |
! |
|
57 |
||
58 |
contentsSpecies: aClass |
|
59 |
contentsSpecies := aClass. |
|
60 |
cache ifNotNil: [ cache recycle ]. |
|
61 |
cache := (direct := contentsSpecies == source contentsSpecies) |
|
62 |
ifTrue: [nil] |
|
63 |
ifFalse: [ |
|
64 |
source contentsSpecies newRecycled: ( |
|
65 |
(cache notNil and: [ cache size > 0 ]) |
|
66 |
ifTrue: [cache size] |
|
67 |
ifFalse: [DefaultBufferSize])] |
|
68 |
! |
|
69 |
||
70 |
on: aSource block: aBlock |
|
71 |
self on: aSource. |
|
72 |
block := aBlock. |
|
73 |
contentsSpecies := aSource contentsSpecies. |
|
74 |
direct := true |
|
75 |
! ! |
|
76 |
||
77 |
!CollectReadStream methodsFor:'private'! |
|
78 |
||
79 |
bufferRead: anInteger into: aSequenceableCollection at: startIndex |
|
80 |
| amount count read | |
|
81 |
count := 0. |
|
82 |
[count < anInteger] whileTrue: |
|
83 |
[amount := (anInteger - count) min: cache size. |
|
84 |
read := [source read: amount into: cache at: 1. amount] on: Incomplete do: [ :ex | ex count ]. |
|
85 |
1 to: read do: [:index | aSequenceableCollection at: count + startIndex + index - 1 put: (block value: (cache at: index))]. |
|
86 |
count := count + read. |
|
87 |
read < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]] |
|
88 |
! |
|
89 |
||
90 |
directRead: anInteger into: aSequenceableCollection at: startIndex |
|
91 |
| count | |
|
92 |
count := [source read: anInteger into: aSequenceableCollection at: startIndex. anInteger] on: Incomplete do: [ :ex | ex count ]. |
|
93 |
startIndex to: startIndex + count - 1 do: [:index | aSequenceableCollection at: index put: (block value: (aSequenceableCollection at: index))]. |
|
94 |
count < anInteger ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise] |
|
95 |
! ! |
|
96 |
||
97 |
!CollectReadStream methodsFor:'seeking'! |
|
98 |
||
99 |
++ anInteger |
|
100 |
^source ++ anInteger |
|
101 |
! |
|
102 |
||
103 |
-- anInteger |
|
104 |
^source -- anInteger |
|
105 |
! |
|
106 |
||
107 |
length |
|
108 |
^source length |
|
109 |
! |
|
110 |
||
111 |
position |
|
112 |
^source position |
|
113 |
! |
|
114 |
||
115 |
position: anInteger |
|
116 |
^source position: anInteger |
|
117 |
! ! |
|
118 |
||
119 |
!CollectReadStream methodsFor:'testing'! |
|
120 |
||
121 |
isPositionable |
|
122 |
^source isPositionable |
|
123 |
! ! |
|
124 |
||
125 |
!CollectReadStream class methodsFor:'documentation'! |
|
126 |
||
127 |
version_SVN |
|
128 |
^ '$Id$' |
|
129 |
! ! |