From jerry@strobe.ATC.Olivetti.Com Wed Jun 28 10:58:19 EDT 1995
Article: 3647 of alt.sources
Path: news.pop.psu.edu!news.cac.psu.edu!howland.reston.ans.net!spool.mu.edu!olivea!strobe!jerry
From: jerry@strobe.ATC.Olivetti.Com (Jerry Aguirre)
Newsgroups: news.software.nntp,alt.sources
Subject: Re: INN streaming patch
Followup-To: news.software.nntp
Date: 28 Jun 1995 02:50:34 GMT
Organization: Olivetti ATC; Cupertino, CA; USA
Lines: 1690
Message-ID: <3sqg1q$l6i@olivea.ATC.Olivetti.Com>
References: <3sn5oi$6ce@olivea.ATC.Olivetti.Com> <3sn6cq$8v8@olivea.atc.olivetti.com>
NNTP-Posting-Host: strobe.atc.olivetti.com
Summary: Faster NNTP transfers
Keywords: INN usenet nntp patch
Xref: news.pop.psu.edu news.software.nntp:11891 alt.sources:3647

[I canceled two earlier postings of this patch due to errors]

Here is an improved patch to modify INN 1.4 to support the
"streaming" extensions to the NNTP protocol.  When a modified
innxmit is sending to a modified innd they negotiate use of
the new protocol.

The new protocol eliminates the effect of RTT on thruput by sending
several articles before waiting for responses.  This improves thruput
especially on certain serial links.

The is the second patch published and should be identified with the
date 27Jun95.  This patch corrects several syntactical problems
experienced when compiling on certain systems.

This patch also protects itself from duplicate IDs in the queue.
Certain types of feeds are prone to generating duplicate entries in
the feed.  These could confuse previous versions as the ID could match
the wrong entry in the internal array.  Duplicate IDs are now
detected and skipped.

For those who the previous patch please note that only innxmit.c is
different from the previous patch.  I have posted a separate diff of the
previous patch to this one.

                                Jerry Aguirre
===BEGIN PATCH===
*** innd/chan.c.orig	Thu Mar 18 13:04:23 1993
--- innd/chan.c	Mon Nov 21 11:12:47 1994
***************
*** 117,122 ****
--- 117,123 ----
      cp->In = in;
      cp->Out = out;
      cp->Tracing = Tracing;
+     cp->Sendid.Size=0;
  
      /* Make the descriptor close-on-exec and non-blocking. */
      CloseOnExec(fd, TRUE);
***************
*** 210,215 ****
--- 211,220 ----
      if (cp->Out.Size > BIG_BUFFER) {
  	cp->Out.Size = 0;
  	DISPOSE(cp->Out.Data);
+     }
+     if (cp->Sendid.Size) {
+ 	cp->Sendid.Size = 0;
+ 	DISPOSE(cp->Sendid.Data);
      }
  }
  
*** innd/innd.h.orig	Thu Mar 18 13:04:26 1993
--- innd/innd.h	Thu Jul 28 18:55:25 1994
***************
*** 148,153 ****
--- 148,154 ----
      BUFFER		In;
      BUFFER		Out;
      BOOL		Tracing;
+     BUFFER		Sendid;
  } CHANNEL;
  
  
*** doc/innxmit.8.orig	Mon Jan 16 13:42:08 1995
--- doc/innxmit.8	Wed Jan 25 17:22:16 1995
***************
*** 11,16 ****
--- 11,19 ----
  .B \-a
  ]
  [
+ .B \-c
+ ]
+ [
  .B \-d
  ]
  [
***************
*** 20,25 ****
--- 23,31 ----
  .B \-r
  ]
  [
+ .B \-s
+ ]
+ [
  .BI \-t " timeout"
  ]
  [
***************
*** 150,155 ****
--- 156,179 ----
  The ``\-A'' flag may be used to specify an alternate spool directory to
  use if the article is not found; this would normally be an NFS-mounted
  spool directory of a master server with longer expiration times.
+ .PP
+ .I Innxmit
+ will attempt to negotiate a streaming mode extension of the NNTP
+ protocol with the server at connect time.
+ If successful it will use a slightly different protocol that enhances
+ throughput.
+ If the server does not recognize the streaming mode negotiation
+ .I innxmit
+ will revert to normal NNTP transfer mode.
+ Use the ``\-s'' flag to disable the attempt to negotiate the streaming
+ mode extension.
+ In streaming mode a check of each message ID is still made to avoid sending
+ articles already on the server.
+ The ``\-c'' flag will, if streaming mode is supported,
+ result in sending articles without checking.
+ This results in slightly greater throughput and may be appropriate when
+ it is known that the site could not already have the articles such as in
+ the case of a "leaf" site.
  .SH HISTORY
  Written by Rich $alz <rsalz@uunet.uu.net> for InterNetNews.
  .de R$
*** backends/innxmit.c.orig	Thu Mar 18 13:03:28 1993
--- backends/innxmit.c	Tue Jun 27 19:39:33 1995
***************
*** 1,6 ****
--- 1,7 ----
  /*  $Revision: 1.14 $
  **
  **  Transmit articles to remote site.
+ **  Modified for NNTP streaming: 27June95 Jerry Aguirre
  */
  #include "configdata.h"
  #include <stdio.h>
***************
*** 39,45 ****
--- 40,84 ----
  
  #define OUTPUT_BUFFER_SIZE	(16 * 1024)
  
+ /* Streaming extensions to NNTP.  This extension removes the lock-step
+ ** limitation of conventional NNTP.  Article transfer is several times
+ ** faster.  Negotiated and falls back to old mode if receiver refuses.
+ */
  
+ /* max number of articles that can be streamed ahead */
+ #define STNBUF 32
+ 
+ /* Send "takethis" without "check" if this many articles were
+ ** accepted in a row.
+ */
+ #define STNC 16
+ 
+ /* typical number of articles to stream  */
+ /* must be able to fopen this many articles */
+ #define STNBUFL (STNBUF/2)
+ 
+ /* number of retries before requeueing to disk */
+ #define STNRETRY 5
+ 
+ struct stbufs {		/* for each article we are procesing */
+ 	char *st_fname;		/* file name */
+ 	char *st_id;		/* message ID */
+ 	int   st_retry;		/* retry count */
+ 	int   st_age;		/* age count */
+ 	QIOSTATE *st_qp;	/* IO to read article contents */
+ };
+ static struct stbufs stbuf[STNBUF]; /* we keep track of this many articles */
+ static int stnq;	/* current number of active entries in stbuf */
+ static int stahead;	/* streaming mode "slow start" counter */
+ static long stnofail;	/* Count of consecutive successful sends */
+ 
+ static int TryStream = TRUE;	/* Should attempt stream negotation? */
+ static int CanStream = FALSE;	/* Result of stream negotation */
+ static int DoCheck   = TRUE;	/* Should check before takethis? */
+ static char modestream[] = "mode stream";
+ 
+ 
+ 
  /*
  ** Syslog formats - collected together so they remain consistent
  */
***************
*** 51,56 ****
--- 90,98 ----
  STATIC char	CANT_AUTHENTICATE[] = "%s authenticate failed %s";
  STATIC char	IHAVE_FAIL[] = "%s ihave failed %s";
  
+ STATIC char	CANT_FINDIT[] = "%s can't find %s";
+ STATIC char	CANT_PARSEIT[] = "%s can't parse ID %s";
+ STATIC char	UNEXPECTED[] = "%s unexpected response code %s";
  
  /*
  **  Global variables.
***************
*** 83,88 ****
--- 125,132 ----
  STATIC unsigned long	STAToffered;
  STATIC unsigned long	STATrefused;
  STATIC unsigned long	STATrejected;
+ STATIC char		*AltSpool;
+ STATIC char		*AltPath;
  
  
  /*
***************
*** 171,189 ****
  {
      int		i;
  
      i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer));
      REMbuffptr = REMbuffer;
      return i < 0 ? FALSE : TRUE;
  }
  
  
  /*
  **  Send a line to the server, adding the dot escape and \r\n.
  */
  STATIC BOOL
! REMwrite(p, i)
      register char	*p;
      register int	i;
  {
      static char		HDR[] = "Content-Transfer-Encoding:";
      static char		COD[] =
--- 215,308 ----
  {
      int		i;
  
+     if (REMbuffptr == REMbuffer) return TRUE; /* nothing buffered */
      i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer));
      REMbuffptr = REMbuffer;
      return i < 0 ? FALSE : TRUE;
  }
  
