From jerry@strobe.ATC.Olivetti.Com Thu Jan  4 04:08:31 EST 1996
Article: 4266 of alt.sources
Path: news.math.psu.edu!chi-news.cic.net!newsfeed.internetmci.com!in2.uu.net!brighton.openmarket.com!decwrl!olivea!strobe!jerry
From: jerry@strobe.ATC.Olivetti.Com (Jerry Aguirre)
Newsgroups: news.software.nntp,alt.sources
Subject: Improved patch to add streaming NNTP to INN1.4
Followup-To: news.software.nntp
Date: 4 Jan 1996 01:11:19 GMT
Organization: Olivetti ATC; Sunnyvale, CA USA
Lines: 1787
Message-ID: <4cf9fn$pja@olivea.ATC.Olivetti.Com>
NNTP-Posting-Host: strobe.atc.olivetti.com
Keywords: NNTP streaming INN usenet
Xref: news.math.psu.edu news.software.nntp:16971 alt.sources:4266

Appended is an improved patch to support streaming NNTP under
INN1.4.  The prevous patch was prone to a CPU intensive loop
when coming out of a pause.  This caused delayed response to
ctlinnd commands as one symptom.

Streaming NNTP provides higher thruput than standard NNTP by
buffering several transactions on the comunication channel.
This eliminates the idle time when standard NNTP would be
waiting for a response to each transfer.

				Jerry Aguirre
===Begin Streaming NNTP Diffs===
*** innd/chan.c.-stream	Sun Jun 11 15:20:19 1995
--- innd/chan.c	Tue Dec  5 14:23:01 1995
***************
*** 117,122 ****
--- 117,126 ----
      cp->In = in;
      cp->Out = out;
      cp->Tracing = Tracing;
+     cp->Sendid.Size=0;
+     cp->Rest=0;
+     cp->SaveUsed=0;
+     cp->Lastch=0;
  
      /* Make the descriptor close-on-exec and non-blocking. */
      CloseOnExec(fd, TRUE);
***************
*** 210,215 ****
--- 214,223 ----
      if (cp->Out.Size > BIG_BUFFER) {
  	cp->Out.Size = 0;
  	DISPOSE(cp->Out.Data);
+     }
+     if (cp->Sendid.Size) {
+ 	cp->Sendid.Size = 0;
+ 	DISPOSE(cp->Sendid.Data);
      }
  }
  
*** innd/innd.h.-stream	Sun Jun 11 15:20:24 1995
--- innd/innd.h	Tue Dec  5 14:21:32 1995
***************
*** 148,153 ****
--- 148,157 ----
      BUFFER		In;
      BUFFER		Out;
      BOOL		Tracing;
+     BUFFER		Sendid;
+     int			Lastch;
+     int			Rest;
+     int			SaveUsed;
  } CHANNEL;
  
  
*** doc/innxmit.8.-stream	Wed Jul 26 17:12:20 1995
--- doc/innxmit.8	Tue Aug  1 17:41:59 1995
***************
*** 11,16 ****
--- 11,19 ----
  .B \-a
  ]
  [
+ .B \-c
+ ]
+ [
  .B \-d
  ]
  [
***************
*** 20,25 ****
--- 23,31 ----
  .B \-r
  ]
  [
+ .B \-s
+ ]
+ [
  .BI \-t " timeout"
  ]
  [
***************
*** 150,155 ****
--- 156,179 ----
  The ``\-A'' flag may be used to specify an alternate spool directory to
  use if the article is not found; this would normally be an NFS-mounted
  spool directory of a master server with longer expiration times.
+ .PP
+ .I Innxmit
+ will attempt to negotiate a streaming mode extension of the NNTP
+ protocol with the server at connect time.
+ If successful it will use a slightly different protocol that enhances
+ throughput.
+ If the server does not recognize the streaming mode negotiation
+ .I innxmit
+ will revert to normal NNTP transfer mode.
+ Use the ``\-s'' flag to disable the attempt to negotiate the streaming
+ mode extension.
+ In streaming mode a check of each message ID is still made to avoid sending
+ articles already on the server.
+ The ``\-c'' flag will, if streaming mode is supported,
+ result in sending articles without checking.
+ This results in slightly greater throughput and may be appropriate when
+ it is known that the site could not already have the articles such as in
+ the case of a "leaf" site.
  .SH HISTORY
  Written by Rich $alz <rsalz@uunet.uu.net> for InterNetNews.
  .de R$
*** backends/innxmit.c.-stream	Tue Aug  1 11:45:12 1995
--- backends/innxmit.c	Wed Jan  3 16:28:17 1996
***************
*** 1,6 ****
--- 1,7 ----
  /*  $Revision: 1.14 $
  **
  **  Transmit articles to remote site.
+ **  Modified for NNTP streaming: 3Jan96 Jerry Aguirre
  */
  #include "configdata.h"
  #include <stdio.h>
***************
*** 39,45 ****
--- 40,85 ----
  
  #define OUTPUT_BUFFER_SIZE	(16 * 1024)
  
+ /* Streaming extensions to NNTP.  This extension removes the lock-step
+ ** limitation of conventional NNTP.  Article transfer is several times
+ ** faster.  Negotiated and falls back to old mode if receiver refuses.
+ */
  
+ /* max number of articles that can be streamed ahead */
+ #define STNBUF 32
+ 
+ /* Send "takethis" without "check" if this many articles were
+ ** accepted in a row.
+ */
+ #define STNC 16
+ 
+ /* typical number of articles to stream  */
+ /* must be able to fopen this many articles */
+ #define STNBUFL (STNBUF/2)
+ 
+ /* number of retries before requeueing to disk */
+ #define STNRETRY 5
+ 
+ struct stbufs {		/* for each article we are procesing */
+ 	char *st_fname;		/* file name */
+ 	char *st_id;		/* message ID */
+ 	int   st_retry;		/* retry count */
+ 	int   st_age;		/* age count */
+ 	QIOSTATE *st_qp;	/* IO to read article contents */
+ 	int   st_hash;		/* hash value to speed searches */
+ };
+ static struct stbufs stbuf[STNBUF]; /* we keep track of this many articles */
+ static int stnq;	/* current number of active entries in stbuf */
+ static long stnofail;	/* Count of consecutive successful sends */
+ 
+ static int TryStream = TRUE;	/* Should attempt stream negotation? */
+ static int CanStream = FALSE;	/* Result of stream negotation */
+ static int DoCheck   = TRUE;	/* Should check before takethis? */
+ static char modestream[] = "mode stream";
+ static long retries = 0;
+ 
+ 
+ 
  /*
  ** Syslog formats - collected together so they remain consistent
  */
***************
*** 51,56 ****
--- 91,99 ----
  STATIC char	CANT_AUTHENTICATE[] = "%s authenticate failed %s";
  STATIC char	IHAVE_FAIL[] = "%s ihave failed %s";
  
+ STATIC char	CANT_FINDIT[] = "%s can't find %s";
+ STATIC char	CANT_PARSEIT[] = "%s can't parse ID %s";
+ STATIC char	UNEXPECTED[] = "%s unexpected response code %s";
  
  /*
  **  Global variables.
***************
*** 83,88 ****
--- 126,133 ----
  STATIC unsigned long	STAToffered;
  STATIC unsigned long	STATrefused;
  STATIC unsigned long	STATrejected;
+ STATIC char		*AltSpool;
+ STATIC char		*AltPath;
  
  
  /*
***************
*** 171,189 ****
  {
      int		i;
  
      i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer));
      REMbuffptr = REMbuffer;
      return i < 0 ? FALSE : TRUE;
  }
  
  
  /*
  **  Send a line to the server, adding the dot escape and \r\n.
  */
  STATIC BOOL
! REMwrite(p, i)
      register char	*p;
      register int	i;
  {
      static char		HDR[] = "Content-Transfer-Encoding:";
      static char		COD[] =
--- 216,335 ----
  {
      int		i;
  
+     if (REMbuffptr == REMbuffer) return TRUE; /* nothing buffered */
      i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer));
      REMbuffptr = REMbuffer;
      return i < 0 ? FALSE : TRUE;
  }
  
