core/Xtreams__RingBuffer.st
author Martin Kobetic
Sun, 17 Nov 2013 00:22:31 -0500
changeset 144 e193a6772be4
parent 109 9587e2df7029
permissions -rw-r--r--
merging

"{ Package: 'stx:goodies/xtreams/core' }"

"{ NameSpace: Xtreams }"

Buffer subclass:#RingBuffer
	instanceVariableNames:'dataPosition'
	classVariableNames:''
	poolDictionaries:'XtreamsPool'
	category:'Xtreams-Core'
!

RingBuffer comment:'RingBuffer is a specialized buffer that has a fixed sized cache. The cache is treated as ring/circle, such that when we get to the end of the cache, we continue reading and writing data from the beginning of the cache. If you write more data than you have written, the unread data is lost. The readPosition and writePosition are always relative to the dataPosition, which is a hidden implementation detail.

Instance Variables
	dataPosition    <ArithmeticValue>       the position in our cache where our data starts

'
!


!RingBuffer methodsFor:'accessing'!

contentsFuture
	| buffer read tail |
	cache isEmpty ifTrue: [^self contentsSpecies new].
	read := (dataPosition + readPosition) \\ cache size.
	tail := (cache size - read) min: self readSize.

	buffer := self contentsSpecies withSize: self readSize.
	buffer replaceFrom: 1 to: tail with: cache startingAt: read + 1.
	buffer replaceFrom: tail + 1 to: self readSize - tail with: cache startingAt: 1.
	^buffer
!

contentsPast
	| buffer tail |
	cache isEmpty ifTrue: [^self contentsSpecies new].
	tail := (cache size - dataPosition) min: self writePosition.

	buffer := self contentsSpecies withSize: self writePosition.
	buffer replaceFrom: 1 to: tail with: cache startingAt: dataPosition + 1.
	buffer replaceFrom: tail + 1 to: buffer size with: cache startingAt: 1.
	^buffer
! !

!RingBuffer methodsFor:'accessing - reading'!

get
	| read |
	dataLength = readPosition ifTrue: [Incomplete zero raise].
	read := (dataPosition + readPosition) \\ cache size.
	readPosition := readPosition + 1.
	^cache at: read + 1
!

read: anInteger into: aSequenceableCollection at: startIndex
	| count read tail |

	count := anInteger min: (dataLength - readPosition).
	read := (dataPosition + readPosition) \\ cache size.
	tail := (cache size - read) min: count.

	aSequenceableCollection replaceFrom: startIndex to: startIndex + tail - 1 with: cache startingAt: read + 1.
	aSequenceableCollection replaceFrom: startIndex + tail to: startIndex + count - 1 with: cache startingAt: 1.
	readPosition := readPosition + count.

	count < anInteger ifTrue: [(Incomplete on: aSequenceableCollection count: count at: startIndex) raise].
	^anInteger
! !

!RingBuffer methodsFor:'accessing - writing'!

insert: anInteger from: aSequenceableCollection at: startIndex

	| tail write temp length position |
	length := dataLength - writePosition.
	write := (dataPosition + writePosition + 1) \\ cache size.
	tail := (cache size - write) min: length.
	temp := self contentsSpecies newRecycled: (length max: DefaultBufferSize).
	temp replaceFrom: 1 to: tail with: cache startingAt: write.
	temp replaceFrom: tail + 1 to: length with: cache startingAt: 1.
	dataLength := dataLength - length.
	self write: anInteger from: aSequenceableCollection at: startIndex.
	position := self writePosition.
	self write: length from: temp.
	self writePosition: position.
	temp recycle.
	^anInteger
!

put: anObject
	| write |

	write := (dataPosition + writePosition) \\ cache size.
	cache at: write + 1 put: anObject.

	self privateWrapDataPosition: 1.
	writePosition := (writePosition + 1) min: cache size.
	writePosition > dataLength ifTrue: [dataLength := writePosition].
	^anObject
!

write: anInteger from: aSequenceableCollection at: startIndex

	anInteger < cache size ifTrue:  [self privateWrite: anInteger from: aSequenceableCollection at: startIndex. ^anInteger].

	" Do the quick store, since we're a ring "
	dataPosition := readPosition := 0.
	dataLength := writePosition := cache size.
	cache replaceFrom: 1 to: cache size with: aSequenceableCollection startingAt: startIndex + anInteger - cache size.
	^anInteger
