InternalPipeStream.st
author Stefan Vogel <sv@exept.de>
Wed, 08 Apr 2020 19:02:35 +0200
changeset 5473 de911f462862
parent 5255 aed43d6cf187
permissions -rw-r--r--
*** empty log message ***

"{ Encoding: utf8 }"

"
 COPYRIGHT (c) 2002 by Claus Gittinger
              All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
"{ Package: 'stx:libbasic2' }"

"{ NameSpace: Smalltalk }"

Stream subclass:#InternalPipeStream
	instanceVariableNames:'queue closed contentsSpecies'
	classVariableNames:''
	poolDictionaries:''
	category:'Streams'
!

!InternalPipeStream class methodsFor:'documentation'!

copyright
"
 COPYRIGHT (c) 2002 by Claus Gittinger
              All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
!

documentation
"
    not useful on its own, but can be used to talk to a vt100
    terminal view ...
    
    See example.
"
!

examples
"
                                                                [exBegin]
    |p|

    p := InternalPipeStream new.
    [
        10 timesRepeat:[
            p nextPutLine:'hello'
        ].
    ] fork.

    [
        10 timesRepeat:[
            Transcript showCR:p nextLine
        ].
    ] fork.
                                                                [exEnd]

                                                                [exBegin]
    |userInput elizasOutput top terminal|

    userInput    := InternalPipeStream new.
    elizasOutput := InternalPipeStream new.

    top := StandardSystemView new.
    terminal := VT100TerminalView openOnInput: userInput output:elizasOutput in:top.

    top extent:(terminal preferredExtent).
    top label:'The doctor is in'.
    top iconLabel:'doctor'.
    top open.
    top waitUntilVisible.
    top onChangeEvaluate:[:what :aParameter :changedObject | what == #destroyed ifTrue:[userInput close]].

    terminal translateNLToCRNL:true.
    terminal inputTranslateCRToNL:true.
    terminal localEcho:true.

    elizasOutput nextPutLine:'Hi, I am Eliza'.
    elizasOutput nextPutLine:'What is your problem (type end to finish conversation) ?'.
    elizasOutput nextPutLine:''.
    elizasOutput nextPutAll:'>'.

    [top realized] whileTrue:[
        |line answer matchingRule matches what|

        ((userInput readWaitWithTimeout:1) not and:[top realized]) ifTrue:[
            line := userInput nextLine.
            ((line isEmptyOrNil and:[userInput atEnd]) or:[ #('quit' 'exit' 'end' 'bye') includes:line ]) ifTrue:[
                top destroy.
                ^ self
            ].
            (matches := line subExpressionsInRegex:'I am (.+)' caseSensitive:false) size == 1 ifTrue:[
                answer := #( 
                            'Why are you %1?'
                            'What makes you think that you are %1?'
                            'Who told you, that you are %1?'
                          ) atRandom bindWithArguments:matches
            ] ifFalse:[
                answer := 'Tell me more.'.
            ].
            elizasOutput nextPutLine:answer.
            elizasOutput nextPutAll:'>'.
        ].
    ].
                                                                [exEnd]
"
! !

!InternalPipeStream class methodsFor:'instance creation'!

new
    ^ self basicNew initialize
! !

!InternalPipeStream methodsFor:'accessing'!

close
    "if there is any partner waiting at either side of the queue,
     tell it that the pipe is no longer active.
     (readers will read an EOF condition, writers will get a write error).
     Either side may close the internal pipe."
     
    closed := true.
    queue readSemaphore signalForAll
!

isOpen
    ^ closed not
!

size
    ^ queue size
! !

!InternalPipeStream methodsFor:'initialization'!

contentsSpecies:aCollectionClass
    "by default, I will return a String of elements, if reading multiple elements.
     However, you may change this to eg. an array, if desired"

    contentsSpecies := aCollectionClass.
!

initialize
    queue := SharedQueue new.
    closed := false.
! !

!InternalPipeStream methodsFor:'queries'!

atEnd
    ^ closed and:[queue isEmpty]
!

contentsSpecies
    ^ contentsSpecies ? String
! !

!InternalPipeStream methodsFor:'reading'!

next
    "return the next element from the stream (might block until something is written)"

    (closed and:[queue isEmpty]) ifTrue:[^ self pastEndRead].
    ^ queue next
!

nextAvailableBytes:nMax into:aBuffer startingAt:startIndex
    |n idx ch|

    (closed and:[queue isEmpty and:[self pastEndRead isNil]]) ifTrue:[^ 0].

    n := 0.
    idx := startIndex.
    [n <= nMax] whileTrue:[
        ch := queue nextIfEmpty:[^ n ].
        aBuffer at:idx put:ch.
        idx := idx + 1.
        n := n + 1
    ].
    ^ n
!

readWaitWithTimeoutMs:millis
    "suspend the current process, until the receiver
     becomes ready for reading or a timeout (in milliseconds) expired.
     If data is already available, return immediate.
     With nil millis, wait forever.
     Return true if a timeout occurred (i.e. false, if data is available).
     The other threads are not affected by the wait."

    (closed and:[queue isEmpty]) ifTrue:[^ false].
    ^ queue readWaitWithTimeoutMs:millis
! !

!InternalPipeStream methodsFor:'synchronization'!

readWait
    queue readSemaphore wait
!

writeWaitWithTimeoutMs:timeout
    queue writeSemaphore waitWithTimeoutMs:timeout
! !

!InternalPipeStream methodsFor:'writing'!

nextPut:anObject
    "write an element (might wakeup readers).
     Answer the argument"

    closed ifTrue:[ self errorNotOpen].
    queue nextPut:anObject.
    ^ anObject

    "
     |s|
     s := InternalPipeStream new.
     s nextPut:$a.
     s nextPut:$b.
     s nextPut:$c.
    "
! !

!InternalPipeStream class methodsFor:'documentation'!

version
    ^ '$Header$'
!

version_CVS
    ^ '$Header$'
! !