Logo Search packages:      
Sourcecode: postgresql-8.4 version File versions  Download package

txid.c

/*-------------------------------------------------------------------------
 * txid.c
 *
 *    Export internal transaction IDs to user level.
 *
 * Note that only top-level transaction IDs are ever converted to TXID.
 * This is important because TXIDs frequently persist beyond the global
 * xmin horizon, or may even be shipped to other machines, so we cannot
 * rely on being able to correlate subtransaction IDs with their parents
 * via functions such as SubTransGetTopmostTransaction().
 *
 *
 *    Copyright (c) 2003-2009, PostgreSQL Global Development Group
 *    Author: Jan Wieck, Afilias USA INC.
 *    64-bit txids: Marko Kreen, Skype Technologies
 *
 *    $PostgreSQL: pgsql/src/backend/utils/adt/txid.c,v 1.8 2009/01/01 17:23:50 momjian Exp $
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/transam.h"
#include "access/xact.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"


#ifndef INT64_IS_BUSTED
/* txid will be signed int8 in database, so must limit to 63 bits */
#define MAX_TXID   UINT64CONST(0x7FFFFFFFFFFFFFFF)
#else
/* we only really have 32 bits to work with :-( */
#define MAX_TXID   UINT64CONST(0x7FFFFFFF)
#endif

/* Use unsigned variant internally */
typedef uint64 txid;

/* sprintf format code for uint64 */
#define TXID_FMT UINT64_FORMAT

/*
 * If defined, use bsearch() function for searching for txids in snapshots
 * that have more than the specified number of values.
 */
#define USE_BSEARCH_IF_NXIP_GREATER 30


/*
 * Snapshot containing 8byte txids.
 */
00056 typedef struct
{
      /*
       * 4-byte length hdr, should not be touched directly.
       *
       * Explicit embedding is ok as we want always correct alignment anyway.
       */
      int32       __varsz;

      uint32            nxip;             /* number of txids in xip array */
      txid        xmin;
      txid        xmax;
      txid        xip[1];                 /* in-progress txids, xmin <= xip[i] < xmax */
} TxidSnapshot;

#define TXID_SNAPSHOT_SIZE(nxip) \
      (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))

/*
 * Epoch values from xact.c
 */
00077 typedef struct
{
      TransactionId last_xid;
      uint32            epoch;
} TxidEpoch;


/*
 * Fetch epoch data from xact.c.
 */
static void
load_xid_epoch(TxidEpoch *state)
{
      GetNextXidAndEpoch(&state->last_xid, &state->epoch);
}

/*
 * do a TransactionId -> txid conversion for an XID near the given epoch
 */
static txid
convert_xid(TransactionId xid, const TxidEpoch *state)
{
#ifndef INT64_IS_BUSTED
      uint64            epoch;

      /* return special xid's as-is */
      if (!TransactionIdIsNormal(xid))
            return (txid) xid;

      /* xid can be on either side when near wrap-around */
      epoch = (uint64) state->epoch;
      if (xid > state->last_xid &&
            TransactionIdPrecedes(xid, state->last_xid))
            epoch--;
      else if (xid < state->last_xid &&
                   TransactionIdFollows(xid, state->last_xid))
            epoch++;

      return (epoch << 32) | xid;
#else                                     /* INT64_IS_BUSTED */
      /* we can't do anything with the epoch, so ignore it */
      return (txid) xid & MAX_TXID;
#endif   /* INT64_IS_BUSTED */
}

/*
 * txid comparator for qsort/bsearch
 */
static int
cmp_txid(const void *aa, const void *bb)
{
      txid        a = *(const txid *) aa;
      txid        b = *(const txid *) bb;

      if (a < b)
            return -1;
      if (a > b)
            return 1;
      return 0;
}

/*
 * sort a snapshot's txids, so we can use bsearch() later.
 *
 * For consistency of on-disk representation, we always sort even if bsearch
 * will not be used.
 */
static void
sort_snapshot(TxidSnapshot *snap)
{
      if (snap->nxip > 1)
            qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
}

/*
 * check txid visibility.
 */
static bool
is_visible_txid(txid value, const TxidSnapshot *snap)
{
      if (value < snap->xmin)
            return true;
      else if (value >= snap->xmax)
            return false;
#ifdef USE_BSEARCH_IF_NXIP_GREATER
      else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
      {
            void     *res;

            res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
            /* if found, transaction is still in progress */
            return (res) ? false : true;
      }
#endif
      else
      {
            uint32            i;

            for (i = 0; i < snap->nxip; i++)
            {
                  if (value == snap->xip[i])
                        return false;
            }
            return true;
      }
}

/*
 * helper functions to use StringInfo for TxidSnapshot creation.
 */

