InternalPipeStream.st
author Claus Gittinger <cg@exept.de>
Tue, 25 Jun 2019 14:28:51 +0200
changeset 5050 44fa8672d102
parent 4326 5aca213f1a71
child 5141 046c31b9f275
permissions -rw-r--r--
#DOCUMENTATION by cg class: SharedQueue comment/format in: #next #nextWithTimeout:

"
 COPYRIGHT (c) 2002 by Claus Gittinger
              All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
"{ Package: 'stx:libbasic2' }"

"{ NameSpace: Smalltalk }"

Stream subclass:#InternalPipeStream
	instanceVariableNames:'queue closed contentsSpecies'
	classVariableNames:''
	poolDictionaries:''
	category:'Streams'
!

!InternalPipeStream class methodsFor:'documentation'!

copyright
"
 COPYRIGHT (c) 2002 by Claus Gittinger
              All Rights Reserved

 This software is furnished under a license and may be used
 only in accordance with the terms of that license and with the
 inclusion of the above copyright notice.   This software may not
 be provided or otherwise made available to, or used by, any
 other person.  No title to or ownership of the software is
 hereby transferred.
"
!

documentation
"
    not useful on its own, but can be used to talk to a vt100
    terminal view ...
    
    See example.
"
!

examples
"
                                                                [exBegin]
    |p|

    p := InternalPipeStream new.
    [
        10 timesRepeat:[
            p nextPutLine:'hello'
        ].
    ] fork.

    [
        10 timesRepeat:[
            Transcript showCR:p nextLine
        ].
    ] fork.
                                                                [exEnd]

                                                                [exBegin]
    |userInput elizasOutput top terminal|

    userInput    := InternalPipeStream new.
    elizasOutput := InternalPipeStream new.

    top := StandardSystemView new.
    terminal := VT100TerminalView openOnInput: userInput output:elizasOutput in:top.

    top extent:(terminal preferredExtent).
    top label:'The doctor is in'.
    top iconLabel:'doctor'.
    top open.
    top waitUntilVisible.
    top onChangeEvaluate:[:what :aParameter :changedObject | what == #destroyed ifTrue:[userInput close]].

    terminal translateNLToCRNL:true.
    terminal inputTranslateCRToNL:true.
    terminal localEcho:true.

    elizasOutput nextPutLine:'Hi, I am Eliza'.
    elizasOutput nextPutLine:'What is your problem (type end to finish conversation) ?'.
    elizasOutput nextPutLine:''.
    elizasOutput nextPutAll:'>'.

    [top realized] whileTrue:[
        |line answer matchingRule|

        line := userInput nextLine.
        ((line isEmptyOrNil and:[userInput atEnd]) or:[ #('quit' 'exit' 'end' 'bye') includes:line ]) ifTrue:[
            top destroy.
            ^ self
        ].

        answer := 'Tell me more.'.
        elizasOutput nextPutLine:answer.
        elizasOutput nextPutAll:'>'.
    ].
                                                                [exEnd]
"
! !

!InternalPipeStream class methodsFor:'instance creation'!

new
    ^ self basicNew initialize
! !

!InternalPipeStream methodsFor:'accessing'!

atEnd
    ^ closed and:[queue isEmpty]
!

close
    "if there is any partner waiting at either side of the queue,
     tell it that the pipe is no longer active.
     (readers will read an EOF condition, writers will get a write error).
     Either side may close the internal pipe."
     
    closed := true.
    queue readSemaphore signalForAll
!

isOpen
    ^ closed not
!

next
    "return the next element from the stream (might block until something is written)"

    (closed and:[queue isEmpty]) ifTrue:[^ self pastEndRead].
    ^ queue next
!

nextAvailableBytes:nMax into:aBuffer startingAt:startIndex
    |n idx ch|

    (closed and:[queue isEmpty and:[self pastEndRead isNil]]) ifTrue:[^ 0].

    n := 0.
    idx := startIndex.
    [n <= nMax] whileTrue:[
        ch := queue nextIfEmpty:[^ n ].
        aBuffer at:idx put:ch.
        idx := idx + 1.
        n := n + 1
    ].
    ^ n
!

nextPut:anObject
    "write an element (might wakeup readers)"

    closed ifTrue:[ self errorNotOpen].
    queue nextPut:anObject

    "
     |s|
     s := InternalPipeStream new.
     s nextPut:$a.
     s nextPut:$b.
     s nextPut:$c.
    "
!

size
    ^ queue size
! !

!InternalPipeStream methodsFor:'initialization'!

contentsSpecies:aClass
    "by default, I will return a String of elements, if reading multiple elements.
     However, you may change this to eg. an array, if desired"

    contentsSpecies := aClass.
!

initialize
    queue := SharedQueue new.
    closed := false.
! !

!InternalPipeStream methodsFor:'queries'!

contentsSpecies
    ^ contentsSpecies ? String
! !

!InternalPipeStream methodsFor:'synchronization'!

readWait
    queue readSemaphore wait
!

writeWaitWithTimeoutMs:timeout
    queue writeSemaphore waitWithTimeoutMs:timeout
! !

!InternalPipeStream class methodsFor:'documentation'!

version
    ^ '$Header$'
! !