.packageName <- "nws"
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/batchNodeList.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

batchNodeList <- function() {
  if (Sys.getenv('PE_HOSTFILE') != '') {
    sgeNodeList()
  }
  else if (Sys.getenv('LSB_HOSTS') != '') {
    lsfNodeList()
  }
  else if (Sys.getenv('PBS_NODEFILE') != '') {
    pbsNodeList()
  }
  else {
    stop('cannot determine the kind of batch queueing system used')
  }
}

sgeNodeList <- function() {
  hostfile <- Sys.getenv('PE_HOSTFILE')
  if (hostfile == '')
    stop('environment variable PE_HOSTFILE is not defined')

  nodeList <- readLines(hostfile)
  nodeList <- strsplit(nodeList, '[[:space:]]+')

  tryCatch({
      unlist(lapply(nodeList, function(x) rep(x[1], as.integer(x[2]))))
    }, error=function(e) {
      stop('hostfile has bad format: ', hostfile)
    })
}

lsfNodeList <- function() {
  hostlist <- Sys.getenv('LSB_HOSTS')
  if (hostlist == '')
    stop('environment variable LSB_HOSTS is not defined')

  nodeList <- unlist(strsplit(hostlist, '[[:space:]]+'), use.names=FALSE)
  x <- grep('^[[:space:]]*$', nodeList)
  if (length(x) > 0)
    nodeList <- nodeList[-x]
  nodeList
}