+ /*
+ **  Return index to entry matching this message ID.  Else return -1.
+ **  The hash is to speed up the search.
+ **  the protocol.
+ */
+ STATIC int
+ stindex(MessageID, hash)
+     char		*MessageID;
+     int hash;
+ {
+     register int i;
  
+     for (i = 0; i < STNBUF; i++) { /* linear search for ID */
+ 	if ((stbuf[i].st_id) && (stbuf[i].st_id[0])
+ 	 && (stbuf[i].st_hash == hash)) {
+ 	    register int n;
+ 
+ 	    if (strcasecmp(MessageID, stbuf[i].st_id)) continue;
+ 
+ 	    /* left of '@' is case sensitive */
+ 	    for (n = 0; (MessageID[n] != '@') && (MessageID[n] != '\0'); n++) ;
+ 	    if (strncmp(MessageID, stbuf[i].st_id, n)) continue;
+ 	    else break;	/* found a match */
+ 	}
+     }
+     if (i >= STNBUF) i = -1;  /* no match found ? */
+     return (i);
+ }
+ 
+ /* stidhash(): calculate a hash value for message IDs to speed comparisons */
+ STATIC int
+ stidhash(MessageID)
+     char                *MessageID;
+ {
+     register char *p;
+     register int hash;
+ 
+     hash = 0;
+     for (p = MessageID + 1; *p && (*p != '>'); p++) {
+ 	hash <<= 1;
+ 	if (isascii(*p) && isupper(*p)) {
+ 	    hash += tolower(*p);
+ 	} else {
+ 	    hash += *p;
+ 	}
+     }
+     return hash;
+ }
+ 
+ /* stalloc(): save path, ID, and qp into one of the streaming mode entries */
+ STATIC int
+ stalloc(Article, MessageID, qp, hash)
+     char		*Article;
+     char		*MessageID;
+     QIOSTATE		*qp;
+     int			hash;
+ {
+     register int i;
+ 
+     for (i = 0; i < STNBUF; i++) {
+ 	if ((!stbuf[i].st_fname) || (stbuf[i].st_fname[0] == '\0')) break;
+     }
+     if (i >= STNBUF) { /* stnq says not full but can not find unused */
+ 	syslog(L_ERROR, "stalloc: Internal error");
+ 	return (-1);
+     }
+     if ((int)strlen(Article) >= SPOOLNAMEBUFF) {
+ 	syslog(L_ERROR, "stalloc: filename longer than %d", SPOOLNAMEBUFF);
+ 	return (TRUE);
+     }
+     /* allocate buffers on first use.
+     ** If filename ever is longer than SPOOLNAMEBUFF then code will abort.
+     ** If ID is ever longer than NNTP_STRLEN then other code would break.
+     */
+     if (!stbuf[i].st_fname) stbuf[i].st_fname = NEW(char, SPOOLNAMEBUFF);
+     if (!stbuf[i].st_id) stbuf[i].st_id = NEW(char, NNTP_STRLEN);
+     (void)strcpy(stbuf[i].st_fname, Article);
+     (void)strcpy(stbuf[i].st_id, MessageID);
+     stbuf[i].st_qp = qp;
+     stbuf[i].st_hash = hash;
+     stbuf[i].st_retry = 0;
+     stbuf[i].st_age = 0;
+     stnq++;
+     return i;
+ }
+ 
+ /* strel(): release for reuse one of the streaming mode entries */
+ STATIC void
+ strel(i)
+     int		i;
+ {
+ 	if (stbuf[i].st_qp) {
+ 	    QIOclose(stbuf[i].st_qp);
+ 	    stbuf[i].st_qp = 0;
+ 	}
+ 	if (stbuf[i].st_id) stbuf[i].st_id[0] = '\0';
+ 	if (stbuf[i].st_fname) stbuf[i].st_fname[0] = '\0';
+ 	stnq--;
+ }
+ 
  /*
  **  Send a line to the server, adding the dot escape and \r\n.
  */
  STATIC BOOL
! REMwrite(p, i, escdot)
      register char	*p;
      register int	i;
+     register BOOL	escdot;
  {
      static char		HDR[] = "Content-Transfer-Encoding:";
      static char		COD[] =
***************
*** 212,218 ****
          }
  
      /* Dot escape, text of the line, line terminator. */
!     if (*p == '.')
  	*REMbuffptr++ = '.';
      if (i > MEMCPY_THRESHOLD) {
  	(void)memcpy((POINTER)REMbuffptr, (POINTER)p, (SIZE_T)i);
--- 358,364 ----
          }
  
      /* Dot escape, text of the line, line terminator. */
