From: Blackboard Technology Group
Subject: Adding/Removing Streams from a Broadcast Stream
Date: 
Message-ID: <7lb2c4$2fq$1@rmc1.crocker.com>
I have a set of clients can connect/disconnect to a producer of streaming
output data.  A broadcast stream is the natural mechanism for this, but
doesn't have an add or remove stream function.  Making a new broadcast
stream every time a client comes/goes is an option (by adding/deleting
from the BROADCAST-STREAM-STREAMS of the current stream and using a
synonym-stream for indirection), but I was wondering if there is a
better approach...

Thanks for any advice!

From: David Bakhash
Subject: Re: Adding/Removing Streams from a Broadcast Stream
Date: 
Message-ID: <cxjyah2353b.fsf@acs5.bu.edu>
······@rmc1.crocker.com (Blackboard Technology Group) writes:

> I have a set of clients can connect/disconnect to a producer of streaming
> output data.  A broadcast stream is the natural mechanism for this, but
> doesn't have an add or remove stream function.  Making a new broadcast
> stream every time a client comes/goes is an option (by adding/deleting
> from the BROADCAST-STREAM-STREAMS of the current stream and using a
> synonym-stream for indirection), but I was wondering if there is a
> better approach...

I think you'd better make your own data structure to hold those streams.  If
you are using them again and again, in an application where streams are being
opened here, closed there, over and over, and you're gonna keep having to
modify your broadcast-streams, then I would say not to bother with them.  a
simple mapc or loop over the streams would probably not be too much less
efficient than making a broadcast stream and then writing to each of them.  I
think that the CL guys were on the right track when they made broadcast
streams, but the API for them is basically non-existent.

dave
From: Erik Naggum
Subject: Re: Adding/Removing Streams from a Broadcast Stream
Date: 
Message-ID: <3139716696746976@naggum.no>
* ······@rmc1.crocker.com (Blackboard Technology Group)
| I have a set of clients can connect/disconnect to a producer of streaming
| output data.  A broadcast stream is the natural mechanism for this, but
| doesn't have an add or remove stream function.  Making a new broadcast
| stream every time a client comes/goes is an option (by adding/deleting
| from the BROADCAST-STREAM-STREAMS of the current stream and using a
| synonym-stream for indirection), but I was wondering if there is a better
| approach...

  there are two issues involved in this decision: (1) what happens when
  there's an error on the stream (such as being closed by the other side)?
  and (2) does a stream get deleted from the broadcast list when closed?

  first, BROADCAST-STREAM-STREAMS must be SETF'able:

(defsetf broadcast-stream-streams (stream) (new-streams)
  `(setf (slot-value ,stream 'indir-list) ,new-streams))

  (another option is to ascertain that the list BROADCAST-STREAM-STREAMS
  returns is the actual list of streams, always has at least one element,
  and PUSH and DROP on the REST of that list, instead.)

(defun broadcast-stream-subscribe (broadcast-stream stream)
  "Subscribe STREAM to BROADCAST-STREAM."
  (unless (member stream (broadcast-stream-streams broadcast-stream))
    (push stream (broadcast-stream-streams broadcast-stream))
    (push broadcast-stream (getf (stream-property-list stream) 'subscriptions))))

(defun broadcast-stream-unsubscribe (broadcast-stream stream)
  "Unsubscribe STREAM from BROADCAST-STREAM."
  (when (member stream (broadcast-stream-streams broadcast-stream))
    (drop stream (broadcast-stream-streams broadcast-stream))
    (drop broadcast-stream (getf (stream-property-list stream) 'subscriptions))))

(defun broadcast-stream-unsubscribe-all (stream)
  "Unsubscribe STREAM from all the BROADCAST streams it subscribes to."
  (dolist (broadcast-stream (copy-list (getf (stream-property-list stream) 'subscriptions)))
    (broadcast-stream-unsubscribe broadcast-stream stream)))

  this cleans up after dead sockets and discards input.  it is normally
  called on output to the socket, but if that is insufficient, call it
  periodically, like once an hour.

(defun broadcast-stream-sanitize (broadcast-stream)
  "Remove stray input and dead streams from broadcast streams."
  (dolist (stream (copy-list (broadcast-stream-streams broadcast-stream)))
    (when (input-stream-p stream)
      (clear-input stream)
      (when (eq stream (read-char-no-hang stream nil stream))
	(broadcast-stream-unsubscribe-all stream)
	(ignore-errors (close stream :abort t))))))

  using stream classes, let's get out of some serious troble:

(defclass sanitizing-broadcast-stream (broadcast-stream)
  ())

(defmethod stream-force-output ((stream sanitizing-broadcast-stream))
  (broadcast-stream-sanitize stream)
  (call-next-method))

  I find it useful to force output of lines.

(defmethod stream-terpri ((stream sanitizing-broadcast-stream))
  (call-next-method)
  (stream-force-output stream))

  this does not address the error-handling for socket errors, which I have
  taken care of by modifying the underlying filesystem-level read and write
  operations not to signal errors, but call an error handler that may tell
  it to ignore the error selectively on error type, stream in question, etc.

  oh, DROP is the reverse of PUSH:

(defmacro drop (object place &rest keys &key key test test-not &environment environment)
  "Drop a particular OBJECT from list in PLACE.  (Intended as counterpart to PUSH.)"
  (declare (ignore key test test-not))
  (multiple-value-bind (vars vals store-vars writer reader)
      (get-setf-expansion place environment)
    (let ((evaled-value (gensym))
	  (store-var (first store-vars)))
      (if (cdr store-vars)
	`(let* ((,evaled-value ,object)
		,@(mapcar #'list vars vals))
	   (multiple-value-bind ,store-vars ,reader
	     (setq ,store-var (delete ,evaled-value ,store-var :count 1 ,@keys))
	     ,writer))
	`(let* ((,evaled-value ,object)
		,@(mapcar #'list vars vals)
		(,store-var (delete ,evaled-value ,reader :count 1 ,@keys)))
	   ,writer)))))

#:Erik
-- 
@1999-07-22T00:37:33Z -- pi billion seconds since the turn of the century