pbsNodeList <- function() {
  hostfile <- Sys.getenv('PBS_NODEFILE')
  if (hostfile == '')
    stop('environment variable PBS_NODEFILE is not defined')

  readLines(hostfile)
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/blendOptions.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

# We side effect options here. at invocation, we generated a new env
# if desired.
blendOptions <- function(options, new) {
  if (! is.null(new)) {
    names <- names(new)
    for (i in seq(along = new))
      assign(names[i], new[[i]], env = options)
  }
  options
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/cmdLaunchOptions.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

sshcmd <- function(host, options) {
  if (is.null(suppressWarnings(nsl(host))))
    warning(sprintf("ssh may not be able to resolve node name '%s'", host),
            call.=FALSE)

  basicArgs <- if (!is.null(options$user))
                 c('ssh', '-f', '-x', '-l', options$user, host)
               else
                 c('ssh', '-f', '-x', host)

  wrapper <- file.path(options$wrapperDir, options$workerWrapper)
  if (file.access(wrapper) == 0) {
    if (length(grep('\\.py$', wrapper, ignore.case=TRUE)) == 1) {
      # hack: stick in extra arguments needed by the BackgroundLaunch.py script
      if (length(grep('BackgroundLaunch', wrapper)) == 1) {
        d <- options$extraPythonModules
        args <- unlist(lapply(d, function(arg, opt) c(opt, arg), '-m'))
        wrapper <- c(wrapper, args, '--')
      }
      if (!is.null(options$python))
        c(options$python, options$pythonOpts, wrapper, basicArgs)
      else
        c('python', options$pythonOpts, wrapper, basicArgs)
    }
    else {
      c(wrapper, basicArgs)
    }
  }
  else {
    basicArgs
  }
}

sshforwardcmd <- function(host, options) {
  if (is.null(suppressWarnings(nsl(host))))
    warning(sprintf("ssh may not be able to resolve node name '%s'", host),
            call.=FALSE)

  if (is.null(options$nwsHostRemote))
    stop('must use the nwsHostRemote option with sshforwardcmd')

  r <- if (nchar(options$nwsHostRemote) > 0)
         sprintf('%s:%d:%s:%d', options$nwsHostRemote, options$nwsPortRemote,
                 options$nwsHost, options$nwsPort)
       else
         sprintf('%d:%s:%d', options$nwsPortRemote,
                 options$nwsHost, options$nwsPort)

  basicArgs <- if (!is.null(options$user))
                 c('ssh', '-f', '-x', '-R', r, '-l', options$user, host)
               else
                 c('ssh', '-f', '-x', '-R', r, host)

  wrapper <- file.path(options$wrapperDir, options$workerWrapper)
  if (file.access(wrapper) == 0) {
    if (length(grep('\\.py$', wrapper, ignore.case=TRUE)) == 1) {
      # hack: stick in extra arguments needed by the BackgroundLaunch.py script
      if (length(grep('BackgroundLaunch', wrapper)) == 1) {
        d <- options$extraPythonModules
        args <- unlist(lapply(d, function(arg, opt) c(opt, arg), '-m'))
        wrapper <- c(wrapper, args, '--')
      }

      if (!is.null(options$python))
        c(options$python, options$pythonOpts, wrapper, basicArgs)
      else
        c('python', options$pythonOpts, wrapper, basicArgs)
    }
    else {
      c(wrapper, basicArgs)
    }
  }
  else {
    basicArgs
  }
}

rshcmd <- function(host, options) {
  if (is.null(suppressWarnings(nsl(host))))
    warning(sprintf("rsh may not be able to resolve node name '%s'", host),
            call.=FALSE)

  basicArgs <- if (!is.null(options$user))
                 c('rsh', host, '-l', options$user, '-n')
               else
                 c('rsh', host, '-n')

  wrapper <- file.path(options$wrapperDir, 'BackgroundLaunch.py')

  d <- options$extraPythonModules
  args <- unlist(lapply(d, function(arg, opt) c(opt, arg), '-m'))
  wrapper <- c(wrapper, args, '--')

  if (!is.null(options$python))
    c(options$python, options$pythonOpts, wrapper, basicArgs)
  else
    c('python', options$pythonOpts, wrapper, basicArgs)
}

lsfcmd <- function(host, options) {
  'bsub'
}

ccscmd <- function(host, options) {
  c("job", "submit", "/exclusive:false")
}

rwincmd <- function(host, options) {
  # Note: Execution of cscript (done locally) must use simple quoting.
  # However, the remote command (the one executed via rwin.vbs) must
  # be done with MSC quoting, since the remote command is presumed to
  # be the Python interpreter.  Therefore, we set quoting to 'simple',
  # but we don't use the rwin.vbs "-s" option.
  wrapper <- file.path(options$wrapperDir, 'rwin.vbs')
  cmd <- if (is.null(options$passwd)) {
      c('cscript', '//nologo', wrapper, host, '--')
    } else {
      user <- if (is.null(options$user))
          Sys.info()[['login']]
        else
          options$user
      c('cscript', '//nologo', wrapper, host, '-l', user,
        '-p', options$passwd, '--')
    }
  list(cmd=cmd, quoting='simple')
}

envcmd <- function(host, envVars, options) {
  args <- if (options$scriptName == 'RNWSSleighWorker.py') {
      d <- options$extraPythonModules
      unlist(lapply(d, function(arg, opt) c(opt, arg), '-m'))
    } else NULL

  c('env', envVars, file.path(options$scriptDir, options$scriptName), args)
}

scriptcmd <- function(host, envVars, options) {
  args <- if (options$scriptName == 'RNWSSleighWorker.py') {
      d <- options$extraPythonModules
      a <- unlist(lapply(d, function(arg, opt) c(opt, arg), '-m'))
      c(a, '--')
    } else NULL

  if (!is.null(options$python))
    c(options$python, options$pythonOpts,
      file.path(options$scriptDir, options$scriptName), args, envVars)
  else
    c('python', options$pythonOpts,
      file.path(options$scriptDir, options$scriptName), args, envVars)
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/launch.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

cmdLaunch <- function(verbose=FALSE) {
  if (is.na(verbose))
    verbose <- FALSE

  nwsName <- Sys.getenv('RSleighNwsName')
  userNwsName <- Sys.getenv('RSleighUserNwsName')
  nwsHost <- Sys.getenv('RSleighNwsHost')
  nwsPort <- as.integer(Sys.getenv('RSleighNwsPort'))
  rank <- as.integer(Sys.getenv('RSleighRank', '-1'))
  maxWorkerCount <- as.integer(Sys.getenv('RSleighWorkerCount'))
  name <- Sys.getenv('RSleighName')
  rngType <- Sys.getenv('RSleighRNGType')
  rngSeed <- Sys.getenv('RSleighRNGSeed')
  numProcs <- as.integer(Sys.getenv('RSleighNumProcs'))
  localID <- as.integer(Sys.getenv('RSleighLocalID'))
  launch(nwsName, nwsHost, nwsPort, rank, maxWorkerCount, name,
         verbose, userNwsName, rngType, rngSeed, numProcs, localID)
}


launch <- function(nwsName, nwsHost, nwsPort, rank=-1, maxWorkerCount=-1,
      name=Sys.info()['nodename'], verbose=FALSE, userNwsName='__default',
                   rngType='legacy', rngSeed=0, numProcs=1, localID=-1) {
  si <- serverInfo(host=nwsHost, port=nwsPort)
  nws <- netWorkSpace(nwsName, serverInfo=si, useUse=TRUE, create=FALSE)    
  userNws <- nwsUseWs(nws@server, userNwsName, create=FALSE)

  # initialize for monitoring
  displayName = sprintf('%s@%d', name, rank)

  # post some info about this worker
  logfile <- Sys.getenv('RSleighLogFile')
  names(logfile) <- NULL
  info <- Sys.info()
  nwsVersion <- paste(nwsPkgInfo(), collapse=' ')
  nwsStore(nws, 'worker info',
      list(host=info[['nodename']],
           os=info[['sysname']],
           pid=Sys.getpid(),
           R=R.version.string,
           nws=nwsVersion,
           rank=rank,
           logfile=logfile))

  # XXX only do this with a sleigh workspace?
  nwsStore(nws, 'worker_ids', as.character(rank))

  # Figure out if this Revo R, if so numThreads
  if(require(Revobase,quiet=TRUE)) {
    tryCatch({
      numCores <- getMKLthreads()
      numThreads <- floor(numCores/numProcs) +
        if((numCores %% numProcs) > rank) 1 else 0
      setMKLthreads(max(numThreads,1))
    }, error=function(e) {
      warning('Revobase appears to exist, but cannot set MKL thread count.',
              call.=FALSE)
      print(e)
    })
  }
  
  # enter the main worker loop
  workerLoop(nws, displayName, rank, maxWorkerCount, verbose, userNws, rngType,
             rngSeed)

  # indicate exit.
  # XXX not sure if this makes any sense
  nwsStore(nws, 'bye', 1)
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/lsfSleigh.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

# This function will properly check to see if the R script has been
# launched in a parallel fashion.  If so, it will start jobs on the
# provided nodes using ssh.  If not, it expects that the parameter
# 'N' contains the number of nodes to start via 'bsub'.  Extra
# arguments to bsub can be provided using the lsfoptions parameter.

lsfSleigh <- function(n, lsfOptions=c(), ...) {
  hostList <- unlist(Sys.getenv("LSB_HOSTS"))
  if(! is.null(hostList) && hostList > " ") {
    hosts <- unlist(strsplit(hostList, " "))
    hosts <- hosts[hosts > " "]
    if(! missing(n) && n != length(hosts)) {
      warning("Number of requested nodes (", n,
              ") does not match number of nodes assigned by LSF (",
              length(hosts),
              ").  Using the ", length(hosts), " hosts LSF has assigned.")
    }
    n <- length(hosts)
    cmd <- sshcmd
  }
  else if(missing(n)) {
    stop("No requested and no hosts assigned by LSF")
  }
  else {
    hosts <- as.character(1:n)
    cmd <- function(...) {
      c('bsub', lsfOptions)
    }
  }

  sleigh(launch=cmd, nodeList=hosts, ...)
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/newprotocol.R"
marshal.list <- function(d, raw=TRUE) {
  x <- if (is.null(d)) {
      '0000'
    } else {
      nm <- names(d)
      ind <- which(nzchar(nm))
      if (any(duplicated(nm[ind])))
        stop('duplicate list names')
      nmlen <- nchar(nm)
      dlen <- nchar(d)
      x <- lapply(ind, function(i) sprintf('%04d%s%04d%s',
          nmlen[i], nm[i], dlen[i], d[[i]]))
      n <- length(d)
      paste(sprintf('%04d', n), paste(x, collapse=''), sep='')
    }

  if (raw) charToRaw(x) else x
}

marshal.vec <- function(d, extra.args=0, raw=TRUE) {
  x <- if (is.null(d)) {
      sprintf('%04d', extra.args)
    } else {
      dlen <- nchar(d)
      x <- lapply(seq(along=d), function(i) sprintf('%020d%s', dlen[i], d[[i]]))
      n <- length(d) + extra.args
      paste(sprintf('%04d', n), paste(x, collapse=''), sep='')
    }

  if (raw) charToRaw(x) else x
}

receive.name <- function(s) {
  len <- as.integer(nwsRecvN(s, 4))
  nwsRecvN(s, len)
}

receive.list <- function(s) {
  num.opts <- as.integer(nwsRecvN(s, 4))
  propose.opts <- list()
  for (i in seq(length=num.opts)) {
    name <- receive.name(s)
    value <- receive.name(s)
    propose.opts[[name]] <- value
  }
  propose.opts
}

negotiate.deadman <- function(s, verbose=FALSE) {
  writeBin(NegotiationHandshakeInit, s)
  handshake <- nwsRecvN(s, 4, rawflag=TRUE)
  if (identical(handshake, NegotiationHandshakePropose)) {
    # it's the new protocol:
    # get the server's proposal, and ignore it
    proposal.opts <- receive.list(s)

    # ask for a deadman connection
    opts <- list()
    opts[[OPT.DEADMAN]] <- '1'
    d <- c(NegotiationHandshakeRequest, marshal.list(opts))
    if (nzchar(Sys.getenv('NWS_VERYVERBOSE')))
      cat(sprintf('sending options to server: %s\n', rawToChar(d)))
    writeBin(d, s)

    # get the final handshake
    acc <- nwsRecvN(s, 4, rawflag=TRUE)
    if (!identical(acc, NegotiationHandshakeAccept)) {
      # probably configured not to shutdown at our whim
      cat("server didn't accept our request\n")
    }
  } else {
    cat("server doesn't support the new protocol\n")
  }
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/nws.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

defaultNwsServerOptions <- new.env()
defaultNetWorkSpaceOptions <- new.env()

# this is a manifest constant, how to approriately handle?
nwsRFP = 3*2^24

NegotiationHandshakeInit    = charToRaw('X000')
NegotiationHandshakePropose = charToRaw('P000')
NegotiationHandshakeRequest = charToRaw('R000')
NegotiationHandshakeAccept  = charToRaw('A000')
CookieHandshake             = charToRaw('2223')
NoCookieHandshake           = charToRaw('2222')

OPT.DEADMAN = 'KillServerOnClose'
OPT.METADATATOSERVER = 'MetadataToServer'
OPT.METADATAFROMSERVER = 'MetadataFromServer'

# a utility function to read exactly n bytes from the socket connection.
nwsRecvN <- function(s, n, rawflag=FALSE) {
  if (n > 0) {
    start = proc.time()[[3]]
    repeat {
      b = tryCatch({
          readBin(s, what='raw', n=n)
        }, error=function(e) {
          # readBin sometimes generates confusing exceptions
          stop("error calling readBin on nws socket: ", e$message, call.=FALSE)
        })
      m = length(b)
      current = proc.time()[[3]]
      if (m > 0 || current - start < 30) break
      start = current
    }
    n = n - m
    if (n <= 0) return(if (rawflag) b else rawToChar(b))
    if (m == 0) stop("failed to read from nws socket", call.=FALSE)

    # we didn't get all of the data, so save the raw vector in a list
    # that we'll concatenate when we do have it all
    rlen = 50
    r = vector('list', rlen)
    i = 1
    r[i] = list(b)

    repeat {
      b = readBin(s, what='raw', n=n)
      i = i + 1
      if (i > rlen) {
        # we ran out of space in our list, so double its length
        rlen = 2 * rlen
        length(r) = rlen
      }
      r[i] = list(b)
      m = length(b)
      n = n - m
      if (n <= 0) break
      if (m == 0)
        stop("failed to read subsequent data from nws socket", call.=FALSE)
    }

    # truncate the list, concatenate the raw vectors,
    # and convert to a single character string
    length(r) = i
    return(if (rawflag) do.call(c, r) else rawToChar(do.call(c, r)))
  }
  else {
    return(if (rawflag) raw(0) else '')
  }
}

check.logical <- function(val, defVal, argName, call) {
  if (is.null(val) || is.na(val)) {
    val <- defVal
  } else if (!is.logical(val)) {
    msg <- sprintf("'%s' takes a logical argument", argName)
    e <- simpleError(msg, call)
    stop(e)
  }
  val
}

check.list <- function(val, defVal, argName, call) {
  if (is.null(val) || is.na(val)) {
    val <- defVal
  } else if (!is.list(val)) {
    msg <- sprintf("'%s' takes a list argument", argName)
    e <- simpleError(msg, call)
    stop(e)
  }
  val
}

check.opts <- function(opts, nms, call) {
  n <- names(opts)
  if (is.null(n)) {
    # there are no named arguments in opts, but if any unnamed arguments
    # are present, check if they are allowed
    if (length(opts) > 0 && !('' %in% nms)) {
      msg <- 'illegal unnamed argument specified'
      e <- simpleError(msg, call)
      stop(e)
    }
  } else {
    unrecog <- n[!n %in% nms]
    if (length(unrecog) > 0) {
      msg <- paste('unused argument(s): ', paste(unrecog, collapse=', '))
      e <- simpleError(msg, call)
      stop(e)
    }
  }
}

nwsServer <- function(...) {
  new("nwsServer", ...)
}

computeDefaultNwsServerOptions <- function(...)
  list(serverInfo=NULL, connopts=NULL)

# class respresenting connection to a netWorkSpace server.
# there are extra things here that should probably be removed
setClass('nwsServer', representation(nwsSocket='ANY',
         connopts='list', options='environment',
                                     cookieProtocol='logical'))

setMethod('initialize', 'nwsServer',
          function(.Object, serverHost=NULL, port=NULL, ...) {
            argList <- list(...)

            .Object@options = new.env()
            blendOptions(.Object@options, as.list(defaultNwsServerOptions))
            blendOptions(.Object@options, argList)
            # The next two lines are an explicit traversal that will be removed
            # in 3.0
            if (!is.null(serverHost)) .Object@options$serverHost = serverHost
            if (!is.null(port)) .Object@options$port = port

            if (is.null(.Object@options$serverInfo)) {
              if (!is.null(.Object@options$serverHost))
                warning('use of the serverHost parameter is deprecated, use serverInfo=serverInfo',
                        call.=FALSE)
              if (!is.null(.Object@options$port))
                warning('use of the port parameter is deprecated, use serverInfo=serverInfo',
                        call.=FALSE)
              if (!is.null(.Object@options$serverHost)
                  && !is.null(.Object@options$port))
                .Object@options$serverInfo <- serverInfo(host=.Object@options$serverHost,
                                                            port=.Object@options$port)
              else if (!is.null(.Object@options$serverHost))
                .Object@options$serverInfo <- serverInfo(host=.Object@options$serverHost)
              else if (!is.null(.Object@options$port))
                .Object@options$serverInfo <- serverInfo(port=.Object@options$port)
              else 
                .Object@options$serverInfo <- getServer()
            }


            # This looks strange, but we want to use blendOptions to read possible
            # input for connopts, but still represent it the standard way within
            # the object.
            .Object@connopts =
              if (is.null(.Object@options$connopts)) {
                x <- list()
                x[[OPT.METADATATOSERVER]] <- '1'
                x[[OPT.METADATAFROMSERVER]] <- '1'
                x
              } else {
                as.list(.Object@options$connopts)
              }

            if (.Platform$OS.type == 'windows') {
              # on windows, socketConnection will wait for the full timeout,
              # even if no one is listening on the specified server port.
              # make.socket doesn't, so we'll use it to throw an exception
              # if no one is listening.
              tmpsock <- tryCatch({
                  suppressWarnings(make.socket(nwsHost(.Object@options$serverInfo),
                                               nwsPort(.Object@options$serverInfo)))
                }, error=function(e) {
                  msg <- sprintf(
                         "Please verify that the NetWorkSpaces server is running on '%s:%d'",
                                 nwsHost(.Object@options$serverInfo),
                                 nwsPort(.Object@options$serverInfo))
                  warning(msg, call.=FALSE)
                  stop(e$message, call.=FALSE)
                })
              close.socket(tmpsock)
            }

            # temporarily change the timeout while creating the socketConnection.
            # we will block for up to a year for data on this socket.
            old.timeout = options(timeout=60 * 60 * 24 * 365)
            .Object@nwsSocket = tryCatch({
                suppressWarnings(
                    socketConnection(nwsHost(.Object@options$serverInfo),
                                     port = nwsPort(.Object@options$serverInfo),
                                     open ='a+b', blocking=TRUE))
              }, error=function(e) {
                msg <- sprintf(
                    "Please verify that the NetWorkSpaces server is running on '%s:%d'",
                               nwsHost(.Object@options$serverInfo),
                               nwsPort(.Object@options$serverInfo))
                warning(msg, call.=FALSE)
                stop(e$message, call.=FALSE)
              }, finally=options(old.timeout))

            # tell the server that we're a new client
            writeBin(NegotiationHandshakeInit, .Object@nwsSocket)
            handshake <- nwsRecvN(.Object@nwsSocket, 4, rawflag=TRUE)

            # XXX should we allow for future change here?
            if (identical(handshake, NegotiationHandshakePropose)) {
              # use the new protocol
              # XXX the data from the server is currently ignored
              proposal.opts <- receive.list(.Object@nwsSocket)

              if (nzchar(Sys.getenv('NWS_VERYVERBOSE'))) {
                cat('nws server options proposal:\n')
                print(proposal.opts)
              }

              wp <- proposal.opts$NwsWebPort
              .Object@options$webPort <-
                  if (!is.null(wp)) as.numeric(wp) else NULL

              # request the connection options that we want
              d <- c(NegotiationHandshakeRequest, marshal.list(.Object@connopts))
              writeBin(d, .Object@nwsSocket)
              acc <- nwsRecvN(.Object@nwsSocket, 4, rawflag=TRUE)
              if (!identical(acc, NegotiationHandshakeAccept)) {
                stop("server didn't accept our request", call.=FALSE)
              }
              .Object@cookieProtocol <- TRUE
            } else if (identical(handshake, CookieHandshake)) {
              # old protocol
              warning("connected to old server: ", rawToChar(handshake),
                      call.=FALSE)
              .Object@cookieProtocol <- TRUE

              # make sure we don't send or expect any meta data
              .Object@connopts[[OPT.METADATATOSERVER]] <- NULL
              .Object@connopts[[OPT.METADATAFROMSERVER]] <- NULL
            } else if (identical(handshake, NoCookieHandshake)) {
              # really old protocol
              warning("connected to very old server: ", rawToChar(handshake),
                      call.=FALSE)
              .Object@cookieProtocol <- FALSE

              # make sure we don't send or expect any meta data
              .Object@connopts[[OPT.METADATATOSERVER]] <- NULL
              .Object@connopts[[OPT.METADATAFROMSERVER]] <- NULL
            } else {
              stop('unrecognized handshake: ', handshake, call.=FALSE)
            }
            .Object
          })

setGeneric('sendOp', function(.Object, ..., metadata=list()) standardGeneric('sendOp'))
setGeneric('sendOpStreaming', function(.Object, ..., metadata=list()) standardGeneric('sendOpStreaming'))
setGeneric('nwsDeleteWs', function(.Object, wsName) standardGeneric('nwsDeleteWs'))
setGeneric('nwsListWss', function(.Object, showDataFrame=TRUE) standardGeneric('nwsListWss'))
setGeneric('nwsMktempWs', function(.Object, wsNameTemplate='__Rws__%010d', ...) standardGeneric('nwsMktempWs'))
setGeneric('nwsOpenWs', function(.Object, wsName, space=NULL, ...) standardGeneric('nwsOpenWs'))
setGeneric('nwsUseWs', function(.Object, wsName, space=NULL, ...) standardGeneric('nwsUseWs'))

setMethod('sendOp', 'nwsServer',
          function(.Object, ..., metadata=list()) {
            s = .Object@nwsSocket
            if (OPT.METADATATOSERVER %in% names(.Object@connopts)) {
              if (nzchar(Sys.getenv('NWS_VERYVERBOSE'))) {
                cat('sending metadata:\n')
                print(metadata)
              }
              writeBin(c(marshal.list(metadata), marshal.vec(list(...))), s)
            }
            else {
              if (nzchar(Sys.getenv('NWS_VERYVERBOSE')))
                cat('not sending metadata\n')
              writeBin(marshal.vec(list(...)), s)
            }

            if (OPT.METADATAFROMSERVER %in% names(.Object@connopts))
              receive.list(s)
            else
              list()
          })

setMethod('sendOpStreaming', 'nwsServer',
          function(.Object, ..., extra=NULL, extralen=length(extra),
                   metadata=list()) {
            # sanity check that either "extra" is specified
            # or "extralen" is
            stopifnot(missing(extra) != missing(extralen))

            s = .Object@nwsSocket
            extralen = if (is.integer(extralen))
                         charToRaw(sprintf('%020d', extralen))
                       else
                         charToRaw(sprintf('%020.0f', extralen))
            # this should only happen if extralen was > 2^64
            stopifnot(identical(length(extralen), 20L))

            if (OPT.METADATATOSERVER %in% names(.Object@connopts))
              writeBin(c(marshal.list(metadata), marshal.vec(list(...), 1),
                         extralen, extra), s)
            else
              writeBin(c(marshal.vec(list(...), 1), extralen, extra), s)

            if (is.null(extra))
              # we can't return metadata in this case
              NULL
            else if (OPT.METADATAFROMSERVER %in% names(.Object@connopts))
              receive.list(s)
            else
              list()
          })

setMethod('nwsDeleteWs', 'nwsServer',
          function(.Object, wsName) {
            s = .Object@nwsSocket
            metadata = sendOp(.Object, 'delete ws', wsName)

            # status, unused at the moment.
            bb = nwsRecvN(s, 4)
          })

setMethod('nwsListWss', 'nwsServer',
          function(.Object, showDataFrame=TRUE) {
            s = .Object@nwsSocket
            metadata = sendOp(.Object, 'list wss')

            status = as.integer(nwsRecvN(s, 4))
            desc = nwsRecvN(s, 20)
            if (.Object@cookieProtocol)
              cookie <- nwsRecvN(s, 40)

            ret <- nwsRecvN(s, as.integer(nwsRecvN(s, 20)))
            if (!showDataFrame)
              ret
            else {
              ## convert response into an R data frame
              ret <- unlist(strsplit(ret, "\n"))
              retval <- list()
              fields <- list()
              i = 1
              while (i <= length(ret)) {
                line <- unlist(strsplit(ret[i], "\t"))

                # convert each field to correct type
                fields[1] = FALSE
                if (substr(line[1], 1, 1)=='>')
                  fields[1] = TRUE
                fields[2] = substr(line[1], 2, nchar(line[1]))  # workspace name
                fields[3] = line[2]
                fields[4] = as.logical(line[3])
                fields[5] = as.integer(line[4])
                if (is.na(line[5]))
                  fields[6] = ""
                else
                  fields[6] = line[5]

                retval = c(retval, list(fields))
                i = i+1
              }

              if (length(retval) > 0) {
                names(retval) <- seq(along=retval)
                retval <- do.call(rbind, retval)
                colnames(retval) <-
                  c("Owned", "Name", "Owner", "Persistent", "NumVariables", "Variables")
              }
              retval <- data.frame(retval)
              retval
            }

          })

setMethod('nwsMktempWs', 'nwsServer',
          function(.Object, wsNameTemplate, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('wsmetadata'), call)

            if (!is.character(wsNameTemplate))
              stop('workspace name must be a string')

            s = .Object@nwsSocket
            wsmetadata = opts$wsmetadata
            metadata = sendOp(.Object, 'mktemp ws', wsNameTemplate, metadata=wsmetadata)
            status = as.integer(nwsRecvN(s, 4))
            desc = nwsRecvN(s, 20) # unused at the moment.
            if (.Object@cookieProtocol)
              cookie <- nwsRecvN(s, 40) # unused at the moment.
            n <- as.integer(nwsRecvN(s, 20))
            name <- nwsRecvN(s, n)
            if (status != 0) stop('mktempWs failed')
            name
          })

setMethod('nwsOpenWs', 'nwsServer',
          function(.Object, wsName, space=NULL, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('create', 'persistent', 'wsmetadata'), call)

            # if invoked directly by user, we need to create a space
            # instance. if invoked via networkspace constructor, use the
            # space passed in.
            if (is.null(space)) {
              serverWrap = new.env()
              serverWrap$server = .Object
              space = new('netWorkSpace', wsName=wsName, serverWrap=serverWrap)
            }

            op = 'open ws'
            owner = sprintf('%d', Sys.getpid())
            p = 'no'
            if (!is.null(opts$persistent) && opts$persistent) p = 'yes'

            s = .Object@nwsSocket
            wsmetadata = opts$wsmetadata

            if (is.null(opts$create) || opts$create) {
              metadata = sendOp(.Object, op, wsName, owner, p, metadata=wsmetadata)
            }
            else {
              create = 'no'
              metadata = sendOp(.Object, op, wsName, owner, p, create, metadata=wsmetadata)
            }

            status = as.integer(nwsRecvN(s, 4))
            if (status != 0) stop(paste("workspace", wsName, "doesn't exist"), call.=FALSE)
            space
          })

setMethod('nwsUseWs', 'nwsServer',
          function(.Object, wsName, space=NULL, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('create', 'persistent', 'wsmetadata'), call)

            # see nwsOpenWs
            if (is.null(space)) {
              serverWrap = new.env()
              serverWrap$server = .Object
              space = new('netWorkSpace', wsName=wsName, serverWrap=serverWrap)
            }

            op = 'use ws'
            owner = ''
            p = 'no'

            s = .Object@nwsSocket
            wsmetadata = opts$wsmetadata

            if (is.null(opts$create) || opts$create) {
              metadata = sendOp(.Object, op, wsName, owner, p, metadata=wsmetadata)
            }
            else {
              create = 'no'
              metadata = sendOp(.Object, op, wsName, owner, p, create, metadata=wsmetadata)
            }

            status = as.integer(nwsRecvN(s, 4))
            if (status != 0) stop(paste("workspace", wsName, "doesn't exist"), call.=FALSE)
            space
          })

if (!isGeneric('nwsWebPort'))
  setGeneric('nwsWebPort', function(.Object, ...) standardGeneric('nwsWebPort'))
setMethod('nwsWebPort', 'nwsServer', function(.Object, ...) {
            .Object@options$webPort
          })

close.nwsServer <- function(con, ...) close(con@nwsSocket)

if (!isGeneric('view'))
  setGeneric('view', function(.Object, ...) standardGeneric('view'))
setMethod('view', 'nwsServer', function(.Object, ...) {
  host <- nwsHost(.Object@options$serverInfo)
  port <- nwsWebPort(.Object)
  if (is.null(port))
    stop('the nws server does not have a web interface')

  vstring <- sprintf('http://%s:%d/doit?op=listWss', host, port)
  browseURL(vstring)
  cat(sprintf("Viewing NetWorkSpaces server '%s:%d' in your browser\n",
              host, port))
  invisible(vstring)
})

netWorkSpace <- function(...) {
  new("netWorkSpace", ...)
}

computeDefaultNetWorkSpaceOptions <- function(...)
  list(useUse=FALSE, serverInfo=NULL, serverWrap=NULL,
       connopts=NULL, wsmetadata=list())

# class representing a netWorkSpace.
setClass('netWorkSpace', representation(server='nwsServer',
         wsName='character', cookieProtocol='logical',
                                        serverInfo='ANY'),
         prototype(server=NULL))

setMethod('initialize', 'netWorkSpace',
          function(.Object, wsName='__default', ...) {
            call <- match.call()
            allOpts <- list(...)
            .Object@wsName = wsName

            # We fill the opts list with options we will pass along
            opts <- list()
            if (!is.null(allOpts[['create']])) {
              opts$create = allOpts[['create']]
              allOpts$create = NULL
            }
            if (!is.null(allOpts[['persistent']])) {
              opts$create = allOpts[['persistent']]
              allOpts$persistent = NULL
            }
            check.opts(opts, c('create', 'persistent'), call)

            options <- new.env()
            blendOptions(options, as.list(defaultNetWorkSpaceOptions))

            if (length(allOpts) > 0) {
              # Make sure we don't miss any unnamed arguments
              args <- c('serverHost', 'port', 'useUse', 'serverWrap',
                        'connopts', 'wsmetadata')
              warn <- FALSE
              warningMess <- 'Use of unnamed arguments to netWorkSpace is deprecated'
              for (i in seq(along=allOpts)) {
                if (names(allOpts)[i] == '') {
                  warn <- TRUE
                  assign(args[i], allOpts[[i]], options)
                }
                else {
                  check.opts(allOpts[i], c('serverInfo', args), call)
                  assign(names(allOpts)[i], allOpts[[i]], options)
                }
              }
              # Warn that unnamed arguments are deprecated
              if (warn) cat(warningMess, '\n')
            }

            if (is.null(options$serverInfo)) {
              if (!is.null(options$serverHost))
                warning('use of the serverHost parameter is deprecated, use serverInfo=serverInfo',
                        call.=FALSE)
              if (!is.null(options$port))
                warning('use of the port parameter is deprecated, use serverInfo=serverInfo',
                        call.=FALSE)
              if (!is.null(options$serverHost)
                  && !is.null(options$port))
                options$serverInfo <- serverInfo(host=options$serverHost,
                                                            port=options$port)
              else if (!is.null(options$serverWrap))
                options$serverInfo <- options$serverWrap$server@options$serverInfo
              else if (!is.null(options$serverHost))
                options$serverInfo <- serverInfo(host=options$serverHost)
              else if (!is.null(options$port))
                options$serverInfo <- serverInfo(port=options$port)
              else 
                options$serverInfo <- getServer()
            }

            # In order to hold the deadman we need to keep this around
            .Object@serverInfo <- options$serverInfo

            # assign the options back to local variables
            # This will be switched during the 3.0 code overhaul
            serverHost <- nwsHost(options$serverInfo)
            port <- nwsPort(options$serverInfo)
            useUse <- options$useUse
            serverWrap <- options$serverWrap
            connopts <- options$connopts
            wsmetadata <- options$wsmetadata

            # if invoked (indirectly) via a server openWs or useWs
            # method, the server will be passed in and used. if
            # invoked directly, need to create a new server instance.
            if (!is.null(serverWrap)) {
              # recycle existing server instance.
              .Object@server = serverWrap$server
            }
            else {
              # create new server instance.
              .Object@server = new('nwsServer', serverInfo=.Object@serverInfo,
                connopts=connopts)
              # now give the server a chance to do its thing.
              spaceWrap = new.env()
              spaceWrap$space = .Object
              handler = function(e) { close(.Object@server@nwsSocket); stop(e) }
              if (useUse) {
                # don't claim this space.
                tryCatch({
                  callList = list(.Object=.Object@server, wsName=wsName,
                    space=spaceWrap, wsmetadata=wsmetadata)
                  if (length(opts) > 0)
                    for (i in 1:length(opts))
                      callList[names(opts)[i]] = opts[names(opts)[i]]
                  do.call(nwsUseWs, callList)
                }, error=handler)
              }
              else {
                # attempt to claim ownership
                tryCatch({
                  callList = list(.Object=.Object@server, wsName=wsName,
                    space=spaceWrap, wsmetadata=wsmetadata)
                  if (length(opts) > 0)
                    for (i in 1:length(opts))
                      callList[names(opts)[i]] = opts[names(opts)[i]]
                  do.call(nwsOpenWs, callList)
                  }, error=handler)
              }
            }
            .Object@cookieProtocol <- .Object@server@cookieProtocol

            .Object
          })

showNetWorkSpace <- function(object) {
    nws <- object
    server <- nws@server

    cat('\n')
    cat('NWS Host:\t', nwsHost(server@options$serverInfo), ':',
        nwsPort(server@options$serverInfo), '\n', sep='')
    cat('Workspace Name:\t', nws@wsName, '\n', sep='')
    cat('\n')
}

setMethod('show', 'netWorkSpace', showNetWorkSpace)

setGeneric('nwsClose', function(.Object) standardGeneric('nwsClose'))
setGeneric('nwsDeclare', function(.Object, xName, mode) standardGeneric('nwsDeclare'))
setGeneric('nwsDeleteVar', function(.Object, xName) standardGeneric('nwsDeleteVar'))
setGeneric('nwsFetch', function(.Object, xName, ...) standardGeneric('nwsFetch'))
setGeneric('nwsFetchTry', function(.Object, xName, defaultVal=NULL, ...) standardGeneric('nwsFetchTry'))
setGeneric('nwsFind', function(.Object, xName, ...) standardGeneric('nwsFind'))
setGeneric('nwsFindTry', function(.Object, xName, defaultVal=NULL, ...) standardGeneric('nwsFindTry'))
setGeneric('nwsFetchFile', function(.Object, xName, fObj, ...) standardGeneric('nwsFetchFile'))
setGeneric('nwsFetchTryFile', function(.Object, xName, fObj, ...) standardGeneric('nwsFetchTryFile'))
setGeneric('nwsFindFile', function(.Object, xName, fObj, ...) standardGeneric('nwsFindFile'))
setGeneric('nwsFindTryFile', function(.Object, xName, fObj, ...) standardGeneric('nwsFindTryFile'))
setGeneric('nwsIFetch', function(.Object, xName, ...) standardGeneric('nwsIFetch'))
setGeneric('nwsIFetchTry', function(.Object, xName, defaultVal=NULL, ...) standardGeneric('nwsIFetchTry'))
setGeneric('nwsIFind', function(.Object, xName, ...) standardGeneric('nwsIFind'))
setGeneric('nwsIFindTry', function(.Object, xName, defaultVal=NULL, ...) standardGeneric('nwsIFindTry'))
setGeneric('nwsListVars', function(.Object, wsName='', showDataFrame=TRUE) standardGeneric('nwsListVars'))
setGeneric('nwsStore', function(.Object, xName, xVal, ...) standardGeneric('nwsStore'))
setGeneric('nwsStoreFile', function(.Object, xName, fObj, n=0, ...) standardGeneric('nwsStoreFile'))
setGeneric('nwsWsName', function(.Object) standardGeneric('nwsWsName'))
setGeneric('nwsVariable', function(.Object, xName, mode=c('fifo','lifo','multi','single'),
           env=parent.frame(), force=FALSE, quietly=FALSE) standardGeneric('nwsVariable'))
setGeneric('nwsServerObject', function(.Object) standardGeneric('nwsServerObject'))

setMethod('nwsClose', 'netWorkSpace',
          function(.Object) {
            # XXX this seems wrong
            close(.Object@server)
          })

# helper function for nwsDeclare method.
nwsDeclareInternal <- function(server, ws, xName, mode) {
  metadata = sendOp(server, 'declare var', ws, xName, mode)
  as.integer(nwsRecvN(server@nwsSocket, 4))
}

setMethod('nwsDeclare', 'netWorkSpace',
          function(.Object, xName, mode) {
            status = nwsDeclareInternal(.Object@server, .Object@wsName, xName, mode)
            if (status != 0) {
              stop('variable declaration failed', call.=FALSE)
            }
          })

setMethod('nwsDeleteVar', 'netWorkSpace',
          function(.Object, xName) {
            s = .Object@server@nwsSocket
            ws = .Object@wsName
            metadata = sendOp(.Object@server, 'delete var', ws, xName)
            status = as.integer(nwsRecvN(s, 4))
            if (status != 0) {
              stop('deleteVar failed', call.=FALSE)
            }
          })

# helper function for fetch/find methods.
nwsRetrieve <- function(cprot, server, ws, xName, op, defaultVal=NULL,
                        pkgResult=FALSE, metadata=list()) {
  s = server@nwsSocket
  metadata = sendOp(server, op, ws, xName, metadata=metadata)

  status = as.integer(nwsRecvN(s, 4))

  desc = as.integer(nwsRecvN(s, 20))
  envId = desc %/% 16777216 #(2^24)
  # if bit zero is set, then the object is not serialized
  notSerialized = desc %% 2

  if (cprot) cookie <- nwsRecvN(s, 40)

  n = as.numeric(nwsRecvN(s, 20))

  if (status != 0) {
    # make sure we read all data to avoid corrupting the connection
    sVal = nwsRecvN(s, n, rawflag=TRUE)
    stop('retrieval failed', call.=FALSE)
  }

  if (notSerialized) {
    sVal = nwsRecvN(s, n, rawflag=TRUE)

    # if bit zero and one of desc are set, it's binary data
    if (desc %% 4 - 1)
      # Return a raw vector
      if (pkgResult)
        list(data=sVal, status=TRUE, metadata=metadata)
      else
        sVal
    else
      # Return a character string
      if (pkgResult)
        list(data=rawToChar(sVal), status=TRUE, metadata=metadata)
      else
        rawToChar(sVal)
  }
  else if (n > 0) {
    # Return an object
    tryCatch({
        if (pkgResult)
          list(data=unserialize(s), status=TRUE, metadata=metadata)
        else
          unserialize(s)
      }, error=function(e) {
        stop(sprintf("unable to unserialize value from ws variable '%s': %s",
                     xName, e$message), call.=FALSE)
      })
  }
  else {
    # Return the default value
    if (pkgResult)
      list(data=defaultVal, status=FALSE, metadata=metadata)
    else
      defaultVal
  }
}

setMethod('nwsFetch', 'netWorkSpace',
          function(.Object, xName, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('pkgResult', 'metadata'), call)
            pkgResult <- check.logical(opts$pkgResult, FALSE, 'pkgResult', call)
            metadata <- check.list(opts$metadata, list(), 'metadata', call)
            nwsRetrieve(.Object@cookieProtocol, .Object@server,
                        .Object@wsName, xName, 'fetch', pkgResult=pkgResult,
                        metadata=metadata)
          })

setMethod('nwsFetchTry', 'netWorkSpace',
          function(.Object, xName, defaultVal=NULL, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('pkgResult', 'metadata'), call)
            pkgResult <- check.logical(opts$pkgResult, FALSE, 'pkgResult', call)
            metadata <- check.list(opts$metadata, list(), 'metadata', call)
            tryCatch({
                nwsRetrieve(.Object@cookieProtocol, .Object@server,
                            .Object@wsName, xName, 'fetchTry', defaultVal,
                            pkgResult=pkgResult, metadata=metadata)
              }, error=function(e) {
                if (pkgResult)
                  list(data=defaultVal, status=FALSE, metadata=list())
                else
                  defaultVal
              })
          })

setMethod('nwsFind', 'netWorkSpace',
          function(.Object, xName, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('pkgResult', 'metadata'), call)
            pkgResult <- check.logical(opts$pkgResult, FALSE, 'pkgResult', call)
            metadata <- check.list(opts$metadata, list(), 'metadata', call)
            nwsRetrieve(.Object@cookieProtocol, .Object@server,
                        .Object@wsName, xName, 'find', pkgResult=pkgResult,
                        metadata=metadata)
          })