+ /*
+ **  Return TRUE if the MessageID duplicates an existing entry.
+ **  Duplicates sometime get into the queue and this greatly confuses
+ **  the protocol.
+ */
+ STATIC BOOL
+ stisdup(MessageID)
+     char		*MessageID;
+ {
+     register int i;
  
+     for (i = 0; i < STNBUF; i++) { /* linear search for ID */
+ 	if ((stbuf[i].st_id) && (stbuf[i].st_id[0])
+ 	 && (0 == strcasecmp(MessageID, stbuf[i].st_id))) {
+ 	    register int n;
+ 
+ 	    for (n = 0; (MessageID[n] != '@') && (MessageID[n] != '\0'); n++) ;
+ 	    /* left of '@' is case sensitive */
+ 	    if (strncmp(MessageID, stbuf[i].st_id, n)) continue;
+ 	    else break;	/* found a match */
+ 	}
+     }
+     return (i < STNBUF);	/* true if we found an existing entry */
+ }
+ 
+ /* stalloc(): save path, ID, and qp into one of the streaming mode entries */
+ STATIC int
+ stalloc(Article, MessageID, qp)
+     char		*Article;
+     char		*MessageID;
+     register QIOSTATE	*qp;
+ {
+     register int i;
+ 
+     for (i = 0; i < STNBUF; i++) {
+ 	if ((!stbuf[i].st_fname) || (stbuf[i].st_fname[0] == '\0')) break;
+     }
+     if (i >= STNBUF) { /* stnq says not full but can not find unused */
+ 	syslog(L_ERROR, "stalloc: Internal error");
+ 	return (-1);
+     }
+     if ((int)strlen(Article) >= SPOOLNAMEBUFF) {
+ 	syslog(L_ERROR, "stalloc: filename longer than %d", SPOOLNAMEBUFF);
+ 	return (TRUE);
+     }
+     /* allocate buffers on first use.
+     ** If filename ever is longer than SPOOLNAMEBUFF then code will abort.
+     ** If ID is ever longer than NNTP_STRLEN then other code would break.
+     */
+     if (!stbuf[i].st_fname) stbuf[i].st_fname = NEW(char, SPOOLNAMEBUFF);
+     if (!stbuf[i].st_id) stbuf[i].st_id = NEW(char, NNTP_STRLEN);
+     (void)strcpy(stbuf[i].st_fname, Article);
+     (void)strcpy(stbuf[i].st_id, MessageID);
+     stbuf[i].st_qp = qp;
+     stbuf[i].st_retry = 0;
+     stbuf[i].st_age = 0;
+     stnq++;
+     return i;
+ }
+ 
+ /* strel(): release for reuse one of the streaming mode entries */
+ STATIC void
+ strel(i)
+     int		i;
+ {
+ 	if (stbuf[i].st_qp) {
+ 	    QIOclose(stbuf[i].st_qp);
+ 	    stbuf[i].st_qp = 0;
+ 	}
+ 	if (stbuf[i].st_id) stbuf[i].st_id[0] = '\0';
+ 	if (stbuf[i].st_fname) stbuf[i].st_fname[0] = '\0';
+ 	stnq--;
+ }
+ 
  /*
  **  Send a line to the server, adding the dot escape and \r\n.
  */
  STATIC BOOL
! REMwrite(p, i, escdot)
      register char	*p;
      register int	i;
+     register BOOL	escdot;
  {
      static char		HDR[] = "Content-Transfer-Encoding:";
      static char		COD[] =
***************
*** 212,218 ****
          }
  
      /* Dot escape, text of the line, line terminator. */
!     if (*p == '.')
  	*REMbuffptr++ = '.';
      if (i > MEMCPY_THRESHOLD) {
  	(void)memcpy((POINTER)REMbuffptr, (POINTER)p, (SIZE_T)i);
--- 331,337 ----
          }
  
      /* Dot escape, text of the line, line terminator. */