! !

!RingBuffer methodsFor:'compressing'!

clear
	super clear.
	dataPosition := 0
!

trim
	| read write |
	read := (dataPosition + readPosition) \\ cache size.
	write := (dataPosition + writePosition) \\ cache size.
	read <= write
		ifTrue: [cache := cache copyFrom: read + 1 to: write + 1]
		ifFalse:
			[ | replacement readSize |
			readSize := cache size - read.
			replacement := cache copyEmpty: write + readSize.
			replacement replaceFrom: 1 to: readSize with: cache startingAt: read + 1.
			replacement replaceFrom: readSize + 1 to: replacement size with: cache startingAt: write + 1.
			cache := replacement].
	dataPosition := readPosition := 0.
	dataLength := writePosition := cache size
! !

!RingBuffer methodsFor:'initialize-release'!

on: aSequenceableCollection
	super on: aSequenceableCollection.
	dataPosition := 0
! !

!RingBuffer methodsFor:'private'!

clearCache
!

growBy: anInteger
	self shouldNotImplement
!

privateWrapDataPosition: anInteger
"       inflatedLength <= cache size --- do nothing
	inflatedLength >   cache size --- move dataPosition and readPosition
"
	| inflatedLength overflow |

	inflatedLength := writePosition + anInteger.
	inflatedLength <= cache size ifTrue: [^self].
	overflow := inflatedLength - cache size.
	readPosition := (readPosition - overflow) max: 0.
	dataPosition := (dataPosition + overflow) \\ cache size
!

privateWrite: anInteger from: aSequenceableCollection at: startIndex
	| write tail |

	anInteger > 0 ifFalse: [^self].
	" Ensure we're only doing with anInteger <= cache size "
	anInteger > cache size ifTrue: [self error: 'invalid privateWrite. Use the #write: protocol instead.'].

	write := (dataPosition + writePosition) \\ cache size.
	tail := (cache size - write) min: anInteger.

	cache replaceFrom: write + 1 to: write + tail with: aSequenceableCollection startingAt: startIndex.
	cache replaceFrom: 1 to: anInteger - tail with: aSequenceableCollection startingAt: startIndex + tail.

	self privateWrapDataPosition: anInteger.
	writePosition := (writePosition + anInteger) min: cache size.
	writePosition > dataLength ifTrue: [dataLength := writePosition]
!

streamingInsert: anInteger into: aWriteStream

	| read amount tail head |
	amount := (dataLength - readPosition) min: anInteger.
	read := (dataPosition + readPosition) \\ cache size.
	tail := cache size - read min: amount.
	head := amount - tail.
	aWriteStream insert: tail from: cache at: read + 1.
	head <= 0 ifTrue: [^self readSkip: tail].
	aWriteStream insert: head from: cache at: 1.
	self readSkip: tail + head
!

streamingInsertInto: aWriteStream

	| count |
	count := dataLength - readPosition.
	self streamingInsert: count into: aWriteStream.
	^count
!

streamingWrite: anInteger into: aWriteStream

	| read amount tail head |
	amount := (dataLength - readPosition) min: anInteger.
	read := (dataPosition + readPosition) \\ cache size.
	tail := cache size - read min: amount.
	head := amount - tail.
	aWriteStream write: tail from: cache at: read + 1.
	head <= 0 ifTrue: [self readSkip: tail. ^anInteger].
	aWriteStream write: head from: cache at: 1.
	self readSkip: tail + head.
	^anInteger
!

streamingWriteInto: aWriteStream

	| count |
	count := dataLength - readPosition.
	self streamingWrite: count into: aWriteStream.
	^count
! !

!RingBuffer methodsFor:'testing'!

hasFixedWriteSpace
	^true
!

hasSpaceToWrite
	^self writeSize isZero not
! !

!RingBuffer class methodsFor:'documentation'!

version_HG

    ^ '$Changeset: <not expanded> $'
!

version_SVN
    ^ '$Id: Xtreams__RingBuffer.st 75 2012-01-30 22:33:34Z mkobetic $'
! !