setMethod('nwsFindTry', 'netWorkSpace',
          function(.Object, xName, defaultVal=NULL, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('pkgResult', 'metadata'), call)
            pkgResult <- check.logical(opts$pkgResult, FALSE, 'pkgResult', call)
            metadata <- check.list(opts$metadata, list(), 'metadata', call)
            tryCatch({
                nwsRetrieve(.Object@cookieProtocol, .Object@server,
                            .Object@wsName, xName, 'findTry', defaultVal,
                            pkgResult=pkgResult, metadata=metadata)
              }, error=function(e) {
                if (pkgResult)
                  list(data=defaultVal, status=FALSE, metadata=list())
                else
                  defaultVal
              })
          })

# helper function for fetchFile/findFile methods.
nwsRetrieveFile <- function(cprot, server, ws, xName, op, fObj) {
  s = server@nwsSocket

  if (missing(fObj)) {
    stop('no value specified for fObj argument', call.=FALSE)
  }

  if (is.character(fObj)) {
    f <- file(fObj, 'wb')
    on.exit(close(f))
  } else {
    if (!is(fObj, "file") || !isOpen(fObj, "w") || summary(fObj)$text != "binary")
      stop('fobj must be a binary mode file object opened for writing', call.=FALSE)
    f <- fObj
  }

  metadata = sendOp(server, op, ws, xName)

  status <- as.integer(nwsRecvN(s, 4))

  # even if failure status, read the rest of the bytes
  desc <- as.integer(nwsRecvN(s, 20))
  if (cprot) cookie <- nwsRecvN(s, 40)
  n <- as.numeric(nwsRecvN(s, 20))

  blen <- 16 * 1024
  while (n > 0) {
    d <- nwsRecvN(s, min(n, blen), rawflag=TRUE)
    if (length(d) == 0) stop('NWS server connection dropped', call.=FALSE)
    writeBin(d, f)
    n <- n - length(d)
  }

  if (status != 0) stop('retrieval failed', call.=FALSE)
  TRUE
}

setMethod('nwsFetchFile', 'netWorkSpace',
          function(.Object, xName, fObj) {
            nwsRetrieveFile(.Object@cookieProtocol, .Object@server,
                            .Object@wsName, xName, 'fetch', fObj)
          })

setMethod('nwsFetchTryFile', 'netWorkSpace',
          function(.Object, xName, fObj) {
            tryCatch({
                nwsRetrieveFile(.Object@cookieProtocol, .Object@server,
                                .Object@wsName, xName, 'fetchTry', fObj)
              }, error=function(e) {
                if (e$message == 'retrieval failed') {
                  FALSE
                }
                else {
                  stop(e$message, call.=FALSE)
                }
              })
          })

setMethod('nwsFindFile', 'netWorkSpace',
          function(.Object, xName, fObj) {
            nwsRetrieveFile(.Object@cookieProtocol, .Object@server,
                            .Object@wsName, xName, 'find', fObj)
          })

setMethod('nwsFindTryFile', 'netWorkSpace',
          function(.Object, xName, fObj) {
            tryCatch({
                nwsRetrieveFile(.Object@cookieProtocol, .Object@server,
                                .Object@wsName, xName, 'findTry', fObj)
              }, error=function(e) {
                if (e$message == 'retrieval failed') {
                  FALSE
                }
                else {
                  stop(e$message, call.=FALSE)
                }
              })
          })

setMethod('nwsIFetch', 'netWorkSpace',
          function(.Object, xName) {
            nwsValueIterator(.Object, xName, 'ifetch', NULL)
          })

setMethod('nwsIFetchTry', 'netWorkSpace',
          function(.Object, xName, defaultVal=NULL) {
            nwsValueIterator(.Object, xName, 'ifetchTry', defaultVal)
          })

setMethod('nwsIFind', 'netWorkSpace',
          function(.Object, xName) {
            nwsValueIterator(.Object, xName, 'ifind', NULL)
          })

setMethod('nwsIFindTry', 'netWorkSpace',
          function(.Object, xName, defaultVal=NULL) {
            nwsValueIterator(.Object, xName, 'ifindTry', defaultVal)
          })

# to see list output clearly use: write(nwsList...(), stdout())
setMethod('nwsListVars', 'netWorkSpace',
          function(.Object, wsName='', showDataFrame=TRUE) {
            s = .Object@server@nwsSocket
            if (wsName == '') wsName = .Object@wsName

            metadata = sendOp(.Object@server, 'list vars', wsName)

            # status, unused at the moment
            status = as.integer(nwsRecvN(s, 4))
            desc = nwsRecvN(s, 20)
            if (.Object@cookieProtocol)
              cookie <- nwsRecvN(s, 40)

            ret <- nwsRecvN(s, as.integer(nwsRecvN(s, 20)))
            if (!showDataFrame)
              ret
            else {
              ## convert response into an R data frame
              ret <- unlist(strsplit(ret, "\n"))
              retval <- list()
              fields <- list()

              i = 1
              while (i<=length(ret)) {
                line <- unlist(strsplit(ret[i], "\t"))

                # convert each field to correct type
                fields[1] = line[1]
                fields[2] = as.integer(line[2])
                fields[3] = as.integer(line[3])
                fields[4] = as.integer(line[4])
                fields[5] = line[5]
                retval = c(retval, list(fields))
                i = i+1
              }

              if (length(retval)>0) {
                names(retval) <- seq(along=retval)
                retval <- do.call(rbind, retval)
                colnames(retval) <-
                c("Variable", "NumValues", "NumFetchers", "NumFinders", "Mode")
              }

              retval <- data.frame(retval)
              retval
            }

          })

# helper function for store method
nwsStoreInternal <- function(server, ws, xName, xVal, metadata=list()) {
  s = server@nwsSocket
  desc = nwsRFP # R Fingerprint

  if (missing(xVal)) {
    stop('no value specified for xVal argument', call.=FALSE)
  }

  if (is.raw(xVal)) {
    desc = desc + 3
  }
  else if (!is.character(xVal) || (length(xVal) != 1)) {
    xVal = serialize(xVal, ascii=FALSE, connection=NULL)
    # serialize returns a raw vector as of R 2.4
    if (is.character(xVal)) xVal = charToRaw(xVal)
  }
  else {
    xVal = charToRaw(xVal)
    desc = desc + 1 # in other systems, we use a manifest constant and a bit or here... .
  }
  descTxt = sprintf('%020i', desc) # would prefer to use unsigned here.

  metadata = sendOpStreaming(server, 'store', ws, xName, descTxt, extra=xVal,
                             metadata=metadata)

  # status, barely used at the moment.
  status = as.integer(nwsRecvN(s, 4))

  if (status != 0) {
    stop('store failed', call.=FALSE)
  }
}

setMethod('nwsStore', 'netWorkSpace',
          function(.Object, xName, xVal, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('metadata'), call)
            metadata <- check.list(opts$metadata, list(), 'metadata', call)
            nwsStoreInternal(.Object@server, .Object@wsName, xName, xVal,
                             metadata=metadata)
          })

setMethod('nwsStoreFile', 'netWorkSpace',
          function(.Object, xName, fObj, n=0, ...) {
            call <- match.call()
            opts <- list(...)
            check.opts(opts, c('metadata', 'serialized'), call)
            metadata <- check.list(opts$metadata, list(), 'metadata', call)
            serialized <- check.logical(opts$serialized, FALSE,
                                        'serialized', call)

            ws <- .Object@wsName
            s <- .Object@server@nwsSocket

            desc <- if (serialized) nwsRFP else nwsRFP + 3

            if (missing(fObj)) {
              stop('no value specified for fObj argument', call.=FALSE)
            }

            # if fObj is a character string, handle it specially
            if (is.character(fObj)) {
              f <- file(fObj, 'rb')
              on.exit(close(f))
            } else {
              if (!is(fObj, "file") || !isOpen(fObj, "r") || summary(fObj)$text != "binary")
                stop('fobj must be a binary mode file object opened for reading', call.=FALSE)
              f <- fObj
            }

            fsize <- file.info(summary(f)$description)$size
            fpos <- seek(f)
            fbytes <- fsize - fpos
            n <- if (n <= 0) fbytes else min(n, fbytes)
            if (n <= 0) return(FALSE)

            descTxt <- sprintf('%020i', desc) # would prefer to use unsigned here.

            sendOpStreaming(.Object@server, 'store', ws, xName,
                            descTxt, extralen=n, metadata=metadata)

            blen <- 16 * 1024
            while (n > 0) {
              d <- readBin(f, what='raw', n=min(blen, n))
              dlen <- length(d)
              if (dlen <= 0)
                break
              writeBin(d, s)
              n <- n - dlen
            }

            if (n > 0) {
              # I don't thing this should ever happen unless the file
              # size computation is incorrect, but I really don't want
              # to corrupt the server connection
              warning('unable to read all the data in file ',
                      summary(f)$description, ' [size: ', fsize,
                      'bytes]: padding value in workspace variable')
              blen <- 1024
              buffer <- raw(blen)
              while (n > 0) {
                if (blen <= n) {
                  writeBin(buffer, s)
                  n <- n - dlen
                } else {
                  writeBin(raw(n), s)
                  break
                }
              }
            }

            # now we're ready to read the metadata
            # XXX need to return this to the user
            connopts <- .Object@server@connopts
            metadata <- if (OPT.METADATAFROMSERVER %in% names(connopts))
                receive.list(s)
              else
                list()

            # status, barely used at the moment.
            status <- as.integer(nwsRecvN(s, 4))

            if (status != 0) {
              stop('store file failed', call.=FALSE)
            }
            TRUE
          })

setMethod('nwsWsName', 'netWorkSpace', function(.Object) {.Object@wsName})

setMethod('nwsVariable', 'netWorkSpace',
          function(.Object, xName, mode=c('fifo','lifo','multi','single'),
                   env=parent.frame(), force=FALSE, quietly=FALSE) {
            missingMode = missing(mode)
            mode <- match.arg(mode)

            # be careful, because 'exists' will cause an active binding function
            # to be called, which is a side effect that we don't want
            if (force ||
                (!tryCatch(bindingIsActive(xName, env), error=function(...) FALSE) &&
                 !exists(xName, envir=env, inherits=FALSE))) {
              s = .Object@server@nwsSocket
              ws = .Object@wsName
              if (missingMode) {
                mlist = c(mode, 'fifo', 'lifo', 'multi', 'single')
                mode = NA
                for (m in mlist) {
                  if (nwsDeclareInternal(.Object@server, ws, xName, m) == 0) {
                    mode = m
                    break
                  }
                }
                if (is.na(mode))
                  stop('unable to declare variable', call.=FALSE)
              } else {
                if (nwsDeclareInternal(.Object@server, ws, xName, mode) != 0)
                  stop('variable declaration failed', call.=FALSE)
              }

              if (identical(mode, 'single')) {
                mf <- function(val)
                  if (missing(val))
                    nwsRetrieve(.Object@cookieProtocol, .Object@server, ws, xName, 'find')
                  else
                    nwsStoreInternal(.Object@server, ws, xName, val)
              } else {
                mf <- function(val)
                  if (missing(val))
                    nwsRetrieve(.Object@cookieProtocol, .Object@server, ws, xName, 'fetch')
                  else
                    nwsStoreInternal(.Object@server, ws, xName, val)
              }

              t <- makeActiveBinding(xName, mf, env)
            } else {
              if (! quietly)
                warning('not overwriting previous binding for ', xName)
            }
          })

