author | Jan Vrany <jan.vrany@fit.cvut.cz> |
Wed, 20 Mar 2013 12:03:03 +0000 | |
changeset 120 | 2ffa5dddf5e8 |
parent 111 | 44ac233b2f83 |
permissions | -rw-r--r-- |
10 | 1 |
"{ Package: 'stx:goodies/xtreams/transforms' }" |
2 |
||
3 |
"{ NameSpace: Xtreams }" |
|
4 |
||
5 |
ReadStream subclass:#CollectReadStream |
|
6 |
instanceVariableNames:'block cache contentsSpecies direct' |
|
7 |
classVariableNames:'' |
|
111
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
8 |
poolDictionaries:'XtreamsPool' |
27
2cc5a8a3ca14
added XtreamsPool to fix DefaultBufferSize; set proper category names
mkobetic
parents:
10
diff
changeset
|
9 |
category:'Xtreams-Transforms' |
10 | 10 |
! |
11 |
||
12 |
CollectReadStream comment:'Converts elements being read using the provided conversion block. When the contentSpecies of the source doesn''t match the desired contentSpecies of this stream, we optimize bulk reads by reading into an internal buffer. |
|
13 |
||
14 |
Instance Variables |
|
15 |
block <BlockClosure> collecting block (same style as collect: blocks) |
|
16 |
cache <SequenceableCollection | nil> caches elements before transformation when direct is false |
|
17 |
contentsSpecies <Class> species for collections of elements of this stream |
|
18 |
direct <Boolean> are we using an intermediate buffer (when source''s contentSpecies doesn''t match ours) |
|
19 |
||
20 |
' |
|
21 |
! |
|
22 |
||
23 |
||
24 |
!CollectReadStream class methodsFor:'instance creation'! |
|
25 |
||
26 |
on: aReadStream block: block |
|
27 |
^self new on: aReadStream block: block |
|
28 |
! ! |
|
29 |
||
30 |
!CollectReadStream methodsFor:'accessing'! |
|
31 |
||
32 |
block |
|
33 |
^block |
|
34 |
! |
|
35 |
||
36 |
get |
|
37 |
^block value: source get |
|
38 |
! |
|
39 |
||
40 |
read: anInteger into: aSequenceableCollection at: startIndex |
|
41 |
direct |
|
42 |
ifTrue: [self directRead: anInteger into: aSequenceableCollection at: startIndex] |
|
43 |
ifFalse: [self bufferRead: anInteger into: aSequenceableCollection at: startIndex]. |
|
44 |
^anInteger |
|
45 |
! ! |
|
46 |
||
47 |
!CollectReadStream methodsFor:'initialize-release'! |
|
48 |
||
49 |
close |
|
50 |
super close. |
|
51 |
cache ifNotNil: [ cache recycle ] |
|
52 |
! |
|
53 |
||
54 |
contentsSpecies |
|
55 |
^contentsSpecies |
|
56 |
! |
|
57 |
||
58 |
contentsSpecies: aClass |
|
59 |
contentsSpecies := aClass. |
|
60 |
cache ifNotNil: [ cache recycle ]. |
|
61 |
cache := (direct := contentsSpecies == source contentsSpecies) |
|
62 |
ifTrue: [nil] |
|
63 |
ifFalse: [ |
|
64 |
source contentsSpecies newRecycled: ( |
|
65 |
(cache notNil and: [ cache size > 0 ]) |
|
66 |
ifTrue: [cache size] |
|
67 |
ifFalse: [DefaultBufferSize])] |
|
68 |
! |
|
69 |
||
70 |
on: aSource block: aBlock |
|
71 |
self on: aSource. |
|
72 |
block := aBlock. |
|
73 |
contentsSpecies := aSource contentsSpecies. |
|
74 |
direct := true |
|
75 |
! ! |
|
76 |
||
77 |
!CollectReadStream methodsFor:'private'! |
|
78 |
||
79 |
bufferRead: anInteger into: aSequenceableCollection at: startIndex |
|
80 |
| amount count read | |
|
81 |
count := 0. |
|
82 |
[count < anInteger] whileTrue: |
|
83 |
[amount := (anInteger - count) min: cache size. |
|
84 |
read := [source read: amount into: cache at: 1. amount] on: Incomplete do: [ :ex | ex count ]. |
|
85 |
1 to: read do: [:index | aSequenceableCollection at: count + startIndex + index - 1 put: (block value: (cache at: index))]. |
|
86 |
count := count + read. |
|
87 |
read < amount ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]] |
|
88 |
! |
|
89 |
||
90 |
directRead: anInteger into: aSequenceableCollection at: startIndex |
|
91 |
| count | |
|
92 |
count := [source read: anInteger into: aSequenceableCollection at: startIndex. anInteger] on: Incomplete do: [ :ex | ex count ]. |
|
93 |
startIndex to: startIndex + count - 1 do: [:index | aSequenceableCollection at: index put: (block value: (aSequenceableCollection at: index))]. |
|
94 |
count < anInteger ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise] |
|
95 |
! ! |
|
96 |
||
97 |
!CollectReadStream methodsFor:'seeking'! |
|
98 |
||
99 |
++ anInteger |
|
100 |
^source ++ anInteger |
|
101 |
! |
|
102 |
||
103 |
-- anInteger |
|
104 |
^source -- anInteger |
|
105 |
! |
|
106 |
||
107 |
length |
|
108 |
^source length |
|
109 |
! |
|
110 |
||
111 |
position |
|
112 |
^source position |
|
113 |
! |
|
114 |
||
115 |
position: anInteger |
|
116 |
^source position: anInteger |
|
117 |
! ! |
|
118 |
||
119 |
!CollectReadStream methodsFor:'testing'! |
|
120 |
||
121 |
isPositionable |
|
122 |
^source isPositionable |
|
123 |
! ! |
|
124 |
||
125 |
!CollectReadStream class methodsFor:'documentation'! |
|
126 |
||
111
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
127 |
version_HG |
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
128 |
|
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
129 |
^ '$Changeset: <not expanded> $' |
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
130 |
! |
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
131 |
|
10 | 132 |
version_SVN |
133 |
^ '$Id$' |
|
134 |
! ! |
|
111
44ac233b2f83
* removed namespace from pool references and stray extension methods
joe
parents:
97
diff
changeset
|
135 |