!     if (escdot && (*p == '.'))
  	*REMbuffptr++ = '.';
      if (i > MEMCPY_THRESHOLD) {
  	(void)memcpy((POINTER)REMbuffptr, (POINTER)p, (SIZE_T)i);
***************
*** 304,310 ****
      double		systime;
  
      if (!Purging) {
! 	(void)REMwrite(QUIT, STRLEN(QUIT));
  	(void)REMflush();
      }
      (void)GetTimeInfo(&Now);
--- 450,456 ----
      double		systime;
  
      if (!Purging) {
! 	(void)REMwrite(QUIT, STRLEN(QUIT), FALSE);
  	(void)REMflush();
      }
      (void)GetTimeInfo(&Now);
***************
*** 325,330 ****
--- 471,478 ----
      syslog(L_NOTICE, STAT1,
  	REMhost, STAToffered, STATaccepted, STATrefused, STATrejected);
      syslog(L_NOTICE, STAT2, REMhost, usertime, systime, STATend - STATbegin);
+     if (retries)
+ 	syslog(L_NOTICE, "%s %lu Streaming retries", REMhost, retries);
  
      if (BATCHfp != NULL && unlink(BATCHtemp) < 0 && errno != ENOENT)
  	(void)fprintf(stderr, "Can't remove \"%s\", %s\n",
***************
*** 342,348 ****
  CloseAndRename()
  {
      /* Close the files, rename the temporary. */
!     QIOclose(BATCHqp);
      if (ferror(BATCHfp)
       || fflush(BATCHfp) == EOF
       || fclose(BATCHfp) == EOF) {
--- 490,499 ----
  CloseAndRename()
  {
      /* Close the files, rename the temporary. */
!     if (BATCHqp) {
! 	QIOclose(BATCHqp);
! 	BATCHqp = NULL;
!     }
      if (ferror(BATCHfp)
       || fflush(BATCHfp) == EOF
       || fclose(BATCHfp) == EOF) {
***************
*** 411,419 ****
      }
  
      (void)fprintf(stderr, "Rewriting batch file and exiting.\n");
      Requeue(Article, MessageID);
  
!     for ( ; ; ) {
  	if ((p = QIOread(BATCHqp)) == NULL) {
  	    if (QIOtoolong(BATCHqp)) {
  		(void)fprintf(stderr, "Skipping long line in \"%s\".\n",
--- 562,584 ----
      }
  
      (void)fprintf(stderr, "Rewriting batch file and exiting.\n");
+     if (CanStream) {	/* streaming mode has a buffer of articles */
+ 	register int i;
+ 
+ 	for (i = 0; i < STNBUF; i++) {    /* requeue unacknowledged articles */
+ 	    if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
+ 		if (Debug)
+ 		    (void)fprintf(stderr, "stbuf[%d]= %s, %s\n",
+ 			    i, stbuf[i].st_fname, stbuf[i].st_id);
+ 		Requeue(stbuf[i].st_fname, stbuf[i].st_id);
+ 		if (Article == stbuf[i].st_fname) Article = NULL;
+ 		strel(i); /* release entry */
+ 	    }
+ 	}
+     }
      Requeue(Article, MessageID);
  
!     for ( ; BATCHqp; ) {
  	if ((p = QIOread(BATCHqp)) == NULL) {
  	    if (QIOtoolong(BATCHqp)) {
  		(void)fprintf(stderr, "Skipping long line in \"%s\".\n",
***************
*** 565,572 ****
--- 730,739 ----
      register char	*p;
      register BOOL	ok;
      register BOOL	InHeaders;
+     long		length;
      char		buff[NNTP_STRLEN];
  
+     length = 0;
      for (InHeaders = TRUE; ; ) {
  	if ((p = QIOread(qp)) == NULL) {
  	    if (QIOtoolong(qp)) {
***************
*** 587,597 ****
  	    InHeaders = FALSE;
  
  	if (InHeaders || MimeArticle == MTnotmime) {
! 	    if (!REMwrite(p, QIOlength(qp))) {
  	        (void)fprintf(stderr, "Can't send \"%s\", %s\n",
  		        Article, strerror(errno));
  	        return FALSE;
  	    }
  	}
  	else {
  	    switch (MimeArticle) {
--- 754,765 ----
  	    InHeaders = FALSE;
  
  	if (InHeaders || MimeArticle == MTnotmime) {
! 	    if (!REMwrite(p, QIOlength(qp), TRUE)) {
  	        (void)fprintf(stderr, "Can't send \"%s\", %s\n",
  		        Article, strerror(errno));
  	        return FALSE;
  	    }
+ 	    length += QIOlength(qp);
  	}
  	else {
  	    switch (MimeArticle) {
***************
*** 601,606 ****
--- 769,775 ----
  		break;
  	    case MTquotedprintable:
  		ok = REMwriteQuoted(p, QIOlength(qp));
+ 		length += QIOlength(qp);
  		break;
  	    }
  	    if (!ok) {
***************
*** 612,637 ****
  	if (GotInterrupt)
  	    Interrupted(Article, MessageID);
      }
!     if (!REMflush()) {
! 	(void)fprintf(stderr, "Can't end \"%s\", %s\n",
  		Article, strerror(errno));
  	return FALSE;
      }
      if (Debug)
! 	(void)fprintf(stderr, "> [ article ]%s\n",
  	     MimeArticle == MTnotmime ? "" : " (Mime: quoted-printable)");
- 
-     /* Write the terminator. */
-     if (write(ToServer, TERM, STRLEN(TERM)) != STRLEN(TERM)) {
- 	(void)fprintf(stderr, "Can't end \"%s\", %s\n",
- 		Article, strerror(errno));
- 	return FALSE;
-     }
      if (GotInterrupt)
  	Interrupted(Article, MessageID);
      if (Debug)
  	(void)fprintf(stderr, "> .\n");
  
      /* What did the remote site say? */
      if (!REMread(buff, (int)sizeof buff)) {
  	(void)fprintf(stderr, "No reply after sending \"%s\", %s\n",
--- 781,802 ----
  	if (GotInterrupt)
  	    Interrupted(Article, MessageID);
      }
!     /* Write the terminator. */
!     if (!REMwrite(".", 1, FALSE)) {
! 	(void)fprintf(stderr, "Can't send \"%s\", %s\n",
  		Article, strerror(errno));
  	return FALSE;
      }
      if (Debug)
! 	(void)fprintf(stderr, "> [ article %ld ]%s\n", length,
  	     MimeArticle == MTnotmime ? "" : " (Mime: quoted-printable)");
      if (GotInterrupt)
  	Interrupted(Article, MessageID);
      if (Debug)
  	(void)fprintf(stderr, "> .\n");
  
+     if (CanStream) return TRUE;	/* streaming mode does not wait for ACK */
+ 
      /* What did the remote site say? */
      if (!REMread(buff, (int)sizeof buff)) {
  	(void)fprintf(stderr, "No reply after sending \"%s\", %s\n",
***************
*** 773,779 ****
--- 938,1116 ----
  	longjmp(JMPwhere, 1);
  }
  
+ /* check articles in streaming NNTP mode
+ ** return TRUE on failure.
+ */
+ STATIC BOOL
+ check(i)
+     int	i; /* index of stbuf to send check for */
+ {
+     char	buff[NNTP_STRLEN];
  
+     /* send "check <ID>" to the other system */
+     (void)sprintf(buff, "check %s", stbuf[i].st_id);
+     if (!REMwrite(buff, (int)strlen(buff), FALSE)) {
+ 	(void)fprintf(stderr, "Can't check article, %s\n",
+ 		strerror(errno));
+ 	return TRUE;
+     }
+     STAToffered++;
+     if (Debug) {
+ 	if (stbuf[i].st_retry)
+ 	    (void)fprintf(stderr, "> %s (retry %d)\n", buff, stbuf[i].st_retry);
+ 	else
+ 	    (void)fprintf(stderr, "> %s\n", buff);
+     }
+     if (GotInterrupt)
+ 	Interrupted(stbuf[i].st_fname, stbuf[i].st_id);
+ 
+     /* That all.  Response is checked later by strlisten() */
+     return FALSE;
+ }
+ 
+ /* Send article in "takethis <id> streaming NNTP mode.
+ ** return TRUE on failure.
+ */
+ STATIC BOOL
+ takethis(i)
+     int i;	/* index to stbuf to be sent */
+ {
+     char	buff[NNTP_STRLEN];
+ 
+     if (!stbuf[i].st_qp) { /* should already be open but ... */
+ 	/* Open the article. */
+ 	if (!(stbuf[i].st_qp = QIOopen(stbuf[i].st_fname, QIO_BUFFER))) {
+ 	    /* can not open it. Should check AltPath */
+ 	    if (AltPath && (*(stbuf[i].st_fname) != '/')) {
+ 		(void)sprintf(AltPath, "%s/%s", AltSpool, stbuf[i].st_fname);
+ 		stbuf[i].st_qp = QIOopen(AltPath, QIO_BUFFER);
+ 	    }
+ 	    if (!(stbuf[i].st_qp)) {
+ 		strel(i);
+ 		return FALSE; /* Not an error. Could be canceled or expired */
+ 	    }
+ 	}
+     }
+     /* send "takethis <ID>" to the other system */
+     (void)sprintf(buff, "takethis %s", stbuf[i].st_id);
+     if (!REMwrite(buff, (int)strlen(buff), FALSE)) {
+ 	(void)fprintf(stderr, "Can't send takethis <id>, %s\n",
+ 		strerror(errno));
+ 	return TRUE;
+     }
+     if (Debug)
+ 	(void)fprintf(stderr, "> %s\n", buff);
+     if (GotInterrupt)
+ 	Interrupted((char *)0, (char *)0);
+     if (!REMsendarticle(stbuf[i].st_fname, stbuf[i].st_id,
+ 	    stbuf[i].st_qp))
+ 	return TRUE;
+     QIOclose(stbuf[i].st_qp);	/* should not need file again */
+     stbuf[i].st_qp = 0;		/* so close to free descriptor */
+     stbuf[i].st_age = 0;
+     /* That all.  Response is checked later by strlisten() */
+     return FALSE;
+ }
+ 
+ 
+ /* listen for responses.  Process acknowledgments to remove items from
+ ** the queue.  Also sends the articles on request.  Returns TRUE on error.
+ ** return TRUE on failure.
+ */
+ STATIC BOOL
+ strlisten()
+ {
+     int		resp;
+     int		i;
+     char	*id, *p;
+     char	buff[NNTP_STRLEN];
+     int		hash;
+ 
+     while(TRUE) {
+ 	if (!REMread(buff, (int)sizeof buff)) {
+ 	    (void)fprintf(stderr, "No reply to check, %s\n", strerror(errno));
+ 	    return TRUE;
+ 	}
+ 	if (GotInterrupt)
+ 	    Interrupted((char *)0, (char *)0);
+ 	if (Debug)
+ 	    (void)fprintf(stderr, "< %s", buff);
+ 
+ 	/* Parse the reply. */
+ 	resp =  atoi(buff);
+ 	/* Skip the 1XX informational messages */
+ 	if ((resp >= 100) && (resp < 200)) continue;
+ 	switch (resp) { /* first time is to verify it */
+ 	case NNTP_ERR_GOTID_VAL:
+ 	case NNTP_OK_SENDID_VAL:
+ 	case NNTP_OK_RECID_VAL:
+ 	case NNTP_ERR_FAILID_VAL:
+ 	case NNTP_RESENDID_VAL:
+ 	    if (id = strchr(buff, '<')) {
+ 		p = strchr(id, '>');
+ 		if (p) *(p+1) = '\0';
+ 		hash = stidhash(id);
+ 		i = stindex(id, hash);	/* find table entry */
+ 		if (i < 0) { /* should not happen */
+ 		    syslog(L_NOTICE, CANT_FINDIT, REMhost, REMclean(buff));
+ 		    return (TRUE); /* can't find it! */
+ 		}
+ 	    } else {
+ 		syslog(L_NOTICE, CANT_PARSEIT, REMhost, REMclean(buff));
+ 		return (TRUE);
+ 	    }
+ 	    break;
+ 	case NNTP_GOODBYE_VAL:
+ 	    /* Most likely out of space -- no point in continuing. */
+ 	    syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff));
+ 	    return TRUE;
+ 	    /* NOTREACHED */
+ 	default:
+ 	    syslog(L_NOTICE, UNEXPECTED, REMhost, REMclean(buff));
+ 	    if (Debug)
+ 		(void)fprintf(stderr, "Unknown reply \"%s\"",
+ 						    buff);
+ 	    return (TRUE);
+ 	}
+ 	switch (resp) { /* now we take some action */
+ 	case NNTP_RESENDID_VAL:	/* remote wants it later */
+ 	    /* try again now because time has passed */
+ 	    if (stbuf[i].st_retry < STNRETRY) {
+ 		if (check(i)) return TRUE;
+ 		stbuf[i].st_retry++;
+ 		stbuf[i].st_age = 0;
+ 	    } else { /* requeue to disk for later */
+ 		Requeue(stbuf[i].st_fname, stbuf[i].st_id);
+ 		strel(i); /* release entry */
+ 	    }
+ 	    break;
+ 	case NNTP_ERR_GOTID_VAL:	/* remote doesn't want it */
+ 	    strel(i); /* release entry */
+ 	    STATrefused++;
+ 	    stnofail = 0;
+ 	    break;
+ 		
+ 	case NNTP_OK_SENDID_VAL:	/* remote wants article */
+ 	    if (takethis(i)) return TRUE;
+ 	    stnofail++;
+ 	    break;
+ 
+ 	case NNTP_OK_RECID_VAL:	/* remote received it OK */
+ 	    strel(i); /* release entry */
+ 	    STATaccepted++;
+ 	    break;
+ 		
+ 	case NNTP_ERR_FAILID_VAL:
+ 	    strel(i); /* release entry */
+ 	    STATrejected++;
+ 	    stnofail = 0;
+ 	    break;
+ 	}
+ 	break;
+     }
+     return (FALSE);
+ }
+ 
  /*
  **  Print a usage message and exit.
  */
***************
*** 781,787 ****
  Usage()
  {
      (void)fprintf(stderr,
! 	"Usage: innxmit [-a] [-d] [-M] [-p] [-r] [-S] [-t#] [-T#] host file\n");
      exit(1);
  }
  
--- 1118,1124 ----
  Usage()
  {
      (void)fprintf(stderr,
! 	"Usage: innxmit [-a] [-c] [-d] [-M] [-p] [-r] [-s] [-S] [-t#] [-T#] host file\n");
      exit(1);
  }
  
***************
*** 801,812 ****
      FILE		*From;
      FILE		*To;
      char		buff[NNTP_STRLEN];
-     char		*AltSpool;
      char		*Article;
      char		*ContentEncoding;
      char		*ContentType;
      char		*MessageID;
-     char		*AltPath;
      SIGHANDLER		(*old)();
      unsigned int	ConnectTimeout;
      unsigned int	TotalTimeout;
--- 1138,1147 ----
***************
*** 818,824 ****
      (void)umask(NEWSUMASK);
  
      /* Parse JCL. */
!     while ((i = getopt(ac, av, "A:adMprSt:T:v")) != EOF)
  	switch (i) {
  	default:
  	    Usage();
--- 1153,1159 ----
      (void)umask(NEWSUMASK);
  
      /* Parse JCL. */
!     while ((i = getopt(ac, av, "A:acdMprsSt:T:v")) != EOF)
  	switch (i) {
  	default:
  	    Usage();
***************
*** 830,835 ****
--- 1165,1173 ----
  	case 'a':
  	    AlwaysRewrite = TRUE;
  	    break;
+ 	case 'c':
+ 	    DoCheck = FALSE;
+ 	    break;
  	case 'd':
  	    Debug = TRUE;
  	    break;
***************
*** 843,850 ****
--- 1181,1192 ----
  	case 'r':
  	    DoRequeue = FALSE;
  	    break;
+ 	case 's':
+ 	    TryStream = FALSE;
+ 	    break;
  	case 'S':
  	    Slavish = TRUE;
+ 	    TryStream = FALSE; /* streaming does not support xreplic */
  	    break;
  	case 't':
  	    ConnectTimeout = atoi(optarg);
***************
*** 957,962 ****
--- 1299,1346 ----
  	/* We no longer need standard I/O. */
  	FromServer = fileno(From);
  	ToServer = fileno(To);
+ 
+ 	if (TryStream) {
+ 	    if (!REMwrite(modestream, (int)strlen(modestream), FALSE)) {
+ 		(void)fprintf(stderr, "Can't negotiate %s, %s\n",
+ 			modestream, strerror(errno));
+ 	    }
+ 	    if (Debug)
+ 		(void)fprintf(stderr, ">%s\n", modestream);
+ 	    /* Does he understand mode stream? */
+ 	    if (!REMread(buff, (int)sizeof buff)) {
+ 		(void)fprintf(stderr, "No reply to %s, %s\n",
+ 				modestream, strerror(errno));
+ 	    } else {
+ 		if (Debug)
+ 		    (void)fprintf(stderr, "< %s", buff);
+ 
+ 		/* Parse the reply. */
+ 		switch (atoi(buff)) {
+ 		default:
+ 		    (void)fprintf(stderr, "Unknown reply to \"%s\" -- %s",
+ 			    modestream, buff);
+ 		    CanStream = FALSE;
+ 		    break;
+ 		case NNTP_OK_STREAM_VAL:	/* YES! */
+ 		    CanStream = TRUE;
+ 		    break;
+ 		case NNTP_BAD_COMMAND_VAL: /* normal refusal */
+ 		    CanStream = FALSE;
+ 		    break;
+ 		}
+ 	    }
+ 	    if (CanStream) {
+ 		int i;
+ 
+ 		for (i = 0; i < STNBUF; i++) { /* reset buffers */
+ 		    stbuf[i].st_fname = 0;
+ 		    stbuf[i].st_id = 0;
+ 		    stbuf[i].st_qp = 0;
+ 		}
+ 		stnq = 0;
+ 	    }
+ 	}
      }
  
      /* Set up signal handlers. */
***************
*** 996,1001 ****
--- 1380,1386 ----
  
  	    /* Normal EOF -- we're done. */
  	    QIOclose(BATCHqp);
+ 	    BATCHqp = NULL;
  	    break;
  	}
  
***************
*** 1100,1107 ****
  	    Interrupted(Article, MessageID);
  
  	/* Offer the article. */
  	(void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID);
! 	if (!REMwrite(buff, (int)strlen(buff))) {
  	    (void)fprintf(stderr, "Can't offer article, %s\n",
  		    strerror(errno));
  	    QIOclose(qp);
--- 1485,1553 ----
  	    Interrupted(Article, MessageID);
  
  	/* Offer the article. */
+ 	if (CanStream) {
+ 	    int i;
+ 	    int lim;
+ 	    int hash;
+ 
+ 	    hash = stidhash(MessageID);
+ 	    if (stindex(MessageID, hash) >= 0) { /* skip duplicates in queue */
+ 		if (Debug)
+ 		    (void)fprintf(stderr, "Skipping duplicate ID %s\n",
+ 							    MessageID);
+ 		QIOclose(qp);
+ 		continue;
+ 	    }
+ 	    /* This code tries to optimize by sending a burst of "check"
+ 	     * commands before flushing the buffer.  This should result
+ 	     * in several being sent in one packet reducing the network
+ 	     * overhead.
+ 	     */
+ 	    if (DoCheck && (stnofail < STNC)) lim = STNBUF;
+ 	    else                              lim = STNBUFL;
+ 	    if (stnq >= lim) { /* need to empty a buffer */
+ 		while (stnq >= STNBUFL) { /* or several */
+ 		    if (strlisten()) {
+ 			RequeueRestAndExit(Article, MessageID);
+ 		    }
+ 		}
+ 	    }
+ 	    /* save new article in the buffer */
+ 	    i = stalloc(Article, MessageID, qp, hash);
+ 	    if (i < 0) {
+ 		QIOclose(qp);
+ 		RequeueRestAndExit(Article, MessageID);
+ 	    }
+ 	    if (DoCheck && (stnofail < STNC)) {
+ 		if (check(i)) {
+ 		    RequeueRestAndExit((char *)NULL, (char *)NULL);
+ 		}
+ 	    } else {
+ 		if (takethis(i)) {
+ 		    RequeueRestAndExit((char *)NULL, (char *)NULL);
+ 		}
+ 	    }
+ 	    /* check for need to resend any IDs */
+ 	    for (i = 0; i < STNBUF; i++) {
+ 		if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
+ 		    if (stbuf[i].st_age++ > stnq) {
+ 			/* This should not happen but just in case ... */
+ 			if (stbuf[i].st_retry < STNRETRY) {
+ 			    if (check(i)) return TRUE; /* resend check */
+ 			    retries++;
+ 			    stbuf[i].st_retry++;
+ 			    stbuf[i].st_age = 0;
+ 			} else { /* requeue to disk for later */
+ 			    Requeue(stbuf[i].st_fname, stbuf[i].st_id);
+ 			    strel(i); /* release entry */
+ 			}
+ 		    }
+ 		}
+ 	    }
+ 	    continue; /* next article */
+ 	}
  	(void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID);
! 	if (!REMwrite(buff, (int)strlen(buff), FALSE)) {
  	    (void)fprintf(stderr, "Can't offer article, %s\n",
  		    strerror(errno));
  	    QIOclose(qp);
***************
*** 1154,1159 ****
--- 1600,1612 ----
  	}
  
  	QIOclose(qp);
+     }
+     if (CanStream) { /* need to wait for rest of ACKs */
+ 	while (stnq > 0) {
+ 	    if (strlisten()) {
+ 		RequeueRestAndExit((char *)NULL, (char *)NULL);
+ 	    }
+ 	}
      }
  
      if (BATCHfp != NULL)
*** innd/nc.c.-stream	Sun Jun 11 15:20:40 1995
--- innd/nc.c	Tue Dec  5 15:23:18 1995
***************
*** 6,12 ****
  #include "innd.h"
  #include "dbz.h"
  
- 
  #define BAD_COMMAND_COUNT	10
  #define WIP_CHECK		(1 * 60)
  #define SAVE_AMT		10
--- 6,11 ----
***************
*** 52,57 ****
--- 51,59 ----
  static FUNCTYPE	NCxpath();
  static FUNCTYPE	NCxreplic();
  static FUNCTYPE	NC_unimp();
+ /* new modules for streaming */
+ static FUNCTYPE	NCcheck();
+ static FUNCTYPE	NCtakethis();
  
  STATIC int		NCcount;	/* Number of open connections	*/
  STATIC int		NCwipsize;	/* Size of NCwip array		*/
***************
*** 67,72 ****
--- 69,76 ----
      {	"authinfo",	NCauthinfo },
      {	"help",		NChelp	},
      {	"ihave",	NCihave	},
+     {	"check",	NCcheck	},
+     {	"takethis",	NCtakethis },
      {	"list",		NClist	},
      {	"mode",		NCmode	},
      {	"quit",		NCquit	},
***************
*** 131,136 ****
--- 135,175 ----
  
  
  /*
+ **  Write an NNTP reply message.
+ **  Call only when we will stay in NCreader mode.
+ **  Tries to do the actual write if it will not block.
+ */
+ STATIC void
+ NCwritereply(cp, text)
+     CHANNEL	*cp;
+     char	*text;
+ {
+     register BUFFER	*bp;
+     register int	i;
+ 
+     bp = &cp->Out;
+     i = bp->Left;
+     WCHANappend(cp, text, (int)strlen(text));	/* text in buffer */
+     WCHANappend(cp, NCterm, STRLEN(NCterm));	/* add CR NL to text */
+     if (i == 0) {	/* if only data then try to write directly */
+ 	i = write(cp->fd, &bp->Data[bp->Used], bp->Left);
+ 	if (Tracing || cp->Tracing)
+ 	    syslog(L_TRACE, "%s NCwritereply %d=write(%d, \"%.15s\", %d)",
+ 		CHANname(cp), i, cp->fd,  &bp->Data[bp->Used], bp->Left);
+ 	if (i > 0) bp->Used += i;
+ 	if (bp->Used == bp->Left) bp->Used = bp->Left = 0;
+ 	else i = 0;
+     } else i = 0;
+     if (i <= 0) {	/* write failed, queue it for later */
+ 	RCHANremove(cp);
+ 	WCHANadd(cp);
+     }
+     if (Tracing || cp->Tracing)
+ 	syslog(L_TRACE, "%s > %s", CHANname(cp), text);
+ }
+ 
+ 
+ /*
  **  Tell the NNTP channel to go away.
  */
  STATIC void
***************
*** 188,197 ****
      case OMrunning:
  	wp = &NCwip[cp->fd];
  	response = ARTpost(cp, AmSlave ? &wp->Replic : NULL, wp->MessageID);
! 	if (atoi(response) == NNTP_TOOKIT_VAL)
  	    cp->Received++;
! 	else
  	    cp->Rejected++;
  	cp->Reported++;
  	if (cp->Reported >= NNTP_ACTIVITY_SYNC) {
  	    syslog(L_NOTICE,
--- 227,246 ----
      case OMrunning:
  	wp = &NCwip[cp->fd];
  	response = ARTpost(cp, AmSlave ? &wp->Replic : NULL, wp->MessageID);
! 	if (atoi(response) == NNTP_TOOKIT_VAL) {
  	    cp->Received++;
! 	    if (cp->Sendid.Size > 3) { /* We be streaming */
! 		char buff[4];
! 		(void)sprintf(buff, "%d", NNTP_OK_RECID_VAL);
! 		cp->Sendid.Data[0] = buff[0];
! 		cp->Sendid.Data[1] = buff[1];
! 		cp->Sendid.Data[2] = buff[2];
! 		response = cp->Sendid.Data;
! 	    }
! 	} else {
  	    cp->Rejected++;
+ 	    if (cp->Sendid.Size) response = cp->Sendid.Data;
+ 	}
  	cp->Reported++;
  	if (cp->Reported >= NNTP_ACTIVITY_SYNC) {
  	    syslog(L_NOTICE,
***************
*** 200,206 ****
  		cp->Received, cp->Refused, cp->Rejected);
  	    cp->Reported = 0;
  	}
! 	NCwritetext(cp, response);
  	cp->State = CSgetcmd;
  	break;
  
--- 249,255 ----
  		cp->Received, cp->Refused, cp->Rejected);
  	    cp->Reported = 0;
  	}
! 	NCwritereply(cp, response);
  	cp->State = CSgetcmd;
  	break;
  
***************
*** 525,535 ****
  
      if (HIShavearticle(p)) {
  	cp->Refused++;
! 	NCwritetext(cp, NNTP_HAVEIT);
      }
      else if (NCinprogress(cp, p, &who)) {
  #if	defined(NNTP_RESENDIT_LATER)
! 	NCwritetext(cp, NNTP_RESENDIT_LATER);
  #else
  	/* Somebody else is sending it to us; wait until they're done. */
  	who->Wanted = TRUE;
--- 574,584 ----
  
      if (HIShavearticle(p)) {
  	cp->Refused++;
! 	NCwritereply(cp, NNTP_HAVEIT);
      }
      else if (NCinprogress(cp, p, &who)) {
  #if	defined(NNTP_RESENDIT_LATER)
! 	NCwritereply(cp, NNTP_RESENDIT_LATER);
  #else
  	/* Somebody else is sending it to us; wait until they're done. */
  	who->Wanted = TRUE;
***************
*** 540,546 ****
  #endif	/* defined(NNTP_RESENDIT_LATER) */
      }
      else {
! 	NCwritetext(cp, NNTP_SENDIT);
  	cp->State = CSgetarticle;
      }
  }
--- 589,595 ----
  #endif	/* defined(NNTP_RESENDIT_LATER) */
      }
      else {
! 	NCwritereply(cp, NNTP_SENDIT);
  	cp->State = CSgetarticle;
      }
  }
***************
*** 607,613 ****
  	h = HOnnrpd;
      else if (caseEQ(p, "query"))
  	h = HOnnrqd;
!     else {
  	NCwritetext(cp, NCbadcommand);
  	return;
      }
--- 656,669 ----
  	h = HOnnrpd;
      else if (caseEQ(p, "query"))
  	h = HOnnrqd;
!     else if (caseEQ(p, "stream")) {
! 	char buff[16];
! 	(void)sprintf(buff, "%d StreamOK.", NNTP_OK_STREAM_VAL);
! 	NCwritetext(cp, buff);
! 	syslog(L_NOTICE, "%s NCmode \"mode stream\" received",
! 		CHANname(cp));
! 	return;
!     } else {
  	NCwritetext(cp, NCbadcommand);
  	return;
      }
***************
*** 763,775 ****
      bp->Used = dest - bp->Data;
  }
  
- 
  /*
! **  Read whatever data is available on the channel.  If we got the
  **  full amount (i.e., the command or the whole article) process it.
  */
  STATIC FUNCTYPE
! NCreader(cp)
      register CHANNEL	*cp;
  {
      register char	*p;
--- 819,830 ----
      bp->Used = dest - bp->Data;
  }
  
  /*
! **  Check whatever data is available on the channel.  If we got the
  **  full amount (i.e., the command or the whole article) process it.
  */
  STATIC FUNCTYPE
! NCproc(cp)
      register CHANNEL	*cp;
  {
      register char	*p;
***************
*** 781,948 ****
      char		*av[2];
      int			i;
  
!     /* Read any data that's there; ignore errors (retry next time it's our
!      * turn) and if we got nothing, then it's EOF so mark it closed. */
!     if ((i = CHANreadtext(cp)) < 0) {
! 	if (cp->BadReads++ >= BAD_IO_COUNT) {
! 	    if (NCcount > 0)
! 		NCcount--;
! 	    CHANclose(cp, CHANname(cp));
! 	}
! 	return;
!     }
!     if (i == 0) {
! 	NCcount--;
! 	return;
!     }
! 
!     /* Update timestamp. */
!     wp = &NCwip[cp->fd];
!     wp->Timestamp = Now.time;
! 
      bp = &cp->In;
!     p = &bp->Data[bp->Used];
!     switch (cp->State) {
!     default:
! 	syslog(L_ERROR, "%s internal NCreader state %d",
! 	    CHANname(cp), cp->State);
! 	break;
  
!     case CSgetcmd:
!     case CSgetauth:
! 	/* Did we get the whole command, terminated with "\r\n"? */
! 	if (bp->Used < 2 || p[-2] != '\r' || p[-1] != '\n')
  	    break;
- 	p[-2] = '\0';
- 	bp->Used -= 2;
  
! 	/* Ignore blank lines. */
! 	if (bp->Used == 0)
! 	    break;
! 	if (Tracing || cp->Tracing)
! 	    syslog(L_TRACE, "%s < %s", CHANname(cp), bp->Data);
  
! 	/* We got something -- stop sleeping (in case we were). */
! 	SCHANremove(cp);
! 	if (cp->Argument != NULL) {
! 	    DISPOSE(cp->Argument);
! 	    cp->Argument = NULL;
! 	}
  
! 	if (cp->State == CSgetauth) {
! 	    if (caseEQn(bp->Data, "mode", 4))
! 		NCmode(cp);
! 	    else
! 		NCauthinfo(cp);
! 	    break;
! 	}
  
! 	/* Loop through the command table. */
! 	for (p = bp->Data, dp = NCcommands; dp < ENDOF(NCcommands); dp++)
! 	    if (caseEQn(p, dp->Name, dp->Size)) {
! 		(*dp->Function)(cp);
! 		cp->BadCommands = 0;
  		break;
  	    }
! 	if (dp == ENDOF(NCcommands)) {
! 	    NCwritetext(cp, NCbadcommand);
! 	    if (++(cp->BadCommands) >= BAD_COMMAND_COUNT)
! 		cp->State = CSwritegoodbye;
! 	    for (i = 0; (p = NCquietlist[i]) != NULL; i++)
! 		if (caseEQ(p, dp->Name))
  		    break;
! 	    if (p == NULL)
! 		syslog(L_NOTICE, "%s bad_command %s",
! 		    CHANname(cp), MaxLength(bp->Data, bp->Data));
! 	}
! 	break;
  
!     case CSgetarticle:
!     case CSgetrep:
! 	/* Reading an article; look for "\r\n.\r\n" terminator. */
! 	if (!ART_EOF(bp->Used, p)) {
  	    /* Check for the null article. */
! 	    if (bp->Used == 3
! 	     && p[-3] == '.' && p[-2] == '\r' && p[-1] == '\n') {
  		cp->Rejected++;
! 		NCwritetext(cp, NNTP_REJECTIT_EMPTY);
  		cp->State = CSgetcmd;
  		bp->Used = 0;
  
  		/* Clear the work-in-progress entry. */
  		NCclearwip(wp);
  	    }
! 
! 	    /* Check for big articles. */
! 	    if (LargestArticle > SAVE_AMT && bp->Used > LargestArticle) {
! 		/* Make some room, saving only the last few bytes. */
! 		for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++)
! 		    p[0] = p[bp->Used - SAVE_AMT];
! 		wp->Size += bp->Used - SAVE_AMT;
! 		bp->Used = SAVE_AMT;
! 		cp->State = CSeatarticle;
  	    }
! 	    break;
! 	}
  
! 	/* Strip article terminator and post the article. */
! 	p[-3] = '\0';
! 	bp->Used -= 3;
! 	SCHANremove(cp);
! 	if (cp->Argument != NULL) {
! 	    DISPOSE(cp->Argument);
! 	    cp->Argument = NULL;
! 	}
! 	NCclean(bp);
! 	NCpostit(cp);
! 	break;
! 
!     case CSeatarticle:
! 	/* Eat the article and then complain that it was too large */
! 	if (ART_EOF(bp->Used, p)) {
! 	    /* Reached the end of the article. */
  	    SCHANremove(cp);
  	    if (cp->Argument != NULL) {
  		DISPOSE(cp->Argument);
  		cp->Argument = NULL;
  	    }
! 	    p = wp->MessageID;
! 	    i = wp->Size + bp->Used;
! 	    syslog(L_ERROR, "%s internal rejecting huge article %s (%d > %d)",
! 		CHANname(cp), p ? p : "(null)", i, LargestArticle);
! 	    (void)sprintf(buff, "%d Article exceeds local limit of %ld bytes",
! 		    NNTP_REJECTIT_VAL, LargestArticle);
! 	    NCwritetext(cp, buff);
  	    cp->State = CSgetcmd;
! 	    cp->Rejected++;
  
! 	    /* Write a local cancel entry so nobody else gives it to us. */
! 	    if (p) {
! 		av[0] = p;
! 		av[1] = NULL;
! 		if ((q = CCcancel(av)) != NULL)
! 		    syslog(L_ERROR, "%s cant cancel %s %s", LogName, av[0], q);
  	    }
  
! 	    /* Clear the work-in-progress entry. */
! 	    NCclearwip(wp);
  
! 	    /* Reset input buffer to the default size; don't let realloc
! 	     * be lazy. */
! 	    DISPOSE(bp->Data);
! 	    bp->Size = START_BUFF_SIZE;
! 	    bp->Used = 0;
! 	    bp->Data = NEW(char, bp->Size);
  	}
! 	else if (bp->Used > 8 * 1024) {
! 	    /* Make some room; save the last few bytes of the article */
! 	    for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++)
! 		p[0] = p[bp->Used - SAVE_AMT + 0];
! 	    wp->Size += bp->Used - SAVE_AMT;
! 	    bp->Used = SAVE_AMT;
  	}
! 	break;
      }
  }
  
  
--- 836,1139 ----
      char		*av[2];
      int			i;
  
!     if (Tracing || cp->Tracing)
! 	syslog(L_TRACE, "%s NCproc Used=%d",
! 	    CHANname(cp), cp->In.Used);
!     
      bp = &cp->In;
!     if (bp->Used == 0) return;
  
!     p = &bp->Data[bp->Used];
!     wp = &NCwip[cp->fd];
!     for ( ; ; ) {
! 	cp->Rest = 0;
! 	cp->SaveUsed = bp->Used;
! 	if (Tracing || cp->Tracing)
! 	    if (bp->Used > 15)
! 		syslog(L_TRACE, "%s NCproc state=%d next \"%.15s\"",
! 		    CHANname(cp), cp->State, bp->Data);
! 	switch (cp->State) {
! 	default:
! 	    syslog(L_ERROR, "%s internal NCproc state %d",
! 		CHANname(cp), cp->State);
  	    break;
  
! 	case CSgetcmd:
! 	case CSgetauth:
! 	    /* Did we get the whole command, terminated with "\r\n"? */
! 	    for (i = 0; (i < bp->Used) && (bp->Data[i] != '\n'); i++) ;
! 	    if (i < bp->Used) cp->Rest = bp->Used = ++i;
! 	    else {
! 		cp->Rest = 0;
! 		break;	/* come back later for rest of line */
! 	    }
! 	    if (cp->Rest < 2) break;
! 	    p = &bp->Data[cp->Rest];
! 	    if (p[-2] != '\r' || p[-1] != '\n') { /* probably in an article */
! 		int j;
  
! 		syslog(L_NOTICE, "%s bad_command %s",
! 		    CHANname(cp), MaxLength(bp->Data, bp->Data));
! 		NCwritetext(cp, NCbadcommand);
! 		if (++(cp->BadCommands) >= BAD_COMMAND_COUNT) {
! 		    cp->State = CSwritegoodbye;
! 		    cp->Rest = cp->SaveUsed;
! 		    break;
! 		}
! 		for (j = i + 1; j < cp->SaveUsed; j++)
! 		    if (bp->Data[j] ==  '\n') {
! 			if (bp->Data[j - 1] == '\r') break;
! 			else cp->Rest = bp->Used = j + 1;
! 		    }
! 		break;
! 	    }
! 	    p[-2] = '\0';
! 	    bp->Used -= 2;
  
! 	    /* Ignore blank lines. */
! 	    if (bp->Data[0] == '\0')
! 		break;
! 	    if (Tracing || cp->Tracing)
! 		syslog(L_TRACE, "%s < %s", CHANname(cp), bp->Data);
  
! 	    /* We got something -- stop sleeping (in case we were). */
! 	    SCHANremove(cp);
! 	    if (cp->Argument != NULL) {
! 		DISPOSE(cp->Argument);
! 		cp->Argument = NULL;
! 	    }
! 
! 	    if (cp->State == CSgetauth) {
! 		if (caseEQn(bp->Data, "mode", 4))
! 		    NCmode(cp);
! 		else
! 		    NCauthinfo(cp);
  		break;
  	    }
! 
! 	    /* Loop through the command table. */
! 	    for (p = bp->Data, dp = NCcommands; dp < ENDOF(NCcommands); dp++)
! 		if (caseEQn(p, dp->Name, dp->Size)) {
! 		    (*dp->Function)(cp);
! 		    cp->BadCommands = 0;
  		    break;
! 		}
! 	    if (dp == ENDOF(NCcommands)) {
! 		NCwritetext(cp, NCbadcommand);
! 		if (++(cp->BadCommands) >= BAD_COMMAND_COUNT) {
! 		    cp->State = CSwritegoodbye;
! 		    cp->Rest = cp->SaveUsed;
! 		}
! 		for (i = 0; (p = NCquietlist[i]) != NULL; i++)
! 		    if (caseEQ(p, dp->Name))
! 			break;
! 		if (p == NULL)
! 		    syslog(L_NOTICE, "%s bad_command %s",
! 			CHANname(cp), MaxLength(bp->Data, bp->Data));
! 	    }
! 	    break;
  
! 	case CSgetarticle:
! 	case CSgetrep:
  	    /* Check for the null article. */
! 	    if ((bp->Used >= 3) && (bp->Data[0] == '.')
! 	     && (bp->Data[1] == '\r') && (bp->Data[2] == '\n')) {
! 		cp->Rest = 3;	/* null article (canceled?) */
  		cp->Rejected++;
! 		if (cp->Sendid.Size > 3) { /* We be streaming */
! 		    char buff[4];
! 		    (void)sprintf(buff, "%d", NNTP_ERR_FAILID_VAL);
! 		    cp->Sendid.Data[0] = buff[0];
! 		    cp->Sendid.Data[1] = buff[1];
! 		    cp->Sendid.Data[2] = buff[2];
! 		    NCwritereply(cp, cp->Sendid.Data);
! 		}
! 		else NCwritetext(cp, NNTP_REJECTIT_EMPTY);
  		cp->State = CSgetcmd;
  		bp->Used = 0;
  
  		/* Clear the work-in-progress entry. */
  		NCclearwip(wp);
+ 		break;
  	    }
! 	    /* Reading an article; look for "\r\n.\r\n" terminator. */
! 	    if (cp->Lastch > 5) i = cp->Lastch; /* only look at new data */
! 	    else         i = 5;
! 	    for ( ; i <= bp->Used; i++) {
! 		if ((bp->Data[i - 5] == '\r')
! 		 && (bp->Data[i - 4] == '\n')
! 		 && (bp->Data[i - 3] == '.')
! 		 && (bp->Data[i - 2] == '\r')
! 		 && (bp->Data[i - 1] == '\n')) {
! 		    cp->Rest = bp->Used = i;
! 		    p = &bp->Data[i];
! 		    break;
! 		}
  	    }
! 	    cp->Lastch = i;
! 	    if (i > bp->Used) {	/* did not find terminator */
! 		/* Check for big articles. */
! 		if (LargestArticle > SAVE_AMT && bp->Used > LargestArticle) {
! 		    /* Make some room, saving only the last few bytes. */
! 		    for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++)
! 			p[0] = p[bp->Used - SAVE_AMT];
! 		    wp->Size += bp->Used - SAVE_AMT;
! 		    bp->Used = SAVE_AMT;
! 		    cp->State = CSeatarticle;
! 		}
! 		cp->Rest = 0;
! 		break;
! 	    }
! 	    if (Mode == OMpaused) { /* defer processing while paused */
! 		cp->Rest = 0;
! 		bp->Used = cp->SaveUsed;
! 		SCHANadd(cp, (time_t)(Now.time + PAUSE_RETRY_TIME),
! 		    (POINTER)&Mode, NCproc, (POINTER)NULL);
! 		return;
! 	    }
  
! 	    /* Strip article terminator and post the article. */
! 	    p[-3] = '\0';
! 	    bp->Used -= 2;
  	    SCHANremove(cp);
  	    if (cp->Argument != NULL) {
  		DISPOSE(cp->Argument);
  		cp->Argument = NULL;
  	    }
! 	    NCclean(bp);
! 	    NCpostit(cp);
  	    cp->State = CSgetcmd;
! 	    break;
  
! 	case CSeatarticle:
! 	    /* Eat the article and then complain that it was too large */
! 	    /* Reading an article; look for "\r\n.\r\n" terminator. */
! 	    if (cp->Lastch > 5) i = cp->Lastch; /* only look at new data */
! 	    else         i = 5;
! 	    for ( ; i <= bp->Used; i++) {
! 		if ((bp->Data[i - 5] == '\r')
! 		 && (bp->Data[i - 4] == '\n')
! 		 && (bp->Data[i - 3] == '.')
! 		 && (bp->Data[i - 2] == '\r')
! 		 && (bp->Data[i - 1] == '\n')) {
! 		    cp->Rest = bp->Used = i;
! 		    p = &bp->Data[i];
! 		    break;
! 		}
  	    }
+ 	    if (i <= bp->Used) {	/* did find terminator */
+ 		/* Reached the end of the article. */
+ 		SCHANremove(cp);
+ 		if (cp->Argument != NULL) {
+ 		    DISPOSE(cp->Argument);
+ 		    cp->Argument = NULL;
+ 		}
+ 		p = wp->MessageID;
+ 		i = wp->Size + bp->Used;
+ 		syslog(L_ERROR, "%s internal rejecting huge article %s (%d > %d)",
+ 		    CHANname(cp), p ? p : "(null)", i, LargestArticle);
+ 		(void)sprintf(buff, "%d Article exceeds local limit of %ld bytes",
+ 			NNTP_REJECTIT_VAL, LargestArticle);
+ 		if (cp->Sendid.Size) NCwritetext(cp, cp->Sendid.Data);
+ 		else NCwritetext(cp, buff);
+ 		cp->State = CSgetcmd;
+ 		cp->Rejected++;
  
! 		/* Write a local cancel entry so nobody else gives it to us. */
! 		if (p) {
! 		    av[0] = p;
! 		    av[1] = NULL;
! 		    if ((q = CCcancel(av)) != NULL)
! 			syslog(L_ERROR, "%s cant cancel %s %s", LogName, av[0], q);
! 		}
  
! 		/* Clear the work-in-progress entry. */
! 		NCclearwip(wp);
! 
! 		/* 
! 		 * only free and allocate the buffer back to
! 		 * START_BUFF_SIZE if there's nothing in the buffer we
! 		 * need to save (i.e., following commands.
! 		 * if there is, then we're probably in streaming mode,
! 		 * so probably not much point in trying to keep the
! 		 * buffers minimal anyway...
! 		 */
! 		if (bp->Used == cp->SaveUsed) {
! 		    /* Reset input buffer to the default size; don't let realloc
! 		     * be lazy. */
! 		    DISPOSE(bp->Data);
! 		    bp->Size = START_BUFF_SIZE;
! 		    bp->Used = 0;
! 		    bp->Data = NEW(char, bp->Size);
! 		    cp->SaveUsed = cp->Rest = cp->Lastch = 0;
! 		}
! 	    }
! 	    else if (bp->Used > 8 * 1024) {
! 		/* Make some room; save the last few bytes of the article */
! 		for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++)
! 		    p[0] = p[bp->Used - SAVE_AMT + 0];
! 		wp->Size += bp->Used - SAVE_AMT;
! 		bp->Used = cp->Lastch = SAVE_AMT;
! 		cp->Rest = 0;
! 	    }
! 	    break;
  	}
! 	if (Tracing || cp->Tracing)
! 		syslog(L_TRACE, "%s NCproc Rest=%d Used=%d SaveUsed=%d",
! 		    CHANname(cp), cp->Rest, bp->Used, cp->SaveUsed);
! 	if (cp->Rest > 0) {
! 	    if (cp->Rest < cp->SaveUsed) { /* more commands in buffer */
! 		bp->Used = cp->SaveUsed = cp->SaveUsed - cp->Rest;
! 		/* It would be nice to avoid this copy but that
! 		** would require changes to the bp structure and
! 		** the way it is used.
! 		*/
! 		(void)memcpy((POINTER)bp->Data, (POINTER)&bp->Data[cp->Rest], (SIZE_T)bp->Used);
! 		cp->Rest = cp->Lastch = 0;
! 	    } else {
! 		bp->Used = cp->Lastch = 0;
! 		break;
! 	    }
! 	} else break;
!     }
! }
! 
! 
! /*
! **  Read whatever data is available on the channel.
! **  Call NCproc() to process the data.
! */
! STATIC FUNCTYPE
! NCreader(cp)
!     register CHANNEL	*cp;
! {
!     register WIP	*wp;
!     int			i;
! 
!     if (Tracing || cp->Tracing)
! 	syslog(L_TRACE, "%s NCreader Used=%d",
! 	    CHANname(cp), cp->In.Used);
!     
!     /* Read any data that's there; ignore errors (retry next time it's our
!      * turn) and if we got nothing, then it's EOF so mark it closed. */
!     if ((i = CHANreadtext(cp)) < 0) {
! 	if (cp->BadReads++ >= BAD_IO_COUNT) {
! 	    if (NCcount > 0)
! 		NCcount--;
! 	    CHANclose(cp, CHANname(cp));
  	}
! 	return;
      }
+     if (i == 0) {
+ 	NCcount--;
+ 	return;
+     }
+ 
+     /* Update timestamp. */
+     wp = &NCwip[cp->fd];
+     wp->Timestamp = Now.time;
+ 
+     NCproc(cp);	/* check and process data */
  }
  
  
***************
*** 1058,1061 ****
--- 1249,1356 ----
      cp->BadCommands = 0;
      NCwritetext(cp, NCgreeting);
      return cp;
+ }
+ 
+ 
+ /* These modules support the streaming option to tranfer articles
+ ** faster.
+ */
+ 
+ /*
+ **  The "check" command.  Check the Message-ID, and see if we want the
+ **  article or not.  Stay in command state.
+ */
+ STATIC FUNCTYPE
+ NCcheck(cp)
+     CHANNEL		*cp;
+ {
+     register char	*p;
+     int			msglen;
+     WIP			*who;
+ 
+     if (AmSlave) {
+ 	NCwritetext(cp, NCbadcommand);
+ 	return;
+     }
+ 
+     /* Snip off the Message-ID. */
+     for (p = cp->In.Data; !ISWHITE(*p); p++)
+ 	continue;
+     for ( ; ISWHITE(*p); p++)
+ 	continue;
+     msglen = strlen(p) + 5; /* 3 digits + space + id + null */
+     if (cp->Sendid.Size < msglen) {
+ 	if (cp->Sendid.Size > 0) DISPOSE(cp->Sendid.Data);
+ 	if (msglen > MAXHEADERSIZE) cp->Sendid.Size = msglen;
+ 	else cp->Sendid.Size = MAXHEADERSIZE;
+ 	cp->Sendid.Data = NEW(char, cp->Sendid.Size);
+     }
+     if (!ARTidok(p)) {
+ 	(void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_GOTID_VAL, p);
+ 	NCwritereply(cp, cp->Sendid.Data);
+ 	syslog(L_NOTICE, "%s bad_messageid %s", CHANname(cp), MaxLength(p, p));
+ 	return;
+     }
+ 
+     if (HIShavearticle(p)) {
+ 	cp->Refused++;
+ 	(void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_GOTID_VAL, p);
+ 	NCwritereply(cp, cp->Sendid.Data);
+     } else if (NCinprogress(cp, p, &who)) {
+ 	(void)sprintf(cp->Sendid.Data, "%d %s", NNTP_RESENDID_VAL, p);
+ 	NCwritereply(cp, cp->Sendid.Data);
+     } else {
+ 	(void)sprintf(cp->Sendid.Data, "%d %s", NNTP_OK_SENDID_VAL, p);
+ 	NCwritereply(cp, cp->Sendid.Data);
+     }
+     /* stay in command mode */
+ }
+ 
+ /*
+ **  The "takethis" command.  Article follows.
+ **  Remember <id> for later ack.
+ */
+ STATIC FUNCTYPE
+ NCtakethis(cp)
+     CHANNEL		*cp;
+ {
+     register char	*p;
+     int			msglen;
+     register WIP	*wp;
+     register int	i;
+ 
+     /* Snip off the Message-ID. */
+     for (p = cp->In.Data; !ISWHITE(*p); p++)
+ 	continue;
+     for ( ; ISWHITE(*p); p++)
+ 	continue;
+     if (!ARTidok(p)) {
+ 	syslog(L_NOTICE, "%s bad_messageid %s", CHANname(cp), MaxLength(p, p));
+     }
+     msglen = strlen(p) + 5; /* 3 digits + space + id + null */
+     if (cp->Sendid.Size < msglen) {
+ 	if (cp->Sendid.Size > 0) DISPOSE(cp->Sendid.Data);
+ 	if (msglen > MAXHEADERSIZE) cp->Sendid.Size = msglen;
+ 	else cp->Sendid.Size = MAXHEADERSIZE;
+ 	cp->Sendid.Data = NEW(char, cp->Sendid.Size);
+     }
+     /* save ID for later NACK or ACK */
+     (void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_FAILID_VAL, p);
+ 
+     cp->State = CSgetarticle;
+     /* set wp->MessageID for benefit of later code in NCreader
+      * (especially while in CSeatarticle state)
+      */
+     wp = &NCwip[cp->fd];
+     if (wp->MessageID == NULL) {
+ 	for (i = SIZEOF(NCfreelist); --i >= 0; )
+ 	    if (NCfreelist[i] != NULL) {
+ 		wp->MessageID = NCfreelist[i];
+ 		NCfreelist[i] = NULL;
+ 		break;
+ 	    }
+ 	if (i < 0)
+ 	    wp->MessageID = NEW(char, DBZMAXKEY + 3);
+     }
+     (void)strcpy(wp->MessageID, p);
  }
*** include/nntp.h.-stream	Sun Jun 11 15:20:48 1995
--- include/nntp.h	Tue Aug  1 17:42:16 1995
***************
*** 83,88 ****
--- 83,105 ----
  #define NNTP_CANTPOST			"440 Posting not allowed"
  #define NNTP_CANTPOST_VAL		440
  
+ /* new entries for the "streaming" protocol */
+ /* response to "mode stream" else 500 if stream not supported */
+ #define NNTP_OK_STREAM_VAL		203	/* Streaming supported */
+ 
+ /* response to "check <id>".  Must include ID of article.
+ ** Example: "431 <1234@host.domain>"
+ */
+ #define NNTP_OK_SENDID_VAL		238	/* I want article <id> */
+ #define NNTP_RESENDID_VAL		431	/* try <id> again later */
+ #define NNTP_ERR_GOTID_VAL		438	/* Got <id>, don't send */
+ 
+ /* responses to "takethis <id>.  Must include ID of article */
+ #define NNTP_OK_RECID_VAL		239	/* Article <id> received OK */
+ #define NNTP_ERR_FAILID_VAL		439	/* Transfer of <id> failed */
+ 
+ /* End of new entries for the "streaming" protocol */
+ 
  
  /*
  **  The first character of an NNTP reply can be used as a category class.
===END Streaming NNTP Diffs===