setMethod('nwsServerObject', 'netWorkSpace', function(.Object) .Object@server)

if (!isGeneric('view'))
  setGeneric('view', function(.Object, ...) standardGeneric('view'))
setMethod('view', 'netWorkSpace', function(.Object, ...) {
  host <- nwsHost(.Object@serverInfo)
  port <- nwsWebPort(.Object@server)
  if (is.null(port))
    stop('the nws server does not have a web interface')

  wsName <- .Object@wsName
  vstring <- sprintf('http://%s:%d/doit?op=listVars&wsName=%s',
                     host, port, URLencode(wsName))
  browseURL(vstring)
  cat(sprintf("Viewing workspace '%s' on server '%s:%d' in your browser\n",
              wsName, host, port))
  invisible(vstring)
})

# helper function for ifetch/ifind methods.
nwsIRetrieve <- function(server, ws, xName, op, varId, valIndex) {
  s = server@nwsSocket
  blanks = '                    '
  varId = strtrim(paste(varId, blanks, sep=''), 20)
  valIndex = strtrim(paste(valIndex, blanks, sep=''), 20)
  metadata = sendOp(server, op, ws, xName, varId, valIndex)

  status = as.integer(nwsRecvN(s, 4))

  desc = as.integer(nwsRecvN(s, 20))
  envId = desc %/% 16777216 #(2^24)
  # if bit zero is set, then the object is not serialized
  notSerialized = desc %% 2

  # cookie protocol is assumed at this point
  varId = nwsRecvN(s, 20)
  valIndex = as.integer(nwsRecvN(s, 20))

  n = as.integer(nwsRecvN(s, 20))

  if (notSerialized) {
    sVal = nwsRecvN(s, n, rawflag=TRUE)

    # if bit zero and one of desc are set, it's binary data
    if (desc %% 4 - 1)
      # Return a raw vector
      list(status=status, sVal=sVal, varId=varId, valIndex=valIndex)
    else
      # Return a character string
      list(status=status, sVal=rawToChar(sVal), varId=varId, valIndex=valIndex)
  }
  else if (n > 0) {
    list(status=status, sVal=unserialize(s), varId=varId, valIndex=valIndex)
  }
  else {
    stop('StopIteration')
  }
}