!     if (escdot && (*p == '.'))
  	*REMbuffptr++ = '.';
      if (i > MEMCPY_THRESHOLD) {
  	(void)memcpy((POINTER)REMbuffptr, (POINTER)p, (SIZE_T)i);
***************
*** 304,310 ****
      double		systime;
  
      if (!Purging) {
! 	(void)REMwrite(QUIT, STRLEN(QUIT));
  	(void)REMflush();
      }
      (void)GetTimeInfo(&Now);
--- 423,429 ----
      double		systime;
  
      if (!Purging) {
! 	(void)REMwrite(QUIT, STRLEN(QUIT), FALSE);
  	(void)REMflush();
      }
      (void)GetTimeInfo(&Now);
***************
*** 342,348 ****
  CloseAndRename()
  {
      /* Close the files, rename the temporary. */
!     QIOclose(BATCHqp);
      if (ferror(BATCHfp)
       || fflush(BATCHfp) == EOF
       || fclose(BATCHfp) == EOF) {
--- 461,470 ----
  CloseAndRename()
  {
      /* Close the files, rename the temporary. */
!     if (BATCHqp) {
! 	QIOclose(BATCHqp);
! 	BATCHqp = NULL;
!     }
      if (ferror(BATCHfp)
       || fflush(BATCHfp) == EOF
       || fclose(BATCHfp) == EOF) {
***************
*** 411,442 ****
      }
  
      (void)fprintf(stderr, "Rewriting batch file and exiting.\n");
      Requeue(Article, MessageID);
  
!     for ( ; ; ) {
! 	if ((p = QIOread(BATCHqp)) == NULL) {
! 	    if (QIOerror(BATCHqp)) {
! 		(void)fprintf(stderr, "Can't read \"%s\", %s\n",
! 			BATCHname, strerror(errno));
! 		ExitWithStats(1);
  	    }
- 	    if (QIOtoolong(BATCHqp)) {
- 		(void)fprintf(stderr, "Skipping long line in \"%s\".\n",
- 			BATCHname);
- 		(void)QIOread(BATCHqp);
- 		continue;
- 	    }
  
! 	    /* Normal EOF. */
! 	    break;
  	}
- 
- 	if (fprintf(BATCHfp, "%s\n", p) == EOF
- 	 || ferror(BATCHfp)) {
- 	    (void)fprintf(stderr, "Can't requeue \"%s\", %s\n",
- 		    p, strerror(errno));
- 	    ExitWithStats(1);
- 	}
      }
  
      CloseAndRename();
--- 533,580 ----
      }
  
      (void)fprintf(stderr, "Rewriting batch file and exiting.\n");
+     if (CanStream) {	/* streaming mode has a buffer of articles */
+ 	register int i;
+ 
+ 	for (i = 0; i < STNBUF; i++) {    /* requeue unacknowledged articles */
+ 	    if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
+ 		if (Debug)
+ 		    (void)fprintf(stderr, "stbuf[%d]= %s, %s\n",
+ 			    i, stbuf[i].st_fname, stbuf[i].st_id);
+ 		Requeue(stbuf[i].st_fname, stbuf[i].st_id);
+ 		if (Article == stbuf[i].st_fname) Article = NULL;
+ 		strel(i); /* release entry */
+ 	    }
+ 	}
+     }
      Requeue(Article, MessageID);
  
!     if (BATCHqp) {
! 	for ( ; ; ) {
! 	    if ((p = QIOread(BATCHqp)) == NULL) {
! 		if (QIOtoolong(BATCHqp)) {
! 		    (void)fprintf(stderr, "Skipping long line in \"%s\".\n",
! 			    BATCHname);
! 		    (void)QIOread(BATCHqp);
! 		    continue;
! 		}
! 		if (QIOerror(BATCHqp)) {
! 		    (void)fprintf(stderr, "Can't read \"%s\", %s\n",
! 			    BATCHname, strerror(errno));
! 		    ExitWithStats(1);
! 		}
! 
! 		/* Normal EOF. */
! 		break;
  	    }
  
! 	    if (fprintf(BATCHfp, "%s\n", p) == EOF
! 	     || ferror(BATCHfp)) {
! 		(void)fprintf(stderr, "Can't requeue \"%s\", %s\n",
! 			p, strerror(errno));
! 		ExitWithStats(1);
! 	    }
  	}
      }
  
      CloseAndRename();
***************
*** 569,584 ****
  
      for (InHeaders = TRUE; ; ) {
  	if ((p = QIOread(qp)) == NULL) {
  	    if (QIOerror(qp)) {
  		(void)fprintf(stderr, "Can't read \"%s\", %s\n",
  			Article, strerror(errno));
  		return FALSE;
  	    }
- 	    if (QIOtoolong(qp)) {
- 		(void)fprintf(stderr, "Line too long in \"%s\"\n", Article);
- 		(void)QIOread(BATCHqp);
- 		continue;
- 	    }
  
  	    /* Normal EOF. */
  	    break;
--- 707,722 ----
  
      for (InHeaders = TRUE; ; ) {
  	if ((p = QIOread(qp)) == NULL) {
+ 	    if (QIOtoolong(qp)) {
+ 		(void)fprintf(stderr, "Line too long in \"%s\"\n", Article);
+ 		(void)QIOread(qp);
+ 		continue;
+ 	    }
  	    if (QIOerror(qp)) {
  		(void)fprintf(stderr, "Can't read \"%s\", %s\n",
  			Article, strerror(errno));
  		return FALSE;
  	    }
  
  	    /* Normal EOF. */
  	    break;
***************
*** 587,593 ****
  	    InHeaders = FALSE;
  
  	if (InHeaders || MimeArticle == MTnotmime) {
! 	    if (!REMwrite(p, QIOlength(qp))) {
  	        (void)fprintf(stderr, "Can't send \"%s\", %s\n",
  		        Article, strerror(errno));
  	        return FALSE;
--- 725,731 ----
  	    InHeaders = FALSE;
  
  	if (InHeaders || MimeArticle == MTnotmime) {
! 	    if (!REMwrite(p, QIOlength(qp), TRUE)) {
  	        (void)fprintf(stderr, "Can't send \"%s\", %s\n",
  		        Article, strerror(errno));
  	        return FALSE;
***************
*** 612,619 ****
  	if (GotInterrupt)
  	    Interrupted(Article, MessageID);
      }
!     if (!REMflush()) {
! 	(void)fprintf(stderr, "Can't end \"%s\", %s\n",
  		Article, strerror(errno));
  	return FALSE;
      }
--- 750,758 ----
  	if (GotInterrupt)
  	    Interrupted(Article, MessageID);
      }
!     /* Write the terminator. */
!     if (!REMwrite(".", 1, FALSE)) {
! 	(void)fprintf(stderr, "Can't send \"%s\", %s\n",
  		Article, strerror(errno));
  	return FALSE;
      }
***************
*** 620,637 ****
      if (Debug)
  	(void)fprintf(stderr, "> [ article ]%s\n",
  	     MimeArticle == MTnotmime ? "" : " (Mime: quoted-printable)");
- 
-     /* Write the terminator. */
-     if (write(ToServer, TERM, STRLEN(TERM)) != STRLEN(TERM)) {
- 	(void)fprintf(stderr, "Can't end \"%s\", %s\n",
- 		Article, strerror(errno));
- 	return FALSE;
-     }
      if (GotInterrupt)
  	Interrupted(Article, MessageID);
      if (Debug)
  	(void)fprintf(stderr, "> .\n");
  
      /* What did the remote site say? */
      if (!REMread(buff, (int)sizeof buff)) {
  	(void)fprintf(stderr, "No reply after sending \"%s\", %s\n",
--- 759,771 ----
      if (Debug)
  	(void)fprintf(stderr, "> [ article ]%s\n",
  	     MimeArticle == MTnotmime ? "" : " (Mime: quoted-printable)");
      if (GotInterrupt)
  	Interrupted(Article, MessageID);
      if (Debug)
  	(void)fprintf(stderr, "> .\n");
  
+     if (CanStream) return TRUE;	/* streaming mode does not wait for ACK */
+ 
      /* What did the remote site say? */
      if (!REMread(buff, (int)sizeof buff)) {
  	(void)fprintf(stderr, "No reply after sending \"%s\", %s\n",
***************
*** 773,779 ****
--- 907,1095 ----
  	longjmp(JMPwhere, 1);
  }
  
+ /* check articles in streaming NNTP mode
+ ** return TRUE on failure.
+ */
+ STATIC BOOL
+ check(i)
+     int	i; /* index of stbuf to send check for */
+ {
+     char	buff[NNTP_STRLEN];
  
+     /* send "check <ID>" to the other system */
+     (void)sprintf(buff, "check %s", stbuf[i].st_id);
+     if (!REMwrite(buff, (int)strlen(buff), FALSE)) {
+ 	(void)fprintf(stderr, "Can't check article, %s\n",
+ 		strerror(errno));
+ 	return TRUE;
+     }
+     STAToffered++;
+     if (stahead > 0) stahead--;
+     if (Debug) {
+ 	if (stbuf[i].st_retry)
+ 	    (void)fprintf(stderr, "> %s (retry %d)\n", buff, stbuf[i].st_retry);
+ 	else
+ 	    (void)fprintf(stderr, "> %s\n", buff);
+     }
+     if (GotInterrupt)
+ 	Interrupted(stbuf[i].st_fname, stbuf[i].st_id);
+ 
+     /* That all.  Response is checked later by strlisten() */
+     return FALSE;
+ }
+ 
+ /* Send article in "takethis <id> streaming NNTP mode.
+ ** return TRUE on failure.
+ */
+ STATIC BOOL
+ takethis(i)
+     int i;	/* index to stbuf to be sent */
+ {
+     char	buff[NNTP_STRLEN];
+ 
+     if (!stbuf[i].st_qp) { /* should already be open but ... */
+ 	/* Open the article. */
+ 	if (!(stbuf[i].st_qp = QIOopen(stbuf[i].st_fname, QIO_BUFFER))) {
+ 	    /* can not open it. Should check AltPath */
+ 	    if (AltPath && (*(stbuf[i].st_fname) != '/')) {
+ 		(void)sprintf(AltPath, "%s/%s", AltSpool, stbuf[i].st_fname);
+ 		stbuf[i].st_qp = QIOopen(AltPath, QIO_BUFFER);
+ 	    }
+ 	    if (!(stbuf[i].st_qp)) {
+ 		strel(i);
+ 		return FALSE; /* Not an error. Could be canceled or expired */
+ 	    }
+ 	}
+     }
+     /* send "takethis <ID>" to the other system */
+     (void)sprintf(buff, "takethis %s", stbuf[i].st_id);
+     if (!REMwrite(buff, (int)strlen(buff), FALSE)) {
+ 	(void)fprintf(stderr, "Can't send takethis <id>, %s\n",
+ 		strerror(errno));
+ 	return TRUE;
+     }
+     if (Debug)
+ 	(void)fprintf(stderr, "> %s\n", buff);
+     if (GotInterrupt)
+ 	Interrupted((char *)0, (char *)0);
+     if (!REMsendarticle(stbuf[i].st_fname, stbuf[i].st_id,
+ 	    stbuf[i].st_qp))
+ 	return TRUE;
+     QIOclose(stbuf[i].st_qp);	/* should not need file again */
+     stbuf[i].st_qp = 0;		/* so close to free descriptor */
+     stbuf[i].st_age = 0;
+     if (stahead < STNBUFL) stahead++;
+     /* That all.  Response is checked later by strlisten() */
+     return FALSE;
+ }
+ 
+ 
+ /* listen for responses.  Process acknowledgments to remove items from
+ ** the queue.  Also sends the articles on request.  Returns TRUE on error.
+ ** return TRUE on failure.
+ */
+ STATIC BOOL
+ strlisten()
+ {
+     int		resp;
+     int		i;
+     char	*id, *p;
+     char	buff[NNTP_STRLEN];
+ 
+     while(TRUE) {
+ 	if (!REMread(buff, (int)sizeof buff)) {
+ 	    (void)fprintf(stderr, "No reply to check, %s\n", strerror(errno));
+ 	    return TRUE;
+ 	}
+ 	if (GotInterrupt)
+ 	    Interrupted((char *)0, (char *)0);
+ 	if (Debug)
+ 	    (void)fprintf(stderr, "< %s", buff);
+ 
+ 	/* Parse the reply. */
+ 	resp =  atoi(buff);
+ 	/* Skip the 1XX informational messages */
+ 	if ((resp >= 100) && (resp < 200)) continue;
+ 	switch (resp) { /* first time is to verify it */
+ 	case NNTP_ERR_GOTID_VAL:
+ 	case NNTP_OK_SENDID_VAL:
+ 	case NNTP_OK_RECID_VAL:
+ 	case NNTP_ERR_FAILID_VAL:
+ 	case NNTP_RESENDID_VAL:
+ 	    if (id = strchr(buff, '<')) {
+ 		p = strchr(id, '>');
+ 		if (p) *(p+1) = '\0';
+ 		for (i = 0; i < STNBUF; i++) { /* linear search for ID */
+ 		    if ((stbuf[i].st_id) && (stbuf[i].st_id[0])
+ 		     && (0 == strcasecmp(id, stbuf[i].st_id))) {
+ 			register int n;
+ 
+ 			for (n = 0; (id[n] != '@') && (id[n] != '\0'); n++) ;
+ 			/* left of '@' is case sensitive */
+ 			if (strncmp(id, stbuf[i].st_id, n)) continue;
+ 			else break;	/* found a match */
+ 		    }
+ 		}
+ 		if (i >= STNBUF) { /* should not happen */
+ 		    syslog(L_NOTICE, CANT_FINDIT, REMhost, REMclean(buff));
+ 		    return (TRUE); /* can't find it! */
+ 		}
+ 	    } else {
+ 		syslog(L_NOTICE, CANT_PARSEIT, REMhost, REMclean(buff));
+ 		return (TRUE);
+ 	    }
+ 	    break;
+ 	case NNTP_GOODBYE_VAL:
+ 	    /* Most likely out of space -- no point in continuing. */
+ 	    syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff));
+ 	    return TRUE;
+ 	    /* NOTREACHED */
+ 	default:
+ 	    syslog(L_NOTICE, UNEXPECTED, REMhost, REMclean(buff));
+ 	    if (Debug)
+ 		(void)fprintf(stderr, "Unknown reply \"%s\"",
+ 						    buff);
+ 	    return (TRUE);
+ 	}
+ 	switch (resp) { /* now we take some action */
+ 	case NNTP_RESENDID_VAL:	/* remote wants it later */
+ 	    /* try again now because time has passed */
+ 	    if (stbuf[i].st_retry < STNRETRY) {
+ 		if (check(i)) return TRUE;
+ 		stbuf[i].st_retry++;
+ 		stbuf[i].st_age = 0;
+ 	    } else { /* requeue to disk for later */
+ 		Requeue(stbuf[i].st_fname, stbuf[i].st_id);
+ 		strel(i); /* release entry */
+ 	    }
+ 	    break;
+ 	case NNTP_ERR_GOTID_VAL:	/* remote doesn't want it */
+ 	    strel(i); /* release entry */
+ 	    STATrefused++;
+ 	    stnofail = 0;
+ 	    break;
+ 		
+ 	case NNTP_OK_SENDID_VAL:	/* remote wants article */
+ 	    if (takethis(i)) return TRUE;
+ 	    stnofail++;
+ 	    break;
+ 
+ 	case NNTP_OK_RECID_VAL:	/* remote received it OK */
+ 	    strel(i); /* release entry */
+ 	    STATaccepted++;
+ 	    break;
+ 		
+ 	case NNTP_ERR_FAILID_VAL:
+ 	    strel(i); /* release entry */
+ 	    STATrejected++;
+ 	    stnofail = 0;
+ 	    break;
+ 	}
+ 	break;
+     }
+     return (FALSE);
+ }
+ 
  /*
  **  Print a usage message and exit.
  */
***************
*** 781,787 ****
  Usage()
  {
      (void)fprintf(stderr,
! 	"Usage: innxmit [-a] [-d] [-M] [-p] [-r] [-S] [-t#] [-T#] host file\n");
      exit(1);
  }
  
--- 1097,1103 ----
  Usage()
  {
      (void)fprintf(stderr,
! 	"Usage: innxmit [-a] [-c] [-d] [-M] [-p] [-r] [-s] [-S] [-t#] [-T#] host file\n");
      exit(1);
  }
  
***************
*** 801,812 ****
      FILE		*From;
      FILE		*To;
      char		buff[NNTP_STRLEN];
-     char		*AltSpool;
      char		*Article;
      char		*ContentEncoding;
      char		*ContentType;
      char		*MessageID;
-     char		*AltPath;
      SIGHANDLER		(*old)();
      unsigned int	ConnectTimeout;
      unsigned int	TotalTimeout;
--- 1117,1126 ----
***************
*** 818,824 ****
      (void)umask(NEWSUMASK);
  
      /* Parse JCL. */
!     while ((i = getopt(ac, av, "A:adMprSt:T:v")) != EOF)
  	switch (i) {
  	default:
  	    Usage();
--- 1132,1138 ----
      (void)umask(NEWSUMASK);
  
      /* Parse JCL. */
!     while ((i = getopt(ac, av, "A:acdMprsSt:T:v")) != EOF)
  	switch (i) {
  	default:
  	    Usage();
***************
*** 830,835 ****
--- 1144,1152 ----
  	case 'a':
  	    AlwaysRewrite = TRUE;
  	    break;
+ 	case 'c':
+ 	    DoCheck = FALSE;
+ 	    break;
  	case 'd':
  	    Debug = TRUE;
  	    break;
***************
*** 843,848 ****
--- 1160,1168 ----
  	case 'r':
  	    DoRequeue = FALSE;
  	    break;
+ 	case 's':
+ 	    TryStream = FALSE;
+ 	    break;
  	case 'S':
  	    Slavish = TRUE;
  	    break;
***************
*** 957,962 ****
--- 1277,1325 ----
  	/* We no longer need standard I/O. */
  	FromServer = fileno(From);
  	ToServer = fileno(To);
+ 
+ 	if (TryStream) {
+ 	    if (!REMwrite(modestream, (int)strlen(modestream), FALSE)) {
+ 		(void)fprintf(stderr, "Can't negotiate %s, %s\n",
+ 			modestream, strerror(errno));
+ 	    }
+ 	    if (Debug)
+ 		(void)fprintf(stderr, ">%s\n", modestream);
+ 	    /* Does he understand mode stream? */
+ 	    if (!REMread(buff, (int)sizeof buff)) {
+ 		(void)fprintf(stderr, "No reply to %s, %s\n",
+ 				modestream, strerror(errno));
+ 	    } else {
+ 		if (Debug)
+ 		    (void)fprintf(stderr, "< %s", buff);
+ 
+ 		/* Parse the reply. */
+ 		switch (atoi(buff)) {
+ 		default:
+ 		    (void)fprintf(stderr, "Unknown reply to \"%s\" -- %s",
+ 			    modestream, buff);
+ 		    CanStream = FALSE;
+ 		    break;
+ 		case NNTP_OK_STREAM_VAL:	/* YES! */
+ 		    CanStream = TRUE;
+ 		    break;
+ 		case NNTP_BAD_COMMAND_VAL: /* normal refusal */
+ 		    CanStream = FALSE;
+ 		    break;
+ 		}
+ 	    }
+ 	    if (CanStream) {
+ 		int i;
+ 
+ 		for (i = 0; i < STNBUF; i++) { /* reset buffers */
+ 		    stbuf[i].st_fname = 0;
+ 		    stbuf[i].st_id = 0;
+ 		    stbuf[i].st_qp = 0;
+ 		}
+ 		stnq = 0;
+ 		stahead = 0;
+ 	    }
+ 	}
      }
  
      /* Set up signal handlers. */
***************
*** 982,992 ****
  	    Interrupted(Article, MessageID);
  
  	if ((Article = QIOread(BATCHqp)) == NULL) {
- 	    if (QIOerror(BATCHqp)) {
- 		(void)fprintf(stderr, "Can't read \"%s\", %s\n",
- 			BATCHname, strerror(errno));
- 		ExitWithStats(1);
- 	    }
  	    if (QIOtoolong(BATCHqp)) {
  		(void)fprintf(stderr, "Skipping long line in \"%s\"\n",
  			BATCHname);
--- 1345,1350 ----
***************
*** 993,1001 ****
--- 1351,1365 ----
  		(void)QIOread(BATCHqp);
  		continue;
  	    }
+ 	    if (QIOerror(BATCHqp)) {
+ 		(void)fprintf(stderr, "Can't read \"%s\", %s\n",
+ 			BATCHname, strerror(errno));
+ 		ExitWithStats(1);
+ 	    }
  
  	    /* Normal EOF -- we're done. */
  	    QIOclose(BATCHqp);
+ 	    BATCHqp = NULL;
  	    break;
  	}
  
***************
*** 1100,1107 ****
  	    Interrupted(Article, MessageID);
  
  	/* Offer the article. */
  	(void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID);
! 	if (!REMwrite(buff, (int)strlen(buff))) {
  	    (void)fprintf(stderr, "Can't offer article, %s\n",
  		    strerror(errno));
  	    QIOclose(qp);
--- 1464,1519 ----
  	    Interrupted(Article, MessageID);
  
  	/* Offer the article. */
+ 	if (CanStream) {
+ 	    int i;
+ 
+ 	    if (stisdup(MessageID)) {	/* skip duplicates in queue */
+ 		if (Debug)
+ 		    (void)fprintf(stderr, "Skipping duplicate ID %s\n",
+ 							    MessageID);
+ 		QIOclose(qp);
+ 		continue;
+ 	    }
+ 	    while (stnq >= STNBUFL) { /* need to empty a buffer */
+ 		if (strlisten()) {
+ 		    RequeueRestAndExit(Article, MessageID);
+ 		}
+ 		if ((stahead > 0) && (stnq < STNBUF)) break;
+ 	    }
+ 	    /* save new article in the buffer */
+ 	    i = stalloc(Article, MessageID, qp);
+ 	    if (i < 0) {
+ 		QIOclose(qp);
+ 		RequeueRestAndExit(Article, MessageID);
+ 	    }
+ 	    if (DoCheck && (stnofail < STNC)) {
+ 		if (check(i)) {
+ 		    RequeueRestAndExit((char *)NULL, (char *)NULL);
+ 		}
+ 	    } else {
+ 		if (takethis(i)) {
+ 		    RequeueRestAndExit((char *)NULL, (char *)NULL);
+ 		}
+ 	    }
+ 	    for (i = 0; i < STNBUF; i++) {
+ 		if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) {
+ 		    if (stbuf[i].st_age++ > stnq) {
+ 			/* This should not happen but just in case ... */
+ 			if (stbuf[i].st_retry < STNRETRY) {
+ 			    if (check(i)) return TRUE; /* resend check */
+ 			    stbuf[i].st_retry++;
+ 			    stbuf[i].st_age = 0;
+ 			} else { /* requeue to disk for later */
+ 			    Requeue(stbuf[i].st_fname, stbuf[i].st_id);
+ 			    strel(i); /* release entry */
+ 			}
+ 		    }
+ 		}
+ 	    }
+ 	    continue; /* next article */
+ 	}
  	(void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID);
! 	if (!REMwrite(buff, (int)strlen(buff), FALSE)) {
  	    (void)fprintf(stderr, "Can't offer article, %s\n",
  		    strerror(errno));
  	    QIOclose(qp);
***************
*** 1154,1159 ****
--- 1566,1578 ----
  	}
  
  	QIOclose(qp);
+     }
+     if (CanStream) { /* need to wait for rest of ACKs */
+ 	while (stnq > 0) {
+ 	    if (strlisten()) {
+ 		RequeueRestAndExit((char *)NULL, (char *)NULL);
+ 	    }
+ 	}
      }
  
      if (BATCHfp != NULL)
*** innd/nc.c.orig	Thu Mar 18 13:04:28 1993
--- innd/nc.c	Wed Apr 26 16:25:29 1995
***************
*** 6,11 ****
--- 6,13 ----
  #include "innd.h"
  #include "dbz.h"
  
+ /* for debugging the "mode stream" code */
+ #undef STR_DEBUG
  
  #define BAD_COMMAND_COUNT	10
  #define WIP_CHECK		(1 * 60)
***************
*** 52,57 ****
--- 54,62 ----
  static FUNCTYPE	NCxpath();
  static FUNCTYPE	NCxreplic();
  static FUNCTYPE	NC_unimp();
+ /* new modules for streaming */
+ static FUNCTYPE	NCcheck();
+ static FUNCTYPE	NCtakethis();
  
  STATIC int		NCcount;	/* Number of open connections	*/
  STATIC int		NCwipsize;	/* Size of NCwip array		*/
***************
*** 67,72 ****
--- 72,79 ----
      {	"authinfo",	NCauthinfo },
      {	"help",		NChelp	},
      {	"ihave",	NCihave	},
+     {	"check",	NCcheck	},
+     {	"takethis",	NCtakethis },
      {	"list",		NClist	},
      {	"mode",		NCmode	},
      {	"quit",		NCquit	},
***************
*** 131,136 ****
--- 138,179 ----
  
  
  /*
+ **  Write an NNTP reply message.
+ **  Call only when we will stay in NCreader mode.
+ **  Tries to do the actual write if it will not block.
+ */
+ STATIC void
+ NCwritereply(cp, text)
+     CHANNEL	*cp;
+     char	*text;
+ {
+     register BUFFER	*bp;
+     register int	i;
+ 
+     bp = &cp->Out;
+     i = bp->Left;
+     WCHANappend(cp, text, (int)strlen(text));	/* text in buffer */
+     WCHANappend(cp, NCterm, STRLEN(NCterm));	/* add CR NL to text */
+     if (i == 0) {	/* if only data then try to write directly */
+ 	i = write(cp->fd, &bp->Data[bp->Used], bp->Left);
+ #ifdef STR_DEBUG
+ 	syslog(L_NOTICE, "%s NCwritereply %d=write(%d, \"%.15s\", %d)",
+ 	    CHANname(cp), i, cp->fd,  &bp->Data[bp->Used], bp->Left);
+ #endif STR_DEBUG
+ 	if (i > 0) bp->Used += i;
+ 	if (bp->Used == bp->Left) bp->Used = bp->Left = 0;
+ 	else i = 0;
+     } else i = 0;
+     if (i <= 0) {	/* write failed, queue it for later */
+ 	RCHANremove(cp);
+ 	WCHANadd(cp);
+     }
+     if (Tracing || cp->Tracing)
+ 	syslog(L_TRACE, "%s > %s", CHANname(cp), text);
+ }
+ 
+ 
+ /*
  **  Tell the NNTP channel to go away.
  */
  STATIC void
***************
*** 188,197 ****
      case OMrunning:
  	wp = &NCwip[cp->fd];
  	response = ARTpost(cp, AmSlave ? &wp->Replic : NULL, wp->MessageID);
! 	if (atoi(response) == NNTP_TOOKIT_VAL)
  	    cp->Received++;
! 	else
  	    cp->Rejected++;
  	cp->Reported++;
  	if (cp->Reported >= NNTP_ACTIVITY_SYNC) {
  	    syslog(L_NOTICE,
--- 231,250 ----
      case OMrunning:
  	wp = &NCwip[cp->fd];
  	response = ARTpost(cp, AmSlave ? &wp->Replic : NULL, wp->MessageID);
! 	if (atoi(response) == NNTP_TOOKIT_VAL) {
  	    cp->Received++;
! 	    if (cp->Sendid.Size > 3) { /* We be streaming */
! 		char buff[4];
! 		(void)sprintf(buff, "%d", NNTP_OK_RECID_VAL);
! 		cp->Sendid.Data[0] = buff[0];
! 		cp->Sendid.Data[1] = buff[1];
! 		cp->Sendid.Data[2] = buff[2];
! 		response = cp->Sendid.Data;
! 	    }
! 	} else {
  	    cp->Rejected++;
+ 	    if (cp->Sendid.Size) response = cp->Sendid.Data;
+ 	}
  	cp->Reported++;
  	if (cp->Reported >= NNTP_ACTIVITY_SYNC) {
  	    syslog(L_NOTICE,
***************
*** 200,206 ****
  		cp->Received, cp->Refused, cp->Rejected);
  	    cp->Reported = 0;
  	}
! 	NCwritetext(cp, response);
  	cp->State = CSgetcmd;
  	break;
  
--- 253,259 ----
  		cp->Received, cp->Refused, cp->Rejected);
  	    cp->Reported = 0;
  	}
! 	NCwritereply(cp, response);
  	cp->State = CSgetcmd;
  	break;
  
***************
*** 524,534 ****
  
      if (HIShavearticle(p)) {
  	cp->Refused++;
! 	NCwritetext(cp, NNTP_HAVEIT);
      }
      else if (NCinprogress(cp, p, &who)) {
  #if	defined(NNTP_RESENDIT_LATER)
! 	NCwritetext(cp, NNTP_RESENDIT_LATER);
  #else
  	/* Somebody else is sending it to us; wait until they're done. */
  	who->Wanted = TRUE;
--- 577,587 ----
  
      if (HIShavearticle(p)) {
  	cp->Refused++;
! 	NCwritereply(cp, NNTP_HAVEIT);
      }
      else if (NCinprogress(cp, p, &who)) {
  #if	defined(NNTP_RESENDIT_LATER)
! 	NCwritereply(cp, NNTP_RESENDIT_LATER);
  #else
  	/* Somebody else is sending it to us; wait until they're done. */
  	who->Wanted = TRUE;
***************
*** 539,545 ****
  #endif	/* defined(NNTP_RESENDIT_LATER) */
      }
      else {
! 	NCwritetext(cp, NNTP_SENDIT);
  	cp->State = CSgetarticle;
      }
  }
--- 592,598 ----
  #endif	/* defined(NNTP_RESENDIT_LATER) */
      }
      else {
! 	NCwritereply(cp, NNTP_SENDIT);
  	cp->State = CSgetarticle;
      }
  }
***************
*** 606,612 ****
  	h = HOnnrpd;
      else if (caseEQ(p, "query"))
  	h = HOnnrqd;
!     else {
  	NCwritetext(cp, NCbadcommand);
  	return;
      }
--- 659,670 ----
  	h = HOnnrpd;
      else if (caseEQ(p, "query"))
  	h = HOnnrqd;
!     else if (caseEQ(p, "stream")) {
! 	char buff[16];
! 	(void)sprintf(buff, "%d StreamOK.", NNTP_OK_STREAM_VAL);
! 	NCwritetext(cp, buff);
! 	return;
!     } else {
  	NCwritetext(cp, NCbadcommand);
  	return;
      }
***************
*** 779,785 ****
--- 837,849 ----
      char		buff[SMBUF];
      char		*av[2];
      int			i;
+     int			rest;
+     int			SaveUsed;
  
+ #ifdef STR_DEBUG
+     syslog(L_NOTICE, "%s NCreader Used=%d",
+ 	CHANname(cp), cp->In.Used);
+ #endif STR_DEBUG
      /* Read any data that's there; ignore errors (retry next time it's our
       * turn) and if we got nothing, then it's EOF so mark it closed. */
      if ((i = CHANreadtext(cp)) < 0) {
***************
*** 801,946 ****
  
      bp = &cp->In;
      p = &bp->Data[bp->Used];
!     switch (cp->State) {
!     default:
! 	syslog(L_ERROR, "%s internal NCreader state %d",
! 	    CHANname(cp), cp->State);
! 	break;
! 
!     case CSgetcmd:
!     case CSgetauth:
! 	/* Did we get the whole command, terminated with "\r\n"? */
! 	if (bp->Used < 2 || p[-2] != '\r' || p[-1] != '\n')
  	    break;
- 	p[-2] = '\0';
- 	bp->Used -= 2;
  
! 	/* Ignore blank lines. */
! 	if (bp->Used == 0)
! 	    break;
! 	if (Tracing || cp->Tracing)
! 	    syslog(L_TRACE, "%s < %s", CHANname(cp), bp->Data);
  
! 	/* We got something -- stop sleeping (in case we were). */
! 	SCHANremove(cp);
! 	if (cp->Argument != NULL) {
! 	    DISPOSE(cp->Argument);
! 	    cp->Argument = NULL;
! 	}
  
! 	if (cp->State == CSgetauth) {
! 	    if (caseEQn(bp->Data, "mode", 4))
! 		NCmode(cp);
! 	    else
! 		NCauthinfo(cp);
! 	    break;
! 	}
  
! 	/* Loop through the command table. */
! 	for (p = bp->Data, dp = NCcommands; dp < ENDOF(NCcommands); dp++)
! 	    if (caseEQn(p, dp->Name, dp->Size)) {
! 		(*dp->Function)(cp);
! 		cp->BadCommands = 0;
  		break;
  	    }
! 	if (dp == ENDOF(NCcommands)) {
! 	    NCwritetext(cp, NCbadcommand);
! 	    if (++(cp->BadCommands) >= BAD_COMMAND_COUNT)
! 		cp->State = CSwritegoodbye;
! 	    for (i = 0; (p = NCquietlist[i]) != NULL; i++)
! 		if (caseEQ(p, dp->Name))
  		    break;
! 	    if (p == NULL)
! 		syslog(L_NOTICE, "%s bad_command %s",
! 		    CHANname(cp), MaxLength(bp->Data, bp->Data));
! 	}
! 	break;
  
!     case CSgetarticle:
!     case CSgetrep:
! 	/* Reading an article; look for "\r\n.\r\n" terminator. */
! 	if (!ART_EOF(bp->Used, p)) {
  	    /* Check for the null article. */
! 	    if (bp->Used == 3
! 	     && p[-3] == '.' && p[-2] == '\r' && p[-1] == '\n') {
  		cp->Rejected++;
! 		NCwritetext(cp, NNTP_REJECTIT_EMPTY);
  		cp->State = CSgetcmd;
  		bp->Used = 0;
  
  		/* Clear the work-in-progress entry. */
  		NCclearwip(wp);
  	    }
! 
! 	    /* Check for big articles. */
! 	    if (LargestArticle > SAVE_AMT && bp->Used > LargestArticle) {
! 		/* Make some room, saving only the last few bytes. */
! 		for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++)
! 		    p[0] = p[bp->Used - SAVE_AMT];
! 		wp->Size += bp->Used - SAVE_AMT;
! 		bp->Used = SAVE_AMT;
! 		cp->State = CSeatarticle;
  	    }
! 	    break;
! 	}
  
! 	/* Strip article terminator and post the article. */
! 	p[-3] = '\0';
! 	bp->Used -= 3;
! 	SCHANremove(cp);
! 	if (cp->Argument != NULL) {
! 	    DISPOSE(cp->Argument);
! 	    cp->Argument = NULL;
! 	}
! 	NCclean(bp);
! 	NCpostit(cp);
! 	break;
! 
!     case CSeatarticle:
! 	/* Eat the article and then complain that it was too large */
! 	if (ART_EOF(bp->Used, p)) {
! 	    /* Reached the end of the article. */
  	    SCHANremove(cp);
  	    if (cp->Argument != NULL) {
  		DISPOSE(cp->Argument);
  		cp->Argument = NULL;
  	    }
! 	    p = wp->MessageID;
! 	    i = wp->Size + bp->Used;
! 	    syslog(L_ERROR, "%s internal rejecting huge article %s (%d > %d)",
! 		CHANname(cp), p ? p : "(null)", i, LargestArticle);
! 	    (void)sprintf(buff, "%d Article exceeds local limit of %ld bytes",
! 		    NNTP_REJECTIT_VAL, LargestArticle);
! 	    NCwritetext(cp, buff);
  	    cp->State = CSgetcmd;
! 	    cp->Rejected++;
  
! 	    /* Write a local cancel entry so nobody else gives it to us. */
! 	    if (p) {
! 		av[0] = p;
! 		av[1] = NULL;
! 		if ((q = CCcancel(av)) != NULL)
! 		    syslog(L_ERROR, "%s cant cancel %s %s", LogName, av[0], q);
  	    }
  
! 	    /* Clear the work-in-progress entry. */
! 	    NCclearwip(wp);
  
! 	    /* Reset input buffer to the default size; don't let realloc
! 	     * be lazy. */
! 	    DISPOSE(bp->Data);
! 	    bp->Size = START_BUFF_SIZE;
! 	    bp->Used = 0;
! 	    bp->Data = NEW(char, bp->Size);
  	}
! 	else if (bp->Used > 8 * 1024) {
! 	    /* Make some room; save the last few bytes of the article */
! 	    for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++)
! 		p[0] = p[bp->Used - SAVE_AMT + 0];
! 	    wp->Size += bp->Used - SAVE_AMT;
! 	    bp->Used = SAVE_AMT;
! 	}
! 	break;
      }
  }
  
--- 865,1080 ----
  
      bp = &cp->In;
      p = &bp->Data[bp->Used];
!     rest = SaveUsed = bp->Used;
!     for ( ; ; ) {
! #ifdef STR_DEBUG
! 	if (bp->Used > 15)
! 	    syslog(L_NOTICE, "%s NCreader state=%d next \"%.15s\"",
! 		CHANname(cp), cp->State, bp->Data);
! #endif STR_DEBUG
! 	switch (cp->State) {
! 	default:
! 	    syslog(L_ERROR, "%s internal NCreader state %d",
! 		CHANname(cp), cp->State);
  	    break;
  
! 	case CSgetcmd:
! 	case CSgetauth:
! 	    /* Did we get the whole command, terminated with "\r\n"? */
! 	    for (i = 0; (i < bp->Used) && (bp->Data[i] != '\n'); i++) ;
! 	    if (i < bp->Used) rest = bp->Used = ++i;
! 	    else {
! 		rest = 0;
! 		break;	/* come back later for rest of line */
! 	    }
! 	    p = &bp->Data[rest];
! 	    if (rest < 2 || p[-2] != '\r' || p[-1] != '\n')
! 		break;
! 	    p[-2] = '\0';
! 	    bp->Used -= 2;
  
! 	    /* Ignore blank lines. */
! 	    if (bp->Data[0] == '\0')
! 		break;
! 	    if (Tracing || cp->Tracing)
! 		syslog(L_TRACE, "%s < %s", CHANname(cp), bp->Data);
  
! 	    /* We got something -- stop sleeping (in case we were). */
! 	    SCHANremove(cp);
! 	    if (cp->Argument != NULL) {
! 		DISPOSE(cp->Argument);
! 		cp->Argument = NULL;
! 	    }
  
! 	    if (cp->State == CSgetauth) {
! 		if (caseEQn(bp->Data, "mode", 4))
! 		    NCmode(cp);
! 		else
! 		    NCauthinfo(cp);
  		break;
  	    }
! 
! 	    /* Loop through the command table. */
! 	    for (p = bp->Data, dp = NCcommands; dp < ENDOF(NCcommands); dp++)
! 		if (caseEQn(p, dp->Name, dp->Size)) {
! 		    (*dp->Function)(cp);
! 		    cp->BadCommands = 0;
  		    break;
! 		}
! 	    if (dp == ENDOF(NCcommands)) {
! 		NCwritetext(cp, NCbadcommand);
! 		if (++(cp->BadCommands) >= BAD_COMMAND_COUNT) {
! 		    cp->State = CSwritegoodbye;
! 		    rest = SaveUsed;
! 		}
! 		for (i = 0; (p = NCquietlist[i]) != NULL; i++)
! 		    if (caseEQ(p, dp->Name))
! 			break;
! 		if (p == NULL)
! 		    syslog(L_NOTICE, "%s bad_command %s",
! 			CHANname(cp), MaxLength(bp->Data, bp->Data));
! 	    }
! 	    break;
  
! 	case CSgetarticle:
! 	case CSgetrep:
  	    /* Check for the null article. */
! 	    if ((bp->Used >= 3) && (bp->Data[0] == '.')
! 	     && (bp->Data[1] == '\r') && (bp->Data[2] == '\n')) {
! 		rest = 3;	/* null article (canceled?) */
  		cp->Rejected++;
! 		if (cp->Sendid.Size > 3) { /* We be streaming */
! 		    char buff[4];
! 		    (void)sprintf(buff, "%d", NNTP_ERR_FAILID_VAL);
! 		    cp->Sendid.Data[0] = buff[0];
! 		    cp->Sendid.Data[1] = buff[1];
! 		    cp->Sendid.Data[2] = buff[2];
! 		    NCwritereply(cp, cp->Sendid.Data);
! 		}
! 		else NCwritetext(cp, NNTP_REJECTIT_EMPTY);
  		cp->State = CSgetcmd;
  		bp->Used = 0;
  
  		/* Clear the work-in-progress entry. */
  		NCclearwip(wp);
+ 		break;
  	    }
! 	    /* Reading an article; look for "\r\n.\r\n" terminator. */
! 	    for (i = 5; i <= bp->Used; i++) {
! 		if ((bp->Data[i - 5] == '\r')
! 		 && (bp->Data[i - 4] == '\n')
! 		 && (bp->Data[i - 3] == '.')
! 		 && (bp->Data[i - 2] == '\r')
! 		 && (bp->Data[i - 1] == '\n')) {
! 		    rest = bp->Used = i;
! 		    p = &bp->Data[i];
! 		    break;
! 		}
  	    }
! 	    if (i > bp->Used) {	/* did not find terminator */
! 		/* Check for big articles. */
! 		if (LargestArticle > SAVE_AMT && bp->Used > LargestArticle) {
! 		    /* Make some room, saving only the last few bytes. */
! 		    for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++)
! 			p[0] = p[bp->Used - SAVE_AMT];
! 		    wp->Size += bp->Used - SAVE_AMT;
! 		    bp->Used = SAVE_AMT;
! 		    cp->State = CSeatarticle;
! 		} else rest = 0;
! 		break;
! 	    }
  
! 	    /* Strip article terminator and post the article. */
! 	    p[-3] = '\0';
! 	    bp->Used -= 2;
  	    SCHANremove(cp);
  	    if (cp->Argument != NULL) {
  		DISPOSE(cp->Argument);
  		cp->Argument = NULL;
  	    }
! 	    NCclean(bp);
! 	    NCpostit(cp);
  	    cp->State = CSgetcmd;
! 	    break;
  
! 	case CSeatarticle:
! 	    /* Eat the article and then complain that it was too large */
! 	    /* Reading an article; look for "\r\n.\r\n" terminator. */
! 	    for (i = 5; i <= bp->Used; i++) {
! 		if ((bp->Data[i - 5] == '\r')
! 		 && (bp->Data[i - 4] == '\n')
! 		 && (bp->Data[i - 3] == '.')
! 		 && (bp->Data[i - 2] == '\r')
! 		 && (bp->Data[i - 1] == '\n')) {
! 		    rest = bp->Used = i;
! 		    p = &bp->Data[i];
! 		    break;
! 		}
  	    }
+ 	    if (i <= bp->Used) {	/* did find terminator */
+ 		/* Reached the end of the article. */
+ 		SCHANremove(cp);
+ 		if (cp->Argument != NULL) {
+ 		    DISPOSE(cp->Argument);
+ 		    cp->Argument = NULL;
+ 		}
+ 		p = wp->MessageID;
+ 		i = wp->Size + bp->Used;
+ 		syslog(L_ERROR, "%s internal rejecting huge article %s (%d > %d)",
+ 		    CHANname(cp), p ? p : "(null)", i, LargestArticle);
+ 		(void)sprintf(buff, "%d Article exceeds local limit of %ld bytes",
+ 			NNTP_REJECTIT_VAL, LargestArticle);
+ 		if (cp->Sendid.Size) NCwritetext(cp, cp->Sendid.Data);
+ 		else NCwritetext(cp, buff);
+ 		cp->State = CSgetcmd;
+ 		cp->Rejected++;
  
! 		/* Write a local cancel entry so nobody else gives it to us. */
! 		if (p) {
! 		    av[0] = p;
! 		    av[1] = NULL;
! 		    if ((q = CCcancel(av)) != NULL)
! 			syslog(L_ERROR, "%s cant cancel %s %s", LogName, av[0], q);
! 		}
  
! 		/* Clear the work-in-progress entry. */
! 		NCclearwip(wp);
! 
! 		/* Reset input buffer to the default size; don't let realloc
! 		 * be lazy. */
! 		DISPOSE(bp->Data);
! 		bp->Size = START_BUFF_SIZE;
! 		bp->Used = 0;
! 		bp->Data = NEW(char, bp->Size);
! 	    }
! 	    else if (bp->Used > 8 * 1024) {
! 		/* Make some room; save the last few bytes of the article */
! 		for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++)
! 		    p[0] = p[bp->Used - SAVE_AMT + 0];
! 		wp->Size += bp->Used - SAVE_AMT;
! 		bp->Used = SAVE_AMT;
! 	    }
! 	    break;
  	}
! #ifdef STR_DEBUG
! 	syslog(L_NOTICE, "%s NCreader rest=%d Used=%d SaveUsed=%d",
! 	    CHANname(cp), rest, bp->Used, SaveUsed);
! #endif STR_DEBUG
! 	if (rest > 0) {
! 	    if (rest < SaveUsed) { /* more commands in buffer */
! 		bp->Used = SaveUsed = SaveUsed - rest;
! 		/* It would be nice to avoid this copy but that
! 		** would require changes to the bp structure and
! 		** the way it is used.
! 		*/
! 		/* strncpy(bp->Data, &bp->Data[rest], bp->Used); */
! 		(void)memcpy((POINTER)bp->Data, (POINTER)&bp->Data[rest], (SIZE_T)bp->Used);
! 		rest = bp->Used;
! 	    } else {
! 		bp->Used = 0;
! 		break;
! 	    }
! 	} else break;
      }
  }
  
***************
*** 1055,1058 ****
--- 1189,1279 ----
      cp->BadCommands = 0;
      NCwritetext(cp, NCgreeting);
      return cp;
+ }
+ 
+ 
+ /* These modules support the streaming option to tranfer articles
+ ** faster.
+ */
+ 
+ /*
+ **  The "check" command.  Check the Message-ID, and see if we want the
+ **  article or not.  Stay in command state.
+ */
+ STATIC FUNCTYPE
+ NCcheck(cp)
+     CHANNEL		*cp;
+ {
+     register char	*p;
+     int			msglen;
+     WIP			*who;
+ 
+     if (AmSlave) {
+ 	NCwritetext(cp, NCbadcommand);
+ 	return;
+     }
+ 
+     /* Snip off the Message-ID. */
+     for (p = cp->In.Data; !ISWHITE(*p); p++)
+ 	continue;
+     for ( ; ISWHITE(*p); p++)
+ 	continue;
+     msglen = strlen(p) + 5; /* 3 digits + space + id + null */
+     if (cp->Sendid.Size < msglen) {
+ 	if (cp->Sendid.Size > 0) DISPOSE(cp->Sendid.Data);
+ 	if (msglen > MAXHEADERSIZE) cp->Sendid.Size = msglen;
+ 	else cp->Sendid.Size = MAXHEADERSIZE;
+ 	cp->Sendid.Data = NEW(char, cp->Sendid.Size);
+     }
+     if (!ARTidok(p)) {
+ 	(void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_GOTID_VAL, p);
+ 	NCwritereply(cp, cp->Sendid.Data);
+ 	syslog(L_NOTICE, "%s bad_messageid %s", CHANname(cp), MaxLength(p, p));
+ 	return;
+     }
+ 
+     if (HIShavearticle(p)) {
+ 	cp->Refused++;
+ 	(void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_GOTID_VAL, p);
+ 	NCwritereply(cp, cp->Sendid.Data);
+     } else if (NCinprogress(cp, p, &who)) {
+ 	(void)sprintf(cp->Sendid.Data, "%d %s", NNTP_RESENDID_VAL, p);
+ 	NCwritereply(cp, cp->Sendid.Data);
+     } else {
+ 	(void)sprintf(cp->Sendid.Data, "%d %s", NNTP_OK_SENDID_VAL, p);
+ 	NCwritereply(cp, cp->Sendid.Data);
+     }
+     /* stay in command mode */
+ }
+ 
+ /*
+ **  The "takethis" command.  Article follows.
+ **  Remember <id> for later ack.
+ */
+ STATIC FUNCTYPE
+ NCtakethis(cp)
+     CHANNEL		*cp;
+ {
+     register char	*p;
+     int			msglen;
+ 
+     /* Snip off the Message-ID. */
+     for (p = cp->In.Data; !ISWHITE(*p); p++)
+ 	continue;
+     for ( ; ISWHITE(*p); p++)
+ 	continue;
+     if (!ARTidok(p)) {
+ 	syslog(L_NOTICE, "%s bad_messageid %s", CHANname(cp), MaxLength(p, p));
+     }
+     msglen = strlen(p) + 5; /* 3 digits + space + id + null */
+     if (cp->Sendid.Size < msglen) {
+ 	if (cp->Sendid.Size > 0) DISPOSE(cp->Sendid.Data);
+ 	if (msglen > MAXHEADERSIZE) cp->Sendid.Size = msglen;
+ 	else cp->Sendid.Size = MAXHEADERSIZE;
+ 	cp->Sendid.Data = NEW(char, cp->Sendid.Size);
+     }
+     /* save ID for later NACK or ACK */
+     (void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_FAILID_VAL, p);
+ 
+     cp->State = CSgetarticle;
  }
*** include/nntp.h.orig	Wed Jul 27 21:50:02 1994
--- include/nntp.h	Fri Jan  6 19:07:59 1995
***************
*** 83,88 ****
--- 83,105 ----
  #define NNTP_CANTPOST			"440 Posting not allowed"
  #define NNTP_CANTPOST_VAL		440
  
+ /* new entries for the "streaming" protocol */
+ /* response to "mode stream" else 500 if stream not supported */
+ #define NNTP_OK_STREAM_VAL		203	/* Streaming supported */
+ 
+ /* response to "check <id>".  Must include ID of article.
+ ** Example: "431 <1234@host.domain>"
+ */
+ #define NNTP_OK_SENDID_VAL		238	/* I want article <id> */
+ #define NNTP_RESENDID_VAL		431	/* try <id> again later */
+ #define NNTP_ERR_GOTID_VAL		438	/* Got <id>, don't send */
+ 
+ /* responses to "takethis <id>.  Must include ID of article */
+ #define NNTP_OK_RECID_VAL		239	/* Article <id> received OK */
+ #define NNTP_ERR_FAILID_VAL		439	/* Transfer of <id> failed */
+ 
+ /* End of new entries for the "streaming" protocol */
+ 
  
  /*
  **  The first character of an NNTP reply can be used as a category class.
===END PATCH===


