SharedQueue.st
author Stefan Vogel <sv@exept.de>
Tue, 31 Oct 2000 19:37:23 +0100
changeset 916 63fddba933d6
parent 845 39e962f58eb3
child 917 df608391baa5
permissions -rw-r--r--
Fix bug with two reader and two writer processes. Code is now simpler and faster.

"
 COPYRIGHT (c) 1993 by Claus Gittinger
	      All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"

"{ Package: 'stx:libbasic2' }"

Queue subclass:#SharedQueue
	instanceVariableNames:'dataAvailable spaceAvailable'
	classVariableNames:''
	poolDictionaries:''
	category:'Kernel-Processes'
!

!SharedQueue class methodsFor:'documentation'!

copyright
"
 COPYRIGHT (c) 1993 by Claus Gittinger
	      All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
!

documentation
"
    SharedQueues provide a safe mechanism for processes to communicate.
    They are basically Queues, with added secure access to the internals,
    allowing use from multiple processes (i.e. the access methods use
    critical regions to protect against confusion due to a process
    switch within a modification).

    Also, sharedQueues can be used for synchronization, since a reading
    process will be blocked when attempting to read an empty queue, while
    a writer will be blocked when attempting to write into a full queue.
    For nonBlocking read, use #isEmpty; for nonBlocking write, use #isFull.

    See samples in doc/coding.

    [author:]
        Claus Gittinger

    [see also:]
        Semaphore
        Process
        CodingExamples::SharedQueueExamples
"
!

examples
"
    |queues readers writers seqNumber accessLock accessLock2
     numbersStillToReceive|

    seqNumber := 1.
    accessLock := Semaphore forMutualExclusion.
    accessLock2 := Semaphore forMutualExclusion.

    numbersStillToReceive := BooleanArray new:100000 withAll:true.

    queues := (1 to:10) collect:[:i | SharedQueue new].
    readers := (1 to:10) collect:[:i |
                                    [   |num|
                                        10000 timesRepeat:[
                                            num := (queues at:i) next.
                                            accessLock2 critical:[
                                                (numbersStillToReceive at:num) ifFalse:[
                                                    self halt:(num printString , ' received twice')
                                                ] ifTrue:[
                                                    numbersStillToReceive at:num put:false.
                                                ].
                                            ].
                                            'num printCR.'.
                                        ].
                                    ] fork
                                 ].
    writers := (1 to:10) collect:[:i |
                                    [   |num|

                                        10000 timesRepeat:[
                                            accessLock critical:[
                                                num := seqNumber.
                                                seqNumber := seqNumber + 1.
                                            ].
                                            (queues at:i) nextPut:num.
                                        ]
                                    ] fork
                                 ].

    readers do:[:aReader | aReader waitUntilTerminated].

    ' any left ? '.
    (numbersStillToReceive includes:true) ifTrue:[
        self halt:'oops - not all numbers received'
    ]
"
! !

!SharedQueue methodsFor:'accessing'!

next
    "return the next value in the queue; if it its empty, wait 'til
     something is put into the receiver.
     When the datum has been removed, signal space-availability to
     writers"

    |value|

    dataAvailable wait.
    value := super next.
    spaceAvailable signal.

    ^ value.!

nextPut:anObject
    "enter anObject into the queue; wait for available space, if
     the queue is full. After the put, signal availablity of a datum
     to readers."

    spaceAvailable wait.
    super nextPut:anObject.
    dataAvailable signal.
    ^ anObject.!

readSemaphore
    "return the semaphore which is signalled when data is available
     for reading."

    ^ dataAvailable

    "Modified: 16.12.1995 / 13:47:11 / cg"
!

removeAll
    "remove all elements in the queue; do not wait, but
     synchronize access to the queue.
     If the queue was full before, signal space-availability to writers.
     This can be used to flush queues in multi-process applications,
     when cleanup is required."

    |count|

    count := 0.
    [(dataAvailable waitWithTimeout:0) notNil] whileTrue:[
        count := count + 1.
    ].    
    super removeAll.
    count timesRepeat:[spaceAvailable signal].!

removeLast
    "return the last value in the queue; if it its empty, wait 'til
     something is put into the receiver.
     When the datum has been removed, signal space-availability to
     writers"

    |value|

    dataAvailable wait.
    value := super removeLast.
    spaceAvailable signal.

    ^ value.!

writeSemaphore
    "return the semaphore which is signalled when the queue has space
     for writing."

    ^ spaceAvailable

    "Modified: 16.12.1995 / 13:47:07 / cg"
! !

!SharedQueue methodsFor:'initialization'!

init:size
    "initialize the receiver for size entries"

    super init:size.
    dataAvailable := Semaphore new name:'shared q-read'.
    spaceAvailable := (Semaphore new:size) name:'shared q-write'

    "Modified: 25.1.1997 / 00:19:45 / cg"! !

!SharedQueue class methodsFor:'documentation'!

version
    ^ '$Header: /cvs/stx/stx/libbasic2/SharedQueue.st,v 1.22 2000-10-31 18:37:23 stefan Exp $'
! !