# helper function to return a closure that acts as an iterator
nwsValueIterator <- function(.Object, xName, op, defaultVal) {
  if (!.Object@cookieProtocol)
    stop('NWS server does not support iterated operations', call.=FALSE)
  if (!is.character(xName))
    stop('variable name must be a string', call.=FALSE)

  # initial state of the closure
  varId <- ''
  valIndex <- 0

  function() {
    defval <- list(status=0, varId=varId, valIndex=valIndex, sVal=defaultVal)

    r <- tryCatch({
          nwsIRetrieve(.Object@server, .Object@wsName,
                       xName, op, varId, valIndex)
        }, error=function(e) defval)

    varId <<- r$varId
    valIndex <<- r$valIndex

    if (r$status != 0) stop('retrieval failed', call.=FALSE)
    r$sVal
  }
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/server.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

SERVER.INFO.ARGS <- c('host','port')
MANAGED.SERVER.INFO.ARGS <- c('host','port','webPort','quiet',
                              'pluginPath', 'logFile')

defaultServerInfoOptions <- new.env()
defaultManagedServerInfoOptions <- new.env()
defaultServerOptions <- new.env()

.serverGlobals <- new.env()

computeDefaultServerInfoOptions <- function(...)
  list(host=Sys.info()[['nodename']], port=8765)

# The host argument currently specifies the interface to bind to,
# where '' means to bind to all interfaces
computeDefaultManagedServerInfoOptions <- function(...) {
  list(host='', port=0, webPort=0, quiet=FALSE,
       pluginPath=.serverGlobals$pluginPath, logFile='')
}

setClass('serverInfo',
         representation(host='ANY', port='ANY'))
setMethod('initialize', 'serverInfo',
          function(.Object, ...) {
            call <- match.call()
            check.opts(list(...), SERVER.INFO.ARGS, call)
            options <- new.env()
            blendOptions(options,
                         as.list(defaultServerInfoOptions))
            blendOptions(options, list(...))
            .Object@host=options$host
            .Object@port=options$port

            .Object
          })

showServerInfo <- function(object) {
  cat('\n')
  cat('Server host:\t', object@host,'\n')
  cat('Server port:\t', object@port,'\n')
  cat('\n')
}
setMethod('show', 'serverInfo', showServerInfo)

setClass('managedServerInfo',
         contains='serverInfo',
         representation(webPort='numeric', quiet='logical', locServ='list',
                        pluginPath='character', logFile='character',
                        state='environment'))
setMethod('initialize', 'managedServerInfo',
          function(.Object, ...) {
            call <- match.call()
            check.opts(list(...), MANAGED.SERVER.INFO.ARGS, call)
            options <- new.env()
            blendOptions(options,
                         as.list(defaultManagedServerInfoOptions))
            blendOptions(options, list(...))

            # start initializing .Object
            .Object@quiet <- options$quiet
            .Object@pluginPath <- options$pluginPath
            .Object@logFile <- options$logFile

            verbose <- nzchar(Sys.getenv('NWS_VERBOSE'))

            # launch server
            .Object@locServ <- startNwsServer(logfile=.Object@logFile,
                                              verbose=verbose,
                                              host=options$host,
                                              port=options$port,
                                              webPort=options$webPort,
                                              pluginpath=.Object@pluginPath)

            # Write back values
            .Object@port <- .Object@locServ$serverPort
            .Object@webPort <- .Object@locServ$webPort
            .Object@host <-
              if (identical(.Object@locServ$hostName, '0.0.0.0')) {
                # wild card must be handled specially
                if (!identical(options$host, '') && verbose)
                  cat(sprintf(
                      'nws server bound to 0.0.0.0 when interface was %s',
                      options$host))
                Sys.info()[['nodename']]
              } else {
                .Object@locServ$hostName
              }

            # Set state, true = running
            .Object@state <- new.env()
            .Object@state$running <- TRUE

            connCode <- checkServerConnection(.Object)
            if (connCode == 0) {
              if (! options$quiet) {
                msg <- sprintf('Started nws server on port %d with web interface on port %d\n',
                               .Object@port, .Object@webPort)
                cat(msg)
              }
            }
            else if (connCode == 1) {
              mess <- sprintf('Cannot connect to host %s, please define host externally',
                           .Object@host)
              if (options$host=='') {
                stop(mess, call.=FALSE)
              }
              else {
                warning(mess, call.=FALSE)
              }
            }
            else if (connCode == 2) {
              warning(sprintf("Cannot connect to server on host: %s and port: %d, \n%s",
                              .Object@host, .Object@port,
                              "Please check network and firewall settings"),
                      call.=FALSE)
            }
            else {
              warning("Unexpected status returned from checkServerConnection",
                      call.=FALSE)
            }
            
            .Object
          })

showManagedServerInfo <- function(object) {
  cat('\n')
  cat('This is a managed server.\n')
  cat('Server host:\t\t', object@host,'\n')
  cat('Server port:\t\t', object@port,'\n')
  cat('Server web port:\t', object@webPort,'\n')
  if(!nwsIsRunning(object))
    cat('\nThis server has been stopped\n')
  cat('\n')
}
setMethod('show', 'managedServerInfo', showManagedServerInfo)

if (!isGeneric('view'))
  setGeneric('view', function(.Object, ...) standardGeneric('view'))
setMethod('view', 'managedServerInfo', function(.Object, ...) {
  host <- nwsHost(.Object)
  port <- nwsWebPort(.Object)
  vstring <- sprintf('http://%s:%d/',
                     host, port)
  browseURL(vstring)
  cat(sprintf("Viewing server '%s:%d' in your browser\n",
              host, port))
  invisible(vstring)
})

setGeneric('nwsServerInfoStop', function(.Object, ...) standardGeneric('nwsServerInfoStop'))
setMethod('nwsServerInfoStop','managedServerInfo',
          function(.Object, ...) {
            # just closing the deadman connection should shutdown the server,
            # although if the user has started another long running process
            # probably won't work...
            # We should eventually do more thorough sanity checking here
            if (!nwsIsRunning(.Object))
              warning('this server has already been stopped')
            else {
              try(close(.Object@locServ$deadman))
              .Object@state$running = FALSE
            }

            invisible(NULL)
          })


setGeneric('nwsPort', function(.Object,...) standardGeneric('nwsPort'))
setMethod('nwsPort', 'serverInfo',
          function(.Object, ...) .Object@port)

setGeneric('nwsHost', function(.Object,...) standardGeneric('nwsHost'))
setMethod('nwsHost', 'serverInfo',
          function(.Object, ...) .Object@host)

setGeneric('nwsIsRunning', function(.Object,...) standardGeneric('nwsIsRunning'))
setMethod('nwsIsRunning', 'managedServerInfo',
          function(.Object, ...) .Object@state$running)

if (!isGeneric('nwsWebPort'))
  setGeneric('nwsWebPort', function(.Object,...) standardGeneric('nwsWebPort'))
setMethod('nwsWebPort', 'managedServerInfo',
          function(.Object, ...) .Object@webPort)


serverInfo <- function(...)
  new("serverInfo", ...)

managedServerInfo <- function(...)
  new("managedServerInfo", ...)

startServer <- function(pkgpath, logfile, verbose, python, pythonOpts,
                        pythonpath, pluginPath, host, port, webPort) {
  serverScript <- file.path(pkgpath, 'bin', 'localserver.py')
  logging <- if (!is.null(logfile) && nzchar(logfile[[1]]))
      c('-l', logfile[[1]])
    else
      character(0)
  plugins <- if (!is.null(pluginPath) && any(nzchar(pluginPath)))
      c('-x', paste(pluginPath[nzchar(pluginPath)], collapse=.Platform$path.sep))
    else
      character(0)

  quoting <- NULL  # use the default quoting for this platform
  cmd <- argv2str(c(python, pythonOpts, serverScript,
                    logging,
                    '-m', pythonpath,
                    '-p', as.character(port),
                    '-i', as.character(host),
                    '-w', as.character(webPort),
                    '-g', 'False',
                    plugins),
                  quoting)

  if (verbose)
    cat(sprintf('executing command to start nws server: %s\n', cmd))

  serverPipe <- pipe(cmd, 'r')
  d <- readLines(serverPipe, n=4)

  if (length(d) == 0) {
    try(close(serverPipe))
    stop('unable to read any output from local nws server')
  }
  else if (length(d) < 3) {
    try(close(serverPipe))
    stop('unable to read all of the nws server info')
  }
  else {
    # These are doubles for consistency only
    serverPort <- as.double(d[1])
    webPort <- as.double(d[2])
    serverPid <- as.integer(d[3])
    hostName <- as.character(d[4])
    if (is.na(serverPort) || is.na(webPort) || is.na(serverPid)) {
      try(close(serverPipe))
      stop('bad output format from starting nws server')
    }
    else {
      list(serverPort=serverPort, webPort=webPort,
          serverPid=serverPid, serverPipe=serverPipe, hostName=hostName)
    }
  }
}

makeDeadman <- function(host, port, verbose=FALSE) {
  # create a "deadman" connection, shake hands with the server,
  # and never read or write to the deadman connection again
  if (.Platform$OS.type == 'windows') {
    # on windows, socketConnection will wait for the full timeout,
    # even if no one is listening on the specified server port.
    # make.socket doesn't, so we'll use it to throw an exception
    # if no one is listening.
    tmpsock <-
      tryCatch({
        suppressWarnings(make.socket(host, port))
      },
      error=function(e) {
        msg <- sprintf(
            "unable to connect to NetWorkSpaces server on '%s:%d'",
            host, port)
        warning(msg, call.=FALSE)
        stop(e$message, call.=FALSE)
      })

    close.socket(tmpsock)
  }

  # NB: I set the timeout to be 30 days, mostly to avoid making the
  # timeout short for all other socket connections in this R session
  one.month <- 30 * 24 * 60 * 60
  orig.timeout <- options(timeout=one.month)
  on.exit(options(orig.timeout))
  con <- socketConnection(host, port=port, open='a+b', blocking=TRUE)
  negotiate.deadman(con, verbose)
  con
}

startBabelfish <- function(pkgpath, host, port, verbose) {
  rname <- if (.Platform$OS.type == 'windows') 'Rterm' else 'R'
  rprog <- file.path(R.home(), 'bin', rname)
  babelfishScript <- file.path(pkgpath, 'bin', 'babelfish.R')
  quoting <- NULL  # use the default quoting for this platform
  cmd <- argv2str(c(rprog, '--vanilla', '--slave', '-f', babelfishScript,
                    '--args', '-h', host, '-p', port), quoting)

  if (verbose)
    cat(sprintf('starting the R babelFish: %s\n', cmd))
  ignore.stderr <- ! verbose
  system(cmd, intern=FALSE, ignore.stderr=ignore.stderr, wait=FALSE)
}

# This should be called when nws is loaded with the path of
# the nws installation
initServerEnv <- function(pkgpath) {
  .serverGlobals$pkgpath <- pkgpath

  p <- pythonpath()  # lots of magic here
  if (!is.null(p)) {
    python <- p$pypath
    if (!is.null(p$nspath) && !is.na(p$nspath[1]) && nzchar(p$nspath[1])) {
      pythonPath <- p$nspath[1]
      pluginPath <- file.path(dirname(pythonPath), 'plugins')
      if (nzchar(Sys.getenv('NWS_VERBOSE'))) {
        cat(sprintf('pythonpath function returned nspath: %s\n', pythonPath))
        cat(sprintf('pythonpath function returned pluginPath: %s\n', pluginPath))
      }
      if (!file.exists(pluginPath)) {
        warnings('no plugins directory in your nwsserver installation')
        pluginPath <- ''
      }
    }
    else {
      if (nzchar(Sys.getenv('NWS_VERBOSE')))
        cat(sprintf('pythonpath function returned bad nspath: pluginPath will be empty string\n'))
      pythonPath <- NULL
      pluginPath <- ''
    }
  }
  else {
    if (nzchar(Sys.getenv('NWS_VERBOSE')))
      cat(sprintf('pythonpath function returned NULL: pluginPath will be empty string\n'))
    python <- which.cmd('python')
    if (is.null(python)) {
      python <- which.python()
    }
    pythonPath <- NULL
    pluginPath <- ''
  }

  .serverGlobals$python <- if (is.null(python)) 'python' else python
  .serverGlobals$pythonPath <- pythonPath
  .serverGlobals$pluginPath <- pluginPath

  if (is.null(python)) {
    # nothing more to do if we can't find a Python interpreter
    if (!nzchar(Sys.getenv('NWS_QUIET'))) {
      warning('python command is not in your PATH ',
              'which is needed to start the nws server')
      if (.Platform$OS.type == 'windows')
        warning('Python must be installed on your machine to create a sleigh')
    }
  }
  else {
    if (.Platform$OS.type == 'windows') {
      # add directory containing python executable if it exists,
      # which makes this directory a good place to put DLLs
      pydir <- dirname(python)  # returns '.' if unqualified
      addPath <- if (pydir != '.' && file.exists(pydir)) {
        if (nzchar(Sys.getenv('NWS_VERBOSE')))
          cat(sprintf('Adding %s to PATH\n', pydir))
        pydir
      } else {
        character(0)
      }

      if (!is.null(pythonPath)) {
        # add <pythonPath>/pywin32_system32 to PATH on Windows.
        # this is done to try to improve the chances that pywin32
        # will be able to find pywintypesXX.dll.
        pywintypes <- file.path(pythonPath, 'pywin32_system32')
        if (!file.exists(pywintypes)) {
          if (nzchar(Sys.getenv('NWS_VERBOSE')))
            cat(sprintf('no pywin32_system32 directory in nwsserver: %s\n',
                        pywintypes))
        } else {
          if (nzchar(Sys.getenv('NWS_VERBOSE')))
            cat(sprintf('Adding %s to PATH\n', pywintypes))
          addPath <- c(addPath, pywintypes)
        }
      }

      # check if there's anything to add to PATH
      if (length(addPath) > 0) {
        addPath <- paste(addPath, collapse=.Platform$path.sep)
        path <- Sys.getenv('PATH')
        if (nzchar(path)) {
          path <- paste(path, addPath, sep=.Platform$path.sep)
          if (nzchar(Sys.getenv('NWS_VERBOSE')))
            cat(sprintf('Setting new PATH to %s\n', path))
          Sys.setenv(PATH=path)
        } else {
          warning('PATH is empty: not updating', call.=FALSE)
        }
      }
    }
  }
}

startNwsServer <- function(logfile=NULL, verbose=FALSE,
        python=defaultSleighOptions$python, # Should this dependency exist?
        pythonOpts=defaultSleighOptions$pythonOpts,
        pythonpath=defaultSleighOptions$extraPythonModules,
        host='', port=0, webPort=0, pluginpath='') {

  pkgpath <- .serverGlobals$pkgpath
  if (is.null(pythonpath)) pythonpath <- ""
  obj <- startServer(pkgpath, logfile, verbose, python, pythonOpts,
                     pythonpath, pluginpath, host, port, webPort)

  # we think that '127.0.0.1' is the safest value to use for the
  # babelfish and the dead man connection if the server is bound
  # to all network interfaces
  if (identical(host, '')) {
    serverHost <- '127.0.0.1'
    if (verbose && !identical(obj$hostName, '0.0.0.0'))
      # this is unexpected, but we're not sure it's an error
      cat(sprintf('nws server has bound to address %s\n', obj$hostName))
  } else {
    serverHost <- obj$hostName
  }

  # must start the babelfish before creating the deadman connection
  startBabelfish(pkgpath, serverHost, obj$serverPort, verbose)
  con <- makeDeadman(serverHost, obj$serverPort, verbose)
  if (is.null(con))
    warning('unable to create deadman connection')

  obj$deadman <- con

  obj
}

# the two functions below are modeled after those in the sleighMan package
# and should mirror them closely. Update when updating sleighMan!
setServer <- function(serverInfo, ...) {
  if (!is(serverInfo, 'serverInfo'))
    stop('you must pass a serverInfo object to setServer')

  if (is(.nwsGlobals$nwsserver, 'managedServerInfo') &&
      nwsIsRunning(.nwsGlobals$nwsserver))
    nwsServerInfoStop(.nwsGlobals$nwsserver)

  .nwsGlobals$nwsserver <- serverInfo
}

getServer <- function(..., create=FALSE) {
  call <- match.call()
  if (is.null(.nwsGlobals$nwsserver) || create)
    tryCatch({
      .nwsGlobals$nwsserver <- managedServerInfo()
    },
    error = function(e) {
      msg <- sprintf('error creating managedServerInfo object: %s',
                     conditionMessage(e))
      e <- simpleError(msg, call)
      stop(e)
    })

  # We should eventually do more thorough sanity checking here
  if (is(.nwsGlobals$nwsserver, 'managedServerInfo') &&
      !nwsIsRunning(.nwsGlobals$nwsserver)) {
    warning(paste('The stored server has been stopped, creating another.',
                  ' Previously created netWorkSpace, sleigh, and nwsServer objects will no longer work.'))
    .nwsGlobals$nwsserver <- NULL
    return(getServer())
  }
  else
    .nwsGlobals$nwsserver
}

# function has return codes:
#   0 - nwsServer connection established
#   1 - hostname not defined
#   2 - Connection failed, reason unknown
checkServerConnection <- function(serv) {
  verbose <- nzchar(Sys.getenv('NWS_VERBOSE'))
  tryCatch({
    suppressWarnings(tempServ <- nwsServer(serverInfo=serv))
    close(tempServ)
    0
  }, error=function(e) {
    if(exists("nsl")) {
      tryCatch({
        if(is.null(utils::nsl(nwsHost(serv)))) {
          return(1)
        }
      }, error=function(e) {
        if(verbose) {
          cat(paste("Is nsl defined outside utils?",
                    "\nError message: ",e,'\n'))
        }
      })
    }
    return(2)
  })
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/sleigh-internal.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

# internal functions for Sleigh class

# enquote and docall copied verbatim from snow
enquote <- function(x) as.call(list(as.name('quote'), x))

docall <- function(fun, args) {
  if ((is.character(fun) && length(fun) == 1) || is.name(fun))
    fun <- get(as.character(fun), env = .GlobalEnv, mode = 'function')
  do.call('fun', lapply(args, enquote))
}

# could this be a method --- it is invoked in the constructor?
addWorker <- function(machine, wsName, userWsName, id, workerCount,
                      numProcs, options) {
  # basic idea is (or should be): if we can get the appropriate
  # worker script running on the remote node, we just need to
  # give it enough env info to take care of the rest
  nwsHost = if (is.null(options$nwsHostRemote))
              options$nwsHost
            else if (options$nwsHostRemote == '')
              'localhost'
            else
              options$nwsHostRemote
  nwsPort = if (is.null(options$nwsPortRemote))
              options$nwsPort
            else
              options$nwsPortRemote

  envVars = list(
    paste('RSleighName=', machine, sep=''),
    paste('RSleighNwsName=', wsName, sep=''),
    paste('RSleighUserNwsName=', userWsName, sep=''),
    paste('RSleighID=', id, sep=''),
    paste('RSleighWorkerCount=', workerCount, sep=''),
    paste('RSleighScriptDir=', options$scriptDir, sep=''),
    paste('RSleighNwsHost=', nwsHost, sep=''),
    paste('RSleighNwsPort=', nwsPort, sep=''),
    paste('RSleighWorkingDir=', options$workingDir, sep=''),
    paste('RSleighNumProcs=', as.character(numProcs), sep=''),
    paste('RProg=', options$rprog, sep=''),
    paste('RSleighRNGType=', options$rngType, sep=''),
    paste('RSleighRNGSeed=', options$rngSeed, sep='')
  )

  elen = length(envVars)

  if (!is.null(options$outfile)) {
    elen = elen + 1
    envVars[[elen]] = paste('RSleighWorkerOut=', options$outfile, sep='')
  }

  if (!is.null(options$logDir)) {
    elen = elen + 1
    envVars[[elen]] = paste('RSleighLogDir=', options$logDir, sep='')
  }

  quoting = NULL  # use the default quoting-style

  if (is.character(options$launch) && options$launch == 'local') {
    background <- file.path(options$wrapperDir, 'BackgroundLaunch.py')
    if (!is.null(options$python)) {
      launchcmd = c(options$python, options$pythonOpts, background)
    }
    else {
      launchcmd = c('python', options$pythonOpts, background)
    }

    d = options$extraPythonModules
    args = unlist(lapply(d, function(opt, arg) c(arg, opt), '-m'))
    launchcmd = c(launchcmd, args, '--')
    workerstart = options$scriptExec(machine, envVars, options)
  }
  else if (is.function(options$launch)) {
    launch = options$launch(machine, options)
    if (is.list(launch) && !is.null(launch$cmd)) {
      # a list allows the quoting-style to be specified
      launchcmd = launch$cmd
      quoting = launch$quoting
    }
    else {
      launchcmd = launch
    }

    # XXX do we need to add an extra level of quoting to workerstart?
    # XXX for ssh? or rsh?
    workerstart = options$scriptExec(machine, envVars, options)
  }

  argv = c(launchcmd, workerstart)
  cmd = argv2str(argv, quoting)
  if (options$verbose) cat("Executing command: ", cmd, "\n")
  system(cmd)
}

storeTask <- function(nws, fun, args,
                      tag = 'anon', barrier = FALSE, return = TRUE, job = -1,
                      varName = 'task') {
  metadata = if (job >= 0) list(batchId=as.character(job)) else list()
  nwsStore(nws, varName,
           list(type='EXEC',
                barrier=barrier,
                data=list(fun=fun, args=args, return=return),
                tag=tag,
                job=job),
           metadata=metadata)
}

## This is necessary since x[i] gets the i'th *column* of a data
## frame object rather than the i'th cell
dfGetElement <- function(x, obs) {
  row <- (obs - 1) %% nrow(x) + 1
  col <- floor((obs - 1) / nrow(x)) + 1
  x[cbind(row, col)]  # pair (r,c) to select elements
}

getChunk <- function(x, iv, by) {
  if (is.matrix(x))
    switch(by, "row"=x[iv,,drop=FALSE], "column"=x[,iv,drop=FALSE], "cell"=x[iv])
  else if (is.data.frame(x))
    switch(by, "row"=x[iv,,drop=FALSE], "column"=x[,iv,drop=FALSE],
        "cell"=dfGetElement(x,iv))
  else
    # this case works for vectors and lists
    x[iv]
}

getElement <- function(x, i, by) {
  if (is.matrix(x))
    switch(by, "row"=x[i,,drop=TRUE], "column"=x[,i,drop=TRUE], "cell"=x[i])
  else if (is.data.frame(x))
    switch(by, "row"=x[i,,drop=TRUE], "column"=x[,i,drop=TRUE], "cell"=dfGetElement(x,i))
  else if (is.list(x))
    x[[i]]
  else
    # this case only works for vectors
    x[i]
}

countElement <- function(x, by) {
  if(is.matrix(x))
    switch(by, "row"=nrow(x), "column"=ncol(x), "cell"=length(x))
  else if (is.data.frame(x))
    switch(by, "row"=nrow(x), "column"=ncol(x), "cell"=nrow(x) * ncol(x))
  else
    length(x)
}

msc.quote <- function(arg) {
  # argument needs quoting if it contains whitespace or a double-quote
  if (length(arg) != 1)
    stop('length of arg must be 1')

  if (!nzchar(arg)) {
    # empty strings must be quoted
    '""'
  } else if (length(grep('[[:space:]"]', arg)) == 0) {
    arg
  }
  else {
    q <- '"'
    nbs <- 0
    v <- strsplit(arg, split='')[[1]]
    for (c in v) {
      if (c == '\\') {
        q <- paste(q, c, sep='')
        nbs <- nbs + 1
      }
      else if (c == '"') {
        q <- paste(q, paste(rep('\\', nbs + 1), collapse=''), c, sep='')
        nbs <- 0
      }
      else {
        q <- paste(q, c, sep='')
        nbs <- 0
      }
    }

    paste(q, paste(rep('\\', nbs), collapse=''), '"', sep='')
  }
}

simple.quote <- function(arg) {
  if (length(arg) != 1)
    stop('length of arg must be 1')

  if (!nzchar(arg)) {
    # empty strings must be quoted
    '""'
  } else if (length(grep('"', arg)) > 0) {
    stop('arguments cannot contain double quotes with simple quoting')
  }
  else if (length(grep('[[:space:]]', arg)) == 0) {
    # argument without whitespace and double-quotes don't need quoting
    arg
  }
  else {
    paste('"', arg, '"', sep='')
  }
}

posix.quote <- function(arg) {
  if (length(arg) != 1)
    stop('length of arg must be 1')

  q <- "'"
  v <- strsplit(arg, split='')[[1]]
  for (c in v) {
    if (c == "'") {
      c <- "'\\''"
    }
    q <- paste(q, c, sep='')
  }

  paste(q, "'", sep='')
}

argv2str <- function(argv, quoting) {
  qfun <- if (is.null(quoting)) {
      if (.Platform$OS.type == 'windows')
        msc.quote
      else
        posix.quote
    }
    else if (quoting == 'posix')
      posix.quote
    else if (quoting == 'msc')
      msc.quote
    else if (quoting == 'simple')
      simple.quote
    else
      stop('unrecognized quoting method: ', quoting)

  paste(lapply(argv, qfun), collapse=' ')
}

# This class is used for the fixedarg opt
setClass('fixedargHolder',
         representation(name='character'))

setMethod('initialize', 'fixedargHolder',
          function(.Object, key) {
            .Object@name=key
            .Object
          })

fixedargHolder <- function(...) {
  new("fixedargHolder",...)
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/sleigh.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

# We use alternating barriers to synchronize eachWorker
# invocations. Their names are common to workers and sleighs.
barrierNames <- list('barrier0', 'barrier1')
RNG.TYPES.AVAILIBLE <- c('legacy','sprngLFG','sprngLCG','sprngLCG64',
                         'sprngCMRG', 'sprngMLFG')
# heuristic test for a closure
isClosure <- function(fun) {
  if (is.function(fun)) {
    e <- environment(fun)
    !is.null(e) && exists("environmentName", mode="function") &&
        identical(environmentName(e), "") &&
        length(ls(e, all.names=FALSE)) > 0
  } else {
    FALSE
  }
}

############################################################################
#  Sleigh code
#

sleigh <- function(...) {
  new("sleigh",...)
}

defaultSleighOptions <- new.env()
nodeSleighOptions <- new.env()

computeDefaultSleighOptions <- function(pkgpath) {
  # compute default value for scriptDir
  scriptDir = file.path(pkgpath, 'bin')

  # for some reason, R.home() uses backslashes on Windows,
  # even though .Platform says it should be forward slash.
  # this should fix the problem.
  rhome = gsub('\\\\', .Platform$file.sep, R.home())

  if (.Platform$OS.type == 'windows') {
    rprog = file.path(rhome, 'bin', 'Rterm.exe')
  }
  else {
    rprog = file.path(rhome, 'bin', 'R')
  }

  python = .serverGlobals$python
  extraPythonModules = .serverGlobals$pythonPath

  list(
      nwsHost = NULL,
      nwsHostRemote = NULL,
      nwsPort = NULL,
      nwsPortRemote = NULL,
      outfile = NULL,
      launch = 'local',
      workerCount = NULL,
      nodeList = NULL,
      scriptExec = scriptcmd,
      wrapperDir = scriptDir,
      scriptDir = scriptDir,
      scriptName = 'RNWSSleighWorker.py',
      workerWrapper = 'BackgroundLaunch.py',
      workingDir = getwd(),
      logDir = NULL,
      user = NULL,
      passwd = NULL,
      wsNameTemplate = 'sleigh_ride_%04d',
      userWsNameTemplate = 'sleigh_user_%04d',
      verbose = FALSE,
      rprog = rprog,
      python = python,
      pythonOpts = c('-E', '-W', 'ignore::DeprecationWarning'),
      extraPythonModules = extraPythonModules,
      serverInfo = NULL,
      rngType = 'legacy',
      rngSeed = NULL,
      fixedargSize = 10240,
      workspaceVersion = 0
      )
}

####
# sleigh class
#
# represents a collection R processes running on a simple network of
# workstation pulling in tasks and generating results.


setClass('sleigh',
         representation(nodeList='character', nws='netWorkSpace',
                        userNws='netWorkSpace', nwsName='character',
                        userNwsName='character', nwss='nwsServer',
                        options='environment', state='environment',
                        newSleigh='logical'),
         prototype(nws=NULL, userNws=NULL, nwss=NULL))

setMethod('initialize', 'sleigh',
function(.Object, ...) {

  argList = list(...)

  # backward compatibility
  # check if nodeList is specified in the old way
  if (length(argList) > 0) {
    argName = names(argList[1])
    if (is.null(argName) || nchar(argName) == 0) {
      if (!is.vector(argList[[1]]))
        stop('argument 1 has no name and is not a vector')
      names(argList)[1] = 'nodeList'
      warning('nodeList should be passed using named variable, nodeList')
    }
  }

  # sanity check the optional arguments
  unrecog = names(argList)[!names(argList) %in% ls(defaultSleighOptions)]
  if (length(unrecog) > 0)
    stop('unused argument(s) ', paste(unrecog, collapse=', '))

  .Object@options = new.env()
  blendOptions(.Object@options, as.list(defaultSleighOptions))
  blendOptions(.Object@options, argList)

  if (is.null(.Object@options$serverInfo)) {
    if (!is.null(.Object@options$nwsHost))
      warning('use of the nwsHost parameter is deprecated, use serverInfo=serverInfo',
              call.=FALSE)
    if (!is.null(.Object@options$nwsPort))
      warning('use of the nwsPort parameter is deprecated, use serverInfo=serverInfo',
              call.=FALSE)

    if (!is.null(.Object@options$nwsHost)
        && !is.null(.Object@options$nwsPort))
      .Object@options$serverInfo <- serverInfo(host=.Object@options$nwsHost,
                                               port=.Object@options$nwsPort)
    else if (!is.null(.Object@options$nwsHost))
      .Object@options$serverInfo <- serverInfo(host=.Object@options$nwsHost)
    else if (!is.null(.Object@options$nwsPort))
      .Object@options$serverInfo <- serverInfo(port=.Object@options$nwsPort)
    else
    .Object@options$serverInfo <- getServer()
  }

  .Object@options$nwsHost <- nwsHost(.Object@options$serverInfo)
  .Object@options$nwsPort <- nwsPort(.Object@options$serverInfo)

  # Even though we have just written back the host and port in the above
  # statements, we are going to use the serverInfo again when creating
  # the nwsServer object (we have to do this in order to hold on to the
  # deadman.  Thus it's really important to not change the host and port
  # in the next fifteen lines.  There's not really a reason to anyway.

  opts = .Object@options

  .Object@state = new.env()
  .Object@state$bx = 1
  .Object@state$occupied = FALSE
  .Object@state$stopped = FALSE
  .Object@state$launch = opts$launch
  .Object@state$totalTasks = 0
  .Object@state$rankCount = 0
  .Object@state$job = 0

  if (!is.function(opts$launch) &&  !is.character(opts$launch)) {
    stop('unknown launch protocol')
  }
  else if (is.character(opts$launch) &&
      !opts$launch %in% c('local', 'service')) {
    stop('unsupported launch protocol')
  }

  if(length(which(RNG.TYPES.AVAILIBLE == opts$rngType)) == 0)
    stop('rng type not supported')
  
  # set up the sleigh's netWorkSpace.
  .Object@nwss <- nwsServer(serverInfo=.Object@options$serverInfo)
  meta = list(wstype='sleigh')
  .Object@nwsName = nwsMktempWs(.Object@nwss, opts$wsNameTemplate, wsmetadata=meta)
  .Object@userNwsName = nwsMktempWs(.Object@nwss, opts$userWsNameTemplate)
  # NEED TO ADD ERROR HANDLING CODE HERE
  .Object@nws = nwsOpenWs(.Object@nwss, .Object@nwsName)
  .Object@userNws = nwsOpenWs(.Object@nwss, .Object@userNwsName)
  .Object@state$workspaceVersion <- nwsFindTry(.Object@nws,'version',defaultVal=0)
 
  # check if this uses a new sleigh workspace, or a generic workspace
  version <- nwsFindTry(.Object@nws, 'version')
  .Object@newSleigh <- !is.null(version)
  if (opts$verbose) {
    if (.Object@newSleigh)
      cat(sprintf('using version %s sleigh workspace\n', version))
    else
      cat('using generic (old-style) workspace\n')
  }

  nwsDeclare(.Object@nws, 'exported', 'fifo')

  if (is.function(opts$launch) || opts$launch == 'local') {
    if (is.null(opts$workerCount)) {
      .Object@nodeList = if (is.null(opts$nodeList))
                           rep('localhost', 3) else opts$nodeList
    }
    else {
      .Object@nodeList = if (is.null(opts$nodeList))
                           rep('localhost', opts$workerCount)
                         else
                           rep(opts$nodeList, length=opts$workerCount)
    }
    .Object@state$workerCount = length(.Object@nodeList)

    if (.Object@state$workerCount < 1) {
      close(.Object@nwss)
      stop('must have at least one worker in a sleigh')
    }

    if (.Object@newSleigh) {
      nwsStore(.Object@nws, 'workerCount',
               as.character(.Object@state$workerCount))
    }

    nodes <- sort(.Object@nodeList)
    rnodes <- rle(nodes)
    id <- 0

    # Check to see if the seed is still null.  We want to make sure
    # that each sleigh has a unique seed.
    if (is.null(opts$rngSeed)) opts$rngSeed <- as.numeric(Sys.time())
    
    for (i in seq(along=rnodes$values)) {
      nodeName <- rnodes$values[i]
      nodeOpts <- new.env()
      blendOptions(nodeOpts, as.list(opts))
      if (exists(nodeName, envir=nodeSleighOptions, mode='list',inherits=FALSE))
        blendOptions(nodeOpts,
            get(nodeName, envir=nodeSleighOptions, mode='list', inherits=FALSE))
      if (exists('nodeName', envir=nodeOpts, inherits=FALSE))
        nodeName <- get('nodeName', envir=nodeOpts, inherits=FALSE)
      if (nodeOpts$verbose)
        nodeOpts$outfile = sprintf('%s_%04d.txt', .Object@nwsName, i)

      addWorker(nodeName, .Object@nwsName, .Object@userNwsName,
                id, .Object@state$workerCount, rnodes$lengths[i], nodeOpts)
      id <- id + rnodes$lengths[i]
    }
  }
  else if (opts$launch == 'service') {
    # remote launch using the "R Sleigh Service"
    service = tryCatch({
        nwsUseWs(.Object@nwss, 'RSleighService', create=FALSE)
      }, error=function(e) {
        close(.Object@nwss)
        stop('no sleigh services are running', call.=FALSE)
      })

    wsvars = nwsListVars(service, showDataFrame=TRUE)
    regworkers = unlist(wsvars$Variable, use.names=FALSE)
    user = if (is.null(opts$user)) Sys.info()[['login']] else opts$user

    # Note: we are only allowing execution on non-administrator sleigh services
    myworkers = regworkers[grep(paste('^', user, '@.', sep=''), regworkers)]

    if (!is.null(opts$nodeList)) {
      warning('ignoring user specified nodeList')
    }

    if (length(myworkers) < 1 || (!is.null(opts$workerCount) && opts$workerCount < 1)) {
      close(.Object@nwss)
      stop('must have at least one worker in a sleigh')
    }

    .Object@nodeList = if (!is.null(opts$workerCount))
                         rep(myworkers, length.out=opts$workerCount) 
                       else myworkers
    .Object@state$workerCount = length(.Object@nodeList)

    b = function(x) if (is.null(x)) '' else x
    nodes <- sort(.Object@nodeList)
    id <- 0
    rnodes <- rle(nodes)
    for (i in seq(along=rnodes$values)) {
      if (opts$verbose)
        opts$outfile = sprintf('%s_%04d.txt', .Object@nwsName, i)
      # XXX is '@' the best delimiter?
      request = sprintf('@%s@%s@%d@%d@%s@%s@%s@%s@%d@',
                        .Object@nwsName,
                        .Object@userNwsName,
                        .Object@state$workerCount,
                        id,
                        b(opts$workingDir),
                        b(opts$outfile),
                        b(opts$logDir),
                        user,
                        rnodes$lengths[i])
      id <- id + rnodes$lengths[i]
      if (opts$verbose)
        cat('command:', request, '\n')

      nwsStore(service, rnodes$values[i], request)
    }
  }
  else if (opts$launch == 'web') {
    cat(sprintf("Your ride is %s, don't forget 'DeleteMe...'.\n", .Object@nwsName))
    nwsStore(.Object@nws, 'runMe', sprintf("library(nws); launch('%s', '%s', %d, userNwsName='%s')",
                                           .Object@nwsName, opts$nwsHost, opts$nwsPort,
                                           .Object@userNwsName))

    tryCatch(nwsFetch(.Object@nws, 'deleteMeWhenAllWorkersStarted'), error=function(...) 0)
    nwsDeleteVar(.Object@nws, 'runMe')

    # XXX this is broken
    .Object@state$workerCount = nwsFetch(.Object@nws, 'rankCount')
    nwsStore(.Object@nws, 'workerCount', .Object@state$workerCount)
    nwsStore(.Object@nws, 'rankCount', -1)
    .Object@state$rankCount = -1
  }

  .Object
})

setMethod('show', 'sleigh', function(object) {
  cat('\n')
  cat('NWS Sleigh Object\n')
  show(object@nws)
  cat(object@state$workerCount, ' Worker Nodes:\t',
      paste(object@nodeList, collapse=', '), '\n', sep='')
  cat('\n')
})

# return a list that contains two values: workerCount and status
setGeneric('status', function(.Object, closeGroup=FALSE, timeout=0) standardGeneric('status'))
setMethod('status', 'sleigh',
function(.Object, closeGroup=FALSE, timeout=0) {
  if (!is.logical(closeGroup))
    stop('the type of the closeGroup argument must be logical')

  if (!is.numeric(timeout))
    stop('the type of the timeout argument must be numeric')

  if (!.Object@newSleigh) {
    warning('status method requires nws server 2.0 with sleigh workspace plugin to be fully functional')
    return(list(numWorkers=.Object@state$workerCount, closed=TRUE))
  }

  if (.Object@state$rankCount < 0) {
    # join phase completed before
    list(numWorkers=workerCount(.Object), closed=TRUE)
  }
  else {
    if (closeGroup) {
      # set the timeout, wait for workers, close the group, and get the count
      nwsStore(.Object@nws, 'status', as.character(timeout))
      nwsFind(.Object@nws, 'status')
      numWorkers <- as.integer(nwsFind(.Object@nws, 'workerCount'))
      closed <- TRUE
    }
    else {
      # wait for workers using timeout, and get the join status
      meta <- list(delay=as.character(timeout))
      numWorkers <- as.integer(nwsFetch(.Object@nws, 'waitForWorkers',
                                        metadata=meta))
      stat <- nwsFind(.Object@nws, 'join_status')
      closed <- switch(stat,
                       open=, joining=FALSE,
                       closed=TRUE,
                       stop('illegal join status'))
    }

    if (closed) {
      # reset the sleigh's workerCount and rankCount
      .Object@state$workerCount <- numWorkers
      .Object@state$rankCount <- -1
    }

    list(numWorkers=numWorkers, closed=closed)
  }
})

close.sleigh <- function(con, ...) stopSleigh(con)

setGeneric('stopSleigh', function(.Object) standardGeneric('stopSleigh'))
setMethod('stopSleigh', 'sleigh', function(.Object) {
  if (.Object@state$stopped) return (invisible(NULL))

  if (!is.function(.Object@state$launch) &&
      identical(.Object@state$launch, 'web')) {
    nwsDeleteVar(.Object@nws, 'task')
  }
  else {
    nwsStore(.Object@nws, 'Sleigh ride over', 1)
  }
  Sys.sleep(3)
  exitCount = 0
  while (!is.null(nwsFetchTry(.Object@nws, 'bye'))) {
    exitCount = exitCount + 1
  }
  if (exitCount != .Object@state$workerCount) {
    cat(sprintf('Only %d of %d have exited.\n', exitCount, .Object@state$workerCount))
  }
  nwsDeleteWs(.Object@nwss, .Object@nwsName)
  close(.Object@nwss)
  .Object@state$stopped = TRUE
})


# run fun once on each worker of the sleigh. pass in a val from the
# range 1:#Workers
setGeneric('eachWorker',
           function(.Object, fun, ..., eo=NULL, DEBUG=FALSE) standardGeneric('eachWorker'))
setMethod('eachWorker', 'sleigh',
function(.Object, fun, ..., eo=NULL, DEBUG=FALSE) {
  if (DEBUG) browser()

  if (.Object@state$rankCount == -1 && .Object@state$workerCount < 1) {
    stop(paste('worker group has been closed, and we have', .Object@state$workerCount, 'workers'))
  }

  if (.Object@state$occupied) {
    stop('sleigh is occupied')
  }

  if (.Object@state$stopped) {
    stop('sleigh is stopped')
  }

  fun <- fun # need to force the argument (NJC: why?)
  force(list(...))  # catch errors before we attempt to submit tasks

  nws = .Object@nws
  wc = .Object@state$workerCount

  blocking = TRUE
  accumulator = NULL
  closure = NULL
  if (!is.null(eo)) {
    if (is.environment(eo) || is.list(eo)) {
      if (!is.null(eo$blocking)) blocking = as.logical(eo$blocking)
      accumulator = eo$accumulator
      if (!is.null(eo$closure)) closure = as.logical(eo$closure)

      # check for unknown options
      if (is.list(eo)) {
        eo$blocking <- eo$accumulator <- eo$closure <- NULL
        if (length(eo) > 0)
          warning('ignoring unknown option(s): ',
            paste('"', names(eo), '"', sep='', collapse=', '))
      }
    }
    else {
      stop('options arg must be a list or environment')
    }
  }

  # issue a warning if fun seems like a closure, and they aren't
  # explicitly enabled via the closure option.
  if (is.null(closure)) {
    closure <- TRUE
    if (isClosure(fun))
      warning('"fun" argument looks like a closure without enabling ',
        'via closure option', immediate.=TRUE)
  }

  # remove the enclosing environment of the function if closures are not
  # allowed.
  if (!closure)
    environment(fun) <- globalenv()

  # use alternating barrier to sync eachWorker invocations with the workers.
  bx = .Object@state$bx
  bn = barrierNames[[bx]]
  .Object@state$bx = bx%%2 + 1

  if (!.Object@newSleigh)
    nwsFetchTry(.Object@nws, bn)

  # allocate a new job id
  job = .Object@state$job
  .Object@state$job = job + 1

  # submit the tasks
  tryCatch({
      if (.Object@newSleigh)
        storeTask(tag=99999, nws=nws, fun=fun, args=list(list(...)), barrier=FALSE,
                  job=job, varName='broadcast')
      else
        lapply(1:wc, storeTask, nws=nws, fun=fun, args=list(list(...)), barrier=TRUE,
               job=job)
    },
    error=function(e) {
      if (!.Object@newSleigh) {
        # XXX we can recover from sending no tasks, but some tasks is bad.
        # XXX it would be nice to detect that situation.
        nwsStore(.Object@nws, bn, 1)
        stop('error sending tasks may have corrupted this sleigh', call.=FALSE)
      }
    })

  # update the total number of submitted tasks
  .Object@state$totalTasks <- .Object@state$totalTasks + wc

  if (!blocking) {
    if (!.Object@newSleigh)
      .Object@state$occupied = TRUE
    func = if (is.null(accumulator)) as.function(list(NULL)) else accumulator
    return (new('sleighPending', nws, wc, wc, func, bn, .Object@state,
                .Object@newSleigh, job))
  }

  val <- if (is.null(accumulator)) vector('list', wc) else NULL
  accumargs = try(length(formals(accumulator)))

  for (i in 1:wc) {
    repeat {
      p = nwsFetch(nws, 'result', metadata=list(batchId=as.character(job)),
                   pkgResult=TRUE)
      r = p$data
      metadata = p$metadata

      # check for a real worker result
      if (is.list(r) && r$type == 'VALUE') {
        break
      }

      if (!is.null(metadata) && metadata$nwsNull == '1') {
        r = list(value=list(NULL), rank=as.integer(metadata$nwsWorkerRank))
        warning(sprintf('returning a NULL result since worker %d is dead', r$rank[[1]]))
        break
      }
    }
    if (is.null(accumulator)) {
      val[r$rank + 1] = r$value
    }
    else {
      if (accumargs == 0)
        accumulator()
      else if (accumargs == 1)
        accumulator(r$value)
      else
        accumulator(r$value, r$rank + 1)
    }
  }

  if (!.Object@newSleigh)
    nwsStore(.Object@nws, bn, 1)

  val
})


# run fun once for each element of a vector.
setGeneric('eachElem',
           function(.Object, fun, elementArgs=list(), fixedArgs=list(),
                    eo=NULL, DEBUG=FALSE) standardGeneric('eachElem'))
setMethod('eachElem', 'sleigh',
function(.Object, fun, elementArgs=list(), fixedArgs=list(), eo=NULL, DEBUG=FALSE) {
  if (DEBUG) browser()

  if (.Object@state$rankCount == -1 && .Object@state$workerCount < 1) {
    stop(paste('worker group has been closed, and we have', .Object@state$workerCount, 'workers'))
  }

  if (.Object@state$occupied) {
    stop('sleigh is occupied')
  }

  if (.Object@state$stopped) {
    stop('sleigh is stopped')
  }

  fun <- fun # need to force the argument (NJC: why?)

  nws = .Object@nws
  wc = .Object@state$workerCount

  argPermute = NULL
  blocking = TRUE
  lf = 0
  by = "row"
  chunkSize = 1
  accumulator = NULL
  elementFunc = NULL
  closure = NULL
  if (!is.null(eo)) {
    if (is.environment(eo) || is.list(eo)) {
      argPermute = eo$argPermute
      if (!is.null(eo$blocking)) blocking = as.logical(eo$blocking)
      if (!is.null(eo$loadFactor)) lf = as.numeric(eo$loadFactor)
      if (!is.null(eo$by)) by = match.arg(eo$by, c('row', 'column', 'cell'))
      if (!is.null(eo$chunkSize)) chunkSize = max(eo$chunkSize, 1)
      accumulator = eo$accumulator
      elementFunc = eo$elementFunc
      if (!is.null(eo$closure)) closure = as.logical(eo$closure)

      # check for unknown options
      if (is.list(eo)) {
        eo$argPermute <- eo$blocking <- eo$loadFactor <- eo$by <-
          eo$chunkSize <- eo$accumulator <- eo$elementFunc <- eo$closure <- NULL
        if (length(eo) > 0)
          warning('ignoring unknown option(s): ',
            paste('"', names(eo), '"', sep='', collapse=', '))
      }
    }
    else {
      stop('options arg must be a list or environment')
    }
  }

  # issue a warning if fun seems like a closure, and they aren't
  # explicitly enabled via the closure option.
  if (is.null(closure)) {
    closure <- TRUE
    if (isClosure(fun))
      warning('"fun" argument looks like a closure without enabling ',
        'via closure option', immediate.=TRUE)
  }

  # remove the enclosing environment of the function if closures are not
  # allowed.
  if (!closure)
    environment(fun) <- globalenv()

  if (!is.list(elementArgs)) elementArgs = list(elementArgs)
  if (!is.list(fixedArgs)) fixedArgs = list(fixedArgs)
  # FixedArgs Optimization - open context
  if ((.Object@state$workspaceVersion >= 1) & (length(fixedArgs) > 0)
      & (object.size(fixedArgs) > .Object@options$fixedargSize)) {
    for(i in seq(length(fixedArgs))) {
      fid <- nwsFetch(nws,'fixedargID')
      nwsStore(nws,fid,fixedArgs[[i]])
      fixedArgs[i] <- list(fixedargHolder(fid))
    }
  }

  if (length(elementArgs) > 0) {
    if (!is.null(elementFunc)) stop('elementFunc cannot be used with elementArgs')

    allTasks = unlist(lapply(elementArgs, countElement, by=by))
    # this allows for functions to be included, even though they aren't now
    numTasks = max(-1, allTasks, na.rm=TRUE)
    if (numTasks < 0) {
      numTasks = NA
    }
    else {
      # cat('got', numTasks, 'tasks\n')

      # check the length of the arguments
      for (taskLen in allTasks) {
        if (!is.na(taskLen) && numTasks %% taskLen != 0) {
          warning('elementArgs contains arguments of inconsistent length')
          break
        }
      }

      # update the total number of submitted tasks
      .Object@state$totalTasks <- .Object@state$totalTasks + numTasks
    }
  }
  else if (!is.null(elementFunc)) {
    numTasks = NA
    nargs = length(formals(elementFunc))
    if (nargs > 2) stop('specified elementFunc function takes too many arguments')
    startingTasks = .Object@state$totalTasks
  }
  else {
    stop('either elementArgs or elementFunc must be specified')
  }

  if (blocking && lf > 0) {
    submitLimit = lf * wc
    if (submitLimit < wc) {
      submitLimit = wc
    }
  }
  else {
    submitLimit = Inf
  }

  allSubmitted = FALSE
  numSubmitted = 0
  numReturned = 0

  tag = 1
  currentTasks = 0

  val <- if (is.null(accumulator)) list() else NULL
  accumargs = try(length(formals(accumulator)))

  # allocate a new job id
  job = .Object@state$job
  .Object@state$job = job + 1

  while (!allSubmitted || numReturned < numSubmitted) {
    if (!allSubmitted) {
      while (!allSubmitted && numSubmitted < submitLimit) {
        if (!is.null(elementFunc)) {
          argchunk = list()
          for (j in 1:chunkSize) {
            varArgs <- tryCatch({
                if (nargs == 0)
                  elementFunc()
                else if (nargs == 1)
                  elementFunc(currentTasks + 1)
                else
                  elementFunc(currentTasks + 1, by)
              }, error=function(e) {
                allSubmitted <<- TRUE
                if (any(nzchar(e$message)))
                  warning(e$message)
                NULL
              })
            if (allSubmitted) break
            if (!is.list(varArgs)) varArgs = list(varArgs)
            args = c(varArgs, fixedArgs)
            if (!is.null(argPermute)) args = args[argPermute]
            # cat('[function case] args:', paste(args, collapse=' '), '\n')
            argchunk[[j]] = args
            currentTasks = currentTasks + 1
          }
        }
        else {
          nTasks = min(numTasks - currentTasks, chunkSize)
          if (nTasks <= 0) {
            argchunk = list()
            allSubmitted = TRUE
          }
          else {
            if (nTasks > 1) {
              v = currentTasks:(currentTasks + nTasks - 1)
              varArgsChunk = lapply(1:length(elementArgs), function(j) {
                iv <- if (is.na(allTasks[j])) v + 1 else v %% allTasks[j] + 1
                getChunk(elementArgs[[j]], iv=iv, by=by)
              })
              argchunk = lapply(1:nTasks, function(i) {
                varArgs = lapply(varArgsChunk, getElement, i=i, by=by)
                args = c(varArgs, fixedArgs)
                if (!is.null(argPermute)) args = args[argPermute]
                # cat('[chunk case] args:', paste(args, collapse=' '), '\n')
                args
              })
            }
            else {
              varArgs = lapply(1:length(elementArgs), function(j) {
                i <- if(is.na(allTasks[j])) currentTasks + 1 else currentTasks %% allTasks[j] + 1
                getElement(elementArgs[[j]], i=i, by=by)
              })
              args = c(varArgs, fixedArgs)
              if (!is.null(argPermute)) args = args[argPermute]
              # cat('[standard case] args:', paste(args, collapse=' '), '\n')
              argchunk = list(args)
            }
            currentTasks = currentTasks + nTasks
          }
        }

        if (length(argchunk) > 0) {
          numSubmitted = numSubmitted + 1
          storeTask(nws, fun, argchunk, tag=tag, barrier=FALSE, job=job)
          tag = tag + length(argchunk)
        }
      }

      if (!is.null(elementFunc)) {
        # update the total number of submitted tasks
        .Object@state$totalTasks <- startingTasks + currentTasks
      }

      if (!blocking) {
        if (!.Object@newSleigh)
          .Object@state$occupied = TRUE
        # cat(sprintf('returning sleighPending object for %d tasks\n', nt))

        # This is giving some contextual information that the sleighPending
        # constructor is expecting.  It is possible that at some point we
        # will want to move this kind of computation into the sleigh-internals.
        # PS 7/09
        fixedArgRep <- if(length(fixedArgs) > 0) {
          if (is(fixedArgs[[1]], 'fixedargHolder')) {fixedArgs}
          else {list()}
        }
        else {list()}

        func = if (is.null(accumulator)) as.function(list(NULL)) else accumulator
        return (new('sleighPending', nws, currentTasks, numSubmitted, func, '',
                    .Object@state, .Object@newSleigh, job, fixedArgRep))
      }
    }

    if (numReturned < numSubmitted) {
      repeat {
        p = nwsFetch(nws, 'result', metadata=list(batchId=as.character(job)),
                     pkgResult=TRUE)
        r = p$data
        metadata = p$metadata

        # ignore everything but 'VALUE' messages
        if (is.list(r) && r$type == 'VALUE') break

        if (!is.null(metadata)) {
          warning(sprintf('ignoring unexpected result with metadata: %s',
                          paste('"', names(metadata), '"', sep='', collapse=', ')))
        }
      }
      if (is.null(accumulator)) {
        val[r$tag:(r$tag + length(r$value) - 1)] = r$value
      }
      else {
        if (accumargs == 0)
          accumulator()
        else if (accumargs == 1)
          accumulator(r$value)
        else
          accumulator(r$value, r$tag:(r$tag + length(r$value) - 1))
      }
      numReturned = numReturned + 1
      submitLimit = submitLimit + 1  # this can become > the number of tasks
    }
  }

  # FixedArgs Optimization - close context
  for (i in fixedArgs) {
    if (class(i) == 'fixedargHolder') {
      nwsDeleteVar(nws,i@name)
    }
  }
  
  if (is.null(accumulator)) length(val) = currentTasks

  val
})


setGeneric('rankCount', function(.Object) standardGeneric('rankCount'))
setMethod('rankCount', 'sleigh', function(.Object) .Object@state$rankCount)

setGeneric('workerCount', function(.Object) standardGeneric('workerCount'))
setMethod('workerCount', 'sleigh', function(.Object) .Object@state$workerCount)

setGeneric('netWorkSpaceObject', function(.Object) standardGeneric('netWorkSpaceObject'))
setMethod('netWorkSpaceObject', 'sleigh', function(.Object) .Object@nws)

wsVarName <- function(name, worker) {
  if (is.null(worker)) {
    sprintf('env_%s', name)
  } else {
    sprintf('env_%d_%s', worker, name)
  }
}

setGeneric('export',
           function(.Object, xName, xVal, worker=NULL) standardGeneric('export'))
setMethod('export', 'sleigh',
function(.Object, xName, xVal, worker=NULL) {
  # sleigh error checking
  if (.Object@state$occupied) stop('sleigh is occupied')
  if (.Object@state$stopped) stop('sleigh is stopped')

  # argument error checking
  if (missing(xName)) stop('no value specified for xName argument')
  if (missing(xVal)) stop('no value specified for xVal argument')
  if (! is.character(xName)) stop('xName must be a character variable')
  if (! is.null(worker)) {
    if (! is.numeric(worker)) stop('worker value must be numeric')
    if (length(worker) > 1) stop('only one worker can be specified')
    if (worker < 0) stop('worker value must be positive')
    if (worker >= .Object@state$workerCount)
      stop('worker value is too large for this sleigh')
  }

  wsVar <- wsVarName(xName, worker)
  nwsDeclare(.Object@nws, wsVar, 'single')
  nwsStore(.Object@nws, wsVar, xVal)
  nwsStore(.Object@nws, 'exported', list(worker=worker, name=xName, wsVar=wsVar))
  invisible(NULL)
})

setGeneric('unexport',
           function(.Object, xName, worker=NULL) standardGeneric('unexport'))
setMethod('unexport', 'sleigh',
function(.Object, xName, worker=NULL) {
  # sleigh error checking
  if (.Object@state$occupied) stop('sleigh is occupied')
  if (.Object@state$stopped) stop('sleigh is stopped')

  # argument error checking
  if (missing(xName)) stop('no value specified for xName argument')
  if (! is.character(xName)) stop('xName must be a character variable')
  if (! is.null(worker)) {
    if (! is.numeric(worker)) stop('worker value must be numeric')
    if (length(worker) > 1) stop('only one worker can be specified')
    if (worker < 0) stop('worker value must be positive')
    if (worker >= .Object@state$workerCount) stop('worker value is too large')
  }

  wsVar <- wsVarName(xName, worker)
  tryCatch({
    nwsDeleteVar(.Object@nws, wsVar)
  }, error=function(e) {
    stop('cannot unexport a variable that was not exported: ', wsVar, call.=FALSE)
  })
  nwsStore(.Object@nws, 'exported', list(worker=worker, name=xName, wsVar=NULL))
  invisible(NULL)
})

setGeneric('workerInfo',
           function(.Object) standardGeneric('workerInfo'))
setMethod('workerInfo', 'sleigh',
function(.Object) {
  n <- .Object@state$workerCount
  host <- as.character(rep(NA, n))
  os <- as.character(rep(NA, n))
  pid <- as.integer(rep(NA, n))
  R <- as.character(rep(NA, n))
  nws <- as.character(rep(NA, n))
  rank <- as.integer(rep(NA, n))
  logfile <- as.character(rep(NA, n))

  it <- nwsIFindTry(.Object@nws, 'worker info')
  x <- it()
  while (! is.null(x)) {
    i <- as.integer(x$rank) + 1
    host[i] <- x$host
    os[i] <- x$os
    pid[i] <- as.integer(x$pid)
    R[i] <- x$R
    nws[i] <- x$nws
    rank[i] <- as.integer(x$rank)
    logfile[i] <- x$logfile
    x <- it()
  }
  data.frame(host=host, os=os, pid=pid, R=R, nws=nws,
             rank=rank, logfile=logfile, stringsAsFactors=FALSE)
})

if (!isGeneric('view'))
  setGeneric('view',
             function(.Object, ...) standardGeneric('view'))
setMethod('view', 'sleigh',
function(.Object, ws=c('system', 'user'), ...) {
  ws <- match.arg(ws)
  wsName <- switch(ws,
                   user = .Object@userNwsName,
                   .Object@nwsName)
  host <- nwsHost(.Object@options$serverInfo)
  port <- nwsWebPort(.Object@nwss)
  if (is.null(port))
    stop('the nws server does not have a web interface')

  vstring <- sprintf('http://%s:%d/doit?op=listVars&wsName=%s',
                     host, port, URLencode(wsName))
  browseURL(vstring)
  cat(sprintf("Viewing sleigh %s workspace '%s' on server '%s:%s' in your browser\n",
              ws, wsName, host, port))
  invisible(vstring)
})

setGeneric('userNws', function(.Object) standardGeneric('userNws'))
setMethod('userNws', 'sleigh', function(.Object) .Object@userNws)

setGeneric('connect',
function(.Object, master=Sys.info()[['nodename']], port=8787)
  standardGeneric('connect'))
setMethod('connect', 'sleigh',
function(.Object, master=Sys.info()[['nodename']], port=8787) {
  # XXX check port somehow?
  opts <- list(blocking=FALSE)
  sp <- eachWorker(.Object, connectionWorker, master, port, eo=opts)

  # set the timeout for socket connections to be 30 days
  one.month <- 30 * 24 * 60 * 60
  orig.timeout <- options(timeout=one.month)
  on.exit(options(orig.timeout))

  cons <- vector('list', workerCount(.Object))
  for (i in seq(along=cons)) {
    nwsStore(.Object@nws, sprintf('connect_%d', i - 1), 't')
    cons[i] <- list(socketConnection(port=port, server=TRUE,
                                     blocking=TRUE, open='a+b'))
  }

  # check the results for any errors
  results <- waitSleigh(sp)
  for (r in results) {
    if (! is.null(r)) {
      if ('message' %in% names(r))
        stop('worker error: ', r$message)
      else
        stop('worker error: ', as.character(r))
    }
  }

  # return the list of socket connection objects
  .Object@state$cons <- cons
})

connectionWorker <- function(master, port) {
  nws <- get('SleighNws', envir=globalenv())
  rank <- get('SleighRank', envir=globalenv())
  ackVar <- sprintf('connect_%d', rank)
  cat(sprintf('rank: %d, ackVar: %s\n', rank, ackVar), file=stderr())
  x <- nwsFetch(nws, ackVar)
  cat(sprintf('fetch value: %s\n', x), file=stderr())
  nwsDeleteVar(nws, ackVar)

  # set the timeout for socket connections to be 30 days
  one.month <- 30 * 24 * 60 * 60
  orig.timeout <- options(timeout=one.month)
  on.exit(options(orig.timeout))

  # try to create the socket connection up to four times
  con <- NULL
  for (i in 1:4) {
    tryCatch({
        con <- socketConnection(master, port=port, blocking=TRUE, open='a+b')
        cat("created socketConnection\n", file=stderr())
        break
      }, error=function(e) {
        cat('caught error calling socketConnection: ',
            as.character(e), file=stderr())
        Sys.sleep(2)
      })
  }

  assign('SleighSock', con, envir=globalenv())
  cat("SleighSock assigned\n", file=stderr())
  invisible(NULL)
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/sleighMan.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

.sleighEnv <- new.env()

getSleigh <- function(name='default') {
  k <- paste(name, 'Sleigh', sep='')
  o <- paste(name, 'Own', sep='')

  # create a sleigh and register it if there is no registered
  # sleigh, or if it has been stopped
  if (is.null(.sleighEnv[[k]]) || .sleighEnv[[k]]@state$stopped) {
    tryCatch({
      .sleighEnv[[k]] <- sleigh()
      .sleighEnv[[o]] <- TRUE
    },
    error=function(e) {
      warning('unable to create sleigh object, ',
              'possibly because NWS server not available',
              call.=FALSE)
    })
  }
  .sleighEnv[[k]]
}

setSleigh <- function(s, name='default') {
  k <- paste(name, 'Sleigh', sep='')
  o <- paste(name, 'Own', sep='')
  if (!is.null(.sleighEnv[[o]]) && .sleighEnv[[o]]) {
    stopifnot(!is.null(.sleighEnv[[k]]))
    stopSleigh(.sleighEnv[[k]])
  }
  .sleighEnv[[k]] <- s
  .sleighEnv[[o]] <- FALSE
  invisible(NULL)
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/sleighPending.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

####
# sleighPending class
#
# represents a sleigh eachWorker/eachElem invocation in progress.
setClass('sleighPending',
         representation(nws='netWorkSpace', numTasks='numeric',
                        numSubmitted='numeric', accumulator='function',
                        barrierName='character', sleighState='environment',
                        state='environment', newSleigh='logical',
                        job='character', fixedArgs='list'),
         prototype(nws=NULL))
setMethod('initialize', 'sleighPending',
function(.Object, nws, numTasks, numSubmitted, accumulator, bn, ss,
         newSleigh, job, fixedArgs=list()) {
  .Object@nws = nws
  .Object@numTasks = numTasks
  .Object@numSubmitted = numSubmitted
  .Object@accumulator = accumulator
  .Object@barrierName = bn
  .Object@sleighState = ss
  .Object@newSleigh = newSleigh
  .Object@job = as.character(job)
  .Object@state = new.env()
  .Object@state$done = FALSE
  .Object@fixedArgs = fixedArgs
  .Object
})

setMethod('show', 'sleighPending', function(object) {
  cat('\n')
  cat('NWS Sleigh Pending Object\n')
  show(object@nws)

  cat('Tasks submitted:', object@numSubmitted, '\n', sep='')

  status <- checkSleigh(object)
  if (status == 0)
    message <- 'Work completed.'
  else
    message <- paste(status, 'jobs still pending.')

  cat('Status:\t', message, '\n', sep='')
  cat('\n')
})

# return the number of results still outstanding.
setGeneric('checkSleigh', function(.Object) standardGeneric('checkSleigh'))
setMethod('checkSleigh', 'sleighPending',
function(.Object) {
  if (.Object@state$done) return (0) # could argue either way here... .

  if (.Object@newSleigh) {
    n <- nwsFetchTry(.Object@nws, 'jobStatus',
                     metadata=list(batchId=as.character(.Object@job)))
    as.numeric(n)  # XXX should this be numeric or integer?
  }
  else {
    v <- nwsListVars(.Object@nws, showDataFrame=TRUE)
    n <- tryCatch(v[v$Variable == 'result', 'NumValues'][[1]], error=function(e) 0)
    .Object@numSubmitted - n
  }
})

# collect all results.
#
# note: a lot of code is duplicated here and in the non-blocking sections of
# eachWorker and eachElem. refactor?
setGeneric('waitSleigh', function(.Object) standardGeneric('waitSleigh'))
setMethod('waitSleigh', 'sleighPending', function(.Object) {
  if (.Object@state$done) {
    stop('results already gathered.')
  }

  accum = if (is.null(body(.Object@accumulator))) NULL else .Object@accumulator
  val = if (is.null(accum)) vector('list', .Object@numTasks) else NULL
  accumargs = try(length(formals(accum)))  # results in 0 if accum is NULL

  if (.Object@numSubmitted > 0) {
    for (i in 1:.Object@numSubmitted) {
      repeat {
        r = nwsFetch(.Object@nws, 'result',
                     metadata=list(batchId=as.character(.Object@job)))
        # ignore everything but 'VALUE' messages
        if (is.list(r) && r$type == 'VALUE') break
      }

      # order results by rank for eachWorker, by tag for eachElem
      ind = if (.Object@barrierName != '') r$rank + 1 else r$tag

      if (is.null(accum)) {
        val[ind:(ind + length(r$value) - 1)] = r$value
      }
      else {
        if (accumargs == 0)
          try(accum())  # XXX should this be an error?
        else if (accumargs == 1)
          try(accum(r$value))
        else
          try(accum(r$value, ind:(ind + length(r$value) - 1)))
      }
    }
  }

  # store to the barrier if this is an old-style eachWorker
  if (!.Object@newSleigh) {
    if (.Object@barrierName != '')
      nwsStore(.Object@nws, .Object@barrierName, 1)
    .Object@sleighState$occupied = FALSE
  }
  .Object@state$done = TRUE

# FixedArgs Optimization
  for (i in .Object@fixedArgs) {
    if (class(i) == 'fixedargHolder') {
      nwsDeleteVar(.Object@nws,i@name)
    }
  }

  .Object@fixedArgs <- list()
  
  val
})
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/sleighPro.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

# this is now identical to the sleigh function,
# since we've moved the status method into the
# sleigh class.  we're keeping this function
# for compatibility with the previous version.
sleighPro <- function(...) {
  new("sleigh", ...)
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/whichcmd.R"
path.split <- function(path) {
  v <- strsplit(path, .Platform$path.sep, fixed=TRUE)[[1]]

  # handle a trailing path.sep specially because of the way
  # that strsplit works
  pathc <- strsplit(path, '')[[1]]
  if (length(pathc) > 0) {
    if (pathc[length(pathc)] == .Platform$path.sep)
      v[length(v) + 1] <- ''
  }
  else {
    v[1] <- ''
  }
  v
}

which.cmd <- function(cmd, path, verbose, pathext, allResults=FALSE) {
  # set default values of path and verbose, dealing with undefined
  # environment variables
  if (missing(path)) {
    path <- Sys.getenv('PATH')
    if (!nzchar(path)) path <- character(0)
  }

  if (missing(verbose))
    verbose <- nzchar(Sys.getenv('NWS_VERBOSE'))

  if (missing(pathext)) {
    ext <- paste(.Platform$path.sep, Sys.getenv('PATHEXT'), sep='')
    pathext <- strsplit(ext, .Platform$path.sep, fixed=TRUE)[[1]]
  }

  # split PATH into a vector (if necessary)
  pathv <- if (length(path) == 1) path.split(path) else path

  # change empty strings into dots
  pathv <- ifelse(pathv == '', '.', pathv)

  if (.Platform$OS.type == 'windows') {
    # trying to be consistent with the file.sep character
    pathv <- gsub('\\', .Platform$file.sep, pathv, fixed=TRUE)
  }

  # construct the appropriate file paths, and see if they exist
  cmdv <- file.path(pathv, rep(paste(cmd, pathext, sep=''),
                               each=length(pathv)))
  execv <- file.access(cmdv, 1)  # test for execute permission
  cmdv <- cmdv[execv == 0]

  if (allResults) {
    cmdv
  } else if (length(cmdv) > 0) {
    ## this results in spurious warnings on Windows if Python is
    ## in the registry and your PATH
    # if (length(cmdv) > 1) {
    #   msg <- sprintf('found multiple %s commands in your PATH', cmd)
    #   warning(msg, call.=FALSE)
    # }
    cmdv[1]
  } else {
    NULL
  }
}

pythonInstallations <- function(path, verbose) {
  # set default values of path and verbose, dealing with undefined
  # environment variables
  if (missing(path)) {
    path <- Sys.getenv('PATH')
    if (!nzchar(path)) path <- character(0)
  }

  if (missing(verbose))
    verbose <- nzchar(Sys.getenv('NWS_VERBOSE'))

  # split PATH into a vector (if necessary)
  pathv <- if (length(path) == 1) path.split(path) else path

  # prefer Python install directories to PATH directories
  if (.Platform$OS.type == 'windows') {
    # try to look up install paths of Python in the registry
    tmp <- pythonInstallPaths()

    if (verbose) {
      if (is.null(tmp)) {
        cat('Unable to read from registry in this version of R\n')
      }
      else if (length(tmp) > 0) {
        cat('Found the following Python installations in the registry:\n')
        for (p in tmp)
          cat(sprintf('  %s\n', p))
      }
      else {
        cat('Found no Python installation in the registry\n')
      }
    }
    pathv <- c(tmp, pathv)
  }

  tmp <- Sys.glob(file.path(dirname(R.home()), 'python*'))
  if (length(tmp) > 0) {
    if (verbose) {
      cat('Found the following Python installations next to R_HOME:\n')
      for (p in tmp)
        cat(sprintf('  %s\n', p))
    }
    pathv <- c(tmp, pathv)
  }

  # find all python executables in the path and the corresponding version
  pypath <- which.cmd('python', path=pathv, verbose=verbose, allResults=TRUE)
  nwssversion <- nwssVersion(pypath, verbose)
  pyversion <- pythonVersion(pypath, verbose)
  ord <- tryCatch(order(pyversion), error=function(e) integer(0))

  if (verbose) {
    if (length(pypath) > 0) {
      cat('Found the following Python executables:\n')
      for (i in ord)
        cat(sprintf('  %s [%s]\n', pypath[i], pyversion[i]))
    }
    else {
      cat('Did not find any Python executables\n')
    }
  }

  data.frame(pyversion=pyversion[ord], pypath=pypath[ord],
             nwssversion=nwssversion, stringsAsFactors=FALSE)
}

which.python <- function(path, verbose) {
  p <- pythonInstallations(path, verbose)
  if (nrow(p) > 0) {
    # pick the latest version of Python available
    i <- latestVersion(substring(p$pyversion, 7))
    p$pypath[i[1]]
  } else {
    NULL
  }
}

pythonVersion <- function(pypath, verbose) {
  tfname <- tempfile()
  tryCatch({
    # write a little Python script that prints its version number
    tf <- file(tfname, 'w')
    script <- c(
      'try:',
      '    import platform',
      '    major, minor, patchlevel = platform.python_version_tuple()',
      '    print "python%s.%s" % (major, minor)',
      'except:',
      '    pass'
    )
    writeLines(script, tf)
    close(tf)

    getVersion <- function(pythonexec) {
      # XXX double quoting the arguments seems to cause errors on Windows
      # cmd <- sprintf('%s %s', pythonexec, tfname)
      # XXX I'm giving it another try using argv2str
      cmd <- argv2str(c(pythonexec, '-E', tfname), NULL)
      if (verbose)
        cat(sprintf('executing command: %s\n', cmd))
      p <- pipe(cmd, 'r')
      d <- readLines(p)
      try(close(p))

      i <- grep('^python', d)
      if (length(i) > 0) {
        d[i[1]]
      } else {
        warning(sprintf('unable to determine python version of %s', pythonexec),
                call.=FALSE)
        'UNKNOWN'
      }
    }
    unlist(lapply(pypath, getVersion))
  },
  finally={
    # delete the little Python script
    unlink(tfname)
  })
}

nwssVersion <- function(pypath, verbose) {
  tfname <- tempfile()
  tryCatch({
    # write a little Python script that prints its version number
    tf <- file(tfname, 'w')
    script <- c(
      'try:',
      '    import twisted',
      '    import nwss',
      '    print "nwss%s" % nwss.__version__',
      'except:',
      '    print "nwss0"'
    )
    writeLines(script, tf)
    close(tf)

    getVersion <- function(pythonexec) {
      # XXX double quoting the arguments seems to cause errors on Windows
      # cmd <- sprintf('%s %s', pythonexec, tfname)
      # XXX I'm giving it another try using argv2str
      cmd <- argv2str(c(pythonexec, '-E', tfname), NULL)
      if (verbose)
        cat(sprintf('executing command: %s\n', cmd))
      p <- pipe(cmd, 'r')
      d <- readLines(p)
      try(close(p))

      i <- grep('^nwss', d)
      if (length(i) > 0) substring(d[i[1]], 5) else ''
    }
    unlist(lapply(pypath, getVersion))
  },
  finally={
    # delete the little Python script
    unlink(tfname)
  })
}

if (.Platform$OS.type == 'windows' && exists('readRegistry', mode='function')) {
  pythonInstallPaths <- function() {
    hivefun <- function(hive) {
      p <- tryCatch({
        readRegistry('Software\\REvolution\\ParallelR\\Python\\InstallPath',
                     hive)
      },
      error=function(e) {
        character(0)
      })
      revoReg <- if (length(p) > 0 && nzchar(p[[1]])) p[[1]] else NULL

      versions <- tryCatch({
        readRegistry('Software\\Python\\PythonCore', hive)
      },
      error=function(e) {
        list()
      })
      vnames <- names(versions)
      pathfun <- function(v) {
        key <- sprintf('Software\\Python\\PythonCore\\%s\\InstallPath', v)
        p <- tryCatch({
          readRegistry(key, hive)
        },
        error=function(e) {
          character(0)
        })
        if (length(p) > 0 && nzchar(p[[1]])) p[[1]] else NULL
      }
      pyReg <- unlist(lapply(vnames, pathfun))
      c(revoReg, pyReg)
    }
    unlist(lapply(c('HCU', 'HLM'), hivefun))
  }
} else {
  pythonInstallPaths <- function() NULL
}

# converts a vector of version strings into a list of integer vectors
versionSplit <- function(v) {
  lapply(strsplit(v, '[.-]'), as.integer)
}

# compares two version vectors, represented as character vectors
versionCmp <- function(a, b) {
  if (is.null(a) || is.null(b))
    return(integer(0))

  if (!is.character(a) || !is.character(b))
    stop('versionCmp takes character arguments')

  cmpfun <- function(v1, v2) {
    cmp <- 0L
    n <- max(length(v1), length(v2))
    for (i in seq(length=n)) {
      if (i > length(v1) || i > length(v2)) {
        cmp <- if (i > length(v1)) -1L else 1L
        break
      } else if (v1[i] > v2[i]) {
        cmp <- 1L
        break
      } else if (v1[i] < v2[i]) {
        cmp <- -1L
        break
      }
    }
    cmp
  }
  # mapply goes back to at least R 2.4.0
  r <- mapply(cmpfun, versionSplit(a), versionSplit(b), SIMPLIFY=TRUE)
  if (length(r) > 0) r else integer(0)
}

# returns the indices of the latest version in a character vector
latestVersion <- function(v) {
  if (is.null(v))
    return(integer(0))

  if (!is.character(v))
    stop('latestVersion takes a character argument')

  if (length(v) == 0) {
    integer(0)
  } else {
    latest <- 1L
    for (i in seq(along=v)[-1]) {
      cmp <- versionCmp(v[i], v[latest[1]])
      if (cmp > 0)
        latest <- i
      else if (cmp == 0)
        latest <- c(latest, i)
    }
    latest
  }
}

nwsserverPkgs <- function(verbose) {
  packages <- installed.packages()
  libPath <- unique(packages[packages[,1] == 'nwsserver', 'LibPath'])
  nwssDir <- Sys.glob(file.path(libPath, 'nwsserver*'))

  if (length(nwssDir) > 0) {
    getRecords <- function(pkgDir) {
      descr <- file.path(pkgDir, 'DESCRIPTION')
      pkgInfo <- read.dcf(descr, fields=c('Package', 'Version'))
      nsversion <- pkgInfo[1, 'Version']
      if (pkgInfo[1, 'Package'] == 'nwsserver' && !is.na(nsversion)) {
        getDirs <- function(nsDir) {
          pyversion <- basename(nsDir)
          nspath <- file.path(nsDir, 'Lib')
          data.frame(nsversion=nsversion, pyversion=pyversion, nspath=nspath,
                     stringsAsFactors=FALSE)
        }
        lapply(Sys.glob(file.path(pkgDir, 'python*')), getDirs)
      } else {
        list()
      }
    }
    p <- do.call('rbind', unlist(lapply(nwssDir, getRecords), recursive=FALSE))
    if (verbose) {
      cat('Found the following nwsserver installations:\n')
      print(p)
    }
    p
  } else {
    if (verbose) {
      cat('Found no nwsserver installations\n')
    }
    data.frame(nsversion=character(0), pyversion=character(0), nspath=character(0),
               stringsAsFactors=FALSE)
  }
}

# returns the nwsserver subdirectory and python interpreter path
# needed to start the nws server
pythonpath <- function() {
  verbose <- nzchar(Sys.getenv('NWS_VERBOSE'))
  quiet <- nzchar(Sys.getenv('NWS_QUIET'))
  result <- NULL

  tryCatch({
    # create data frames with information about the Python and
    # nwsserver package installations
    pytable <- pythonInstallations(verbose=verbose)
    nstable <- nwsserverPkgs(verbose=verbose)

    # join the two data frames on the "pyversion" column
    table <- merge(pytable, nstable)

    if (verbose && nrow(table) > 0) {
        cat('Result of merging Python and nwsserver installations:\n')
        print(table)
    }

    # reduce the table to only include the most up to date version of
    # the nwsserver package
    table <- table[latestVersion(table$nsversion),]

    # reduce the table to only include the most up to date version of Python
    table <- table[latestVersion(substring(table$pyversion, 7)),]

    if (nrow(table) > 0 && any(versionCmp(table$nsversion, '2') > 0)) {
      # extract the information we need into a 2-element list
      result <- table[1, c('nspath', 'pypath'), drop=TRUE]

      # mention that there are multiple choices
      if (nrow(table) > 1 && verbose)
        cat(sprintf('Found %d choices of (python, nwsserver) for Python %s\n',
                    nrow(table), substring(table[1, 'pyversion'], 7)))

      if (verbose)
        cat(sprintf('PYTHONPATH="%s" "%s"\n', result$nspath, result$pypath))
    } else if (any(versionCmp(pytable$nwssversion, '2') > 0)) {
      i <- latestVersion(pytable$nwssversion)
      pypath <- pytable$pypath[i[1]]
      result <- list(pypath=pypath, nspath='')

      # warn the user that there are multiple versions of the same package?
      if (length(i) > 1 && !quiet)
        cat(sprintf('Found %d Python installations with nwsserver package version %s\n',
                    length(i), pytable$nwssversion[i[1]]))

      if (verbose)
        cat(sprintf('PYTHONPATH="%s" "%s"\n', result$nspath, result$pypath))
    } else {
      if (!quiet) {
        if (nrow(pytable) == 0)
          cat('Unable to find any Python installations\n')
        else if (nrow(nstable) == 0)
          cat('Unable to find any nwsserver package installations\n')
        else
          cat('Unable to find any nwsserver installations with support',
              'for your Python interpreter\n')
      }
    }
  },
  error=function(e) {
    if (!quiet)
      cat(sprintf('caught exception: %s\n', conditionMessage(e)))
  })

  result
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/workerLoop.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

tostr <- function(obj) {
  rval <- NULL
  tc <- textConnection('rval', open='w')
  sink(tc)
  on.exit({sink(); close(tc)})
  print(obj)
  paste(rval, collapse='\n')
}

logMsg <- function(..., var) {
  msg <- sub('[[:space:]]+$', '', paste(..., sep='\n'))
  cat(msg, '\n')
  flush.console()
  logmsg <- try(sprintf('[%s] %s -- %s', date(),
    get('SleighName', globalenv()), msg))
  nwsStore(get('SleighNws', globalenv()), var, logmsg)
  invisible(NULL)
}

logError <- function(...) {
  logMsg(..., var='logError')
}

logDebug <- function(...) {
  logMsg(..., var='logDebug')
}

importVars <- function(iter, envir=globalenv()) {
  wsVars <- list()
  exp <- iter()
  while (! is.null(exp)) {
    if (is.null(exp$worker) ||
        any(get('SleighRank', globalenv()) %in% exp$worker)) {
      if (is.null(exp$wsVar)) {
        tryCatch({
          remove(list=exp$name, envir=envir)
        },
        warning=function(w) {
        },
        error=function(e) {
          logError(tostr(e))
        })
        wsVars[[exp$name]] <- NULL
      } else {
        wsVars[[exp$name]] <- exp$wsVar
      }
    }
    exp <- iter()
  }

  for (i in seq(along.with=wsVars)) {
    assign(names(wsVars[i]),
           nwsFind(get('SleighNws', globalenv()), wsVars[[i]]), envir)
  }
}

workerLoop <- function(nws, displayName, rank, workerCount, verbose, userNws, rngType,
                       rngSeed) {
  bx <- 1
  lastJob <- -1
  expiter <- tryCatch(nwsIFindTry(nws, 'exported'), error=function(e) NULL)
  
  # put these into global environment so both worker loop and worker
  # code have access
  assign('SleighName', displayName, globalenv())
  assign('SleighNws', nws, globalenv())
  assign('SleighUserNws', userNws, globalenv())
  assign('SleighRank', rank, globalenv())

  loadedRNG <- T
  # this needs to be delayed so that we don't force an immediate join phase
  delayedAssign('SleighWorkerCount', as.integer(nwsFind(nws, 'workerCount')),
                assign.env=globalenv())
  # initialize for random number generation
  setRNGSeed <- function(rngType, rngSeed) {
    if(rngType == 'legacy') {
      logDebug('using legacy random number generation')
      seedval <- (as.integer(rank) + as.numeric(rngSeed))
      set.seed(seedval)
    }
    else if(substr(rngType,1,5) == 'sprng') {
      if (require(sprngNWS, quietly=TRUE)) {
        logDebug('using sprngNWS for random number generation')
        if(rngType == 'sprngLFG') gtype <- 0
        else if(rngType == 'sprngLCG') gtype <- 1
        else if(rngType == 'sprngLCG64') gtype <- 2
        else if(rngType == 'sprngCMRG') gtype <- 3
        else if(rngType == 'sprngMLFG') gtype <- 4
        else {
          logError(sprintf('ERROR: This sprng generator type is not supported - shutting down'))
          loadedRNG <- F
        }
        streamno <- rank
        nstream <- workerCount
        seed <- as.numeric(rngSeed)   # XXX should be parameterizable
        param <- 0  # XXX (probably) should be parameterizable
        tryCatch({
          init.nwssprng(gtype, streamno, nstream, seed, param)
        },
        error=function(e) {
          logError(sprintf('Error calling init_nwssprng: %s - shutting down',
                           as.character(e)))
          loadedRNG <- F
        })
      }
      else {
        logError(sprintf('ERROR: sprngNWS not availible - shutting down'))
        loadedRNG <- F
      }
    }
    else {
      logError(sprintf('ERROR: this rngType is not supported'))
      loadedRNG <- F
    }
  }

  setRNGSeed(rngType, rngSeed)
  if (!loadedRNG)
    break

  repeat {
    # wait for a task to execute
    t <- tryCatch({
           nwsFetch(nws, 'task')
         }, error=function(e) {
           if (e$message != 'retrieval failed')
             logError(sprintf("Error getting task: %s - shutting down",
                              as.character(e)))
           NULL
         })

    if (is.null(t))
      break

    # sanity check
    if (!is.list(t) || t$type != 'EXEC') {
      logError("Bad task: ignoring", tostr(t))
      next
    }

    if (verbose)
      logDebug(sprintf("Got task %s", t$tag))

    if (t$job != lastJob) {
      fixedArgCache <- new.env()
      if (!is.null(expiter)) {
        importVars(expiter)
        lastJob <- t$job
      }
    }

    # Insert any fixedargs as needed
    arg <- t$data$args
    inum <- 0
    for (i in arg) { # Per task in chunk
      inum <- inum + 1
      jnum <- 0
      for (j in i) { # Per arg in task
        jnum <- jnum + 1
        if (is(j,'fixedargHolder')) {
          if (!is.null(fixedArgCache[[j@name]])) {
            fixedArgCache[[j@name]]
          }
          else {
            fixedArgCache[[j@name]] <- nwsFindTry(nws,j@name)
          }
          arg[[inum]][jnum] <- list(fixedArgCache[[j@name]])
        }
      }
    }

    
    # execute the task        
    dotask <- function(i) {
      tryCatch({
        docall(t$data$fun, arg[[i]])
      },
      error = function(e) {
        logError(as.character(e))
        ## would like to figure out a way to log a useful traceback
        ## but that information seems to be lost at this point
        # calls <- sys.calls()
        # logError(paste(limitedLabels(calls), collapse='\n'))
        # rm(calls)
        e
      })
    }

    tm <- system.time(value <- lapply(seq(arg), dotask))

    if (verbose)
      logDebug(sprintf("Task %s completed", t$tag))

    # send back the task results
    tryCatch({
      nwsStore(nws, 'result', list(type='VALUE', value=value, tag=t$tag,
               job=t$job, resubmitted=t$resubmitted, time=tm, rank=rank))
    },
    error=function(e) {
      # try to store the error object in case the failure was due to
      # a serialization error.  this will fail also if the workspace
      # has been deleted, or the server has crashed.
      logError(sprintf('Error returning result: %s', as.character(e)))
      nwsStore(nws, 'result', list(type='VALUE', value=e, tag=t$tag,
               job=t$job, resubmitted=t$resubmitted, time=tm, rank=rank))
    })

    if (t$barrier) {
      nwsFind(nws, barrierNames[[bx]])
      bx <- bx%%2 + 1
    }
  }
}
#line 1 "/tmp/buildd/r-cran-nws-2.0.0.3/R/zzz.R"
#
# Copyright (c) 2005-2008, REvolution Computing, Inc.
#
# NetWorkSpaces is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#

.nwsGlobals <- new.env()

nwsPkgInfo <- function() {
  c(name=.nwsGlobals$pkgName,
      version=packageDescription(.nwsGlobals$pkgName, fields='Version'))
}

.onLoad <- function(lib, pkg) {
  require(methods)
  require(utils)
  pkgpath <- file.path(lib, pkg)

  initServerEnv(pkgpath)

  blendOptions(defaultSleighOptions, computeDefaultSleighOptions(pkgpath))
  blendOptions(defaultServerInfoOptions,
               computeDefaultServerInfoOptions())
  blendOptions(defaultManagedServerInfoOptions,
               computeDefaultManagedServerInfoOptions())
  blendOptions(defaultNwsServerOptions, computeDefaultNwsServerOptions())
  blendOptions(defaultNetWorkSpaceOptions, computeDefaultNetWorkSpaceOptions())

  .nwsGlobals$pkgName <- pkg
}