static StringInfo
buf_init(txid xmin, txid xmax)
{
      TxidSnapshot snap;
      StringInfo  buf;

      snap.xmin = xmin;
      snap.xmax = xmax;
      snap.nxip = 0;

      buf = makeStringInfo();
      appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
      return buf;
}

static void
buf_add_txid(StringInfo buf, txid xid)
{
      TxidSnapshot *snap = (TxidSnapshot *) buf->data;

      /* do this before possible realloc */
      snap->nxip++;

      appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
}

static TxidSnapshot *
buf_finalize(StringInfo buf)
{
      TxidSnapshot *snap = (TxidSnapshot *) buf->data;

      SET_VARSIZE(snap, buf->len);

      /* buf is not needed anymore */
      buf->data = NULL;
      pfree(buf);

      return snap;
}

/*
 * simple number parser.
 *
 * We return 0 on error, which is invalid value for txid.
 */
static txid
str2txid(const char *s, const char **endp)
{
      txid        val = 0;
      txid        cutoff = MAX_TXID / 10;
      txid        cutlim = MAX_TXID % 10;

      for (; *s; s++)
      {
            unsigned    d;

            if (*s < '0' || *s > '9')
                  break;
            d = *s - '0';

            /*
             * check for overflow
             */
            if (val > cutoff || (val == cutoff && d > cutlim))
            {
                  val = 0;
                  break;
            }

            val = val * 10 + d;
      }
      if (endp)
            *endp = s;
      return val;
}

/*
 * parse snapshot from cstring
 */
static TxidSnapshot *
parse_snapshot(const char *str)
{
      txid        xmin;
      txid        xmax;
      txid        last_val = 0,
                        val;
      const char *str_start = str;
      const char *endp;
      StringInfo  buf;

      xmin = str2txid(str, &endp);
      if (*endp != ':')
            goto bad_format;
      str = endp + 1;

      xmax = str2txid(str, &endp);
      if (*endp != ':')
            goto bad_format;
      str = endp + 1;

      /* it should look sane */
      if (xmin == 0 || xmax == 0 || xmin > xmax)
            goto bad_format;

      /* allocate buffer */
      buf = buf_init(xmin, xmax);

      /* loop over values */
      while (*str != '\0')
      {
            /* read next value */
            val = str2txid(str, &endp);
            str = endp;

            /* require the input to be in order */
            if (val < xmin || val >= xmax || val <= last_val)
                  goto bad_format;

            buf_add_txid(buf, val);
            last_val = val;

            if (*str == ',')
                  str++;
            else if (*str != '\0')
                  goto bad_format;
      }

      return buf_finalize(buf);

bad_format:
      elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
      return NULL;
}

/*
 * Public functions.
 *
 * txid_current() and txid_current_snapshot() are the only ones that
 * communicate with core xid machinery.  All the others work on data
 * returned by them.
 */

/*
 * txid_current() returns int8
 *
 *          Return the current toplevel transaction ID as TXID
 */
Datum
txid_current(PG_FUNCTION_ARGS)
{
      txid        val;
      TxidEpoch   state;

      load_xid_epoch(&state);

      val = convert_xid(GetTopTransactionId(), &state);

      PG_RETURN_INT64(val);
}

/*
 * txid_current_snapshot() returns txid_snapshot
 *
 *          Return current snapshot in TXID format
 *
 * Note that only top-transaction XIDs are included in the snapshot.
 */
Datum
txid_current_snapshot(PG_FUNCTION_ARGS)
{
      TxidSnapshot *snap;
      uint32            nxip,
                        i,
                        size;
      TxidEpoch   state;
      Snapshot    cur;

      cur = GetActiveSnapshot();
      if (cur == NULL)
            elog(ERROR, "no active snapshot set");

      load_xid_epoch(&state);

      /* allocate */
      nxip = cur->xcnt;
      size = TXID_SNAPSHOT_SIZE(nxip);
      snap = palloc(size);
      SET_VARSIZE(snap, size);

      /* fill */
      snap->xmin = convert_xid(cur->xmin, &state);
      snap->xmax = convert_xid(cur->xmax, &state);
      snap->nxip = nxip;
      for (i = 0; i < nxip; i++)
            snap->xip[i] = convert_xid(cur->xip[i], &state);

      /* we want them guaranteed to be in ascending order */
      sort_snapshot(snap);

      PG_RETURN_POINTER(snap);
}

/*
 * txid_snapshot_in(cstring) returns txid_snapshot
 *
 *          input function for type txid_snapshot
 */
Datum
txid_snapshot_in(PG_FUNCTION_ARGS)
{
      char     *str = PG_GETARG_CSTRING(0);
      TxidSnapshot *snap;

      snap = parse_snapshot(str);

      PG_RETURN_POINTER(snap);
}

/*
 * txid_snapshot_out(txid_snapshot) returns cstring
 *
 *          output function for type txid_snapshot
 */
