# HG changeset patch # User Claus Gittinger # Date 1358990234 -3600 # Node ID e0e5b1d689465b90e0a8d8036d900748e56d7633 # Parent 36e6a080007fa6e346a1fb901656baebc1141698 initial checkin diff -r 36e6a080007f -r e0e5b1d68946 ActiveObject.st --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ActiveObject.st Thu Jan 24 02:17:14 2013 +0100 @@ -0,0 +1,381 @@ +" + COPYRIGHT (c) 2013 by eXept Software AG + All Rights Reserved + + This software is furnished under a license and may be used + only in accordance with the terms of that license and with the + inclusion of the above copyright notice. This software may not + be provided or otherwise made available to, or used by, any + other person. No title to or ownership of the software is + hereby transferred. +" +"{ Package: 'stx:libbasic2' }" + +Object subclass:#ActiveObject + instanceVariableNames:'process messageQueue' + classVariableNames:'AccessLock' + poolDictionaries:'' + category:'Kernel-Processes' +! + +Message subclass:#MessageAndResponse + instanceVariableNames:'action ok result exception ready' + classVariableNames:'' + poolDictionaries:'' + privateIn:ActiveObject +! + +Lookup subclass:#RedirectingLookup + instanceVariableNames:'' + classVariableNames:'Instance' + poolDictionaries:'' + privateIn:ActiveObject +! + +!ActiveObject class methodsFor:'documentation'! + +copyright +" + COPYRIGHT (c) 2013 by eXept Software AG + All Rights Reserved + + This software is furnished under a license and may be used + only in accordance with the terms of that license and with the + inclusion of the above copyright notice. This software may not + be provided or otherwise made available to, or used by, any + other person. No title to or ownership of the software is + hereby transferred. +" +! + +documentation +" + an active object executes incoming messages in a serialized, synchronous manner, + enqueuing incoming messages, executing them one after the other, and returning results to the caller. + Messages are synchronous: the caller is blocked while I perform my duty. + Exceptions during execution are sent back to the caller. + This is an abstract framework class, to be subclassed for real workers. + + The following is a not-yet-working experiment, using lookup objects to redirect automatcally into a synchronizing + method. This does not work yet. + + Notice the use of the lookup object here: all incoming messages from a process other than my worker-process + itself are forwarded to the #doesNotUnderstand: method. There is no need to inherit from nil, and subclasses can use + any messages they like locally, without them being enqueued. + + For now, define methods which have to be synchronized by defining them as: + methodX + self synchronized:[ + ... + do something + ... + ] + + [Author:] + Claus Gittinger +" +! + +examples +" + normally, one would subclass ActiveObject and put protocol into it; + here, for the example, an anon Printer is defined. It is slow printing to the Transcript for a demo. + The interesting thing is the error handling which is demonstrated in printWithError: + Any exception inside the worker object is returned back and raised in the active-object's client, + not in the worker (take a look at the call-chain, when running the example below). + + + |workerClass worker| + + workerClass := ActiveObject + subclass:#Printer + instanceVariableNames:'' + classVariableNames:'' + poolDictionaries:'' + category:nil + inEnvironment:nil. + + workerClass compile:' +print:aLine + self synchronized:[. + aLine do:[:ch | + Transcript show:ch. + Delay waitForSeconds:0.2. + ]. + Transcript cr. + ] +'. + + workerClass compile:' +printWithError:aLine + self synchronized:[. + aLine do:[:ch | + Transcript show:ch. + ch == $l ifTrue:[ self foo ]. + Delay waitForSeconds:0.2. + ]. + Transcript cr. + ] +'. + + worker := workerClass new. + 'now ready for printing'. + worker printWithError:'abcdef'. + worker printWithError:'hello world'. +" +! ! + +!ActiveObject class methodsFor:'initialization'! + +initialize + AccessLock := Semaphore forMutualExclusion. + "/ self lookupObject: RedirectingLookup new + self lookupObject: nil +! ! + +!ActiveObject class methodsFor:'instance creation'! + +new + |newObject| + + + newObject := self basicNew initialize. + ^ newObject. +! ! + +!ActiveObject methodsFor:'accessing'! + +process + ^ process +! ! + +!ActiveObject methodsFor:'message handling'! + +messageHandlingLoop + |here| + + [ true ] whileTrue:[ + |paket| + + "/ Transcript showCR:'A: await message...'. + (messageQueue readWaitWithTimeoutMs:30000) ifTrue:[ + "/ inactive for 30 seconds - exit + AccessLock critical:[ + messageQueue isEmpty ifTrue:[ + "/ Transcript showCR:'A: inactive - stop worker'. + process := nil. + messageQueue := nil. + ]. + "exit the worker process loop" + ^ self + ]. + ]. + + paket := messageQueue next. + "/ Transcript showCR:'A: got message: ', paket selector. + Exception handle:[:ex | + |ex2 c c2| + + "create a copy of the sender chain, up to this frame here + (cannot pass the original exception chain, because that one will be unwound, + thereby clearing the senders)" + + ex2 := ex shallowCopy. + c := ex2 suspendedContext. + c2 := c shallowCopy. + ex2 suspendedContext: c2. + [ c sender ~~ here ] whileTrue:[ + c2 setSender:(c sender shallowCopy). + c2 := c2 sender. + c := c sender + ]. + paket ok: false. + paket exception: ex2. + ] do:[ + |rslt| + + here := thisContext. + rslt := paket action value. "<- here, an exception may occur" + paket ok: true. + paket result:rslt. + ]. + "/ Transcript showCR:'send reply.'. + paket ready signal + ]. +! + +sendAction: aBlock + |paket sema k| + + process isNil ifTrue:[ + AccessLock critical:[ + process isNil ifTrue:[ + "/ Transcript showCR:'starting worker thread'. + messageQueue := SharedQueue new. + process := [ + [ + self messageHandlingLoop + ] ensure:[ + process := nil + ] + ] fork. + ] + ] + ]. + + paket := MessageAndResponse new. + paket action: aBlock. + paket ready: (sema := Semaphore new). + + messageQueue nextPut: paket. + sema wait. + + "arrive here, when done with message" + paket ok ifTrue:[ + ^ paket result + ]. + "/ exception + + "/ patch the sender chain by tacking my calling chain to the end of the exception chain (from the worker), + "/ so the debugger shows a picture as if the error happened here + k := paket exception suspendedContext. + [ k sender notNil ] whileTrue:[ k := k sender ]. + k setSender: thisContext sender sender. "/ hide sendAction and synchronized frames + paket exception doRaise. "/ reraise, now in my context +! + +sendSelector: selector arguments: arguments + |paket sema| + + process isNil ifTrue:[ + AccessLock critical:[ + process isNil ifTrue:[ + "/ Transcript showCR:'starting worker thread'. + messageQueue := SharedQueue new. + process := [ + [ + self messageHandlingLoop + ] ensure:[ + process := nil + ] + ] fork. + ] + ] + ]. + + paket := MessageAndResponse new. + paket selector: selector. + paket arguments: arguments. + paket ready: (sema := Semaphore new). + "/ Transcript showCR:'send message...'. + messageQueue nextPut: paket. + sema wait. + "/ Transcript showCR:'got reply...'. + "arrive here, when done with message" + paket ok ifTrue:[ + ^ paket result + ]. + "/ exception + paket exception raiseSignal. "/ reraise, now in my context +! + +synchronized:aBlock + Processor activeProcess == process ifTrue:[ + ^ aBlock value + ]. + + ^ self sendAction:aBlock. +! + +synchronizedMethod + "marks calling methods as being handled via the queue" + + |retVal| + + Processor activeProcess == process ifTrue:[ + ^ self + ]. + + retVal := self sendSelector:(thisContext sender selector) arguments:(thisContext sender args). + thisContext sender return:retVal. +! ! + +!ActiveObject::MessageAndResponse methodsFor:'accessing'! + +action + ^ action +! + +action:something + action := something. +! + +exception + ^ exception +! + +exception:something + exception := something. +! + +ok + ^ ok +! + +ok:something + ok := something. +! + +ready + ^ ready +! + +ready:something + ready := something. +! + +result + ^ result +! + +result:something + result := something. +! ! + +!ActiveObject::RedirectingLookup class methodsFor:'instance creation'! + +new + Instance isNil ifTrue: [Instance := self basicNew]. + ^ Instance +! ! + +!ActiveObject::RedirectingLookup methodsFor:'lookup'! + +lookupMethodForSelector:selector directedTo:initialSearchClass for:aReceiver withArguments:argArrayOrNil from:sendingContext + "redircet everything into the receiver's doesNotUnderstand method" + +Transcript show:'l '; showCR:aReceiver class name. + (aReceiver isNil "fake call from lookupMethodFor: - sigh" + or:[ + "insider messages are handled as usual" + "to avoid recursive lookup error..." + ((ActiveObject compiledMethodAt:#process) valueWithReceiver: aReceiver arguments:#()) == Processor activeProcess] + ) ifTrue:[ + ^ super lookupMethodForSelector:selector directedTo:initialSearchClass for:aReceiver withArguments:argArrayOrNil from:sendingContext + ]. + "messages from the outside are forwarded to #doesNotUnderstand:" + ^ aReceiver compiledMethodAt:#doesNotUnderstand. +! ! + +!ActiveObject class methodsFor:'documentation'! + +version + ^ '$Header: /cvs/stx/stx/libbasic2/ActiveObject.st,v 1.1 2013-01-24 01:17:14 cg Exp $' +! + +version_CVS + ^ '$Header: /cvs/stx/stx/libbasic2/ActiveObject.st,v 1.1 2013-01-24 01:17:14 cg Exp $' +! ! + + +ActiveObject initialize!