InternalPipeStream.st
changeset 3978 e4c47408edb2
parent 3291 facd0cfd0781
child 4326 5aca213f1a71
--- a/InternalPipeStream.st	Thu Jun 30 16:59:13 2016 +0200
+++ b/InternalPipeStream.st	Thu Jun 30 18:18:01 2016 +0200
@@ -1,7 +1,9 @@
 "{ Package: 'stx:libbasic2' }"
 
+"{ NameSpace: Smalltalk }"
+
 Stream subclass:#InternalPipeStream
-	instanceVariableNames:'queue'
+	instanceVariableNames:'queue closed contentsSpecies'
 	classVariableNames:''
 	poolDictionaries:''
 	category:'Streams'
@@ -13,6 +15,7 @@
 "
     not useful on its own, but can be used to talk to a vt100
     terminal view ...
+    
     See example.
 "
 !
@@ -50,13 +53,14 @@
     top iconLabel:'doctor'.
     top open.
     top waitUntilVisible.
+    top onChangeEvaluate:[:what :aParameter :changedObject | what == #destroyed ifTrue:[userInput close]].
 
     terminal translateNLToCRNL:true.
     terminal inputTranslateCRToNL:true.
     terminal localEcho:true.
 
     elizasOutput nextPutLine:'Hi, I am Eliza'.
-    elizasOutput nextPutLine:'What is your problem ?'.
+    elizasOutput nextPutLine:'What is your problem (type end to finish conversation) ?'.
     elizasOutput nextPutLine:''.
     elizasOutput nextPutAll:'>'.
 
@@ -64,7 +68,7 @@
         |line answer matchingRule|
 
         line := userInput nextLine.
-        (#('quit' 'exit' 'end' 'bye') includes:line) ifTrue:[
+        ((line isEmptyOrNil and:[userInput atEnd]) or:[ #('quit' 'exit' 'end' 'bye') includes:line ]) ifTrue:[
             top destroy.
             ^ self
         ].
@@ -86,22 +90,35 @@
 !InternalPipeStream methodsFor:'accessing'!
 
 atEnd
-    ^ false . "/ queue notNil
+    ^ closed and:[queue isEmpty]
 !
 
 close
-    queue := nil
+    "if there is any partner waiting at either side of the queue,
+     tell it that the pipe is no longer active.
+     (readers will read an EOF condition, writers will get a write error).
+     Either side may close the internal pipe."
+     
+    closed := true.
+    queue readSemaphore signalForAll
+!
+
+isOpen
+    ^ closed not
 !
 
 next
     "return the next element from the stream (might block until something is written)"
 
+    (closed and:[queue isEmpty]) ifTrue:[^ self pastEndRead].
     ^ queue next
 !
 
 nextAvailableBytes:nMax into:aBuffer startingAt:startIndex
     |n idx ch|
 
+    (closed and:[queue isEmpty and:[self pastEndRead isNil]]) ifTrue:[^ 0].
+
     n := 0.
     idx := startIndex.
     [n <= nMax] whileTrue:[
@@ -113,10 +130,19 @@
     ^ n
 !
 
-nextPut:something
+nextPut:anObject
     "write an element (might wakeup readers)"
 
-    queue nextPut:something
+    closed ifTrue:[ self errorNotOpen].
+    queue nextPut:anObject
+
+    "
+     |s|
+     s := InternalPipeStream new.
+     s nextPut:$a.
+     s nextPut:$b.
+     s nextPut:$c.
+    "
 !
 
 size
@@ -125,19 +151,37 @@
 
 !InternalPipeStream methodsFor:'initialization'!
 
+contentsSpecies:aClass
+    "by default, I will return a String of elements, if reading multiple elements.
+     However, you may change this to eg. an array, if desired"
+
+    contentsSpecies := aClass.
+!
+
 initialize
     queue := SharedQueue new.
+    closed := false.
+! !
+
+!InternalPipeStream methodsFor:'queries'!
+
+contentsSpecies
+    ^ contentsSpecies ? String
 ! !
 
 !InternalPipeStream methodsFor:'synchronization'!
 
 readWait
     queue readSemaphore wait
+!
+
+writeWaitWithTimeoutMs:timeout
+    queue writeSemaphore waitWithTimeoutMs:timeout
 ! !
 
 !InternalPipeStream class methodsFor:'documentation'!
 
 version
-    ^ '$Header: /cvs/stx/stx/libbasic2/InternalPipeStream.st,v 1.4 2014-06-02 22:16:14 vrany Exp $'
+    ^ '$Header$'
 ! !