Datum
txid_snapshot_out(PG_FUNCTION_ARGS)
{
      TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
      StringInfoData str;
      uint32            i;

      initStringInfo(&str);

      appendStringInfo(&str, TXID_FMT ":", snap->xmin);
      appendStringInfo(&str, TXID_FMT ":", snap->xmax);

      for (i = 0; i < snap->nxip; i++)
      {
            if (i > 0)
                  appendStringInfoChar(&str, ',');
            appendStringInfo(&str, TXID_FMT, snap->xip[i]);
      }

      PG_RETURN_CSTRING(str.data);
}

/*
 * txid_snapshot_recv(internal) returns txid_snapshot
 *
 *          binary input function for type txid_snapshot
 *
 *          format: int4 nxip, int8 xmin, int8 xmax, int8 xip
 */
Datum
txid_snapshot_recv(PG_FUNCTION_ARGS)
{
      StringInfo  buf = (StringInfo) PG_GETARG_POINTER(0);
      TxidSnapshot *snap;
      txid        last = 0;
      int               nxip;
      int               i;
      int               avail;
      int               expect;
      txid        xmin,
                        xmax;

      /*
       * load nxip and check for nonsense.
       *
       * (nxip > avail) check is against int overflows in 'expect'.
       */
      nxip = pq_getmsgint(buf, 4);
      avail = buf->len - buf->cursor;
      expect = 8 + 8 + nxip * 8;
      if (nxip < 0 || nxip > avail || expect > avail)
            goto bad_format;

      xmin = pq_getmsgint64(buf);
      xmax = pq_getmsgint64(buf);
      if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
            goto bad_format;

      snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
      snap->xmin = xmin;
      snap->xmax = xmax;
      snap->nxip = nxip;
      SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));

      for (i = 0; i < nxip; i++)
      {
            txid        cur = pq_getmsgint64(buf);

            if (cur <= last || cur < xmin || cur >= xmax)
                  goto bad_format;
            snap->xip[i] = cur;
            last = cur;
      }
      PG_RETURN_POINTER(snap);

bad_format:
      elog(ERROR, "invalid snapshot data");
      return (Datum) NULL;
}

/*
 * txid_snapshot_send(txid_snapshot) returns bytea
 *
 *          binary output function for type txid_snapshot
 *
 *          format: int4 nxip, int8 xmin, int8 xmax, int8 xip
 */
Datum
txid_snapshot_send(PG_FUNCTION_ARGS)
{
      TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
      StringInfoData buf;
      uint32            i;

      pq_begintypsend(&buf);
      pq_sendint(&buf, snap->nxip, 4);
      pq_sendint64(&buf, snap->xmin);
      pq_sendint64(&buf, snap->xmax);
      for (i = 0; i < snap->nxip; i++)
            pq_sendint64(&buf, snap->xip[i]);
      PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}

/*
 * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
 *
 *          is txid visible in snapshot ?
 */
Datum
txid_visible_in_snapshot(PG_FUNCTION_ARGS)
{
      txid        value = PG_GETARG_INT64(0);
      TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);

      PG_RETURN_BOOL(is_visible_txid(value, snap));
}

/*
 * txid_snapshot_xmin(txid_snapshot) returns int8
 *
 *          return snapshot's xmin
 */
Datum
txid_snapshot_xmin(PG_FUNCTION_ARGS)
{
      TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);

      PG_RETURN_INT64(snap->xmin);
}

/*
 * txid_snapshot_xmax(txid_snapshot) returns int8
 *
 *          return snapshot's xmax
 */
Datum
txid_snapshot_xmax(PG_FUNCTION_ARGS)
{
      TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);

      PG_RETURN_INT64(snap->xmax);
}

/*
 * txid_snapshot_xip(txid_snapshot) returns setof int8
 *
 *          return in-progress TXIDs in snapshot.
 */
Datum
txid_snapshot_xip(PG_FUNCTION_ARGS)
{
      FuncCallContext *fctx;
      TxidSnapshot *snap;
      txid        value;

      /* on first call initialize snap_state and get copy of snapshot */
      if (SRF_IS_FIRSTCALL())
      {
            TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);

            fctx = SRF_FIRSTCALL_INIT();

            /* make a copy of user snapshot */
            snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
            memcpy(snap, arg, VARSIZE(arg));

            fctx->user_fctx = snap;
      }

      /* return values one-by-one */
      fctx = SRF_PERCALL_SETUP();
      snap = fctx->user_fctx;
      if (fctx->call_cntr < snap->nxip)
      {
            value = snap->xip[fctx->call_cntr];
            SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
      }
      else
      {
            SRF_RETURN_DONE(fctx);
      }
}

Generated by  Doxygen 1.6.0   Back to index