User Tools

Site Tools


r-connection-api

R Connection API

This document contains information about a proposed R connection API.

Rationale

  • Replacement for existing connections internals
  • Provide a mechanism to interface R with arbitrary data streams:
    • hardware interfaces (serial, USB, etc.)
    • novel/non-standard software interfaces (binary file formats)
  • Further modularize R core by pushing some (all?) connections into packages
  • Allow the R community to write new connections, taking advantage of the existing functionality:
    • R level read/write routines
    • Automagic character re-encoding

Connection API

The API designs presented below are partially motivated by R-devel posts (2006) by Jeff Horner.

Creation

The connection creation API consists of two functions, R_InitXConnection and R_RegisterXConnection, to be called in sequence. The first function allocates (R_alloc) and initializes the public (struct Xconn) connection structure. User code then sets function pointers to I/O methods. Finally, the second function is used to register the connection. Registration involves checking the validity of the connection structure, setting up garbage collection, and making the connection accessible in the R session. This setup has several good qualities:

  • extremely flexible for connection developers
  • no access to other connections
  • no access to private variables (encoding, etc.)
  • API has 'final say' before connections are registered

Code

Xconnections.h
/** maximum number of Xconnections **/
#define MAXXCONNS 128
 
typedef enum {
    XMODE_NULL,
    XMODE_R,
    XMODE_W,
    XMODE_RW,
    XMODE_A,
    XMODE_RA
} Xmode;
 
typedef struct Xconn  *Xconnection;
struct Xconn {
    int      (*open)(Xconnection, Xmode, const char *);
    void     (*close)(Xconnection);
    void     (*destroy)(Xconnection);
    int      (*seek)(Xconnection, long, int, int);
    int      (*flush)(Xconnection);
    void     (*truncate)(Xconnection);
    size_t   (*read)(void *, size_t, size_t, Xconnection);
    size_t   (*write)(const void *, size_t, size_t, Xconnection);
    SEXP     (*control)(Xconnection, SEXP);
    void     *data;
};
Xconnections.c
#include "Xconnections.h"
 
typedef struct Xconn_private Xprivate;
 
static Xprivate XConnections[MAXXCONNS];
 
/** private connection structure **/
 
struct Xconn_private {
    /** unique id **/
    int id;
    /** class name **/
    char* class;
    /** filename, url, etc. converted to CE_NATIVE (translateChar)**/
    char* description;
    /** character encoding name (iconv) **/
    char* encname;
    /** file operation mode **/
    Xmode mode;
    /** is connection open? **/
    Rboolean open;
    /** stuff for character encoding **/
    void *iiconv, *oiconv; /* iconv_t */
    char *iiconvbuff, *oiconvbuff, *inext, *onext;
    void *exptr;
    /** pointer to user connection structure **/
    Xconnection conn;
};
 
/** private functions **/
/** return index of next available member of XConnections, 
    return non-negative on success, negative on failure **/
static int NextXConnection(void);
 
/** cleanup a connection **/
static void DestroyXConnection(int);
 
/** initialize a struct Xconn_private **/
static void init_Xprivate(Xprivate);
 
/** initialize a struct Xconn **/
static void init_Xconnection(Xconnection);
 
/** cleanup memory associated with struct Xconn_private **/
static void free_Xprivate(Xprivate);
 
/** parse an R mode string **/ 
static Xmode parse_mode(const char *);
 
/** initialize character re-encoding return 0 on success, non-zero on fail**/
static int init_iconv(Xprivate);
 
/** C finalizer for struct Xconn_private **/
static void finalizer(SEXP exptr);
 
 
/** public functions **/
Xconnection R_InitXConnection(void);
SEXP        R_RegisterXConnection(char *, char *, char *, char *, Xconnection);
 
static int NextXConnection(void)
{
    int i;
    for(i = 0; i < MAXXCONNS; i++)
        if(!XConnections[i]) return i;
    return -1;
}
 
static void DestroyXConnection(int nconn)
{
    Xprivate priv = XConnections[nconn];
    if(priv) {
        XConnections[nconn] = NULL;
        if(priv->open && priv->conn->close)
            priv->conn->close(priv->conn);
        if(priv->conn->destroy)
            priv->conn->destroy(priv->conn);
        free_Xprivate(priv);
    }
}
 
static void init_Xprivate(Xprivate priv)
{
    priv->class = NULL;
    priv->description = NULL;
    priv->encname = NULL;
    priv->mode = XMODE_NULL;
    priv->open = FALSE;
    priv->iiconv = NULL;
    priv->oiconv = NULL;
    priv->iiconvbuff = NULL;
    priv->inext = NULL;
    priv->onext = NULL;
    priv->exptr = NULL;
    priv->xconn = NULL;
}
 
static void init_Xconnection(Xconnection conn)
{
    conn->open = NULL;
    conn->close = NULL;
    conn->destroy = NULL;
    conn->seek = NULL;
    conn->flush = NULL;
    conn->truncate = NULL;
    conn->read = NULL;
    conn->write = NULL;
    conn->control = NULL;
    conn->data = NULL;
}
 
