SharedQueue.st
author Claus Gittinger <cg@exept.de>
Sat, 02 May 2020 21:40:13 +0200
changeset 5476 7355a4b11cb6
parent 5453 0ecf0fc76b40
permissions -rw-r--r--
#FEATURE by cg class: Socket class added: #newTCPclientToHost:port:domain:domainOrder:withTimeout: changed: #newTCPclientToHost:port:domain:withTimeout:

"{ Encoding: utf8 }"

"
 COPYRIGHT (c) 1993 by Claus Gittinger
	      All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
"{ Package: 'stx:libbasic2' }"

"{ NameSpace: Smalltalk }"

Queue subclass:#SharedQueue
	instanceVariableNames:'dataAvailable spaceAvailable accessLock'
	classVariableNames:''
	poolDictionaries:''
	category:'Kernel-Processes'
!

!SharedQueue class methodsFor:'documentation'!

copyright
"
 COPYRIGHT (c) 1993 by Claus Gittinger
	      All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
!

documentation
"
    SharedQueues provide a safe mechanism for processes to communicate.
    They are basically Queues, with added secure access to the internals,
    allowing use from multiple processes (i.e. the access methods use
    critical regions to protect against confusion due to a process
    switch within a modification).

    Also, sharedQueues can be used for synchronization, since a reading
    process will be blocked when attempting to read an empty queue, while
    a writer will be blocked when attempting to write into a full queue.
    For nonBlocking read, use #isEmpty; for nonBlocking write, use #isFull.

    Be warned:
        if a reader process wants to add elements to the sharedqueue in its
        read-loop, it may block, if the queue is full, leading to a deadlock.
        The reason is that the sharedQueues size is fixed, and any write is blocked
        if the queue is full.
        For this situations, please use an UnlimitedSharedQueue, which grows in that
        particular situation.
        
    See samples in doc/coding.

    [author:]
        Claus Gittinger

    [see also:]
        SharedCollection
        UnlimitedSharedQueue
        Queue
        Semaphore
        Process
        CodingExamples::SharedQueueExamples
"
!

examples
"
                                                                    [exBegin]
    |queues readers writers seqNumber accessLock accessLock2
     numbersStillToReceive|

    seqNumber := 1.
    accessLock := Semaphore forMutualExclusion.
    accessLock2 := Semaphore forMutualExclusion.

    numbersStillToReceive := BooleanArray new:100000 withAll:true.

    queues := (1 to:10) collect:[:i | SharedQueue new].
    readers := (1 to:10) collect:[:i |
                                    [   |num|
                                        10000 timesRepeat:[
                                            num := (queues at:i) next.
                                            accessLock2 critical:[
                                                (numbersStillToReceive at:num) ifFalse:[
                                                    self halt:(num printString , ' received twice')
                                                ] ifTrue:[
                                                    numbersStillToReceive at:num put:false.
                                                ].
                                            ].
                                            'num printCR.'.
                                        ].
                                    ] fork
                                 ].
    writers := (1 to:10) collect:[:i |
                                    [   |num|

                                        10000 timesRepeat:[
                                            accessLock critical:[
                                                num := seqNumber.
                                                seqNumber := seqNumber + 1.
                                            ].
                                            (queues at:i) nextPut:num.
                                        ]
                                    ] fork
                                 ].

    readers do:[:aReader | aReader waitUntilTerminated].

    ' any left ? '.
    (numbersStillToReceive includes:true) ifTrue:[
        self halt:'oops - not all numbers received'
    ]
                                                                    [exEnd]

   deadlock example:
   here, a read process tries to write !!
                                                                    [exBegin]
    |queue reader writer|

    queue := SharedQueue new:10.
    reader := 
        [   
            |num|
            
            [ (num := queue next) ~~ #EOF] whileTrue:[
                'here is the bad code: writing into the queue !!'.
                num == 30 ifTrue:[
                    Transcript showCR:'xxx'.
                    queue nextPut:'bad1'.
                    queue nextPut:'bad2'.
                ].
                Transcript showCR:num.
                Delay waitForSeconds:0.01.
            ].
        ] fork.

    writer := 
        [   |num|

            1 to:60 do:[:seqNr |
                queue nextPut:seqNr.
            ].
            queue nextPut:#EOF.
        ] fork.

    reader waitUntilTerminated.
    writer waitUntilTerminated.
                                                                        [exEnd]
"
! !

