"
COPYRIGHT (c) 1993 by Claus Gittinger
All Rights Reserved
This software is furnished under a license and may be used
only in accordance with the terms of that license and with the
inclusion of the above copyright notice. This software may not
be provided or otherwise made available to, or used by, any
other person. No title to or ownership of the software is
hereby transferred.
"
Object subclass:#ProcessorScheduler
instanceVariableNames:'quiescentProcessLists
zombie
activeProcess currentPriority
readFds readSemaphores readChecks
writeFds writeSemaphores writeChecks
timeouts timeoutActions timeoutSemaphores
idleActions anyTimeouts dispatching
timeoutProcess'
classVariableNames:'KnownProcesses KnownProcessIds
PureEventDriven
UserSchedulingPriority TimingPriority'
poolDictionaries:''
category:'Kernel-Processes'
!
ProcessorScheduler comment:'
COPYRIGHT (c) 1993 by Claus Gittinger
All Rights Reserved
'!
Smalltalk at:#Processor put:nil!
!ProcessorScheduler class methodsFor:'documentation'!
copyright
"
COPYRIGHT (c) 1993 by Claus Gittinger
All Rights Reserved
This software is furnished under a license and may be used
only in accordance with the terms of that license and with the
inclusion of the above copyright notice. This software may not
be provided or otherwise made available to, or used by, any
other person. No title to or ownership of the software is
hereby transferred.
"
!
version
"
$Header: /cvs/stx/stx/libbasic/ProcessorScheduler.st,v 1.17 1994-06-02 16:21:23 claus Exp $
"
!
documentation
"
This class has only one instance, which is bound to the global
'Processor'. It is responsible for scheduling among the smalltalk
processes (threads; not to confuse with heavy weight unix processes).
"
! !
!ProcessorScheduler class methodsFor:'initialization'!
initialize
"class setup: create the one-and-only instance of myself and
setup some priority values."
KnownProcesses isNil ifTrue:[
KnownProcesses := WeakArray new:10.
KnownProcesses watcher:self.
KnownProcessIds := OrderedCollection new.
"want to get informed when returning from snapshot"
ObjectMemory addDependent:self
].
Processor isNil ifTrue:[
"create the one and only processor"
Processor := self new.
].
"
allow configurations without processes
"
PureEventDriven := self threadsAvailable not.
PureEventDriven ifTrue:[
'no process support - running event driven' errorPrintNewline
].
UserSchedulingPriority := 8.
TimingPriority := 16.
!
update:something
"being a dependent of the ObjectMemory, this is the notification
that something happened"
something == #returnFromSnapshot ifTrue:[
self reinstallProcesses
]
!
reinstallProcesses
"recreate all processes after a snapShot load.
This is currently not implemented (and might never be).
All we could do is to restart the processes. Time will show."
KnownProcesses do:[:p |
p notNil ifTrue:[
"how, exactly should this be done ?"
p id ~~ 0 ifTrue:[
'process restart not implemented' errorPrintNewline
]
]
]
! !
!ProcessorScheduler class methodsFor:'instance creation'!
new
"there is (currently) only one processor ..."
Processor isNil ifTrue:[
Processor := self basicNew initialize
].
^ Processor.
! !
!ProcessorScheduler class methodsFor:'instance release'!
informDispose
"some Process has been collected - terminate the underlying thread"
|id sz "{ Class: SmallInteger }"|
sz := KnownProcessIds size.
1 to:sz do:[:index |
(KnownProcesses at:index) isNil ifTrue:[
id := KnownProcessIds at:index.
id notNil ifTrue:[
Transcript showCr:('terminate thread (no longer refd) ', id printString).
self threadDestroy:id.
KnownProcessIds at:index put:nil.
]
]
]
! !
!ProcessorScheduler class methodsFor:'queries'!
isPureEventDriven
"this is temporary - (maybe not :-).
you can run ST/X either with or without processes.
Without, there is conceptionally a single process handling all
outside events and timeouts. This has some negative implications
(Debugger is ugly), but allows a fully portable ST/X without any
assembler support - i.e. quick portability.
The PureEvent flag will automatically be set if the runtime system
does not support threads - otherwise, it can be set manually
(from rc-file).
"
^ PureEventDriven
!
pureEventDriven
"turn on pure-event driven mode - no processes, single dispatch loop"
PureEventDriven := true
!
processDriven
"turn on process driven mode"
PureEventDriven := false
! !
!ProcessorScheduler class methodsFor:'primitive process primitives'!
threadsAvailable
"return true, if the runtime system supports threads (i.e. processes);
false otherwise."
%{ /* NOCONTEXT */
extern OBJ __threadsAvailable();
RETURN (__threadsAvailable());
%}
!
threadInterrupt:id with:aBlock
"make the process evaluate aBlock when it awakes the next time"
%{ /* NOCONTEXT */
if (_isSmallInteger(id) && __isBlock(aBlock)) {
__threadInterrupt(_intVal(id), aBlock);
}
%}
!
threadCreate:aBlock
"physical creation of a process executing aBlock.
(warning: low level entry, no administration done).
This may return nil, if process could not be created."
%{ /* NOCONTEXT */
int tid;
extern int __threadCreate();
tid = __threadCreate(aBlock, 0 /* stackSize no longer needed */);
if (tid != 0) {
RETURN ( _MKSMALLINT(tid));
}
%}
.
"
arrive here, if creation of process in VM failed.
(no memory for process)
"
ObjectMemory allocationFailureSignal raise.
^ nil
!
threadDestroy:id
"physical destroy other process ...
(warning: low level entry, no administration done)"
%{ /* NOCONTEXT */
if (_isSmallInteger(id)) {
__threadDestroy(_intVal(id));
}
%}
! !
!ProcessorScheduler methodsFor:'primitive process primitives'!
threadSwitch:aProcess
"continue execution in aProcess.
(warning: low level entry, no administration is done here)"
|id pri ok oldProcess oldPri p|
aProcess isNil ifTrue:[^ self].
aProcess == activeProcess ifTrue:[^ self].
oldProcess := activeProcess.
oldPri := currentPriority.
id := aProcess id.
pri := aProcess priority.
aProcess state:#active.
"no interrupts now - activeProcess has already been changed
(dont add any message sends here)"
activeProcess := aProcess.
currentPriority := pri.
%{
extern OBJ __threadSwitch();
ok = __threadSwitch(__context, _intVal(id));
%}.
"time passes ...
... here again"
ok ifFalse:[
"
switch failed for some reason -
destroy the bad process
"
p := activeProcess.
activeProcess := oldProcess.
currentPriority := oldPri.
p id ~~ 0 ifTrue:[
p state:#suspended.
p terminate.
]
].
zombie notNil ifTrue:[
self class threadDestroy:zombie.
zombie := nil
]
!
scheduleForInterrupt:aProcess
"make aProcess evaluate its pushedInterrupt block(s)"
|id|
aProcess isNil ifTrue:[^ self].
aProcess == activeProcess ifTrue:[^ self].
id := aProcess id.
self class threadInterrupt:id with:[aProcess interrupt].
"and, make the process runnable"
aProcess resume
! !
!ProcessorScheduler methodsFor:'constants'!
lowestPriority
"return the lowest priority value"
^ 1 "do not change this - its not variable"
!
highestPriority
"return the highest priority value processes can have"
"must be below schedulingPriority - otherwise scheduler
could be blocked ..."
^ 30
!
schedulingPriority
"return the priority at which the scheduler runs"
"must be above highestPriority - otherwise scheduler
could be blocked ..."
^ 31
!
userInterruptPriority
"not currently used - for ST80 compatibility only"
^ 24
!
timingPriority
"return the priority, at which all timing takes place (messageTally,
delay etc.)"
^ TimingPriority
!
userSchedulingPriority
"return the priority, at which all normal user (interactive) processing
takes place"
^ UserSchedulingPriority
!
userBackgroundPriority
"not currently used - for ST80 compatibility only"
^ 6
!
systemBackgroundPriority
"not currently used - for ST80 compatibility only"
^ 4
!
lowIOPriority
"not currently used - for ST80 compatibility only"
^ 2 "claus: is this ok ?"
! !
!ProcessorScheduler methodsFor:'private initializing'!
initialize
"initialize the one-and-only ProcessorScheduler"
|nPrios l|
nPrios := self schedulingPriority.
quiescentProcessLists := Array new:nPrios.
readFds := Array with:nil.
readChecks := Array with:nil.
readSemaphores := Array with:nil.
writeFds := Array with:nil.
writeChecks := Array with:nil.
writeSemaphores := Array with:nil.
timeouts := Array with:nil.
timeoutSemaphores := Array with:nil.
timeoutActions := Array with:nil.
anyTimeouts := false.
dispatching := false.
"
handcraft the first (dispatcher-) process - this one will never
block, but go into a select if there is nothing to do.
Also, it has a prio of max+1 - thus, it comes first when looking
for a runnable process.
"
activeProcess := Process new.
activeProcess setId:0.
activeProcess name:'scheduler'.
activeProcess state:#run.
currentPriority := self schedulingPriority.
activeProcess setPriority:currentPriority.
l := LinkedList new.
l add:activeProcess.
quiescentProcessLists at:currentPriority put:l.
ObjectMemory ioInterruptHandler:self.
ObjectMemory timerInterruptHandler:self.
!
reInitialize
"all previous stuff is obsolete - each object should reinstall itself
upon restart."
|l|
KnownProcesses := WeakArray new:5.
KnownProcesses watcher:self class.
KnownProcessIds := OrderedCollection new.
"for now (cannot snapin processes)"
quiescentProcessLists := Array new:self schedulingPriority.
readFds := Array with:nil.
readChecks := Array with:nil.
readSemaphores := Array with:nil.
writeFds := Array with:nil.
writeChecks := Array with:nil.
writeSemaphores := Array with:nil.
dispatching := false.
"handcraft the first (dispatcher-) process - this one will never
block, but go into a select if there is nothing to do.
Also it has a prio of max+1"
activeProcess := Process new.
activeProcess setId:0.
activeProcess name:'scheduler'.
activeProcess state:#run.
currentPriority := self schedulingPriority.
activeProcess setPriority:currentPriority.
l := LinkedList new.
l add:activeProcess.
quiescentProcessLists at:currentPriority put:l.
ObjectMemory ioInterruptHandler:self.
ObjectMemory timerInterruptHandler:self.
! !
!ProcessorScheduler methodsFor:'private'!
remember:aProcess
"remember aProcess for later disposal (where the underlying
system resources have to be freed)."
|newShadow oldSize oldId
index "{ Class: SmallInteger }"
sz "{ Class: SmallInteger }" |
OperatingSystem blockInterrupts.
index := 1.
sz := KnownProcessIds size.
[index <= sz] whileTrue:[
(KnownProcesses at:index) isNil ifTrue:[
oldId := KnownProcessIds at:index.
oldId notNil ifTrue:[
self class threadDestroy:oldId.
].
KnownProcesses at:index put:aProcess.
KnownProcessIds at:index put:aProcess id.
OperatingSystem unblockInterrupts.
^ self
].
index := index + 1
].
KnownProcessIds grow:index.
KnownProcessIds at:index put:aProcess id.
oldSize := KnownProcesses size.
(index > oldSize) ifTrue:[
newShadow := WeakArray new:(oldSize * 2).
newShadow watcher:self class.
newShadow replaceFrom:1 with:KnownProcesses.
KnownProcesses := newShadow
].
KnownProcesses at:index put:aProcess.
OperatingSystem unblockInterrupts.
!
unRemember:aProcess
"forget aProcess - dispose processing will not consider this one"
|index|
index := KnownProcesses identityIndexOf:aProcess.
index ~~ 0 ifTrue:[
KnownProcessIds at:index put:nil.
KnownProcesses at:index put:nil.
]
! !
!ProcessorScheduler methodsFor:'process creation'!
newProcessFor:aBlock
"create a new process executing aBlock.
Return a process (or nil if fail). The new process is not scheduled.
To start it running, it needs a Process>>resume."
|id p|
id := self class threadCreate:aBlock.
id isNil ifTrue:[
"
this may happen, if the VM does not support more processes,
or if it ran out of memory, when allocating internal data
structures
"
self error:'cannot create new Process'.
^ nil
].
p := Process new.
p setId:id.
p startBlock:aBlock.
p state:#light. "meaning: has no stack yet"
p setPriority:currentPriority.
self remember:p.
^ p
! !
!ProcessorScheduler methodsFor:'scheduling'!
reschedule
"switch to the highest prio runnable process
The scheduler itself is always runnable, so there is always a switch."
|l p|
(self schedulingPriority) to:1 by:-1 do:[:prio |
l := quiescentProcessLists at:prio.
l notNil ifTrue:[
p := l first.
p notNil ifTrue:[
activeProcess state == #active ifTrue:[
activeProcess state:#run.
].
^ self threadSwitch:p
].
quiescentProcessLists at:prio put:nil
]
].
"
no process to run - this 'cannot' happen
(well, not quite: it may happen if the scheduler process is
suspended - which btw. should be avoided, since noone is there
to schedule processes then)
"
self halt:'fatal dispatcher should never be suspended'.
"try to repair by just resuming ..."
activeProcess resume
!
yield
"move the currently running process to the end of the currentList
and reschedule to the first in the list, thus switching to the
next same-prio-process."
|l|
OperatingSystem blockInterrupts.
l := quiescentProcessLists at:currentPriority.
l isNil ifTrue:[
OperatingSystem unblockInterrupts.
'oops - nil runnable list' errorPrintNewline.
^ self
].
l size == 1 ifTrue:[
"
running one is the only one
"
OperatingSystem unblockInterrupts.
^ self
].
"
bring running process to the end
"
l removeFirst.
l addLast:activeProcess.
OperatingSystem unblockInterrupts.
"
and switch to first in the list
"
activeProcess state:#run.
self threadSwitch:(l first).
!
suspend:aProcess
"remove the argument, aProcess from the list of runnable processes.
If the process is the current one, reschedule."
|pri l s|
aProcess isNil ifTrue:[self error:'nil suspend'. ^ self].
aProcess id isNil ifTrue:['bad suspend: already dead' errorPrintNewline. ^ self].
OperatingSystem blockInterrupts.
pri := aProcess priority.
l := quiescentProcessLists at:pri.
l isNil ifTrue:[
OperatingSystem unblockInterrupts.
'bad suspend: not running' errorPrintNewline.
^ self
].
l remove:aProcess ifAbsent:[
OperatingSystem unblockInterrupts.
'bad suspend: not running' errorPrintNewline.
^ self
].
l isEmpty ifTrue:[
quiescentProcessLists at:pri put:nil.
l := nil
].
OperatingSystem unblockInterrupts.
s := aProcess state.
((s == #active) or:[s == #run]) ifTrue:[
aProcess state:#suspended.
].
(aProcess == activeProcess) ifTrue:[
"can be done a bit faster sometimes"
l notNil ifTrue:[
self threadSwitch:(l first)
] ifFalse:[
self reschedule
]
].
!
resume:aProcess
"set aProcess runnable - if its prio is higher than the currently running prio,
reschedule."
|l pri|
aProcess == activeProcess ifTrue:[^ self].
aProcess isNil ifTrue:[^ self].
"ignore, if process is already dead"
aProcess id isNil ifTrue:[^ self].
OperatingSystem blockInterrupts.
pri := aProcess priority.
l := quiescentProcessLists at:pri.
l isNil ifTrue:[
l := LinkedList new.
quiescentProcessLists at:pri put:l
] ifFalse:[
"if already running, ignore"
(l includes:aProcess) ifTrue:[
OperatingSystem unblockInterrupts.
^ self
]
].
l addLast:aProcess.
OperatingSystem unblockInterrupts.
(pri > currentPriority) ifTrue:[
activeProcess state:#run.
self threadSwitch:aProcess
] ifFalse:[
aProcess state:#suspended
]
!
processTermination
"current process finished its startup block without termination,
lay him to rest now"
self terminate:activeProcess.
self reschedule
!
terminate:aProcess
"terminate aProcess. If its not the current process, its simply
removed from its list and destroyed. Otherwise, a switch is forced
and the process is destroyed by the next running process."
|pri id l|
aProcess isNil ifTrue:[^ self].
id := aProcess id.
id isNil ifTrue:[^ self]. "already dead"
aProcess setId:nil.
aProcess startBlock:nil.
OperatingSystem blockInterrupts.
"remove the process from the runnable list"
pri := aProcess priority.
l := quiescentProcessLists at:pri.
(l notNil and:[l includes:aProcess]) ifTrue:[
l remove:aProcess.
l isEmpty ifTrue:[quiescentProcessLists at:pri put:nil].
].
OperatingSystem unblockInterrupts.
aProcess exitAction notNil ifTrue:[
aProcess exitAction value.
aProcess exitAction:nil
].
aProcess state:#dead.
aProcess == activeProcess ifTrue:[
"
hard case - its the currently running process
we must have the next active process destroy this one
(we cannot destroy the chair we are sitting on ... :-)
"
zombie := id.
self unRemember:aProcess.
self reschedule.
^ self
].
self class threadDestroy:id.
self unRemember:aProcess.
^ self
!
terminateActive
"terminate the current process (i.e. myself)"
self terminate:activeProcess
!
changePriority:newPrio for:aProcess
"change the priority of aProcess"
|oldList newList oldPrio|
oldPrio := aProcess priority.
oldPrio == newPrio ifTrue:[^ self].
OperatingSystem blockInterrupts.
aProcess setPriority:newPrio.
oldList := quiescentProcessLists at:oldPrio.
(oldList includes:aProcess) ifFalse:[
OperatingSystem unblockInterrupts.
^ self
].
oldList remove:aProcess.
oldList isEmpty ifTrue:[quiescentProcessLists at:oldPrio put:nil].
newList := quiescentProcessLists at:newPrio.
newList isNil ifTrue:[
newList := LinkedList new.
quiescentProcessLists at:newPrio put:newList
].
newList addLast:aProcess.
OperatingSystem unblockInterrupts.
"if its the current process lowering its prio
or another one raising, we have to reschedule"
aProcess == activeProcess ifTrue:[
currentPriority := newPrio.
newPrio < oldPrio ifTrue:[
self reschedule.
]
] ifFalse:[
newPrio > currentPriority ifTrue:[
activeProcess state:#run.
self threadSwitch:aProcess.
]
]
! !
!ProcessorScheduler methodsFor:'accessing'!
currentPriority
"return the priority of the currently running process"
^ currentPriority
"Processor currentPriority"
!
activePriority
"return the priority of the currently running process.
GNU-ST compatibility; this is the same as currentPriority"
^ currentPriority
!
activeProcess
"return the currently running process"
^ activeProcess
"Processor activeProcess"
! !
!ProcessorScheduler methodsFor:'queries'!
highestPriorityRunnableProcess
"return the highest prio runnable process"
|l p maxPri "{ Class: SmallInteger }" |
maxPri := self highestPriority.
maxPri to:1 by:-1 do:[:prio |
l := quiescentProcessLists at:prio.
l notNil ifTrue:[
p := l first.
p notNil ifTrue:[^ p].
"in the fly clear out empty lists"
quiescentProcessLists at:prio put:nil
]
].
^ nil
! !
!ProcessorScheduler methodsFor:'dispatching'!
dispatchLoop
"dispatch forever - the main process is running here all the time"
dispatching == true ifTrue:[^ self].
dispatching := true.
[true] whileTrue:[
self dispatch
]
!
dispatch
"central dispatch, handling timeouts and switching to the highest
prio runnable process"
|any millis pri p nActions "{ Class: SmallInteger }" |
"handle all timeout actions"
anyTimeouts ifTrue:[
self evaluateTimeouts
].
"first do a quick check using checkActions - this is needed for
devices like X-connection, where some events might be in the event
queue, so a select does not always help"
any := false.
nActions := readChecks size.
1 to:nActions do:[:index |
|checkBlock sema action|
checkBlock := readChecks at:index.
(checkBlock notNil and:[checkBlock value]) ifTrue:[
sema := readSemaphores at:index.
sema notNil ifTrue:[
sema signalOnce.
].
any := true.
]
].
"now, someone might be runnable:"
p := self highestPriorityRunnableProcess.
p isNil ifTrue:[
"no one runnable, hard wait for event or timeout"
self waitForEventOrTimeout.
^ self
].
pri := p priority.
"want to give control to another process p.
If the switched-to processes priority is lower than the
userSchedulingPriority, we have to make certain, that the
next input or timer will bring us back for a reschedule.
This is done by enabling ioInterrupts for all file descriptors.
If ioInterrupts are not available, we schedule a timer interrupt
to interrupt us after 1/20s of a second - effectively polling
the filedescriptors. - which is very bad, since low prio processes
will be hurt in performance - dont let benchmarks run with low prio ...
Higher prio processes must suspend, same prio ones must yield to
get back control"
"uncommenting this will make timeouts interrupt the current process
(i.e. run at TimingPrio); if commented, they run at UserSchedulingPrio.
this will all change, when timeouts are removed and all is process driven
"
"
pri < TimingPriority ifTrue:[
anyTimeouts ifTrue:[
millis := self timeToNextTimeout.
millis == 0 ifTrue:[^ self].
]
].
"
pri < UserSchedulingPriority ifTrue:[
"comment out this if above is uncommented"
anyTimeouts ifTrue:[
millis := self timeToNextTimeout.
millis == 0 ifTrue:[^ self].
].
OperatingSystem supportsIOInterrupts ifTrue:[
readFds do:[:fd |
fd notNil ifTrue:[
OperatingSystem enableIOInterruptsOn:fd
].
].
] ifFalse:[
millis notNil ifTrue:[
millis := millis min:50
] ifFalse:[
millis := 50
]
]
].
millis notNil ifTrue:[
"schedule a clock interrupt"
OperatingSystem enableTimer:millis rounded.
].
"now let the process run - will come back here by reschedule
from ioInterrupt or timerInterrupt ... (running at max+1)"
activeProcess state:#run.
self threadSwitch:p.
"... when we arrive here, we are back on stage"
millis notNil ifTrue:[
OperatingSystem disableTimer.
self checkForInputWithTimeout:0.
]
! !
!ProcessorScheduler methodsFor:'waiting'!
ioInterrupt
"data arrived while waiting - reschedule to bring dispatcher into play"
self reschedule
!
timerInterrupt
"timer expired while waiting - reschedule to bring dispatcher into play"
self reschedule
!
timeToNextTimeout
"return the delta-T (in millis) to next timeout, or nil if
there is none"
|aTime now minDelta n "{ Class: SmallInteger }"|
"find next timeout. since there are usually not many, just search.
If there where many, the list should be kept sorted ... keeping deltas
to next (as in Unix kernel)"
n := timeouts size.
1 to:n do:[:index |
aTime := timeouts at:index.
aTime notNil ifTrue:[
minDelta isNil ifTrue:[
now := OperatingSystem getMillisecondTime.
(OperatingSystem millisecondTime:aTime isAfter:now) ifFalse:[^ 0].
minDelta := OperatingSystem millisecondTimeDeltaBetween:aTime and:now.
] ifFalse:[
(OperatingSystem millisecondTime:aTime isAfter:now) ifFalse:[^ 0].
minDelta := minDelta min:(OperatingSystem millisecondTimeDeltaBetween:aTime and:now)
]
]
].
^ minDelta
!
waitForEventOrTimeout
"entered when no process is runnable - wait for either input on
any file descriptors to arrive or a timeout to happen.
If it makes sense, do some background garbage collection.
The idle actions are a leftover from previous ST/X releases and will
vanish."
|millis limit doingGC|
doingGC := true.
[doingGC] whileTrue:[
anyTimeouts ifTrue:[
millis := self timeToNextTimeout.
(millis notNil and:[millis <= 0]) ifTrue:[
^ self "oops - hurry up checking"
].
].
"if its worth doing, collect a bit of garbage"
limit := ObjectMemory incrementalGCLimit.
doingGC := limit notNil and:[ObjectMemory oldSpaceAllocatedSinceLastGC > limit].
doingGC ifTrue:[
ObjectMemory gcStep.
].
"then do idle actions"
(idleActions size ~~ 0) ifTrue:[
idleActions do:[:aBlock |
aBlock value.
].
^ self "go back checking"
].
doingGC ifTrue:[
(self checkForInputWithTimeout:0) ifTrue:[
^ self "go back checking"
]
]
].
(self checkForInputWithTimeout:0) ifTrue:[
^ self "go back checking"
].
"no, really nothing to do - simply wait"
OperatingSystem supportsSelect ifFalse:[
"SCO instant ShitStation has a bug here,
waiting always 1 sec in the select - therefore we delay a bit and
return - effectively polling in 50ms cycles
"
OperatingSystem millisecondDelay:50.
^ self
].
millis isNil ifTrue:[
millis := 9999.
] ifFalse:[
millis := millis rounded
].
self checkForInputWithTimeout:millis
!
checkForInputWithTimeout:millis
"really nothing to do - hard wait for either input or timeout"
|fd index sema action|
fd := OperatingSystem selectOnAnyReadable:readFds writable:writeFds error:nil withTimeOut:millis.
fd notNil ifTrue:[
index := readFds indexOf:fd.
index ~~ 0 ifTrue:[
sema := readSemaphores at:index.
sema notNil ifTrue:[
sema signalOnce
] ifFalse:[
action := readChecks at:index.
action notNil ifTrue:[
action value.
^ true
]
]
]
].
^ false
!
evaluateTimeouts
"walk through timeouts and evaluate blocks or signal semas that need to be .."
|now aTime block blocksToEvaluate n "{ Class: SmallInteger }"|
anyTimeouts ifFalse:[ ^ self].
"have to collect the blocks first, then evaluate them. This avoids
problems due to newly inserted blocks."
now := OperatingSystem getMillisecondTime.
blocksToEvaluate := nil.
n := timeouts size.
anyTimeouts := false.
1 to:n do:[:index |
aTime := timeouts at:index.
aTime notNil ifTrue:[
(OperatingSystem millisecondTime:aTime isAfter:now) ifFalse:[
"this one should be triggered"
(timeoutSemaphores at:index) notNil ifTrue:[
(timeoutSemaphores at:index) signalOnce.
timeoutSemaphores at:index put:nil
] ifFalse:[
"to support pure-events"
block := timeoutActions at:index.
block notNil ifTrue:[
blocksToEvaluate isNil ifTrue:[
blocksToEvaluate := OrderedCollection with:block
] ifFalse:[
blocksToEvaluate add:block
].
timeoutActions at:index put:nil
]
].
timeouts at:index put:nil.
] ifTrue:[
anyTimeouts := true
]
]
].
blocksToEvaluate notNil ifTrue:[
blocksToEvaluate do:[:aBlock |
aBlock value
]
]
! !
!ProcessorScheduler methodsFor:'adding / removing'!
enableSemaphore:aSemaphore onInput:aFileDescriptor
"enable a semaphore to be triggered when input on aFileDescriptor
arrives"
self enableSemaphore:aSemaphore onInput:aFileDescriptor check:nil
!
enableSemaphore:aSemaphore onInput:aFileDescriptor check:aBlock
"enable a semaphore to be triggered when input on aFileDescriptor
arrives or checkblock evaluates to true.
(checkBlock is used for buffered input, where a select may not detect
data already read into a buffer - as in Xlib)"
|idx|
OperatingSystem blockInterrupts.
(readFds includes:aFileDescriptor) ifFalse:[
idx := readFds indexOf:nil.
idx ~~ 0 ifTrue:[
readFds at:idx put:aFileDescriptor.
readSemaphores at:idx put:aSemaphore.
readChecks at:idx put:aBlock
] ifFalse:[
readFds := readFds copyWith:aFileDescriptor.
readSemaphores := readSemaphores copyWith:aSemaphore.
readChecks := readChecks copyWith:aBlock.
]
].
OperatingSystem unblockInterrupts.
!
disableSemaphore:aSemaphore
"disable triggering of a semaphore"
|idx|
OperatingSystem blockInterrupts.
idx := readSemaphores identityIndexOf:aSemaphore.
idx ~~ 0 ifTrue:[
readFds at:idx put:nil.
readSemaphores at:idx put:nil.
readChecks at:idx put:nil
].
idx := timeoutSemaphores identityIndexOf:aSemaphore.
idx ~~ 0 ifTrue:[
timeouts at:idx put:nil.
timeoutSemaphores at:idx put:nil.
timeoutActions at:idx put:nil.
].
OperatingSystem unblockInterrupts.
!
enableSemaphore:aSemaphore afterSeconds:seconds
"enable a semaphore to be triggered after some seconds"
self enableSemaphore:aSemaphore afterMilliseconds:(seconds * 1000)
!
enableSemaphore:aSemaphore afterMilliseconds:millis
"enable a semaphore to be triggered after some milliseconds"
|now then index|
now := OperatingSystem getMillisecondTime.
then := OperatingSystem millisecondTimeAdd:now and:millis.
OperatingSystem blockInterrupts.
index := timeoutSemaphores identityIndexOf:aSemaphore.
index ~~ 0 ifTrue:[
timeouts at:index put:then
] ifFalse:[
index := timeouts indexOf:nil.
index ~~ 0 ifTrue:[
timeoutSemaphores at:index put:aSemaphore.
timeouts at:index put:then.
timeoutActions at:index put:nil.
] ifFalse:[
timeoutSemaphores := timeoutSemaphores copyWith:aSemaphore.
timeouts := timeouts copyWith:then.
timeoutActions := timeoutActions copyWith:nil.
].
].
anyTimeouts := true.
OperatingSystem unblockInterrupts.
! !
!ProcessorScheduler methodsFor:'pure event support'!
enableIOAction:aBlock on:aFileDescriptor
"half-obsolete event support: prepare to evaluate aBlock when input on
aFileDescriptor arrives. Will vanish."
|idx|
OperatingSystem blockInterrupts.
(readFds includes:aFileDescriptor) ifFalse:[
idx := readFds indexOf:nil.
idx ~~ 0 ifTrue:[
readFds at:idx put:aFileDescriptor.
readChecks at:idx put:aBlock.
readSemaphores at:idx put:nil
] ifFalse:[
readFds := readFds copyWith:aFileDescriptor.
readChecks := readChecks copyWith:aBlock.
readSemaphores := readSemaphores copyWith:nil.
]
].
OperatingSystem unblockInterrupts.
!
disableFd:aFileDescriptor
"will vanish: disable block events on aFileDescriptor"
|idx|
OperatingSystem blockInterrupts.
idx := readFds indexOf:aFileDescriptor.
idx ~~ 0 ifTrue:[
readFds at:idx put:nil.
readChecks at:idx put:nil.
readSemaphores at:idx put:nil
].
OperatingSystem unblockInterrupts.
!
addIdleBlock:aBlock
"add the argument, aBlock to the list of idle-actions; to be
evaluated whenever no events are pending.
Will vanish - use low prio processes instead."
OperatingSystem blockInterrupts.
idleActions isNil ifTrue:[
idleActions := OrderedCollection new
].
idleActions add:aBlock.
OperatingSystem unblockInterrupts.
!
removeIdleBlock:aBlock
"remove the argument, aBlock from the list of idle-blocks.
Will vanish - use low prio processes instead."
OperatingSystem blockInterrupts.
idleActions notNil ifTrue:[
idleActions remove:aBlock
].
OperatingSystem unblockInterrupts.
!
addTimedBlock:aBlock after:delta
"add the argument, aBlock to the list of time-sceduled-blocks; to be
evaluated after delta seconds; the block will be removed after being
evaluated.
May vanish: use another process to signal a semaphore periodically."
|now then index|
now := OperatingSystem getMillisecondTime.
then := OperatingSystem millisecondTimeAdd:now and:(delta * 1000).
OperatingSystem blockInterrupts.
index := timeoutActions identityIndexOf:aBlock.
index ~~ 0 ifTrue:[
timeouts at:index put:then
] ifFalse:[
index := timeouts indexOf:nil.
index ~~ 0 ifTrue:[
timeoutActions at:index put:aBlock.
timeouts at:index put:then.
timeoutSemaphores at:index put:nil
] ifFalse:[
timeoutActions := timeoutActions copyWith:aBlock.
timeouts := timeouts copyWith:then.
timeoutSemaphores := timeoutSemaphores copyWith:nil.
].
].
anyTimeouts := true.
OperatingSystem unblockInterrupts.
!
removeTimedBlock:aBlock
"remove the argument, aBlock from the list of time-sceduled-blocks.
May vanish: use another process to signal a semaphore periodically."
|index|
OperatingSystem blockInterrupts.
index := timeoutActions identityIndexOf:aBlock.
(index ~~ 0) ifTrue:[
timeoutActions at:index put:nil.
timeouts at:index put:nil.
timeoutSemaphores at:index put:nil.
].
OperatingSystem unblockInterrupts.
! !