Semaphore.st
author Claus Gittinger <cg@exept.de>
Tue, 09 Jul 2019 20:55:17 +0200
changeset 24417 03b083548da2
parent 24392 aa300a0e1b6c
child 25268 9b31493cf516
permissions -rw-r--r--
#REFACTORING by exept class: Smalltalk class changed: #recursiveInstallAutoloadedClassesFrom:rememberIn:maxLevels:noAutoload:packageTop:showSplashInLevels: Transcript showCR:(... bindWith:...) -> Transcript showCR:... with:...

"{ Encoding: utf8 }"

"
 COPYRIGHT (c) 1993 by Claus Gittinger
	      All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
"{ Package: 'stx:libbasic' }"

"{ NameSpace: Smalltalk }"

Object subclass:#Semaphore
	instanceVariableNames:'count waitingProcesses lastOwnerId name owner'
	classVariableNames:''
	poolDictionaries:''
	category:'Kernel-Processes'
!

!Semaphore class methodsFor:'documentation'!

copyright
"
 COPYRIGHT (c) 1993 by Claus Gittinger
	      All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
!

documentation
"
    Semaphores are used to synchronize processes providing a nonBusy wait
    mechanism. A process can wait for the availability of some resource by
    performing a Semaphore>>wait, which will suspend the process until the
    resource becomes available. Signalling is done by (another process performing)
    Semaphore>>signal.
    If the resource has been already available before the wait, no suspending is
    done, but the resource immediately allocated.
    The resource internally keeps a count, the number of times the resource can be
    allocated. If the semaphore is created with a count greater than 1, the sema
    can be waited-upon that many times without blocking.
    On the other hand, if initialized with a negative count, the semaphore
    must be signalled that many times more often in order for a wait to not block.
    In other words: whenever the semaphore has a count greater than zero,
    the wait operation will proceed. Otherwise, it will block until the count has
    been incremented by signal operations to be greater than zero.

    There are also semaphores for mutual access to a critical region
    (Semaphore>>forMutualExclusion and Semaphore>>critical:).

    Additional protocol is provided for oneShot semaphores,
    (#signalOnce) and for conditional signalling (#signalIf).

    You can also attach semaphores to external events (such as I/O arrival or
    timer events).
    This is done by telling the Processor to signal the semaphore
    under some condition.
    See 'Processor>>signal:afterSeconds:', 'Processor>>signal:onInput:' etc.

    See examples in doc/coding (found in the CodingExamples-nameSpace).

    Warning/Note/Hint:
	a Semaphore-forMutualExclusion does NEVER allow for the critical
	region to be entered twice - NOT EVEN by the same process.
	That means, that a recursive attempt to enter that section leads
	to a deadlock.
	Use a RecursionLock instead, to avoid this.

    Hint:
	now (Jul2002), Semaphores now allow for a negative count; this allows for
	a sync-point to be implemented easily (i.e. to wait for multiple other processes
	to arrive at a sync-point).
	See examples.


    [instance variables:]
	count                   <SmallInteger>          the number of waits, that will go through
							without blocking.
							Incremented on #signal; decremented on #wait.

	waitingProcesses        <OrderedCollection>     waiting processes - will be served first
							come first served when signalled.

	lastOwnerId             <SmallInteger>          a debugging aid: set when count drops
							to zero to the current processes id.
							Helps in finding deadlocks.

	name                    <String>                a debugging aid: an optional userFriendly
							name; helps to identify a semaphore easier.

    [see also:]
	SemaphoreSet RecursionLock Monitor
	SharedQueue Delay
	Process ProcessorScheduler

    [author:]
	Claus Gittinger
"
!

examples
"
    two processes synchronizing on a sema:
                                                        [exBegin]
        |sema thread1 thread2|

        sema := Semaphore new.

        thread1 := [
                        Transcript showCR:'here is thread 1; now waiting ...'.
                        sema wait.
                        Transcript showCR:'here is thread 1 again.'.
                   ] newProcess.

        thread2 := [
                        Transcript showCR:'here is thread 2; delaying a bit ...'.
                        Delay waitForSeconds:5.
                        Transcript showCR:'here is thread 2 again; now signalling the sema'.
                        sema signal.
                        Transcript showCR:'here is thread 2 after the signalling.'.
                  ] newProcess.

        thread1 priority:7.
        thread2 priority:6.

        thread1 resume.
        thread2 resume.
                                                        [exEnd]

    semaphore for critical regions:
                                                        [exBegin]
        |accessLock|

        accessLock := Semaphore forMutualExclusion.

        [
            5 timesRepeat:[
                Delay waitForSeconds:2.
                accessLock critical:[
                    Transcript showCR:'thread1 in critical region'.
                    Delay waitForSeconds:1.
                    Transcript showCR:'thread1 leaving critical region'.
                ].
            ]
        ] forkAt:5.

        [
            5 timesRepeat:[
                Delay waitForSeconds:1.
                accessLock critical:[
                    Transcript showCR:'thread2 in critical region'.
                    Delay waitForSeconds:2.
                    Transcript showCR:'thread2 leaving critical region'.
                ].
            ]
        ] forkAt:4.
                                                        [exEnd]

    a deadlock due to recursive enter of a critical region:
                                                        [exBegin]
        |accessLock block|

        accessLock := Semaphore forMutualExclusion.

        block := [:arg |
                    Transcript showCR:'about to enter'.
                    accessLock critical:[
                        Transcript showCR:'entered - doing action'.
                        arg value
                    ].
                    Transcript showCR:'left region'.
                 ].

        block value:[].                 'this works'.
        block value:[block value:[] ].  'this deadlocks'.
                                                        [exEnd]

    Avoid the deadlock by using a RecursionLock instead:
                                                        [exBegin]
        |accessLock block|

        accessLock := RecursionLock new.

        block := [:arg |
                    Transcript showCR:'about to enter'.
                    accessLock critical:[
                        Transcript showCR:'entered - doing action'.
                        arg value
                    ].
                    Transcript showCR:'left region'.
                 ].

        block value:[].                 'this works'.
        block value:[block value:[] ].  'this deadlocks'.
                                                        [exEnd]


   Wait for multiple processes to arrive at a sync-point:
                                                        [exBegin]
        |syncSema proceedSema thread1 thread2 thread3|

        syncSema := Semaphore new.
        syncSema setCount:(1-3).
        proceedSema := Semaphore new.

        thread1 := [
                        Transcript showCR:'here is thread 1; now busy ...'.
                        Delay waitForSeconds:(2 + (Random nextIntegerBetween:2 and:4)).
                        Transcript showCR:'here is thread 1 again - now syncing.'.
                        syncSema signal.
                        Transcript showCR:'thread 1 is waiting for all others...'.
                        proceedSema wait.
                        Transcript showCR:'thread 1 done.'.
                   ] newProcess.

        thread2 := [
                        Transcript showCR:'here is thread 2; now busy ...'.
                        Delay waitForSeconds:(3 + (Random nextIntegerBetween:2 and:4)).
                        Transcript showCR:'here is thread 2 again - now syncing.'.
                        syncSema signal.
                        Transcript showCR:'thread 2 is waiting for all others...'.
                        proceedSema wait.
                        Transcript showCR:'thread 2 done.'.
                  ] newProcess.

        thread3 := [
                        Transcript showCR:'here is thread 3; now busy ...'.
                        Delay waitForSeconds:(4 + (Random nextIntegerBetween:2 and:4)).
                        Transcript showCR:'here is thread 3 again - now syncing.'.
                        syncSema signal.
                        Transcript showCR:'thread 3 is waiting for all others...'.
                        proceedSema wait.
                        Transcript showCR:'thread 3 done.'.
                  ] newProcess.

        thread1 priority:7.
        thread2 priority:6.
        thread3 priority:9.

        thread1 resume.
        thread2 resume.
        thread3 resume.

        Transcript showCR:'main thread: now waiting for other threads...'.
        syncSema wait.
        Transcript showCR:'main thread: all other threads at syncPoint.'.
        Delay waitForSeconds:2.
        Transcript showCR:'main thread: now let them proceed...'.
        proceedSema signalForAll.
        Transcript showCR:'main thread: done.'.
                                                        [exEnd]

   waitWithTimeout:0 can also be used to conditionally acquire the semaphore
   i.e. only acquire it if it is available.
     |s|

     s := Semaphore new.
     [
          (s waitWithTimeout:0) notNil ifTrue:[
                Transcript showCR:'process1 got the sema'.
                Delay waitForSeconds:1.
                Transcript showCR:'process1 signals sema'.
                s signal.
          ] ifFalse:[
                Transcript showCR:'process1 has NOT got the sema'.
          ].
     ] fork.
     [
          (s waitWithTimeout:0) notNil ifTrue:[
                Transcript showCR:'process2 got the sema'.
                Delay waitForSeconds:1.
                Transcript showCR:'process2 signals sema'.
                s signal.
          ] ifFalse:[
                Transcript showCR:'process2 has NOT got the sema'.
          ]
     ] fork.
     s signal.
     Delay waitForSeconds:0.5.
     Transcript showCR:'master waits for sema'.
     s wait.
     Transcript showCR:'master got the sema'.
"
! !

!Semaphore class methodsFor:'instance creation'!

cleanup
    "an emergency helper: manually signal all semaphores which were held by a now dead process.
     Can only (;-?) happen, if a semaphore-holding process was hard terminated
     (i.e. no ensure handling happened), and semas remain in a bad state."

    self allSubInstancesDo:[:sema |
        |lastOwner|

        (sema count == 0
         and:[(lastOwner := sema lastOwner) notNil
         and:[lastOwner isDead]]) ifTrue:[
            sema signal
        ]
    ]

    "Modified: / 20-02-2017 / 15:58:34 / stefan"
!

forMutualExclusion
    "create & return a new semaphore which allows exactly one process to
     wait on it without blocking. This type of semaphore is used
     for mutual exclusion from critical regions (see #critical:).
     Also see RecursionLock, to avoid deadlock in case of recursive entered
     critical regions."

    ^ self basicNew setCount:1 name:'criticalRegionSema'

    "Modified: / 18-02-2017 / 17:27:14 / stefan"
    "Modified: / 09-08-2017 / 11:50:15 / cg"
!

name:aString
    "create & return a new semaphore which blocks until a signal is sent"

    ^ self basicNew setCount:0 name:aString

    "Created: / 09-08-2017 / 11:49:18 / cg"
!

new
    "create & return a new semaphore which blocks until a signal is sent"

    ^ self basicNew setCount:0

    "Modified: / 20-02-2017 / 10:26:04 / stefan"
!

new:n
    "create & return a new semaphore which allows n waits before
     blocking"

    ^ self basicNew setCount:n

    "Modified: / 18-02-2017 / 17:27:26 / stefan"
! !

!Semaphore methodsFor:'Compatibility-Squeak'!

isSignaled
    ^ self wouldBlock not
!

waitTimeoutMSecs:milliSeconds
    ^ self waitWithTimeoutMs:milliSeconds state:#wait

    "Modified: / 24-07-2017 / 21:18:01 / cg"
!

waitTimeoutSeconds:seconds
    ^ self waitWithTimeout:seconds
! !

!Semaphore methodsFor:'accessing'!

owner
    "an optional reference to someone who owns this semaphore,
     typically a shared queue or a windowgroup or similar.
     This has no semantic meaning and is only used to support debugging"
     
    ^ owner
!

owner:something
    "an optional reference to someone who owns this semaphore,
     typically a shared queue or a windowgroup or similar.
     This has no semantic meaning and is only used to support debugging"

    owner := something.
! !

!Semaphore methodsFor:'printing & storing'!

displayOn:aGCOrStream
    "return a string to display the receiver - include the
     count for your convenience"

    "/ what a kludge - Dolphin and Squeak mean: printOn: a stream;
    "/ old ST80 means: draw-yourself on a GC.
    (aGCOrStream isStream) ifFalse:[
        ^ super displayOn:aGCOrStream
    ].
    aGCOrStream
        nextPutAll:self className;
        nextPut:$(.
    count printOn:aGCOrStream.
    aGCOrStream nextPutAll:' name: '.
    (name ? 'unnamed') printOn:aGCOrStream.
    aGCOrStream nextPut:$).

    "Modified: / 28-06-1997 / 16:21:09 / cg"
    "Modified (comment): / 22-02-2017 / 16:47:54 / cg"
    "Modified: / 28-06-2019 / 09:12:00 / Claus Gittinger"
!

name
    "return the semaphore's userFriendly name (only used by semaphore- and process monitors)"

    ^ name

    "Modified (comment): / 23-02-2017 / 16:54:32 / cg"
    "Modified (comment): / 27-05-2019 / 17:31:52 / Claus Gittinger"
!

name:aString
    "set the semaphore's userFriendly name (only used by semaphore- and process monitors)"

    name := aString

    "Modified (comment): / 23-02-2017 / 16:54:35 / cg"
    "Modified (comment): / 27-05-2019 / 17:31:44 / Claus Gittinger"
! !

!Semaphore methodsFor:'private'!

addWaitingProcess:aProcess
    "add aProcess to the list of waiting processes.
     all processes are ordered first-come-first-serve.

     NOTE: must be called with blocked interrupts"

    waitingProcesses isNil ifTrue:[
        "for now - assume that there is probably only one waiter"
        waitingProcesses := Array with:aProcess
    ] ifFalse:[
        waitingProcesses isArray ifTrue:[
            "add 2 to reserve space for additional waiters"
            waitingProcesses := (OrderedCollection new:waitingProcesses size + 2)
                                    addAll:waitingProcesses;
                                    yourself.
        ].
        waitingProcesses add:aProcess.
    ].

"/    "Sort, so that higher priority process are resumed first.
"/    Processes having the same priority are ordered first-come-first-serve."
"/
"/    |priority insertIndex|
"/
"/    (waitingProcesses size == 0
"/     or:[(priority := aProcess priority) <= waitingProcesses last priority]) ifTrue:[
"/        waitingProcesses add:aProcess.
"/        ^ self.
"/    ].
"/
"/    insertIndex := waitingProcesses findFirst:[:process| process priority < priority].
"/    waitingProcesses add:aProcess beforeIndex:insertIndex.
!

removeWaitingProcess:aProcess
    "remove aProcess from the list of waiting processes
     NO action if it is not in the list.

     NOTE: must be called with blocked interrupts"

    |nWaiting|

    nWaiting := waitingProcesses size.
    nWaiting == 0 ifTrue:[^ self].

    nWaiting == 1 ifTrue:[
	(waitingProcesses at:1) == aProcess ifTrue:[
	    waitingProcesses := nil.
	].
	^ self.
    ].
    waitingProcesses removeIdentical:aProcess ifAbsent:[].
!

wakeupWaiters
    "remove all waiting processes from the list of waiting processes
     and resume them.
     Answer true, if a higher priority process became ready, false if not.
     NOTE: Must be called when known that waitingProcesses is nonNil and
           also with blocked interrupts.
           Must not perform an operation that causes a reschedule."

    |processes anyDead needsReschedule|

    processes := waitingProcesses.
    "/ do not set to nil - a waiting process may be suspended and will not be resumed by #makeRunnable: ...
    "/    waitingProcesses := nil.

    needsReschedule := anyDead := false.
    processes do:[:eachProcess |
        (Processor makeRunnable:eachProcess) notNil ifTrue:[
            "higher priority process became ready"
            needsReschedule := true.
        ] ifFalse:[
            "if process is nil or dead (or for other reasons) makeRunnable returns false.
             So check here."
            (eachProcess isNil or:[eachProcess isDead]) ifTrue:[
                "printing to Transcript might not be a good idea while interrupts are blocked"
                'Semaphore>>wakeupWaiters: removing a dead process: ' infoPrint. eachProcess infoPrintCR.
                anyDead := true.
            ].
        ].
    ].
    anyDead ifTrue:[
        "interrupts are already blocked by sender"
        waitingProcesses := processes reject:[:p | p isNil or:[p isDead]]
    ].

    ^ needsReschedule.

    "Modified: / 20-02-2017 / 11:34:42 / stefan"
    "Modified (format): / 24-07-2017 / 18:03:47 / cg"
! !

!Semaphore methodsFor:'private-accessing'!

clear
    "clear the semaphore's count"

    count := 0

    "Modified (comment): / 23-02-2017 / 16:54:28 / cg"
!

initSignals
    "set the count of the semaphore to zero.
     provided for ST-80 compatibility."

    count := 0

    "Created: 17.2.1997 / 11:31:19 / cg"
!

setCount:n
    "set the count of the semaphore;
     that's the number of possible waits, without blocking"

    waitingProcesses := nil.
    count := n

    "Modified: 17.2.1997 / 11:36:40 / cg"
!

setCount:n name:aString
    "set the count of the semaphore;
     that's the number of possible waits, without blocking"

    waitingProcesses := nil.
    count := n.
    name := aString.

    "Created: / 09-08-2017 / 11:48:00 / cg"
! !

!Semaphore methodsFor:'queries'!

count
    "return the number of 'already-counted' trigger events.
     That's the number of waits which will succeed without blocking"

    ^ count

    "Created: 23.1.1997 / 02:55:58 / cg"
!

lastOwner
    "return the last owning process or nil
     (the one which counted to zero).
     May be very useful in debugging deadLock situations"

    ^ Processor processWithId:lastOwnerId.

    "Created: / 11-08-2011 / 14:35:36 / cg"
!

lastOwnerId
    "return the processId of the last owning process
     (the one which counted to zero).
     May be very useful in debugging deadLock situations"

    ^ lastOwnerId

    "Created: / 24-01-1997 / 23:09:33 / cg"
!

numberOfWaitingProcesses
    "return the number of processes waiting on the receiver"

    ^ waitingProcesses size

    "Created: 3.5.1996 / 18:06:27 / cg"
!

waitingProcesses
    "return the processes waiting on the receiver"

    ^ waitingProcesses ? #()

    "Created: 18.7.1996 / 20:53:33 / cg"
! !

!Semaphore methodsFor:'semaphoreSet interface'!

checkAndAddWaitingProcess:process
    "interface for SemaphoreSet.
     If the semaphore is available, decrement it and return true.
     Otherwise register our process to be wakened up once the semaphore is available
     and return false.
     ATTENTION: this must be invoked with OperatingSystem-interrupts-blocked.
    "

    count > 0 ifTrue:[
	count := count - 1.
	count == 0 ifTrue:[
	    lastOwnerId := Processor activeProcessId.
	].
	^ true
    ].
    (waitingProcesses notNil and:[(waitingProcesses includesIdentical:process)]) ifFalse:[
	self addWaitingProcess:process.
    ].
    ^ false

    "Modified: / 14-12-1995 / 10:32:17 / stefan"
    "Modified: / 11-08-2011 / 14:36:20 / cg"
! !

!Semaphore methodsFor:'signaling'!

signal
    "waking up the highest prio waiter."

    |wasBlocked needsReschedule|

    wasBlocked := OperatingSystem blockInterrupts.
    count := count + 1.
    needsReschedule := waitingProcesses notEmptyOrNil and:[self wakeupWaiters].
    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

    needsReschedule ifTrue:[
        "now, schedule the highest priority process"
        Processor reschedule.
    ].

    "Modified: / 04-02-1998 / 21:01:07 / cg"
    "Modified: / 24-07-2017 / 11:44:15 / stefan"
!

signal:anInteger
    "increment semaphore by anInteger waking up the highest prio waiters."

    |wasBlocked needsReschedule|

    wasBlocked := OperatingSystem blockInterrupts.
    count := count + anInteger.
    needsReschedule := waitingProcesses notEmptyOrNil and:[self wakeupWaiters].
    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

    needsReschedule ifTrue:[
        "now, schedule the highest priority process"
        Processor reschedule.
    ].

    "Created: / 22-02-2017 / 14:38:33 / stefan"
    "Modified: / 24-07-2017 / 11:44:36 / stefan"
!

signalForAll
    "signal the semaphore for all waiters.
     This can be used for process synchronization, if multiple processes are
     waiting for a common event."

    |wasBlocked needsReschedule|

    waitingProcesses notEmptyOrNil ifTrue:[
        needsReschedule := false.

        wasBlocked := OperatingSystem blockInterrupts.
        "first, make them all runnable, but do not schedule
         (in case one has higher prio and goes into a wait immediately again.)"
        waitingProcesses notEmptyOrNil ifTrue:[
            needsReschedule := self wakeupWaiters.
            "wakeupWaites may have removed dead processes from waitingProcesses!!"
            count := count + waitingProcesses size.
        ].
        wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

        needsReschedule ifTrue:[
            "now, schedule the highest priority process"
            Processor reschedule.
        ].
    ]

    "Modified: / 05-02-1998 / 10:40:26 / cg"
    "Modified: / 24-07-2017 / 11:44:49 / stefan"
!

signalIf
    "signal the semaphore, but only if being waited upon.
     This can be used for one-shot semaphores (i.e. not remembering
     previous signals)"

    |wasBlocked needsReschedule|

    waitingProcesses notNil ifTrue:[
        needsReschedule := false.

        wasBlocked := OperatingSystem blockInterrupts.
        waitingProcesses notEmptyOrNil ifTrue:[
            count := count + 1.
            needsReschedule := self wakeupWaiters.
        ].
        wasBlocked ifFalse:[
            OperatingSystem unblockInterrupts.
        ].
        needsReschedule ifTrue:[
            "now, schedule the highest priority process"
            Processor reschedule.
        ].
    ]

    "Modified: / 28-02-1996 / 21:23:57 / cg"
    "Modified: / 24-07-2017 / 11:45:20 / stefan"
!

signalIfWithReturn
    "signal the semaphore, but only if being waited upon.
     This can be used for one-shot semaphores (i.e. not remembering
     previous signals).
     Answer true if a reschedule is needed, false if not."

    |wasBlocked needsReschedule|

    needsReschedule := false.
    waitingProcesses notNil ifTrue:[
        wasBlocked := OperatingSystem blockInterrupts.
        waitingProcesses notEmptyOrNil ifTrue:[
            count := count + 1.
            needsReschedule := self wakeupWaiters.
        ].
        wasBlocked ifFalse:[
            OperatingSystem unblockInterrupts.
        ].
    ].
    ^ needsReschedule.

    "Created: / 19-02-2017 / 18:09:23 / stefan"
    "Modified: / 24-07-2017 / 11:45:25 / stefan"
!

signalOnce
    "wakeup waiters - but only once.
     I.e. if the semaphore has already been signaled, this is ignored."

    |wasBlocked needsReschedule|

    count <= 0 ifTrue:[
        needsReschedule := false.

        wasBlocked := OperatingSystem blockInterrupts.
        "/ check again - now interrupts are blocked.
        count <= 0 ifTrue:[
            count := count + 1.
            needsReschedule := (count == 1
                                and:[waitingProcesses notEmptyOrNil 
                                and:[self wakeupWaiters]]).
        ].
        wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

        needsReschedule ifTrue:[
            "now, schedule the highest priority process"
            Processor reschedule.
        ].
    ].

    "Modified: / 28-02-1996 / 21:24:08 / cg"
    "Modified: / 24-07-2017 / 11:46:07 / stefan"
!

signalOnceWithoutReschedule
    "wakeup waiters - but only once.
     I.e. if the semaphore has already been signaled, this is ignored."

    |wasBlocked needsReschedule|

    count <= 0 ifTrue:[
        wasBlocked := OperatingSystem blockInterrupts.
        "/ check again - now interrupts are blocked.
        count <= 0 ifTrue:[
            count < 0 ifTrue:[
                'oops - bad sema count' infoPrintCR.
                count := 0.
            ].
            
            count := count + 1.
            needsReschedule := (count == 1
                                and:[waitingProcesses notEmptyOrNil 
                                and:[self wakeupWaiters]]).
        ].
        wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

        ^ needsReschedule
    ].
    ^ false

    "Created: / 25-07-2017 / 11:26:23 / cg"
! !

!Semaphore methodsFor:'testing'!

isEmpty
    "ST80 compatibility - return true if there are no waiters"

    ^ waitingProcesses isEmptyOrNil

    "Created: / 03-05-1996 / 18:06:27 / cg"
    "Modified: / 18-06-1998 / 16:07:38 / cg"
    "Modified: / 24-07-2017 / 11:43:35 / stefan"
!

wouldBlock
    "return true, if the receiver would block the activeProcess
     if a wait was performed. False otherwise.
     Attention: if asked without some global lock (blockedInterrupts),
     the returned value may be outdated right away."

    ^ count <= 0
! !

!Semaphore methodsFor:'waiting'!

consume
    "consume the resource without waiting.
     This works even if the count is 0 (count may become negative).
     Answer the new count afterwards"

    ^ self consume:1
!

consume:n
    "consume the resource n times without waiting.
     This works even if the count is 0 (count may become negative).
     Answer the new count afterwards"

    |wasBlocked|

    wasBlocked := OperatingSystem blockInterrupts.
    count := count - n.
    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

    ^ count
!

consumeIfPossible
    "if the semaphore is currently free,
     acquire it, lock it and return true.
     Otherwise, do not wait, but return false immediately."

    |wasBlocked|

    wasBlocked := OperatingSystem blockInterrupts.
    count <= 0 ifTrue:[
        wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
        ^ false.
    ].    

    "if we come here, we have acquired the semaphore"
    count := count - 1.
    count == 0 ifTrue:[
        lastOwnerId := Processor activeProcessId.
    ].
    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
    ^ true

    "Created: / 31-08-2017 / 23:05:19 / cg"
!

critical:aBlock
    "evaluate aBlock as a critical region; the receiver must be
     created using Semaphore>>forMutualExclusion"

    |retVal wasBlocked needsReschedule gotSema|

    wasBlocked := OperatingSystem blockInterrupts.

    count > 0 ifTrue:[
        "/ inlined common case when no wait is required - ugly kludge but helpful
        count := count - 1.
        count == 0 ifTrue:[
            lastOwnerId := Processor activeProcessId.
        ].
        wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
        retVal := aBlock ifCurtailed:[self signal].
    ] ifFalse:[
        "have to wait for Semaphore availability"
        retVal := [
            gotSema := self wait.
            wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
            aBlock value.
        ] ifCurtailed:[
            wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
            gotSema notNil ifTrue:[self signal].
        ].
    ].

    "this is inlined code from #signal"
    OperatingSystem blockInterrupts.
    count := count + 1.
    needsReschedule := waitingProcesses notEmptyOrNil and:[self wakeupWaiters].
    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

    needsReschedule ifTrue:[
        "now, schedule the highest priority process"
        Processor reschedule.
    ].
    "end inlined code from #signal"

    ^ retVal.

    "
      the example below is stupid (it should use a SharedQueue,
      or at least a Queue with critical regions).
      Anyhow, it demonstrates how two processes lock each other
      from accessing coll at the same time

     |sema coll|

     sema := Semaphore forMutualExclusion.
     coll := OrderedCollection new:10.

     [
        1 to:1000 do:[:i |
            sema critical:[
                coll addLast:i.
                (Delay forSeconds:0.1) wait.
            ]
        ]
     ] forkAt:4.

     [
        1 to:1000 do:[:i |
            sema critical:[
                coll removeFirst.
                (Delay forSeconds:0.1) wait.
            ]
        ]
     ] forkAt:4.
    "

    "Modified: / 11-08-2011 / 14:36:30 / cg"
    "Modified: / 24-07-2017 / 11:42:47 / stefan"
!

critical:aBlock ifBlocking:blockingBlock
    "like critical:, but do not block if the lock cannot be acquired.
     Instead, return the value of the second argument, blockingBlock."

    ^ self critical:aBlock timeoutMs:0 ifBlocking:blockingBlock.
!

critical:aBlock timeoutMs:timeoutMs ifBlocking:blockingBlock
    "like critical:, but do not block if the lock cannot be acquired 
     within timeoutMs milliseconds.
     Instead, return the value of blockingBlock."

    |retVal wasBlocked needsReschedule gotSema|

    wasBlocked := OperatingSystem blockInterrupts.

    count > 0 ifTrue:[
        "/ inlined common case when no wait is required - ugly kludge but helpful
        count := count - 1.
        count == 0 ifTrue:[
            lastOwnerId := Processor activeProcessId.
        ].
        wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
        retVal := aBlock ifCurtailed:[self signal].
        gotSema := true.
    ] ifFalse:[
        "have to wait for Semaphore availability"
        timeoutMs ~= 0 ifTrue:[
            retVal := [
                gotSema := self waitWithTimeoutMs:timeoutMs state:#wait.
                gotSema notNil ifTrue:[
                    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
                    aBlock value.
                ].
            ] ifCurtailed:[
                wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
                gotSema notNil ifTrue:[self signal].
            ].
        ].
    ].

    gotSema notNil ifTrue:[
        "this is inlined code from #signal"
        OperatingSystem blockInterrupts.
        count := count + 1.
        needsReschedule := waitingProcesses notEmptyOrNil and:[self wakeupWaiters].
        wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

        needsReschedule ifTrue:[
            "now, schedule the highest priority process"
            Processor reschedule.
        ].
        "end inlined code from #signal"
    ] ifFalse:[
        wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
        retVal := blockingBlock value.
    ].

    ^ retVal.

    "Created: / 18-02-2017 / 22:15:25 / stefan"
    "Modified: / 24-07-2017 / 11:43:19 / stefan"
    "Modified: / 24-07-2017 / 21:17:47 / cg"
!

wait
    "wait for the semaphore"

    |activeProcess wasBlocked|

    wasBlocked := OperatingSystem blockInterrupts.
    count <= 0 ifTrue:[
        activeProcess := Processor activeProcess.
        "
         need a while-loop here, since more than one process may
         wait for it and another one may also wake up.
         Thus, the count is not always non-zero after returning from
         suspend.
        "
        [
            self addWaitingProcess:activeProcess.
            "
             for some more descriptive info in processMonitor ...
             ... set the state to #wait (instead of #suspend)
            "
            [
                activeProcess suspendWithState:#wait
            ] ifCurtailed:[
                "interrupts are not blocked when entered through Processor>>#interruptActive"
                OperatingSystem blockInterrupts.
                self removeWaitingProcess:activeProcess.
                wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
            ].
            self removeWaitingProcess:activeProcess.

            count <= 0
        ] whileTrue.
    ].

    "if we come here, we have acquired the semaphore"
    count := count - 1.
    count == 0 ifTrue:[
        lastOwnerId := Processor activeProcessId.
    ].
    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

    "Modified: / 13-12-1995 / 13:26:33 / stefan"
    "Modified (format): / 21-02-2017 / 15:29:51 / stefan"
    "Modified: / 24-07-2017 / 23:04:09 / cg"
!

waitUncounted
    "wait for the semaphore; do not consume the resource
     (i.e. do not count down)"

    |activeProcess wasBlocked|

    count > 0 ifTrue:[
        ^ self
    ].
    activeProcess := Processor activeProcess.

    wasBlocked := OperatingSystem blockInterrupts.
    "
     need a while-loop here, since more than one process may
     wait for it and another one may also wake up.
     Thus, the count is not always non-zero after returning from
     suspend.
    "
    [count <= 0] whileTrue:[
        self addWaitingProcess:activeProcess.
        "
         for some more descriptive info in processMonitor ...
         ... set the state to #wait (instead of #suspend)
        "
        [
            activeProcess suspendWithState:#wait
        ] ifCurtailed:[
            "interrupts are not blocked when entered through Processor>>#interruptActive"
            OperatingSystem blockInterrupts.
            self removeWaitingProcess:activeProcess.
            wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
        ].
        self removeWaitingProcess:activeProcess.
    ].
    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].

    "Modified: / 13-12-1995 / 13:26:49 / stefan"
    "Modified: / 24-07-2017 / 23:04:33 / cg"
!

waitUncountedWithTimeout:secondsOrNilOrTimeDuration
    "wait for the semaphore, but abort the wait after some time (seconds).
     return the receiver if the semaphore triggered normal, nil if we return
     due to a timeout.
     The argument may be a time duration or the number of seconds as integer
     or float (i.e. use 0.1 for a 100ms timeout).
     With zero timeout, this can be used to poll a semaphore (returning
     the receiver if the semaphore is available, nil if not).
     However, polling is not the intended use of semaphores, though.
     If seconds is nil, wait without timeout."

    <resource: #skipInDebuggersWalkBack>

    |millis|

    secondsOrNilOrTimeDuration notNil ifTrue:[
        secondsOrNilOrTimeDuration isNumber ifTrue:[
            millis := (secondsOrNilOrTimeDuration * 1000) asInteger.
        ] ifFalse:[
            "a TimeDuration"
            millis := secondsOrNilOrTimeDuration asTruncatedMilliseconds.
        ].
    ].

    ^ self waitUncountedWithTimeoutMs:millis state:#wait.

    "Created: / 15-04-2019 / 12:12:52 / Stefan Vogel"
    "Modified (comment): / 25-06-2019 / 14:25:52 / Claus Gittinger"
!

waitUncountedWithTimeoutMs:milliSecondsOrNil
    "wait for the semaphore; do not consume the resource
     (i.e. do not count down).
     Abort the wait after some time.
     return the receiver if the semaphore triggered normal, nil if we return
     due to a timeout.
     With zero timeout, this can be used to poll a semaphore (returning
     the receiver if the semaphore is available, nil if not).
     However, polling is not the intended use of semaphores, though.
     If milliSecondsOrNil is nil, wait without timeout."

    <resource: #skipInDebuggersWalkBack>

    self waitUncountedWithTimeoutMs:milliSecondsOrNil state:#wait

    "Modified: / 13-12-1995 / 13:27:24 / stefan"
    "Modified: / 24-07-2017 / 21:53:57 / cg"
    "Modified: / 30-05-2018 / 13:57:16 / Claus Gittinger"
    "Modified (comment): / 25-06-2019 / 14:24:26 / Claus Gittinger"
!

waitUncountedWithTimeoutMs:milliSecondsOrNil state:newStateSymbol
    "wait for the semaphore; do not consume the resource
     (i.e. do not count down).
     Abort the wait after some time.
     return the receiver if the semaphore triggered normal, nil if we return
     due to a timeout.
     With zero timeout, this can be used to poll a semaphore 
     (returning the receiver if the semaphore is available, nil if not).
     However, polling is not the intended use of semaphores, though.
     If milliSecondsOrNil is nil, wait without timeout.
     The stateSymbol argument is purely for the ProcessMonitor, to present a nicer
     threadState (#wait instead of #suspend)"

    <resource: #skipInDebuggersWalkBack>

    |activeProcess timeoutOccurred wasBlocked timeoutBlock endTime dueTime currentDelta maxMilliseconds|

    count > 0 ifTrue:[
        ^ self
    ].

    wasBlocked := OperatingSystem blockInterrupts.

    count <= 0 ifTrue:[
        "with zero-timeout, this is a poll"
        milliSecondsOrNil == 0 ifTrue:[
            wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
            ^ nil
        ].

        activeProcess := Processor activeProcess.
        timeoutOccurred := false.

        milliSecondsOrNil notNil ifTrue:[
            "Wait with timeout: calculate the end-time"
            maxMilliseconds := SmallInteger maxVal // 4.
            currentDelta := milliSecondsOrNil.
            currentDelta > maxMilliseconds ifTrue:[
                "NOTE: the microsecondTime is increasing monotonically,
                       while millisecondTime is wrapping at 16r1fffffff.
                       So use the microsecondTime to check when we are finished"
                dueTime := OperatingSystem getMicrosecondTime + (currentDelta * 1000).
                currentDelta := maxMilliseconds.
            ].
            endTime := OperatingSystem 
                        millisecondTimeAdd:OperatingSystem getMillisecondTime 
                        and:currentDelta.

            timeoutBlock := [
                    timeoutOccurred := true.
                    timeoutBlock:= nil.
                    Processor resume:activeProcess.
                ].
            Processor addTimedBlock:timeoutBlock for:activeProcess atMilliseconds:endTime.
        ].

        "
         need a while-loop here, since more than one process may
         wait for it and another one may also wake up.
         Thus, the count is not always non-zero after returning from
         suspend.
        "
        [
            self addWaitingProcess:activeProcess.

            "
             for some more descriptive info in processMonitor ...
             ... set the state to #wait (instead of #suspend)
            "
            [
                "sleep until resumed..."
                activeProcess suspendWithState:newStateSymbol.
            ] ifCurtailed:[
                "interrupts are not blocked when entered through Processor>>#interruptActive"
                OperatingSystem blockInterrupts.
                timeoutBlock notNil ifTrue:[
                    Processor removeTimedBlock:timeoutBlock.
                    timeoutBlock := nil.
                ].
                self removeWaitingProcess:activeProcess.
                wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
            ].

            self removeWaitingProcess:activeProcess.
            timeoutOccurred ifTrue:[
                (dueTime notNil
                 and:[(currentDelta := dueTime - OperatingSystem getMicrosecondTime) > 0]) ifTrue:[
                    "there is still some time left"
                    timeoutOccurred := false.
                    currentDelta := (currentDelta // 1000) min:maxMilliseconds.
                    endTime := OperatingSystem 
                                millisecondTimeAdd:OperatingSystem getMillisecondTime 
                                and:currentDelta.
                    Processor addTimedBlock:timeoutBlock for:activeProcess atMilliseconds:endTime.
                ] ifFalse:[
                    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
                    ^ nil
                ].
            ].

            count <= 0
        ] whileTrue.

        timeoutBlock notNil ifTrue:[
            Processor removeTimedBlock:timeoutBlock.
            timeoutBlock := nil.
        ].
    ].

    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
    ^ self

    "Created: / 24-07-2017 / 21:53:23 / cg"
    "Modified: / 24-07-2017 / 23:06:00 / cg"
    "Modified: / 02-08-2017 / 14:16:08 / stefan"
    "Modified: / 30-05-2018 / 13:57:11 / Claus Gittinger"
    "Modified (comment): / 25-06-2019 / 14:24:18 / Claus Gittinger"
!

waitWithTimeout:secondsOrNilOrTimeDuration
    "wait for the semaphore, but abort the wait after some time (seconds).
     return the receiver if the semaphore triggered normal, nil if we return
     due to a timeout.
     
     The argument may be a time duration or the number of seconds as integer
     or float (i.e. use 0.1 for a 100ms timeout).
     With zero timeout, this can be used to poll a semaphore (returning
     the receiver if the semaphore is available, nil if not).
     However, polling is not the intended use of semaphores, though.
     If the argument is nil, wait without timeout (forever)."

    <resource: #skipInDebuggersWalkBack>

    |millis|

    secondsOrNilOrTimeDuration notNil ifTrue:[
        secondsOrNilOrTimeDuration isNumber ifTrue:[
            millis := (secondsOrNilOrTimeDuration * 1000) asInteger.
        ] ifFalse:[
            "a TimeDuration"
            millis := secondsOrNilOrTimeDuration asTruncatedMilliseconds.
        ].
    ].

    ^ self waitWithTimeoutMs:millis state:#wait.

    "Modified: / 21-02-2017 / 14:49:08 / stefan"
    "Modified: / 24-07-2017 / 21:15:39 / cg"
    "Modified: / 30-05-2018 / 13:57:33 / Claus Gittinger"
    "Modified: / 24-07-2018 / 16:28:36 / Stefan Vogel"
    "Modified (comment): / 25-06-2019 / 14:21:43 / Claus Gittinger"
!

waitWithTimeoutMs:milliSecondsOrNil
    "wait for the semaphore, but abort the wait after some time.
     return the receiver if the semaphore triggered normal, nil if we return
     due to a timeout.
     With zero timeout, this can be used to poll a semaphore (returning
     the receiver if the semaphore is available, nil if not).
     However, polling is not the intended use of semaphores, though.
     If milliSeconds is nil, wait without timeout."

    ^ self waitWithTimeoutMs:milliSecondsOrNil state:#wait

    "Modified: / 21-02-2017 / 15:21:30 / stefan"
    "Modified: / 24-07-2017 / 21:15:25 / cg"
    "Modified (format): / 25-06-2019 / 14:26:12 / Claus Gittinger"
!

waitWithTimeoutMs:milliSecondsOrNil state:waitStateSymbol
    "wait for the semaphore, but abort the wait after some time.
     return the receiver if the semaphore triggered normal, nil if we return
     due to a timeout.
     With zero timeout, this can be used to poll a semaphore (returning
     the receiver if the semaphore is available, nil if not).
     However, polling is not the intended use of semaphores, though.
     If milliSecondsOrNil is nil, wait without timeout.

     THIS IS A COPY of #waitWithTimeoutMs - the only difference is setting waitStateSymbol.
     waitStateSymbol is the state the process is set to while waiting - normally #wait."

    |activeProcess timeoutOccurred wasBlocked timeoutBlock endTime currentDelta dueTime maxMilliseconds|

    wasBlocked := OperatingSystem blockInterrupts.

    count <= 0 ifTrue:[
        "with zero-timeout, this is a poll"
        milliSecondsOrNil == 0 ifTrue:[
            wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
            ^ nil
        ].

        activeProcess := Processor activeProcess.
        timeoutOccurred := false.

        milliSecondsOrNil notNil ifTrue:[
            "Wait with timeout: calculate the end-time"
            maxMilliseconds := SmallInteger maxVal // 4.
            currentDelta := milliSecondsOrNil.
            currentDelta > maxMilliseconds ifTrue:[
                "NOTE: the microsecondTime is increasing monotonically,
                       while millisecondTime is wrapping at 16r1fffffff.
                       So use the microsecondTime to check when we are finished"
                dueTime := OperatingSystem getMicrosecondTime + (currentDelta * 1000).
                currentDelta := maxMilliseconds.
            ].
            endTime := OperatingSystem 
                        millisecondTimeAdd:OperatingSystem getMillisecondTime 
                        and:currentDelta.

            timeoutBlock := [
                    timeoutOccurred := true.
                    timeoutBlock:= nil.
                    Processor resume:activeProcess.
                ].
            Processor addTimedBlock:timeoutBlock for:activeProcess atMilliseconds:endTime.
        ].

        "
         need a while-loop here, since more than one process may
         wait for it and another one may also wake up.
         Thus, the count is not always non-zero after returning from
         suspend.
        "
        [
            self addWaitingProcess:activeProcess.

            "
             for some more descriptive info in processMonitor ...
             ... set the state to waitStateSymbol (instead of #suspend)
            "
            [
                "sleep until resumed..."
                activeProcess suspendWithState:waitStateSymbol.
            ] ifCurtailed:[
                "interrupts are not blocked when entered through Processor>>#interruptActive"
                OperatingSystem blockInterrupts.
                timeoutBlock notNil ifTrue:[
                    Processor removeTimedBlock:timeoutBlock.
                    timeoutBlock := nil.
                ].
                self removeWaitingProcess:activeProcess.
                wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
            ].

            self removeWaitingProcess:activeProcess.
            timeoutOccurred ifTrue:[
                (dueTime notNil
                 and:[(currentDelta := dueTime - OperatingSystem getMicrosecondTime) > 0]) ifTrue:[
                    "there is still some time left"
                    timeoutOccurred := false.
                    currentDelta := (currentDelta // 1000) min:maxMilliseconds.
                    endTime := OperatingSystem 
                                millisecondTimeAdd:OperatingSystem getMillisecondTime 
                                and:currentDelta.
                    Processor addTimedBlock:timeoutBlock for:activeProcess atMilliseconds:endTime.
                ] ifFalse:[
                    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
                    ^ nil
                ].
            ].

            count <= 0
        ] whileTrue.

        timeoutBlock notNil ifTrue:[
            Processor removeTimedBlock:timeoutBlock.
            timeoutBlock := nil.
        ].
    ].

    "if we come here, we have acquired the semaphore"
    count := count - 1.
    count == 0 ifTrue:[
        lastOwnerId := Processor activeProcessId.
    ].
    wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
    ^ self

    "Modified: / 24-07-2017 / 23:04:57 / cg"
    "Modified: / 02-08-2017 / 14:16:48 / stefan"
    "Modified (comment): / 25-06-2019 / 14:23:42 / Claus Gittinger"
! !

!Semaphore class methodsFor:'documentation'!

version
    ^ '$Header$'
!

version_CVS
    ^ '$Header$'
! !