static void free_Xprivate(Xprivate priv)
{
     if(priv) {
         if(priv->encname) free(priv->encname);
         if(priv->description) free(priv->description);
         if(priv->class) free(priv->class);
         if(priv->conn) free(priv->conn);
         free(priv);
     }
}
 
static int init_iconv(Xprivate priv)
{
    /* not yet implemented, disable */
    if(priv->encname)
        free(priv->encname)
    priv->encname = NULL;
    return 0;
}
 
static void finalizer(SEXP exptr)
{
    int i, nconn = -1;
    int *idptr = (int *) R_ExternalPtrAddr(exptr);
 
    for(i = 0; i < MAXXCONNS; i++) {
        if(XConnections[i] && &XConnections[i]->id == idptr) {
            nconn = i;
            break;
        }
    }
 
    if(nconn < 0) return;    
    warning("closing unused Xconnection %d (%s)", priv->id, priv->conn->description);
    DestroyXConnection(nconn);
    R_ClearExternalPtr(exptr); /* not really needed */
}
 
Xconnection R_InitXConnection(void) 
{   
     Xconnection conn = NULL
     conn = (Xconnection) R_alloc(1, sizeof(struct Xconn));
     init_Xconnection(conn);
     return conn;
}
 
static void register_fail(Xprivate priv, const char *msg)
{
     free_Xprivate(priv);
     error("failed to register Xconnection: %s", msg);
}
 
/* only encname may be NULL */
SEXP R_RegisterXConnection(char *class, char *description, char *mode, char *encname, Xconnection conn)
{
     Xprivate priv = NULL;
     SEXP ans, sclass;
     int nconn;
 
     /* FIXME check Xconnection */
 
     /* check for space in XConnections */
     nconn = NextXConnection();
     if(nconn < 0) register_fail(priv, "too many Xconnections");
 
     /* check conn, class, description, encname, mode, copy to priv */
     priv = (Xprivate) malloc(sizeof(struct Xconn_private));
     if(!priv) register_fail(priv, "memory allocation failed");
     init_Xprivate(priv);
 
     priv->conn = (Xconnection) malloc(sizeof(struct Xconn));
     if(!priv->conn) register_fail(priv, "memory allocation failed");
     memcpy(priv->conn, conn, sizeof(struct Xconn));
 
     priv->class = (char *) malloc(strlen(class) + 1);
     if(!priv->class) register_fail(priv, "memory allocation failed");
     strcpy(priv->class, class);
 
     priv->description = (char *) malloc(strlen(description) + 1);
     if(!priv->description) register_fail(priv, "memory allocation failed");
     strcpy(priv->description, description);
 
     if(encname) {
         priv->encname = (char *) malloc(strlen(encname) + 1);
         if(!priv->encname) register_fail(priv, "memory allocation failed");
         strcpy(priv->encname, encname);
         /* initialize character re-encoding */
         if(init_iconv(priv)) 
             register_fail(priv, "iconv initialization failed");
     }
 
     priv->mode = parse_mode(mode);
     if(priv->mode == XMODE_NULL) 
         register_fail(priv, "invalid mode argument");
 
     /* initialize finalizer code */
     priv->exptr = R_MakeExternalPtr(con->id, install("Xconnection"), R_NilValue);
     setAttrib(ans, install("exptr"), priv->exptr);
     R_RegisterCFinalizerEx(priv->exptr, finalizer, FALSE);
 
     /* store Xprivate pointer */
     XConnections[nconn] = priv;
 
     /* prepare return value */
     PROTECT(ans = allocVector(INTSXP, 1));
     INTEGER(ans)[0] = nconn;
     PROTECT(sclass = allocVector(STRSXP, 2));
     SET_STRING_ELT(sclass, 0, mkChar(priv->class));
     SET_STRING_ELT(sclass, 1, mkChar("Xconnection"));
     classgets(ans, sclass);
     UNPROTECT(2);
     return ans;
}

Example

SEXP nullConnection(SEXP description, SEXP mode, SEXP blocking, SEXP data)
{   
     SEXP ans;
     Xconnection conn;
 
     if(!isString(description) || length(description)!=1)
          error("invalid %s argument", "description");
     if(!isString(mode) || length(mode)!=1)
          error("invalid %s argument", "mode");
     if(!isLogical(blocking) || length(blocking)!=1)
          error("invalid %s argument", "blocking")
 
     conn =  R_InitXConnection();
 
     /* 
        - set blocking 
        - set I/O operation pointers
        - set encname
        - set private
     */
 
     return R_RegisterXConnection("null",\
                CHAR(STRING_ELT(description,0),\
                CHAR(STRING_ELT(mode,0)), CE_NATIVE, conn);
}

As a test case, this example code should compile as is, and not cause problems (other than R-level errors). This is the 'null' connection

r-connection-api.txt · Last modified: 2017/09/20 19:05 (external edit)