"
COPYRIGHT (c) 1993 by Claus Gittinger
All Rights Reserved
This software is furnished under a license and may be used
only in accordance with the terms of that license and with the
inclusion of the above copyright notice. This software may not
be provided or otherwise made available to, or used by, any
other person. No title to or ownership of the software is
hereby transferred.
"
Object subclass:#ProcessorScheduler
instanceVariableNames:'quiescentProcessLists scheduler zombie activeProcess
currentPriority readFdArray readSemaphoreArray readCheckArray
writeFdArray writeSemaphoreArray timeoutArray timeoutActionArray
timeoutProcessArray timeoutSemaphoreArray idleActions anyTimeouts
dispatching interruptedProcess useIOInterrupts gotIOInterrupt
osChildExitActions gotChildSignalInterrupt
exitWhenNoMoreUserProcesses suspendScheduler'
classVariableNames:'KnownProcesses KnownProcessIds PureEventDriven
UserSchedulingPriority UserInterruptPriority TimingPriority
HighestPriority SchedulingPriority MaxNumberOfProcesses
InvalidProcessSignal TimeSliceProcess'
poolDictionaries:''
category:'Kernel-Processes'
!
!ProcessorScheduler class methodsFor:'documentation'!
copyright
"
COPYRIGHT (c) 1993 by Claus Gittinger
All Rights Reserved
This software is furnished under a license and may be used
only in accordance with the terms of that license and with the
inclusion of the above copyright notice. This software may not
be provided or otherwise made available to, or used by, any
other person. No title to or ownership of the software is
hereby transferred.
"
!
documentation
"
This class has only one instance, which is bound to the global
'Processor'. It is responsible for scheduling among the smalltalk
processes (threads; not to confuse with heavy weight unix processes).
Scheduling is fully done in smalltalk (the always runnable scheduler-
process, running at highest priority does this).
The main VM primitive to support this is found in threadSwitch, which passes
control to another process (usually selected by the scheduler).
Thus it is possible to modify the schedulers policy and implementation
at the smalltalk level.
(To answer a frequently asked question:
dont add preemptive round-robin here; this can be implemented without
any need to change the scheduler. See goodies/timeslicing.st for how
this is done in a very elegant way).
Notice: Smalltalk/X can (still) be compiled & configured without
process support. This non-process mode is called 'pureEventDriven' mode
and is useful to quickly port ST/X to systems, where these facilities
are either not needed (server applications), or are difficult to
implement (threads require some assembler support functions).
To allow pureEvent mode, kludges are built into some places in the
system, where either a process is forked, or a timeout is used instead
(for examples, see ProcessMonitor or MemoryMonitor).
This pure-event mode may not be supported in the future
(actually, it is no longer maintained in places where was present, so dont
run the system without Processes).
[class variables:]
KnownProcesses <WeakArray> all known processes
KnownProcessIds <Collection> and their IDs
PureEventDriven <Boolean> true, if no process support
is available
UserSchedulingPriority <Integer> the priority at which normal
user interfaces run
UserInterruptPriority the priority at which user-
interrupts (Cntl-C) processing
takes place. Processes with
a greater or equal priority are
not interruptable.
TimingPriority the priority used for timing.
Processes with a greater or
equal priority are not interrupted
by timers.
HighestPriority The highest allowed prio for processes
SchedulingPriority The priority of the scheduler (must
me higher than any other).
MaxNumberOfProcesses if non-nil, no more than this
number of processes are allowed
(for debugging)
most interesting methods:
Processor>>suspend: (see also Process>>suspend)
Processor>>resume: (see also Process>>resume)
Processor>>terminate: (see also Process>>terminate)
Processor>>yield
Processor>>changePriority:for: (see also Process>>priority:
Processor>>signal:afterSeconds: (see also Delay>>forSeconds:)
Processor>>signal:afterMilliseconds: (see also Delay>>forMilliseconds:)
Processor>>signal:onInput: (see also ExternalStream>>readWait)
Processor>>signal:onOutput: (see also ExternalStream>>writeWait)
Processor>>disableSemaphore:
[see also:]
Process
Delay Semaphore SemaphoreSet SharedQueue
WindowGroup
(``Working with processes'': programming/processes.html)
[author:]
Claus Gittinger
"
! !
!ProcessorScheduler class methodsFor:'initialization'!
initialize
"class setup: create the one-and-only instance of myself and
setup some priority values."
UserSchedulingPriority := 8.
UserInterruptPriority := 24.
TimingPriority := 16.
SchedulingPriority := 31.
HighestPriority := 30.
InvalidProcessSignal isNil ifTrue:[
InvalidProcessSignal := ErrorSignal newSignalMayProceed:true.
InvalidProcessSignal nameClass:self message:#invalidProcessSignal.
InvalidProcessSignal notifierString:'invalid process'.
].
Processor isNil ifTrue:[
"create the one and only processor"
Processor := self basicNew initialize.
].
"
allow configurations without processes
(but such configurations are no longer distributed)
"
PureEventDriven := self threadsAvailable not.
PureEventDriven ifTrue:[
'Processor [error]: no process support - running event driven' errorPrintCR
].
"Modified: 23.9.1996 / 14:24:50 / stefan"
"Modified: 10.1.1997 / 18:03:03 / cg"
! !
!ProcessorScheduler class methodsFor:'instance creation'!
new
"there is (currently) only one processor ..."
self error:'only one processor is allowed in the system'
! !
!ProcessorScheduler class methodsFor:'Signal constants'!
invalidProcessSignal
^ InvalidProcessSignal
"Created: 23.9.1996 / 13:44:57 / stefan"
! !
!ProcessorScheduler class methodsFor:'instance release'!
update:something with:aParameter from:changedObject
"some Process has been garbage collected
- terminate the underlying thread.
Usually this does not happen; instead, the process terminates itself
by sending #terminate."
|id sz "{ Class: SmallInteger }"|
something == #ElementExpired ifTrue:[
sz := KnownProcessIds size.
1 to:sz do:[:index |
"/ (KnownProcesses at:index) isNil ifTrue:[
(KnownProcesses at:index) == 0 ifTrue:[
id := KnownProcessIds at:index.
id notNil ifTrue:[
'Processor [warning]: terminating thread ' errorPrint.
id errorPrint.
' (no longer refd)' errorPrintCR.
self threadDestroy:id.
KnownProcessIds at:index put:nil.
].
KnownProcesses at:index put:nil.
]
]
]
"Created: 7.1.1997 / 16:45:42 / stefan"
"Modified: 10.1.1997 / 19:10:48 / cg"
! !
!ProcessorScheduler class methodsFor:'primitive process primitives'!
threadCreate:aProcess withId:id
"physical creation of a process.
(warning: low level entry, no administration done).
This may raise an exception, if a VM process could not be created."
MaxNumberOfProcesses notNil ifTrue:[
KnownProcessIds size >= MaxNumberOfProcesses ifTrue:[
(KnownProcessIds count:[:el | el notNil]) >= MaxNumberOfProcesses ifTrue:[
"
the number of processes has reached the (soft) limit.
This limit prevents runaway programs from creating too many
processes. If you continue in the debugger, the process will be
created as usual. If you dont want this, abort or terminate.
"
self error:'too many processes'.
]
]
].
%{
int tid;
extern int __threadCreate();
tid = __threadCreate(aProcess,
0 /* stackSize: no longer needed */,
__isSmallInteger(id) ? __intVal(id) /* assign id */
: -1 /* let VM assign one */ );
if (tid) {
RETURN ( __MKSMALLINT(tid));
}
%}
.
"
arrive here, if creation of process in VM failed.
This may happen, if the VM does not support more processes,
or if it ran out of memory, when allocating internal data
structures.
"
^ ObjectMemory allocationFailureSignal raise.
!
threadDestroy:id
"physical destroy other process ...
(warning: low level entry, no administration done)"
%{ /* NOCONTEXT */
if (__isSmallInteger(id)) {
__threadDestroy(__intVal(id));
}
%}
!
threadInterrupt:id
"make the process evaluate an interrupt. This sets a flag in the VMs
threadSwitcher, to let the process perform a #interrupt when its set to
run the next time. The process itself can decide how to react on this
interrupt (currently, it looks for interruptBlocks to evaluate)."
%{ /* NOCONTEXT */
if (__isSmallInteger(id)) {
__threadInterrupt(__intVal(id));
}
%}
!
threadsAvailable
"return true, if the runtime system supports threads (i.e. processes);
false otherwise."
%{ /* NOCONTEXT */
RETURN (__threadsAvailable());
%}
! !
!ProcessorScheduler class methodsFor:'queries'!
isPureEventDriven
"this is temporary - (maybe not :-).
you can run ST/X either with or without processes.
Without, there is conceptionally a single process handling all
outside events and timeouts. This has some negative implications
(Debugger is ugly), but allows a fully portable ST/X without any
assembler support - i.e. quick portability.
The PureEvent flag will automatically be set if the runtime system
does not support threads - otherwise, it can be set manually
(from rc-file).
"
^ PureEventDriven
!
knownProcesses
"return a collection of all (living) processes in the system"
^ KnownProcesses select:[:p | p notNil and:[p ~~ 0]]
!
maxNumberOfProcesses
"return the limit on the number of processes;
the default is nil (i.e. unlimited)."
^ MaxNumberOfProcesses
!
maxNumberOfProcesses:aNumber
"set the limit on the number of processes.
This helps if you have a program which (by error) creates countless
subprocesses. Without this limit, you may have a hard time to find
this error (and repairing it). If nil (the default), the number of
processes is unlimited."
MaxNumberOfProcesses := aNumber
!
processDriven
"turn on process driven mode"
PureEventDriven := false
!
pureEventDriven
"turn on pure-event driven mode - no processes, single dispatch loop"
PureEventDriven := true
! !
!ProcessorScheduler methodsFor:'I/O event actions'!
disableFd:aFileDescriptor
"disable block events on aFileDescriptor.
This is a leftover support for pure-event systems and may vanish."
|idx "{Class: SmallInteger }"
wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
idx := readFdArray identityIndexOf:aFileDescriptor startingAt:1.
idx ~~ 0 ifTrue:[
readFdArray at:idx put:nil.
readCheckArray at:idx put:nil.
readSemaphoreArray at:idx put:nil
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
!
enableIOAction:aBlock onInput:aFileDescriptor
"half-obsolete event support: arrange for aBlock to be
evaluated when input on aFileDescriptor arrives.
This is a leftover support for pure-event systems and may vanish."
|idx "{Class: SmallInteger }"
wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
(readFdArray identityIndexOf:aFileDescriptor startingAt:1) == 0 ifTrue:[
idx := readFdArray identityIndexOf:nil startingAt:1.
idx ~~ 0 ifTrue:[
readFdArray at:idx put:aFileDescriptor.
readCheckArray at:idx put:aBlock.
readSemaphoreArray at:idx put:nil
] ifFalse:[
readFdArray := readFdArray copyWith:aFileDescriptor.
readCheckArray := readCheckArray copyWith:aBlock.
readSemaphoreArray := readSemaphoreArray copyWith:nil.
]
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
! !
!ProcessorScheduler methodsFor:'accessing'!
activePriority
"return the priority of the currently running process.
GNU-ST & ST-80 compatibility; this is the same as currentPriority"
^ currentPriority
!
activeProcess
"return the currently running process"
^ activeProcess
"Processor activeProcess"
!
currentPriority
"return the priority of the currently running process"
^ currentPriority
"Processor currentPriority"
!
interruptedProcess
"returns the process which was interrupted by the active one"
^ interruptedProcess
! !
!ProcessorScheduler methodsFor:'background processing'!
addIdleBlock:aBlock
"add the argument, aBlock to the list of idle-actions.
Idle blocks are evaluated whenever no other process is runnable,
and no events are pending.
Use of idle blocks is not recommended, use a low priority processes
instead, which has the same effect. Idle blcoks are still included
to support background actions in pure-event systems, where no processes
are available.
Support for idle-blocks may vanish."
|wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
idleActions isNil ifTrue:[
idleActions := OrderedCollection new
].
idleActions add:aBlock.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
!
removeIdleBlock:aBlock
"remove the argument, aBlock from the list of idle-blocks.
Support for idle-blocks may vanish - use low prio processes instead."
|wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
idleActions notNil ifTrue:[
idleActions remove:aBlock ifAbsent:[]
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
! !
!ProcessorScheduler methodsFor:'dispatching'!
dispatch
"It handles timeouts and switches to the highest prio runnable process"
|any millis pri p nActions "{ Class: SmallInteger }"
checkBlock sema|
"
handle all timeout actions
"
anyTimeouts ifTrue:[
self evaluateTimeouts
].
"first do a quick check for semaphores using checkActions - this is needed for
devices like the X-connection, where some events might be in the event
queue. Without these checks, a select might block even though there is work to do
"
any := false.
nActions := readCheckArray size.
1 to:nActions do:[:index |
checkBlock := readCheckArray at:index.
(checkBlock notNil and:[checkBlock value]) ifTrue:[
sema := readSemaphoreArray at:index.
sema notNil ifTrue:[
sema signalOnce.
].
any := true.
]
].
"now, someone might be runnable ..."
p := self highestPriorityRunnableProcess.
p isNil ifTrue:[
"/ no one runnable, hard wait for event or timeout
self waitForEventOrTimeout.
"/ check for OS process termination
gotChildSignalInterrupt ifTrue:[
gotChildSignalInterrupt := false.
self handleChildSignalInterrupt
].
^ self
].
pri := p priority.
"
want to give control to the process p.
If the switched-to processes priority is lower than the
userSchedulingPriority, we have to make certain, that the
next input or timer will bring us back for a reschedule.
This is done by enabling ioInterrupts for all file descriptors.
If ioInterrupts are not available (OS does not support them),
we schedule a timer interrupt to interrupt us after 1/20s of a second
- effectively polling the filedescriptors 20 times a second.
(which is bad, since low prio processes will be hurt in performance)
Therefore, dont let benchmarks run with low prio ...
Higher prio processes must be suspended,
same prio ones must yield or suspend to get back control
"
"
uncommenting this will make timeouts interrupt the current process
(i.e. as if the interrupt runs at TimingPrio);
if left commented, they are handled at UserSchedulingPrio.
this will all change, when timeouts are removed and all is process driven
(a future version will have a process running to handle a timeout queue)
"
"
pri < TimingPriority ifTrue:[
anyTimeouts ifTrue:[
millis := self timeToNextTimeout.
millis == 0 ifTrue:[^ self].
]
].
"
"
if the process to run has a lower than UserInterruptPriority,
arrange for an interrupt to occur on I/O.
This is done by enabling IO-signals (if the OS supports them)
or by installing a poll-interrupt after 50ms (if the OS does not).
"
pri < UserInterruptPriority ifTrue:[
"comment out this if above is uncommented"
anyTimeouts ifTrue:[
millis := self timeToNextTimeout.
millis == 0 ifTrue:[^ self].
].
"---"
useIOInterrupts ifTrue:[
readFdArray do:[:fd |
fd notNil ifTrue:[
OperatingSystem enableIOInterruptsOn:fd
].
].
] ifFalse:[
millis notNil ifTrue:[
millis := millis min:50
] ifFalse:[
millis := 50
]
]
].
millis notNil ifTrue:[
"schedule a clock interrupt after millis milliseconds"
OperatingSystem enableTimer:millis rounded.
].
"
now let the process run - will come back here by reschedule
from ioInterrupt or timerInterrupt ... (running at max+1)
"
self threadSwitch:p.
"... when we arrive here, we are back on stage.
Either by an ALARM or IO signal, or by a suspend of another process
"
millis notNil ifTrue:[
OperatingSystem disableTimer.
].
"/ check for OS process termination
gotChildSignalInterrupt ifTrue:[
gotChildSignalInterrupt := false.
self handleChildSignalInterrupt
].
"/ check for new input
(gotIOInterrupt or:[useIOInterrupts not]) ifTrue:[
gotIOInterrupt := false.
self checkForInputWithTimeout:0.
]
"Modified: 12.4.1996 / 10:14:18 / stefan"
"Modified: 9.1.1997 / 16:12:44 / cg"
!
dispatchLoop
"central dispatch loop; the scheduler process is always staying in
this method, looping forever."
|dispatchAction handlerAction ignoredSignals|
"avoid confusion if entered twice"
dispatching == true ifTrue:[^ self].
dispatching := true.
"/ create the relevant blocks & signalSet outside of the
"/ while-loop
"/ (thanks to stefans objectAllocation monitor,
"/ this safes a bit of memory allocation in the scheduler)
dispatchAction := [self dispatch].
handlerAction := [:ex |
('Processor [info]: ignored signal (', ex signal printString, ')') infoPrintCR.
ex return
].
ignoredSignals := SignalSet
with:(Process terminateSignal)
with:AbortSignal.
"/
"/ I made this an extra call to dispatch; this allows recompilation
"/ of the dispatch-handling code in the running system.
"/
[dispatching] whileTrue:[
ignoredSignals handle:handlerAction do:dispatchAction
].
"/ we arrive here in standalone Apps,
"/ when the last process at or above UserSchedulingPriority process died.
"/ regular ST/X stays in above loop forever
'Processor [info]: finish dispatch (no more processes)' infoPrintCR.
"Modified: 23.9.1996 / 14:19:56 / stefan"
"Modified: 10.1.1997 / 19:10:53 / cg"
!
exitWhenNoMoreUserProcesses:aBoolean
exitWhenNoMoreUserProcesses := aBoolean
! !
!ProcessorScheduler methodsFor:'os process handling'!
childSignalInterrupt
"{ Pragma: +returnable }"
"child changed state - switch to scheduler process which will decide
what to do now."
gotChildSignalInterrupt := true.
interruptedProcess := activeProcess.
self threadSwitch:scheduler
"Modified: 12.4.1996 / 10:12:18 / stefan"
!
handleChildSignalInterrupt
"child changed state - execute child termination blocks.
If child is no longer alive, remove action block."
|osProcessStatus blocking wasBlocked|
blocking := OperatingSystem blockingChildProcessWait.
"/ no interrupt processing, to avoid races with monitorPid
wasBlocked := OperatingSystem blockInterrupts.
[
[
osProcessStatus := OperatingSystem childProcessWait:blocking.
osProcessStatus notNil ifTrue:[
|pid action|
pid := osProcessStatus pid.
osProcessStatus stillAlive ifTrue:[
action := osChildExitActions at:pid ifAbsent:[].
] ifFalse:[
action := osChildExitActions removeKey:pid ifAbsent:[].
].
action notNil ifTrue:[
action value:osProcessStatus
].
].
"/ if pollChildProcesses does block, poll only one status change.
"/ we will get another SIGCHLD for other status changes.
osProcessStatus notNil and:[blocking not]
] whileTrue.
"/ if there are no more waiters, disable SIGCHILD handler.
"/ this helps us with synchronous waiters (e.g. pclose),
"/ But they should block SIGCHLD anyway.
osChildExitActions isEmpty ifTrue:[
OperatingSystem disableChildSignalInterrupts.
].
] valueNowOrOnUnwindDo:[
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
]
"Modified: 5.1.1996 / 16:56:11 / stefan"
"Modified: 28.2.1996 / 21:36:31 / cg"
"Created: 12.4.1996 / 10:08:21 / stefan"
!
monitorPid:pid action:aBlock
"add a 1-arg-block that is called when the operating system child process
with pid pid changes state.
The argument for the block is an OSProcessStatus.
"
OperatingSystem sigCHLD ~= 0 ifTrue:[
"/ SIGCHLD is supported,
"/ aBlock will be evaluated, as soon as a SIGCHLD interrupt for pid has been received.
OperatingSystem enableChildSignalInterrupts.
osChildExitActions at:pid put:aBlock
] ifFalse:[
|osProcessStatus|
"/ SIGCHLD is not supported, wait synchronously for the exit
"/ of pid.
[
osProcessStatus := OperatingSystem childProcessWait:true.
osProcessStatus notNil ifTrue:[
(osProcessStatus pid = pid) ifTrue:[
aBlock value:osProcessStatus.
].
osProcessStatus stillAlive
].
] whileTrue.
].
"Created: 28.12.1995 / 14:22:10 / stefan"
"Modified: 5.1.1996 / 22:01:06 / stefan"
!
unmonitorPid:pid
"remove a monitor for a child process"
osChildExitActions removeKey:pid ifAbsent:[].
"Created: 12.4.1996 / 19:01:59 / cg"
! !
!ProcessorScheduler methodsFor:'primitive process primitives'!
scheduleForInterrupt:aProcess
"make aProcess evaluate its pushed interrupt block(s)"
self scheduleInterruptActionsOf:aProcess.
"
and, make the process runnable
"
aProcess state ~~ #stopped ifTrue:[
"
and, make the process runnable
"
self resume:aProcess
]
"Modified: 17.6.1996 / 14:40:52 / cg"
!
scheduleInterruptActionsOf:aProcess
"make aProcess evaluate its pushed interrupt block(s)
when resumed."
|id|
aProcess isNil ifTrue:[^ self].
aProcess == activeProcess ifTrue:[^ self].
id := aProcess id.
self class threadInterrupt:id.
"Created: 5.3.1996 / 17:25:55 / cg"
!
threadSwitch:aProcess
"continue execution in aProcess.
WARNING: this is a low level entry, no process administration is done here"
|id pri ok oldProcess oldPri p singleStep wasBlocked|
(aProcess isNil or:[aProcess == activeProcess]) ifTrue:[^ self].
wasBlocked := OperatingSystem blockInterrupts.
oldProcess := activeProcess.
oldPri := currentPriority.
id := aProcess id.
pri := aProcess priority.
singleStep := aProcess isSingleStepping.
aProcess state:#active.
oldProcess setStateTo:#run if:#active.
"
no interrupts now - activeProcess has already been changed
(dont add any message sends here)
"
activeProcess := aProcess.
currentPriority := pri.
%{
extern OBJ ___threadSwitch();
if (__isSmallInteger(id)) {
ok = ___threadSwitch(__context, __intVal(id), (singleStep == true) ? 1 : 0);
} else {
ok = false;
}
%}.
"time passes spent in some other process ...
... here again"
p := activeProcess.
activeProcess := oldProcess.
currentPriority := oldProcess priority.
ok ifFalse:[
"
switch failed for some reason -
destroy the bad process
"
p id ~~ 0 ifTrue:[
'Processor [warning]: problem with process ' errorPrint.
p id errorPrint.
p name notNil ifTrue:[
' (' errorPrint. p name errorPrint. ')' errorPrint.
].
'; hard-terminate it.' errorPrintCR.
'Processor [info]: cleanup may take a while if stack is huge' infoPrintCR.
p state:#suspended.
self terminateNoSignal:p.
]
].
zombie notNil ifTrue:[
self class threadDestroy:zombie.
zombie := nil
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
! !
!ProcessorScheduler methodsFor:'priority constants'!
highIOPriority
"not currently used - for ST80 compatibility only"
^ 16 "claus: is this ok ?"
"Created: 15.11.1996 / 11:42:39 / cg"
!
highestPriority
"return the highest priority value (normal) processes can have."
"must be below schedulingPriority -
otherwise scheduler could be blocked ...
"
^ HighestPriority
!
lowIOPriority
"not currently used - for ST80 compatibility only"
^ 2 "claus: is this ok ?"
!
lowestPriority
"return the lowest priority value"
^ 1 "do not change this - its not variable"
!
schedulingPriority
"return the priority at which the scheduler runs."
"must be above highestPriority -
otherwise scheduler could be blocked ...
"
^ SchedulingPriority
!
systemBackgroundPriority
"return the priority, at which background system processing
should take place.
Not currently used - for ST80 compatibility only"
^ 4
!
timingPriority
"return the priority, at which all timing takes place (messageTally,
delay etc.)"
^ TimingPriority
!
userBackgroundPriority
"return the priority, at which background user (non-interactive) processing
should take place.
Not currently used - for ST80 compatibility only"
^ 6
!
userInterruptPriority
"return the priority, at which the event scheduler runs - i.e.
all processes running at a lower priority are interruptable by Cntl-C
or the timer. Processes running at higher prio will not be interrupted."
^ UserInterruptPriority
!
userSchedulingPriority
"return the priority, at which all normal user (interactive) processing
takes place"
^ UserSchedulingPriority
! !
!ProcessorScheduler methodsFor:'private'!
remember:aProcess
"remember aProcess for later disposal (where the underlying
system resources have to be freed)."
|newShadow oldId wasBlocked
oldSize "{ Class: SmallInteger }"
index "{ Class: SmallInteger }"
sz "{ Class: SmallInteger }" |
wasBlocked := OperatingSystem blockInterrupts.
index := 1.
sz := KnownProcessIds size.
[index <= sz] whileTrue:[
(KnownProcesses at:index) isNil ifTrue:[
oldId := KnownProcessIds at:index.
oldId notNil ifTrue:[
self class threadDestroy:oldId.
].
KnownProcesses at:index put:aProcess.
KnownProcessIds at:index put:aProcess id.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
^ self
].
index := index + 1
].
KnownProcessIds grow:index.
KnownProcessIds at:index put:aProcess id.
oldSize := KnownProcesses size.
(index > oldSize) ifTrue:[
newShadow := WeakArray new:(oldSize * 2).
newShadow addDependent:self class.
newShadow replaceFrom:1 with:KnownProcesses.
KnownProcesses := newShadow
].
KnownProcesses at:index put:aProcess.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
"Modified: 7.1.1997 / 16:48:39 / stefan"
!
unRemember:aProcess
"forget aProcess - dispose processing will not consider this one"
|index wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
index := KnownProcesses identityIndexOf:aProcess.
index ~~ 0 ifTrue:[
KnownProcessIds at:index put:nil.
KnownProcesses at:index put:nil.
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
! !
!ProcessorScheduler methodsFor:'private initializing'!
initialize
"initialize the one-and-only ProcessorScheduler"
|nPrios "{ Class: SmallInteger }"
p l|
KnownProcesses isNil ifTrue:[
KnownProcesses := WeakArray new:30.
KnownProcesses addDependent:self class.
KnownProcessIds := OrderedCollection new.
].
"
create a collection with process lists; accessed using the priority as key
"
nPrios := SchedulingPriority.
quiescentProcessLists := Array new:nPrios.
"/ 1 to:nPrios do:[:pri |
"/ quiescentProcessLists at:pri put:(LinkedList new)
"/ ].
readFdArray := Array with:nil.
readCheckArray := Array with:nil.
readSemaphoreArray := Array with:nil.
writeFdArray := Array with:nil.
writeSemaphoreArray := Array with:nil.
timeoutArray := Array with:nil.
timeoutSemaphoreArray := Array with:nil.
timeoutActionArray := Array with:nil.
timeoutProcessArray := Array with:nil.
anyTimeouts := false.
dispatching := false.
exitWhenNoMoreUserProcesses := false. "/ mhmh - how about true ?
useIOInterrupts := OperatingSystem supportsIOInterrupts.
gotIOInterrupt := false.
osChildExitActions := Dictionary new.
gotChildSignalInterrupt := false.
"
handcraft the first (dispatcher-) process - this one will never
block, but go into a select if there is nothing to do.
Also, it has a prio of max+1 - thus, it comes first when looking
for a runnable process.
"
currentPriority := SchedulingPriority.
p := Process new.
p setId:0 state:#run.
p setPriority:currentPriority.
p name:'scheduler'.
scheduler := activeProcess := p.
quiescentProcessLists at:currentPriority put:(l := LinkedList new).
l add:p.
"
let me handle IO and timer interrupts
"
ObjectMemory ioInterruptHandler:self.
ObjectMemory timerInterruptHandler:self.
ObjectMemory childSignalInterruptHandler:self.
"Modified: 29.7.1996 / 12:10:59 / cg"
"Modified: 7.1.1997 / 16:48:26 / stefan"
!
reinitialize
"all previous processes (except those marked as restartable) are made dead
- each object should reinstall its process(s) upon restart;
especially, windowgroups have to.
In contrast to ST-80, restartable processes are restarted at the beginning
NOT continued where left. This is a consequence of the portable implementation
of ST/X, since in order to continue a process, we needed to know the
internals of the machines (and C-compilers) stack layout.
This was not done, favouring portability for process continuation.
In praxis, this is not much of a problem, since in almost every case,
the computation state can be saved in some object, and processing be
restarted from scratch, reinitializing things from this saved state."
|processesToRestart|
"
lay all processes to rest, collect restartable ones
"
processesToRestart := OrderedCollection new.
KnownProcesses do:[:p |
(p notNil and:[p ~~ 0]) ifTrue:[
"how, exactly should this be done ?"
p isRestartable == true ifTrue:[
p nextLink:nil.
processesToRestart add:p
] ifFalse:[
p setId:nil state:#dead
]
].
].
scheduler setId:nil state:#dead.
"
now, start from scratch
"
KnownProcesses := nil.
self initialize.
"
... and restart those that can be.
"
processesToRestart do:[:p |
p imageRestart
]
"Modified: 28.10.1996 / 20:45:54 / cg"
! !
!ProcessorScheduler methodsFor:'process creation'!
newProcessFor:aProcess
"create a physical (VM-) process for aProcess.
Return true if ok, false if something went wrong.
The process is not scheduled; to start it running,
it needs a Process>>resume. Once resumed, the process will later
get control in its #start method."
|id|
id := self class threadCreate:aProcess withId:nil.
id isNil ifTrue:[^ false].
aProcess setId:id state:#light. "meaning: has no stack yet"
self remember:aProcess.
^ true
!
newProcessFor:aProcess withId:idWant
"private entry for Process restart - do not use in your program"
idWant isNil ifTrue:[
^ self newProcessFor:aProcess
].
(self class threadCreate:aProcess withId:idWant) ~~ idWant ifTrue:[
^ false
].
aProcess state:#light. "meaning: has no stack yet"
self remember:aProcess.
^ true
"Modified: 28.10.1996 / 19:14:28 / cg"
! !
!ProcessorScheduler methodsFor:'queries'!
activeProcessIsSystemProcess
"return true if the active process is a system process,
which should not be suspended."
^ self isSystemProcess:activeProcess
"
Processor activeProcessIsSystemProcess
"
!
anyUserProcessAtAll
"return true, if there is any process still running with a
non-zero processGroup (i.e. any non-system process).
This is used to determine if we should stop scheduling
in standAlone applications."
|listArray l prio "{ Class: SmallInteger }" |
prio := HighestPriority.
listArray := quiescentProcessLists.
[prio >= 1] whileTrue:[
l := listArray at:prio.
l notNil ifTrue:[
l do:[:aProcess |
aProcess processGroupId ~~ 0 ifTrue:[
^ true.
]
]
].
prio := prio - 1
].
^ false
"
Processor anyUserProcessAtAll
"
"Modified: 29.7.1996 / 11:49:17 / cg"
!
highestPriorityRunnableProcess
"return the highest prio runnable process"
|listArray l p prio "{ Class: SmallInteger }" |
prio := HighestPriority.
listArray := quiescentProcessLists.
[prio >= 1] whileTrue:[
l := listArray at:prio.
l notNil ifTrue:[
l notEmpty ifTrue:[
p := l first.
"
if it got corrupted somehow ...
"
p id isNil ifTrue:[
'Processor [warning]: process with nil id removed' errorPrintCR.
l removeFirst.
^ nil.
].
^ p
]
].
prio := prio - 1
].
^ nil
"Modified: 10.1.1997 / 18:03:28 / cg"
!
isPureEventDriven
"this is temporary - (maybe not :-).
you can run ST/X either with or without processes.
Without, there is conceptionally a single process handling all
outside events and timeouts. This has some negative implications
(Debugger is ugly), but allows a fully portable ST/X without any
assembler support - i.e. quick portability.
The PureEvent flag will automatically be set if the runtime system
does not support threads - otherwise, it can be set manually
(from rc-file).
"
^ PureEventDriven
"Created: 13.4.1996 / 20:31:31 / cg"
!
isSystemProcess:aProcess
"return true if aProcess is a system process,
which should not be suspended/terminated etc.."
(PureEventDriven
or:[aProcess id == 0
or:[(Display notNil and:[Display dispatchProcess == aProcess])
" nameOrId endsWith:'dispatcher' "
]]) ifTrue:[
^ true
].
^ false
"
Processor activeProcessIsSystemProcess
"
"Modified: 13.4.1996 / 20:35:00 / cg"
! !
!ProcessorScheduler methodsFor:'scheduling'!
changePriority:prio for:aProcess
"change the priority of aProcess"
|oldList newList oldPrio newPrio wasBlocked|
oldPrio := aProcess priority.
oldPrio == prio ifTrue:[^ self].
aProcess == scheduler ifTrue:[^ self].
"
check for valid argument
"
newPrio := prio.
newPrio < 1 ifTrue:[
newPrio := 1.
] ifFalse:[
newPrio > HighestPriority ifTrue:[
newPrio := HighestPriority
]
].
[
wasBlocked := OperatingSystem blockInterrupts.
aProcess setPriority:newPrio.
oldList := quiescentProcessLists at:oldPrio.
oldList notNil ifTrue:[
(oldList identityIndexOf:aProcess) ~~ 0 ifTrue:[
oldList remove:aProcess.
newList := quiescentProcessLists at:newPrio.
newList isNil ifTrue:[
quiescentProcessLists at:newPrio put:(newList := LinkedList new).
].
newList addLast:aProcess.
"if its the current process lowering its prio
or another one raising, we have to reschedule"
aProcess == activeProcess ifTrue:[
currentPriority := newPrio.
newPrio < oldPrio ifTrue:[
self threadSwitch:scheduler.
]
] ifFalse:[
newPrio > currentPriority ifTrue:[
self threadSwitch:aProcess.
]
].
].
]
] valueNowOrOnUnwindDo:[
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
]
"Modified: 29.7.1996 / 12:11:57 / cg"
!
interruptActive
"interrupt the current process
- this message is sent by the VM, when a process is about to be switched to,
and that process has the interrupted flag bit set.
Pass the interrupt to the process, which may do whatever it likes with it."
|s|
"/ hide those intermediate scheduler contexts;
"/ the interrupt block should think it was called right
"/ from the originally interrupted context
s := thisContext sender.
s selector == #threadSwitch: ifTrue:[
s := s sender.
s selector == #timerInterrupt ifTrue:[
s := s sender
]
].
activeProcess interruptedIn:s
"Modified: 20.10.1996 / 17:06:48 / cg"
!
processTermination
"sent by VM if the current process finished its startup block
without proper process termination. Lay him to rest now.
This can only happen, if something went wrong in Block>>newProcess,
since the block defined there always terminates itself."
self terminateNoSignal:activeProcess.
self threadSwitch:scheduler
!
reschedule
"switch to the highest prio runnable process.
The scheduler itself is always runnable, so we can do an unconditional switch
to that one. This method is a historical left-over and will vanish."
^ self threadSwitch:scheduler
!
resume:aProcess
"set aProcess runnable -
if its prio is higher than the currently running prio, switch to it."
|l pri wasBlocked|
"ignore, if process is already dead"
(aProcess isNil or:[aProcess id isNil]) ifTrue:[^ self].
aProcess == activeProcess ifTrue:[
"special handling for waiting schedulers"
aProcess == scheduler ifTrue:[
suspendScheduler := false.
].
^ self
].
wasBlocked := OperatingSystem blockInterrupts.
pri := aProcess priority.
l := quiescentProcessLists at:pri.
"if already running, ignore"
l notNil ifTrue:[
(l identityIndexOf:aProcess) ~~ 0 ifTrue:[
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
^ self
]
] ifFalse:[
l := LinkedList new.
quiescentProcessLists at:pri put:l.
].
l addLast:aProcess.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
(pri > currentPriority) ifTrue:[
"
its prio is higher; immediately transfer control to it
"
self threadSwitch:aProcess
] ifFalse:[
"
its prio is lower; it will have to wait for a while ...
"
aProcess state:#run
]
"Modified: 29.7.1996 / 12:07:37 / cg"
!
resumeForSingleSend:aProcess
"like resume, but let the process execute a single send only.
This will be used by the (new, not yet released) debugger
for single stepping."
(aProcess isNil or:[aProcess == activeProcess]) ifTrue:[^ self].
aProcess singleStep:true.
self resume:aProcess
!
suspend:aProcess
"remove the argument, aProcess from the list of runnable processes.
If the process is the current one, reschedule.
Notice:
This method should only be called by Process>>suspend or
Process>>suspendWithState:"
|pri l p wasBlocked|
"
some debugging stuff
"
aProcess isNil ifTrue:[
InvalidProcessSignal raiseWith:aProcess errorString:'PROCESSOR: nil suspend'.
^ self
].
aProcess id isNil ifTrue:[
InvalidProcessSignal raiseWith:aProcess errorString:'PROCESSOR: bad suspend: already dead'.
self threadSwitch:scheduler.
^ self
].
aProcess == scheduler ifTrue:[
"only scheduler may suspend itself"
activeProcess == scheduler ifTrue:[
suspendScheduler := true.
[suspendScheduler] whileTrue:[
self dispatch.
].
^ self
].
InvalidProcessSignal raiseWith:aProcess errorString:'PROCESSOR: scheduler should never be suspended'.
^ self
].
wasBlocked := OperatingSystem blockInterrupts.
pri := aProcess priority.
l := quiescentProcessLists at:pri.
"notice: this is slightly faster than putting the if-code into
the ifAbsent block, because [] is a shared cheap block, created at compile time
"
(l isNil or:[(l remove:aProcess ifAbsent:[]) isNil]) ifTrue:[
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
'Processor [warning]: bad suspend: not on run list' errorPrintCR.
"/ MiniDebugger enterWithMessage:'bad suspend: not on run list'.
self threadSwitch:scheduler.
^ self
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
(aProcess == activeProcess) ifTrue:[
"we can immediately switch sometimes"
l notEmpty ifTrue:[
p := l first
] ifFalse:[
p := scheduler
].
self threadSwitch:p
].
"Modified: 23.9.1996 / 13:49:24 / stefan"
"Modified: 11.1.1997 / 18:23:51 / cg"
!
terminate:aProcess
"terminate aProcess. This is donen by sending aProcess the terminateSignal,
which will evaluate any unwind blocks and finally do a hard terminate."
aProcess terminate
!
terminateActive
"terminate the current process (i.e. the running process kills itself).
The active process is sent the terminateSignal so it will evaluate any
unwind blocks and finally do a hard terminate.
This is sent for regular termination and by the VM, if the hard-stack limit
is reached. (i.e. a process did not repair things in a recursionInterrupt and
continued to grow its stack)"
activeProcess terminate
!
terminateActiveNoSignal
"hard terminate the active process, without sending any
terminate signal thus no unwind blocks are evaluated."
self terminateNoSignal:activeProcess
!
terminateNoSignal:aProcess
"hard terminate aProcess without sending the terminate signal, thus
no unwind blocks or exitAction are performed in the process..
If its not the current process, it is simply removed from its list
and physically destroyed. Otherwise (since we can't take away the chair
we are sitting on), a switch is forced and the process
will be physically destroyed by the next running process.
(see zombie handling)"
|pri id l wasBlocked|
aProcess isNil ifTrue:[^ self].
aProcess == scheduler ifTrue:[
InvalidProcessSignal raiseWith:aProcess errorString:'PROCESSOR: I will not terminate scheduler'.
^ self
].
id := aProcess id.
id isNil ifTrue:[^ self]. "already dead"
aProcess setId:nil state:#dead.
wasBlocked := OperatingSystem blockInterrupts.
"remove the process from the runnable list"
pri := aProcess priority.
l := quiescentProcessLists at:pri.
(l identityIndexOf:aProcess) ~~ 0 ifTrue:[
l remove:aProcess.
l isEmpty ifTrue:[
quiescentProcessLists at:pri put:nil
]
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
aProcess == activeProcess ifTrue:[
"
hard case - its the currently running process
we must have the next active process destroy this one
(we cannot destroy the chair we are sitting on ... :-)
"
zombie := id.
self unRemember:aProcess.
self threadSwitch:scheduler.
"not reached"
^ self
].
self class threadDestroy:id.
self unRemember:aProcess.
^ self
"Modified: 29.7.1996 / 11:56:08 / cg"
"Modified: 23.9.1996 / 13:50:24 / stefan"
!
yield
"move the currently running process to the end of the currentList
and reschedule to the first in the list, thus switching to the
next same-prio-process."
|l sz wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
"
debugging consistency check - will be removed later
"
activeProcess priority ~~ currentPriority ifTrue:[
'Processor [warning]: process changed its priority' errorPrintCR.
currentPriority := activeProcess priority.
].
l := quiescentProcessLists at:currentPriority.
sz := l size.
"
debugging consistency checks - will be removed later
"
sz == 0 ifTrue:[
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
'Processor [warning]: empty runnable list' errorPrintCR.
^ self
].
"
check if the running process is not the only one
"
sz ~~ 1 ifTrue:[
"
bring running process to the end
"
l removeFirst.
l addLast:activeProcess.
"
and switch to first in the list
"
self threadSwitch:(l first).
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
"Modified: 10.1.1997 / 18:04:35 / cg"
! !
!ProcessorScheduler methodsFor:'scheduling - preemptive'!
slice
"Give other Processes at the current priority a chance to run."
|i "{ Class: SmallInteger }"
list|
i := self highestPriority - 1. "claus: dont slice myself"
[ i > 0 and: [(quiescentProcessLists at: i) size <= 1]] whileTrue: [i := i - 1].
i == 0 ifTrue: [^ self].
list := quiescentProcessLists at:i.
"/ shuffle that list
list addLast:(list removeFirst).
"Modified: 17.1.1997 / 16:38:38 / cg"
!
startTimeSlicing
"start preemptive scheduling"
|timeSliceProcess TimeSliceInterval|
timeSliceProcess notNil ifTrue: [^self].
timeSliceProcess := [
[
[true] whileTrue: [
Delay waitForMilliseconds:TimeSliceInterval.
self slice
]
] valueOnUnwindDo:[
timeSliceProcess := nil
]
] newProcess.
timeSliceProcess priority:(self highestPriority).
timeSliceProcess name:'time slicer'.
timeSliceProcess resume.
"
Processor startTimeSlicing
"
"Created: 17.1.1997 / 16:42:02 / cg"
"Modified: 17.1.1997 / 16:43:36 / cg"
!
stopTimeSlicing
"stop preemptive scheduling"
|timeSliceProcess|
timeSliceProcess notNil ifTrue: [
timeSliceProcess terminate.
]
"
Processor stopTimeSlicing
"
"Created: 17.1.1997 / 16:43:03 / cg"
"Modified: 17.1.1997 / 16:43:42 / cg"
! !
!ProcessorScheduler methodsFor:'semaphore signalling'!
disableSemaphore:aSemaphore
"disable triggering of a semaphore"
|idx "{ Class: SmallInteger }"
wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
idx := readSemaphoreArray identityIndexOf:aSemaphore startingAt:1.
[idx ~~ 0] whileTrue:[
readFdArray at:idx put:nil.
readSemaphoreArray at:idx put:nil.
readCheckArray at:idx put:nil.
idx := readSemaphoreArray identityIndexOf:aSemaphore startingAt:idx.
].
idx := writeSemaphoreArray identityIndexOf:aSemaphore startingAt:1.
[idx ~~ 0] whileTrue:[
writeFdArray at:idx put:nil.
writeSemaphoreArray at:idx put:nil.
idx := writeSemaphoreArray identityIndexOf:aSemaphore startingAt:idx.
].
idx := timeoutSemaphoreArray identityIndexOf:aSemaphore startingAt:1.
[idx ~~ 0] whileTrue:[
timeoutArray at:idx put:nil.
timeoutSemaphoreArray at:idx put:nil.
timeoutActionArray at:idx put:nil.
timeoutProcessArray at:idx put:nil.
idx := timeoutSemaphoreArray identityIndexOf:aSemaphore startingAt:idx.
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
!
signal:aSemaphore
"arrange for a semaphore to be triggered as soon as possible.
The actual signalling is performed slightly delayed, when the dispatcher
looks for a process to resume.
This is provided as entry for primitive code using __STX_SignalSemaphore().
Normal smalltalk code should send an appropriate message directly
to the semaphore."
|now wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
now := OperatingSystem getMillisecondTime.
self signal:aSemaphore atMilliseconds:now.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
"Created: 24.9.1996 / 10:18:45 / cg"
!
signal:aSemaphore afterMilliseconds:millis
"arrange for a semaphore to be triggered after some milliseconds"
|now then wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
now := OperatingSystem getMillisecondTime.
then := OperatingSystem millisecondTimeAdd:now and:millis rounded.
self signal:aSemaphore atMilliseconds:then.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
!
signal:aSemaphore afterSeconds:seconds
"arrange for a semaphore to be triggered after some seconds"
self signal:aSemaphore afterMilliseconds:(seconds * 1000)
!
signal:aSemaphore atMilliseconds:aMillisecondTime
"arrange for a semaphore to be triggered at a specific millisecond time.
If there is already a pending trigger time, the time is changed."
|index "{ Class: SmallInteger }"
wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
index := timeoutSemaphoreArray identityIndexOf:aSemaphore startingAt:1.
index ~~ 0 ifTrue:[
timeoutArray at:index put:aMillisecondTime
] ifFalse:[
index := timeoutArray identityIndexOf:nil startingAt:1.
index ~~ 0 ifTrue:[
timeoutSemaphoreArray at:index put:aSemaphore.
timeoutArray at:index put:aMillisecondTime.
timeoutActionArray at:index put:nil.
timeoutProcessArray at:index put:nil
] ifFalse:[
timeoutSemaphoreArray := timeoutSemaphoreArray copyWith:aSemaphore.
timeoutArray := timeoutArray copyWith:aMillisecondTime.
timeoutActionArray := timeoutActionArray copyWith:nil.
timeoutProcessArray := timeoutProcessArray copyWith:nil
].
].
anyTimeouts := true.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
!
signal:aSemaphore onInput:aFileDescriptor
"arrange for a semaphore to be triggered when input on aFileDescriptor
arrives."
self signal:aSemaphore onInput:aFileDescriptor orCheck:nil
!
signal:aSemaphore onInput:aFileDescriptor orCheck:aBlock
"arrange for a semaphore to be triggered when input on aFileDescriptor
arrives OR checkblock evaluates to true.
(checkBlock is used for buffered input, where a select may not detect
data already read into a buffer - as in Xlib)"
|idx "{ Class: SmallInteger }"
wasBlocked|
aFileDescriptor isNil ifTrue:[
'ProcessorScheduler [info]: no fd to select on - polling with checkBlock' infoPrintCR
].
wasBlocked := OperatingSystem blockInterrupts.
(readFdArray identityIndexOf:aFileDescriptor startingAt:1) == 0 ifTrue:[
idx := readFdArray identityIndexOf:nil startingAt:1.
idx ~~ 0 ifTrue:[
readFdArray at:idx put:aFileDescriptor.
readSemaphoreArray at:idx put:aSemaphore.
readCheckArray at:idx put:aBlock
] ifFalse:[
readFdArray := readFdArray copyWith:aFileDescriptor.
readSemaphoreArray := readSemaphoreArray copyWith:aSemaphore.
readCheckArray := readCheckArray copyWith:aBlock.
]
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
"Modified: 10.1.1997 / 15:09:41 / cg"
!
signal:aSemaphore onOutput:aFileDescriptor
"arrange for a semaphore to be triggered when output on aFileDescriptor
is possible. (i.e. can be written without blocking)"
|idx "{ Class: SmallInteger }"
wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
(writeFdArray identityIndexOf:aFileDescriptor startingAt:1) == 0 ifTrue:[
idx := writeFdArray identityIndexOf:nil startingAt:1.
idx ~~ 0 ifTrue:[
writeFdArray at:idx put:aFileDescriptor.
writeSemaphoreArray at:idx put:aSemaphore.
] ifFalse:[
writeFdArray := writeFdArray copyWith:aFileDescriptor.
writeSemaphoreArray := writeSemaphoreArray copyWith:aSemaphore.
]
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
! !
!ProcessorScheduler methodsFor:'timeout handling'!
addTimedBlock:aBlock afterMilliseconds:delta
"add the argument, aBlock to the list of time-scheduled-blocks; to be
evaluated after delta milliseconds. The process which installs this timed
block will be interrupted for execution of the block.
(if it is running, the interrupt will occur in whatever method it is
executing; if it is suspended, it will be resumed).
The block will be removed from the timed-block list after evaluation
(i.e. it will trigger only once).
Returns an ID, which can be used in #removeTimeoutWidthID:"
^ self addTimedBlock:aBlock for:activeProcess afterMilliseconds:delta
"Modified: 23.9.1996 / 14:33:59 / cg"
!
addTimedBlock:aBlock afterSeconds:delta
"add the argument, aBlock to the list of time-scheduled-blocks.
to be evaluated after delta seconds. The process which installs this timed
block will be interrupted for execution of the block.
(if it is running, the interrupt will occur in whatever method it is
executing; if it is suspended, it will be resumed).
The block will be removed from the timed-block list after evaluation
(i.e. it will trigger only once).
Returns an ID, which can be used in #removeTimeoutWidthID:"
^ self addTimedBlock:aBlock for:activeProcess afterMilliseconds:(delta * 1000) rounded
"Modified: 23.9.1996 / 14:34:04 / cg"
!
addTimedBlock:aBlock atMilliseconds:aMillisecondTime
"add the argument, aBlock to the list of time-scheduled-blocks; to be
evaluated when the millisecondClock value passes aMillisecondTime.
The process which installs this timed block will be interrupted for
execution of the block.
(if it is running, the interrupt will occur in whatever method it is
executing; if it is suspended, it will be resumed).
The block will be removed from the timed-block list after evaluation
(i.e. it will trigger only once).
Returns an ID, which can be used in #removeTimeoutWidthID:"
^ self addTimedBlock:aBlock for:activeProcess atMilliseconds:aMillisecondTime
"Modified: 23.9.1996 / 14:34:09 / cg"
!
addTimedBlock:aBlock for:aProcess afterMilliseconds:delta
"add the argument, aBlock to the list of time-scheduled-blocks; to be
evaluated after delta milliseconds. The process specified by the argument,
aProcess will be interrupted for execution of the block.
(if it is running, the interrupt will occur in whatever method it is
executing; if it is suspended, it will be resumed).
If aProcess is nil, the block will be evaluated by the scheduler itself
(which is dangerous - the block should not raise any error conditions).
The block will be removed from the timed-block list after evaluation
(i.e. it will trigger only once).
Returns an ID, which can be used in #removeTimeoutWidthID:"
|now then wasBlocked id|
wasBlocked := OperatingSystem blockInterrupts.
now := OperatingSystem getMillisecondTime.
then := OperatingSystem millisecondTimeAdd:now and:delta.
id := self addTimedBlock:aBlock for:aProcess atMilliseconds:then.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
^ id
"Modified: 23.9.1996 / 14:34:13 / cg"
!
addTimedBlock:aBlock for:aProcess afterSeconds:delta
"add the argument, aBlock to the list of time-scheduled-blocks.
to be evaluated after delta seconds. aProcess will be interrupted for
execution of the block.
(if it is running, the interrupt will occur in whatever method it is
executing; if it is suspended, it will be resumed).
If aProcess is nil, the block will be evaluated by the scheduler itself
(which is dangerous - the block should not raise any error conditions).
The block will be removed from the timed-block list after evaluation
(i.e. it will trigger only once).
Returns an ID, which can be used in #removeTimeoutWidthID:"
^ self addTimedBlock:aBlock for:aProcess afterMilliseconds:(delta * 1000) rounded
"Modified: 23.9.1996 / 14:34:18 / cg"
!
addTimedBlock:aBlock for:aProcess atMilliseconds:aMillisecondTime
"add the argument, aBlock to the list of time-scheduled-blocks; to be
evaluated by aProcess when the millisecondClock value passes
aMillisecondTime.
If that block is already in the timeout list,
its trigger-time is changed.
The process specified by the argument, aProcess will be interrupted
for execution of the block.
If aProcess is nil, the block will be evaluated by the scheduler itself
(which is dangerous - the block should not raise any error conditions).
If the process is active at trigger time, the interrupt will occur in
whatever method it is executing; if suspended at trigger time, it will be
resumed.
The block will be removed from the timed-block list after evaluation
(i.e. it will trigger only once).
Returns an ID, which can be used in #removeTimeoutWidthID:"
|index "{ Class: SmallInteger }"
wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
index := timeoutActionArray identityIndexOf:aBlock startingAt:1.
index ~~ 0 ifTrue:[
timeoutArray at:index put:aMillisecondTime
] ifFalse:[
index := timeoutArray indexOf:nil.
index ~~ 0 ifTrue:[
timeoutArray at:index put:aMillisecondTime.
timeoutActionArray at:index put:aBlock.
timeoutSemaphoreArray at:index put:nil.
timeoutProcessArray at:index put:aProcess
] ifFalse:[
timeoutArray := timeoutArray copyWith:aMillisecondTime.
timeoutActionArray := timeoutActionArray copyWith:aBlock.
timeoutSemaphoreArray := timeoutSemaphoreArray copyWith:nil.
timeoutProcessArray := timeoutProcessArray copyWith:aProcess.
index := timeoutArray size.
].
].
anyTimeouts := true.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
^ index
"Modified: 23.9.1996 / 14:34:23 / cg"
!
addTimeoutFunctionCall:anExternalFunction for:aProcess afterMilliseconds:delta with:argument
"prepare for an external function to be called with a single argument
after some millisecond-Delay.
If aProcess is nil, the block will be evaluated by the scheduler itself,
otherwise, that process will be interrupted and the function is performed
in this processes context.
The callBack will be removed from the timed-block list after evaluation
(i.e. it will trigger only once).
Returns an ID, which can be used in #removeTimeoutWidthID:"
|now then wasBlocked id|
wasBlocked := OperatingSystem blockInterrupts.
now := OperatingSystem getMillisecondTime.
then := OperatingSystem millisecondTimeAdd:now and:delta.
id := self
addTimeoutFunctionCall:anExternalFunction
for:aProcess
atMilliseconds:then
with:argument.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
^ id
"Created: 23.9.1996 / 14:28:27 / cg"
"Modified: 23.9.1996 / 14:34:42 / cg"
!
addTimeoutFunctionCall:anExternalFunction for:aProcess atMilliseconds:milliTime with:argument
"prepare for an external function to be called with a single argument
at some millisecond-time.
If aProcess is nil, the block will be evaluated by the scheduler itself,
otherwise, that process will be interrupted and the function is performed
in this processes context.
The callBack will be removed from the timed-block list after evaluation
(i.e. it will trigger only once).
Returns an ID, which can be used in #removeTimeoutWidthID:"
|action|
action := [anExternalFunction callWith:argument].
^ self
addTimedBlock:action
for:aProcess
atMilliseconds:milliTime.
"Created: 23.9.1996 / 14:29:30 / cg"
"Modified: 23.9.1996 / 14:34:57 / cg"
!
evaluateTimeouts
"walk through timeouts and evaluate blocks or signal semas that need to be .."
|sema now aTime block blocksToEvaluate
processes n "{ Class: SmallInteger }"|
anyTimeouts ifFalse:[ ^ self].
"have to collect the blocks first, then evaluate them. This avoids
problems due to newly inserted blocks."
now := OperatingSystem getMillisecondTime.
blocksToEvaluate := nil.
n := timeoutArray size.
anyTimeouts := false.
1 to:n do:[:index |
aTime := timeoutArray at:index.
aTime notNil ifTrue:[
(OperatingSystem millisecondTime:aTime isAfter:now) ifFalse:[
"this one should be triggered"
sema := timeoutSemaphoreArray at:index.
sema notNil ifTrue:[
sema signalOnce.
timeoutSemaphoreArray at:index put:nil
] ifFalse:[
"to support pure-events"
block := timeoutActionArray at:index.
block notNil ifTrue:[
blocksToEvaluate isNil ifTrue:[
blocksToEvaluate := OrderedCollection new:10.
processes := OrderedCollection new:10.
].
blocksToEvaluate add:block.
processes add:(timeoutProcessArray at:index).
timeoutActionArray at:index put:nil.
timeoutProcessArray at:index put:nil.
]
].
timeoutArray at:index put:nil.
] ifTrue:[
anyTimeouts := true
]
]
].
blocksToEvaluate notNil ifTrue:[
blocksToEvaluate keysAndValuesDo:[:index :block |
|p|
p := processes at:index.
(p isNil or:[p == scheduler or:[PureEventDriven]]) ifTrue:[
block value
] ifFalse:[
p interruptWith:block
]
]
]
!
removeTimedBlock:aBlock
"remove the argument, aBlock from the list of time-sceduled-blocks."
|index "{ Class: SmallInteger }"
wasBlocked|
wasBlocked := OperatingSystem blockInterrupts.
index := timeoutActionArray identityIndexOf:aBlock startingAt:1.
(index ~~ 0) ifTrue:[
timeoutArray at:index put:nil.
timeoutActionArray at:index put:nil.
timeoutSemaphoreArray at:index put:nil.
timeoutProcessArray at:index put:nil.
].
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
!
removeTimeoutWithID:anID
"remove the timeOut with anID (as returned by #addTimedBlock)
from the list of time-sceduled-blocks."
|index "{ Class: SmallInteger }"
wasBlocked|
index := anID.
(index > 0) ifTrue:[
wasBlocked := OperatingSystem blockInterrupts.
timeoutArray at:index put:nil.
timeoutActionArray at:index put:nil.
timeoutSemaphoreArray at:index put:nil.
timeoutProcessArray at:index put:nil.
wasBlocked ifFalse:[OperatingSystem unblockInterrupts].
]
"Created: 23.9.1996 / 14:32:33 / cg"
"Modified: 23.9.1996 / 14:35:09 / cg"
! !
!ProcessorScheduler methodsFor:'waiting'!
checkForInputWithTimeout:millis
"this is called, when there is absolutely nothing to do;
hard wait for either input to arrive or a timeout to occur."
|fd index sema action wasBlocked|
"/ must enable interrupts, to be able to get out of a
"/ long wait (especially, to handle sigChild in the meantime)
(wasBlocked := OperatingSystem interruptsBlocked) ifTrue:[
OperatingSystem unblockInterrupts.
].
fd := OperatingSystem
selectOnAnyReadable:readFdArray
writable:writeFdArray
exception:nil
withTimeOut:millis.
wasBlocked ifTrue:[
OperatingSystem blockInterrupts.
].
fd isNil ifTrue:[
(OperatingSystem lastErrorSymbol == #EBADF) ifTrue:[
"/ mhmh - one of the fd's given to me is corrupt.
"/ find out which one .... and remove it
OperatingSystem clearLastErrorNumber.
self removeCorruptedFds
]
] ifFalse:[
index := readFdArray indexOf:fd.
index ~~ 0 ifTrue:[
sema := readSemaphoreArray at:index.
sema notNil ifTrue:[
sema signalOnce.
^ true
] ifFalse:[
action := readCheckArray at:index.
action notNil ifTrue:[
action value.
^ true
]
]
]
].
^ false
"Modified: 12.4.1996 / 09:31:22 / stefan"
!
ioInterrupt
"{ Pragma: +returnable }"
"data arrived while waiting - switch to scheduler process which will decide
what to do now.
This method is called by the VM' interrupt handling mechanism.
Notice, that at the time of the message, we are still in the context
of whichever process is currently running."
gotIOInterrupt := true.
interruptedProcess := activeProcess.
self threadSwitch:scheduler
"Modified: 21.12.1995 / 16:17:40 / stefan"
"Modified: 18.10.1996 / 20:36:05 / cg"
!
removeCorruptedFds
"this is sent when select returns an error due to some invalid
fileDescriptor. May happen, if someone does a readWait/writeWait on a
socket connection, which somehow got corrupted
(shutdown by partner, or closed in another thread).
Without special care, all following selects would immediately return with
an #EBADF error, leading to high-frequency polling and a locked up system.
(you could still fix things by interrupting on the console and fixing the
readFdArray/writeFdArray in the debugger)"
readFdArray keysAndValuesDo:[:idx :fd |
|rslt sema|
rslt := OperatingSystem
selectOnAnyReadable:(Array with:fd)
writable:nil
exception:nil
withTimeOut:0.
(rslt isNil and:[OperatingSystem lastErrorSymbol == #EBADF]) ifTrue:[
('Processor [warning]: removing invalid read fileDescriptor: ' , fd printString) errorPrintCR.
readFdArray at:idx put:nil.
OperatingSystem clearLastErrorNumber.
(sema := readSemaphoreArray at:idx) notNil ifTrue:[
readSemaphoreArray at:idx put:nil.
sema signal.
].
]
].
writeFdArray keysAndValuesDo:[:idx :fd |
|rslt sema|
rslt := OperatingSystem
selectOnAnyReadable:nil
writable:(Array with:fd)
exception:nil
withTimeOut:0.
(rslt isNil and:[OperatingSystem lastErrorSymbol == #EBADF]) ifTrue:[
('Processor [warning]: removing invalid write fileDescriptor: ' , fd printString) errorPrintCR.
writeFdArray at:idx put:nil.
OperatingSystem clearLastErrorNumber.
(sema := writeSemaphoreArray at:idx) notNil ifTrue:[
writeSemaphoreArray at:idx put:nil.
sema signal.
].
]
].
"Modified: 12.4.1996 / 09:32:58 / stefan"
"Modified: 10.1.1997 / 18:03:51 / cg"
!
schedulerInterrupt
"forced reschedule - switch to scheduler process which will decide
what to do now."
interruptedProcess := activeProcess.
self threadSwitch:scheduler
!
timeToNextTimeout
"return the delta-T (in millis) to next timeout, or nil if
there is none"
|aTime now delta minDelta n "{ Class: SmallInteger }"|
"find next timeout. since there are usually not many, just search.
If there were many, the list should be kept sorted ... keeping deltas
to next (as in Unix kernel)"
n := timeoutArray size.
1 to:n do:[:index |
aTime := timeoutArray at:index.
aTime notNil ifTrue:[
now isNil ifTrue:[
now := OperatingSystem getMillisecondTime.
].
(OperatingSystem millisecondTime:aTime isAfter:now) ifFalse:[^ 0].
delta := OperatingSystem millisecondTimeDeltaBetween:aTime and:now.
minDelta isNil ifTrue:[
minDelta := delta
] ifFalse:[
minDelta := minDelta min:delta
]
]
].
^ minDelta
!
timerInterrupt
"{ Pragma: +returnable }"
"timer expired while waiting - switch to scheduler process which will decide
what to do now.
This method is called by the VM' interrupt handling mechanism.
Notice, that at the time of the message, we are still in the context
of whichever process is currently running."
interruptedProcess := activeProcess.
self threadSwitch:scheduler
"Modified: 18.10.1996 / 20:35:54 / cg"
!
waitForEventOrTimeout
"entered when no process is runnable - wait for either input on
any file descriptors to arrive or a timeout to happen.
If it makes sense, do some background garbage collection.
The idle actions are a leftover from previous ST/X releases and will
vanish (installing a low-prio process has the same effect)."
|millis doingGC anySema|
doingGC := true.
[doingGC] whileTrue:[
anyTimeouts ifTrue:[
millis := self timeToNextTimeout.
(millis notNil and:[millis <= 0]) ifTrue:[
^ self "oops - hurry up checking"
].
].
"
if its worth doing, collect a bit of garbage;
but not, if a backgroundCollector is active
"
ObjectMemory backgroundCollectorRunning ifTrue:[
doingGC := false
] ifFalse:[
doingGC := ObjectMemory gcStepIfUseful.
].
"then do idle actions"
(idleActions notNil and:[idleActions size ~~ 0]) ifTrue:[
idleActions do:[:aBlock |
aBlock value.
].
^ self "go back checking"
].
doingGC ifTrue:[
(self checkForInputWithTimeout:0) ifTrue:[
^ self "go back checking"
]
]
].
exitWhenNoMoreUserProcesses ifTrue:[
"/ check if there are any processes at all
"/ stop dispatching if there is none
"/ (and millis is nil, which means that no timeout blocks are present)
"/ and no readSemaphores are present (which means that noone is waiting for input)
"/ and no writeSemaphores are present
anySema := false.
anySema := (readSemaphoreArray findFirst:[:sema | sema notNil]) ~~ 0.
anySema ifFalse:[
anySema := (writeSemaphoreArray findFirst:[:sema | sema notNil]) ~~ 0.
].
anySema ifFalse:[
self anyUserProcessAtAll ifFalse:[
dispatching := false.
^ self
]
].
].
"/
"/ absolutely nothing to do - simply wait
"/
OperatingSystem supportsSelect ifFalse:[
"SCO instant ShitStation has a bug here,
waiting always 1 sec in the select - therefore we delay a bit and
return - effectively polling in 50ms cycles
"
(self checkForInputWithTimeout:0) ifTrue:[
^ self "go back checking"
].
OperatingSystem millisecondDelay:50.
^ self
].
millis isNil ifTrue:[
millis := 9999.
] ifFalse:[
millis := millis rounded
].
self checkForInputWithTimeout:millis
"Modified: 14.12.1995 / 13:37:46 / stefan"
"Modified: 18.7.1996 / 20:42:17 / cg"
! !
!ProcessorScheduler class methodsFor:'documentation'!
version
^ '$Header: /cvs/stx/stx/libbasic/ProcessorScheduler.st,v 1.110 1997-01-17 15:44:35 cg Exp $'
! !
ProcessorScheduler initialize!