!SharedQueue methodsFor:'accessing'!

remove:anElement ifAbsent:exceptionalValue
    |retVal noSuchElement|

    noSuchElement := false.
    retVal := accessLock critical:[
        super remove:anElement ifAbsent:[noSuchElement := true]
    ].
    noSuchElement ifTrue:[
        ^ exceptionalValue value.
    ].
    dataAvailable consume.
    spaceAvailable signal.
    ^ retVal.

    "Created: / 22-02-2017 / 14:53:13 / stefan"
    "Modified: / 22-02-2017 / 16:50:49 / stefan"
!

removeAll
    "remove all elements in the queue; do not wait, but
     synchronize access to the queue.
     If the queue was full before, signal space-availability to writers.
     This can be used to flush queues in multi-process applications,
     when cleanup is required."

    |count|

    accessLock critical:[
        [
            count := dataAvailable count.
            dataAvailable clear.
        ] valueUninterruptably.
        super removeAll.
    ].
    count timesRepeat:[spaceAvailable signal]. 
!

removeIdentical:anElement ifAbsent:exceptionalValue
    |retVal noSuchElement|

    noSuchElement := false.
    accessLock critical:[
        retVal := super removeIdentical:anElement ifAbsent:[noSuchElement := true].
        noSuchElement ifFalse:[
            dataAvailable consume.
        ].
    ].
    noSuchElement ifTrue:[
        ^ exceptionalValue value.
    ].
    spaceAvailable signal.
    ^ retVal.

    "Modified: / 22-02-2017 / 17:05:10 / stefan"
!

removeLast
    "return the last value in the queue; if it its empty, wait until
     something is put into the receiver.
     When the datum has been removed, signal space-availability to
     writers"

    |retVal|

    dataAvailable wait.
    retVal := accessLock critical:[super removeLast].
    spaceAvailable signal.

    ^ retVal.

    "Modified: / 22-02-2017 / 16:50:39 / stefan"
! !

!SharedQueue methodsFor:'accessing-internals'!

accessLock
    "return the critical access-semaphore which is used internally to synchronize access"

    ^ accessLock

    "Modified (comment): / 22-05-2017 / 12:08:49 / mawalch"
!

readSemaphore
    "return the semaphore which is signalled when data is available
     for reading."

    ^ dataAvailable

    "Modified: 16.12.1995 / 13:47:11 / cg"
!

readWaitWithTimeoutMs:ms
    "Return true if a timeout occurred (i.e. false, if data is available)."

    ^ (dataAvailable waitUncountedWithTimeoutMs:ms) isNil.
!

superNextPut:anObject
    "private; to allow subclasses to call the basic nextPut (w.o. synchronization)"

    super nextPut:anObject.

    "Modified: / 22-02-2017 / 16:34:11 / stefan"
!

superNextPutFirst:anObject
    "private; to allow subclasses to call the basic nextPutFirst (w.o. synchronization)"

    super nextPutFirst:anObject.

    "Modified: / 22-02-2017 / 16:34:16 / stefan"
!

withAccessLockedDo:aBlock
    "evaluate aBlock while access via next/nextPut are blocked."

    accessLock critical:aBlock
!

writeSemaphore
    "return the semaphore which is signalled when the queue has space
     for writing."

    ^ spaceAvailable

    "Modified: 16.12.1995 / 13:47:07 / cg"
! !

!SharedQueue methodsFor:'accessing-reading'!

next
    "return the next value in the queue; if it its empty, wait 'til
     something is put into the receiver.
     When the datum has been removed, signal space-availability to writers"

    |retVal|

    "/ need a loop here, in case someone else was reading the element in-between the
    "/ wait and the accessLock-critical
    [
        dataAvailable wait.
        retVal := accessLock critical:[super nextOrNil].
    ] doWhile:[retVal isNil].
    spaceAvailable signal.

    ^ retVal.

    "Modified: / 22-02-2017 / 14:40:32 / stefan"
    "Modified (comment): / 25-06-2019 / 14:28:44 / Claus Gittinger"
!

nextIfEmpty:exceptionBlock
    "return the next value in the queue; 
     if it is empty do not wait, but return the value of exceptionBlock.
     When a datum has been removed, signal space-availability to writers"

    |retVal isEmpty|

    retVal := accessLock critical:[
        isEmpty := self isEmpty.
        isEmpty ifFalse:[
            dataAvailable consume.
            super nextOrNil
        ].
    ].
    isEmpty ifTrue:[
        ^ exceptionBlock value
    ].
    spaceAvailable signal.
    ^ retVal.

    "Modified: / 22-02-2017 / 17:03:23 / stefan"
    "Modified (comment): / 24-05-2018 / 09:23:48 / Claus Gittinger"
