author | Jan Vrany <jan.vrany@fit.cvut.cz> |
Wed, 01 Feb 2012 08:54:33 +0000 | |
changeset 105 | ba761aff18ac |
parent 74 | 752e2d88fe73 |
child 109 | 9587e2df7029 |
permissions | -rw-r--r-- |
2 | 1 |
"{ Package: 'stx:goodies/xtreams/core' }" |
2 |
||
3 |
"{ NameSpace: Xtreams }" |
|
4 |
||
5 |
Object subclass:#Buffer |
|
6 |
instanceVariableNames:'cache readPosition writePosition dataLength' |
|
7 |
classVariableNames:'' |
|
8 |
poolDictionaries:'' |
|
20
51de794993c3
added XtreamsPool to fix DefaultBufferSize; set proper category names
mkobetic
parents:
7
diff
changeset
|
9 |
category:'Xtreams-Core' |
2 | 10 |
! |
11 |
||
12 |
Buffer comment:'Buffer implements a buffering API over an in-memory cache. The cache will grow as required to fit new data in to the buffer. Two positions are kept, the read position and write position, allowing flexible usage of the buffer. |
|
13 |
||
14 |
API: |
|
15 |
read:into:startingAt: -- reads data from the cache and moves readPosition forward |
|
16 |
write:into:startingAt: -- writes data into the cache and moves writePosition forward |
|
17 |
readSkip:/writeSkip: -- moves either readPosition or writePosition forward without changing the cache |
|
18 |
readPosition/readPosition:/writePosition/writePosition: -- accessors to the read/write positions |
|
19 |
clear -- remove all the data from the cache |
|
20 |
trim -- remove all the read data and all the empty space for writing in to from the cache |
|
21 |
||
22 |
Instance Variables |
|
23 |
cache <SequenceableCollection> the cache for our buffer |
|
24 |
dataLength <ArithmeticValue> the amount of data in our cache, not the size of the cache |
|
25 |
readPosition <ArithmeticValue> the position within our data that we''re reading from (0..dataLength) |
|
26 |
writePosition <ArithmeticValue> the position within our data that we''re writing from (0..dataLength) |
|
27 |
||
28 |
' |
|
29 |
! |
|
30 |
||
31 |
||
32 |
!Buffer class methodsFor:'instance creation'! |
|
33 |
||
34 |
new: anInteger class: aClass |
|
35 |
^self on: (aClass newRecycled: anInteger) |
|
36 |
! |
|
37 |
||
38 |
on: aSequenceableCollection |
|
39 |
^self new on: aSequenceableCollection |
|
40 |
! ! |
|
41 |
||
42 |
!Buffer methodsFor:'accessing'! |
|
43 |
||
44 |
activeSize |
|
45 |
"The number of elements in the buffer that can be explored. This is <= cacheSize" |
|
46 |
||
47 |
^dataLength |
|
48 |
! |
|
49 |
||
50 |
cache |
|
51 |
^cache |
|
52 |
! |
|
53 |
||
54 |
cacheSize |
|
55 |
"The size of the cache, which is >= dataLength" |
|
56 |
||
57 |
^cache size |
|
58 |
! |
|
59 |
||
60 |
contentsFuture |
|
61 |
^cache copyFrom: readPosition + 1 to: readPosition + self readSize |
|
62 |
! |
|
63 |
||
64 |
contentsPast |
|
65 |
^cache copyFrom: 1 to: writePosition |
|
66 |
! |
|
67 |
||
68 |
inactiveSize |
|
69 |
"Free space to write in to" |
|
70 |
||
71 |
^cache size - dataLength |
|
72 |
! |
|
73 |
||
74 |
readSize |
|
75 |
"The number of elements available to read" |
|
76 |
||
77 |
^dataLength - readPosition |
|
78 |
! |
|
79 |
||
80 |
writeSize |
|
81 |
"The number of available slots to write in to, which might overwrite elements in dataLength; writeSize >= inactiveSize" |
|
82 |
||
83 |
^(cache size - writePosition + readPosition) min: cache size |
|
84 |
! ! |
|
85 |
||
86 |
!Buffer methodsFor:'accessing - reading'! |
|
87 |
||
88 |
get |
|
89 |
dataLength = readPosition ifTrue: [Incomplete zero raise]. |
|
90 |
readPosition := readPosition + 1. |
|
91 |
^cache at: readPosition |
|
92 |
! |
|
93 |
||
94 |
read: anInteger into: aSequenceableCollection at: startIndex |
|
95 |
| count | |
|
96 |
count := anInteger min: (dataLength - readPosition). |
|
97 |
aSequenceableCollection replaceFrom: startIndex to: startIndex + count - 1 with: cache startingAt: readPosition + 1. |
|
98 |
readPosition := readPosition + count. |
|
99 |
count < anInteger ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise]. |
|
100 |
^anInteger |
|
101 |
! |
|
102 |
||
103 |
readPosition |
|
104 |
^readPosition |
|
105 |
! |
|
106 |
||
107 |
readPosition: aPosition |
|
108 |
readPosition := (aPosition max: 0) min: dataLength |
|
109 |
! |
|
110 |
||
111 |
readSkip: anInteger |
|
112 |
| old delta | |
|
113 |
old := readPosition. |
|
114 |
self readPosition: old + anInteger. |
|
115 |
delta := readPosition - old. |
|
116 |
^delta |
|
117 |
! ! |
|
118 |
||
119 |
!Buffer methodsFor:'accessing - writing'! |
|
120 |
||
121 |
insert: aStreamable |
|
122 |
||
123 |
^aStreamable streamingInsertInto: self |
|
124 |
! |
|
125 |
||
126 |
insert: anInteger from: aStreamable |
|
127 |
||
128 |
^aStreamable streamingInsert: anInteger into: self |
|
129 |
! |
|
130 |
||
131 |
insert: anInteger from: aSequenceableCollection at: startIndex |
|
132 |
||
133 |
dataLength + anInteger <= cache size ifFalse: [self growBy: anInteger]. |
|
134 |
||
135 |
cache replaceFrom: writePosition + anInteger + 1 to: dataLength + anInteger with: cache startingAt: writePosition + 1. |
|
136 |
cache replaceFrom: writePosition + 1 to: writePosition + anInteger with: aSequenceableCollection startingAt: startIndex. |
|
137 |
writePosition := writePosition + anInteger. |
|
138 |
dataLength := dataLength + anInteger. |
|
139 |
^anInteger |
|
140 |
! |
|
141 |
||
142 |
put: anObject |
|
143 |
||
144 |
writePosition < cache size ifFalse: [self growBy: 1]. |
|
145 |
||
146 |
cache at: writePosition + 1 put: anObject. |
|
147 |
writePosition := writePosition + 1. |
|
148 |
writePosition > dataLength ifTrue: [dataLength := writePosition]. |
|
149 |
^anObject |
|
150 |
! |
|
151 |
||
152 |
write: aStreamable |
|
153 |
||
154 |
^aStreamable streamingWriteInto: self |
|
155 |
! |
|
156 |
||
157 |
write: anInteger from: aStreamable |
|
158 |
||
159 |
^aStreamable streamingWrite: anInteger into: self |
|
160 |
! |
|
161 |
||
162 |
write: anInteger from: aSequenceableCollection at: startIndex |
|
163 |
||
164 |
writePosition + anInteger <= cache size ifFalse: [self growBy: anInteger]. |
|
165 |
self privateWrite: anInteger from: aSequenceableCollection at: startIndex. |
|
166 |
^anInteger |
|
167 |
! |
|
168 |
||
169 |
writePosition |
|
170 |
^writePosition |
|
171 |
! |
|
172 |
||
173 |
writePosition: aPosition |
|
174 |
^writePosition := (aPosition max: 0) min: dataLength |
|
175 |
! |
|
176 |
||
177 |
writeSkip: anInteger |
|
178 |
| old delta | |
|
179 |
old := writePosition. |
|
180 |
self writePosition: old + anInteger. |
|
181 |
delta := writePosition - old. |
|
182 |
^delta |
|
183 |
! ! |
|
184 |
||
185 |
!Buffer methodsFor:'compressing'! |
|
186 |
||
187 |
clear |
|
188 |
self clearCache. |
|
189 |
readPosition := writePosition := dataLength := 0 |
|
190 |
! |
|
191 |
||
192 |
trim |
|
193 |
cache := cache copyFrom: readPosition + 1 to: writePosition. |
|
194 |
readPosition := 0. |
|
195 |
dataLength := writePosition := cache size |
|
196 |
! ! |
|
197 |
||
198 |
||
199 |
!Buffer methodsFor:'initialize-release'! |
|
200 |
||
201 |
close |
|
202 |
dataLength := writePosition |
|
203 |
! |
|
204 |
||
205 |
contentsSpecies |
|
206 |
^cache species |
|
207 |
! |
|
208 |
||
209 |
on: aSequenceableCollection |
|
210 |
cache := aSequenceableCollection. |
|
211 |
dataLength := 0. |
|
212 |
readPosition := 0. |
|
213 |
writePosition := 0 |
|
214 |
! |
|
215 |
||
216 |
recycle |
|
217 |
||
218 |
cache recycle |
|
219 |
! ! |
|
220 |
||
221 |
!Buffer methodsFor:'printing'! |
|
222 |
||
223 |
printOn: aStream |
|
224 |
| stream | |
|
225 |
stream := String new writing. |
|
226 |
self streamingPrintOn: stream. |
|
227 |
aStream nextPutAll: stream conclusion |
|
228 |
! |
|
229 |
||
230 |
streamingPrintOn: aStream |
|
231 |
aStream write: self class name; |
|
232 |
space; write: 'data: '; print: dataLength; |
|
233 |
space; write: 'read: '; print: self readPosition; |
|
234 |
space; write: 'write: '; print: self writePosition |
|
235 |
! ! |
|
236 |
||
237 |
!Buffer methodsFor:'private'! |
|
238 |
||
239 |
clearCache |
|
240 |
cache := cache copyEmpty: 0 |
|
241 |
! |
|
242 |
||
243 |
growBy: anInteger |
|
244 |
| replacement | |
|
245 |
replacement := cache copyEmpty: (cache size + anInteger) * 2. |
|
246 |
replacement replaceFrom: 1 to: cache size with: cache startingAt: 1. |
|
247 |
cache := replacement |
|
248 |
! |
|
249 |
||
250 |
privateWrite: anInteger from: aSequenceableCollection at: startIndex |
|
251 |
" Ensure we're only doing with (writePosition + anInteger) <= cache size " |
|
252 |
(writePosition + anInteger) <= cache size ifFalse: [self error: 'invalid privateWrite. Use the #write: protocol instead.']. |
|
253 |
||
254 |
cache replaceFrom: writePosition + 1 to: writePosition + anInteger with: aSequenceableCollection startingAt: startIndex. |
|
255 |
writePosition := writePosition + anInteger. |
|
256 |
writePosition > dataLength ifTrue: [dataLength := writePosition] |
|
257 |
! |
|
258 |
||
259 |
streamingInsert: anInteger into: aWriteStream |
|
260 |
| count | |
|
261 |
count := (writePosition - readPosition) min: anInteger. |
|
262 |
count <= 0 ifTrue: [^0]. |
|
263 |
aWriteStream insert: count from: cache at: readPosition + 1. |
|
264 |
self readSkip: count |
|
265 |
! |
|
266 |
||
267 |
streamingInsertInto: aWriteStream |
|
268 |
| count | |
|
269 |
count := writePosition - readPosition. |
|
270 |
count <= 0 ifTrue: [^0]. |
|
271 |
self streamingInsert: count into: aWriteStream. |
|
272 |
^count |
|
273 |
! |
|
274 |
||
275 |
streamingWrite: anInteger from: aReadStream |
|
276 |
"Implementing this method would allow the buffer to be treated like a stream sometimes. Currently this is not implemented, make a buffer stream on your buffer instead." |
|
277 |
self error: 'not yet implemented' |
|
278 |
! |
|
279 |
||
280 |
streamingWrite: anInteger into: aWriteStream |
|
281 |
| count | |
|
282 |
count := (writePosition - readPosition) min: anInteger. |
|
283 |
count <= 0 ifTrue: [^0]. |
|
284 |
aWriteStream write: count from: cache at: readPosition + 1. |
|
285 |
self readSkip: count. |
|
286 |
^count |
|
287 |
! |
|
288 |
||
289 |
streamingWriteFrom: aReadStream |
|
290 |
"Implementing this method would allow the buffer to be treated like a stream sometimes. Currently this is not implemented, make a buffer stream on your buffer instead." |
|
291 |
self error: 'not yet implemented' |
|
292 |
! |
|
293 |
||
294 |
streamingWriteInto: aWriteStream |
|
295 |
| count | |
|
296 |
count := writePosition - readPosition. |
|
297 |
count <= 0 ifTrue: [^0]. |
|
298 |
self streamingWrite: count into: aWriteStream. |
|
299 |
^count |
|
300 |
! ! |
|
301 |
||
302 |
!Buffer methodsFor:'testing'! |
|
303 |
||
304 |
hasDataToRead |
|
305 |
^readPosition < dataLength |
|
306 |
! |
|
307 |
||
308 |
hasFixedWriteSpace |
|
309 |
^false |
|
310 |
! |
|
311 |
||
312 |
hasSpaceToWrite |
|
313 |
^true |
|
314 |
! ! |
|
315 |
||
316 |
!Buffer class methodsFor:'documentation'! |
|
317 |
||
318 |
version_SVN |
|
319 |
^ '$Id$' |
|
320 |
! ! |