!

nextOrNil
    "return the next value in the queue; 
     if it is empty do not wait, but return nil.
     When a datum has been removed, signal space-availability to writers"

    ^ self nextIfEmpty:nil

    "Created: / 31-05-2007 / 15:09:33 / cg"
    "Modified (comment): / 25-06-2019 / 14:22:33 / Claus Gittinger"
!

nextWithTimeout:secondsOrTimeDurationOrNil
    "return the next value in the queue; if it its empty, wait until
     something is put into the receiver.
     When the datum has been removed, signal space-availability to writers.
     Answer nil if a timeout occurs.

     The argument may be a time duration or the number of seconds as integer
     or float (i.e. use 0.1 for a 100ms timeout).
     With zero timeout, this can be used to poll a semaphore (returning
     the receiver if the semaphore is available, nil if not).
     However, polling is not the intended use of semaphores, though.
     If the argument is nil, wait without timeout (forever)."

    |retVal|

    (dataAvailable waitWithTimeout:secondsOrTimeDurationOrNil) isNil ifTrue:[
        ^ nil
    ].
    retVal := accessLock critical:[super nextOrNil].
    spaceAvailable signal.

    ^ retVal.

    "Modified: / 22-02-2017 / 14:35:09 / stefan"
    "Modified (format): / 25-06-2019 / 14:28:27 / Claus Gittinger"
!

peek
    self isEmpty ifTrue:[
        dataAvailable waitUncounted.
    ].
    ^ super peek
! !

!SharedQueue methodsFor:'accessing-writing'!

nextPut:anObject
    "enter anObject to the end of the queue; 
     Wait for available space, if the queue is full. 
     After the put, signal availablity of a datum to readers.
     Answer anObject"

    self commonWriteWith:[self superNextPut:anObject].
    ^ anObject

    "Modified (comment): / 22-02-2017 / 15:18:36 / stefan"
!

nextPutFirst:anObject
    "insert anObject at the beginning of the queue; 
     Wait for available space, if the queue is full. 
     After the put, signal availablity of a datum to readers.
     Insertion at the beginning may be useful to add hi-prio elements (for example, in a job-scheduler)"

    self commonWriteWith:[self superNextPutFirst:anObject].

    "Modified (comment): / 22-02-2017 / 15:18:42 / stefan"
!

nonBlockingNextPut:aValue
    "add data to the queue, but do not block if the queue is full"

    accessLock critical:[
        spaceAvailable consume.
        super nextPut:aValue.
        dataAvailable signal.
    ].

    "Created: / 19-02-2020 / 17:34:36 / Stefan Vogel"
! !

!SharedQueue methodsFor:'enumerating'!

do:anObject
    "evaluate the argument, aBlock for each element in the queue"

    accessLock critical:[super do:anObject].

    "Modified: / 22-02-2017 / 14:54:45 / stefan"
!

reverseDo:anObject
    "evaluate the argument, aBlock for each element in the queue"

    accessLock critical:[super reverseDo:anObject].

    "Created: / 22-02-2017 / 14:54:22 / stefan"
! !

!SharedQueue methodsFor:'initialization'!

init:size
    "initialize the receiver for size entries"

    super init:size.
    dataAvailable := Semaphore name:'shared q-read'.
    dataAvailable owner:self.
    spaceAvailable := (Semaphore new:size) name:'shared q-write'.
    spaceAvailable owner:self.
    accessLock := RecursionLock new.

    "Modified: / 09-08-2017 / 11:59:38 / cg"
! !

!SharedQueue methodsFor:'private'!

commonWriteWith:aBlock
    "common code for nextPut / nextPutFirst; 
     wait for available space, if the queue is full. 
     After the put, signal availablity of a datum to readers."

    spaceAvailable wait.
    accessLock critical:[
        aBlock value.
        dataAvailable signal.
    ].

    "Modified (comment): / 22-02-2017 / 15:18:03 / stefan"
! !

!SharedQueue class methodsFor:'documentation'!

version
    ^ '$Header$'
!

version_CVS
    ^ '$Header$'
! !