Logo Search packages:      
Sourcecode: postgresql-8.4 version File versions

async.c

/*-------------------------------------------------------------------------
 *
 * async.c
 *      Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
 *
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *      $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.146 2009/02/13 17:12:04 tgl Exp $
 *
 *-------------------------------------------------------------------------
 */

/*-------------------------------------------------------------------------
 * New Async Notification Model:
 * 1. Multiple backends on same machine.  Multiple backends listening on
 *      one relation.  (Note: "listening on a relation" is not really the
 *      right way to think about it, since the notify names need not have
 *      anything to do with the names of relations actually in the database.
 *      But this terminology is all over the code and docs, and I don't feel
 *      like trying to replace it.)
 *
 * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
 *      ie, each relname/listenerPID pair.  The "notification" field of the
 *      tuple is zero when no NOTIFY is pending for that listener, or the PID
 *      of the originating backend when a cross-backend NOTIFY is pending.
 *      (We skip writing to pg_listener when doing a self-NOTIFY, so the
 *      notification field should never be equal to the listenerPID field.)
 *
 * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
 *      relname to a list of outstanding NOTIFY requests.  Actual processing
 *      happens if and only if we reach transaction commit.  At that time (in
 *      routine AtCommit_Notify) we scan pg_listener for matching relnames.
 *      If the listenerPID in a matching tuple is ours, we just send a notify
 *      message to our own front end.  If it is not ours, and "notification"
 *      is not already nonzero, we set notification to our own PID and send a
 *      SIGUSR2 signal to the receiving process (indicated by listenerPID).
 *      BTW: if the signal operation fails, we presume that the listener backend
 *      crashed without removing this tuple, and remove the tuple for it.
 *
 * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
 *      notify processing immediately if this backend is idle (ie, it is
 *      waiting for a frontend command and is not within a transaction block).
 *      Otherwise the handler may only set a flag, which will cause the
 *      processing to occur just before we next go idle.
 *
 * 5. Inbound-notify processing consists of scanning pg_listener for tuples
 *      matching our own listenerPID and having nonzero notification fields.
 *      For each such tuple, we send a message to our frontend and clear the
 *      notification field.  BTW: this routine has to start/commit its own
 *      transaction, since by assumption it is only called from outside any
 *      transaction.
 *
 * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
 * of pending actions.  If we reach transaction commit, the changes are
 * applied to pg_listener just before executing any pending NOTIFYs.  This
 * method is necessary because to avoid race conditions, we must hold lock
 * on pg_listener from when we insert a new listener tuple until we commit.
 * To do that and not create undue hazard of deadlock, we don't want to
 * touch pg_listener until we are otherwise done with the transaction;
 * in particular it'd be uncool to still be taking user-commanded locks
 * while holding the pg_listener lock.
 *
 * Although we grab ExclusiveLock on pg_listener for any operation,
 * the lock is never held very long, so it shouldn't cause too much of
 * a performance problem.  (Previously we used AccessExclusiveLock, but
 * there's no real reason to forbid concurrent reads.)
 *
 * An application that listens on the same relname it notifies will get
 * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
 * by comparing be_pid in the NOTIFY message to the application's own backend's
 * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
 * frontend during startup.)  The above design guarantees that notifies from
 * other backends will never be missed by ignoring self-notifies.  Note,
 * however, that we do *not* guarantee that a separate frontend message will
 * be sent for every outside NOTIFY.  Since there is only room for one
 * originating PID in pg_listener, outside notifies occurring at about the
 * same time may be collapsed into a single message bearing the PID of the
 * first outside backend to perform the NOTIFY.
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <unistd.h>
#include <signal.h>

#include "access/heapam.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/tqual.h"


/*
 * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
 * all actions requested in the current transaction.  As explained above,
 * we don't actually modify pg_listener until we reach transaction commit.
 *
 * The list is kept in CurTransactionContext.  In subtransactions, each
 * subtransaction has its own list in its own CurTransactionContext, but
 * successful subtransactions attach their lists to their parent's list.
 * Failed subtransactions simply discard their lists.
 */
typedef enum
{
      LISTEN_LISTEN,
      LISTEN_UNLISTEN,
      LISTEN_UNLISTEN_ALL
} ListenActionKind;

typedef struct
{
      ListenActionKind action;
      char        condname[1];                        /* actually, as long as needed */
} ListenAction;

static List *pendingActions = NIL;              /* list of ListenAction */

static List *upperPendingActions = NIL;         /* list of upper-xact lists */

/*
 * State for outbound notifies consists of a list of all relnames NOTIFYed
 * in the current transaction.      We do not actually perform a NOTIFY until
 * and unless the transaction commits.    pendingNotifies is NIL if no
 * NOTIFYs have been done in the current transaction.
 *
 * The list is kept in CurTransactionContext.  In subtransactions, each
 * subtransaction has its own list in its own CurTransactionContext, but
 * successful subtransactions attach their lists to their parent's list.
 * Failed subtransactions simply discard their lists.
 *
 * Note: the action and notify lists do not interact within a transaction.
 * In particular, if a transaction does NOTIFY and then LISTEN on the same
 * condition name, it will get a self-notify at commit.  This is a bit odd
 * but is consistent with our historical behavior.
 */
static List *pendingNotifies = NIL;                   /* list of C strings */

static List *upperPendingNotifies = NIL;        /* list of upper-xact lists */

/*
 * State for inbound notifies consists of two flags: one saying whether
 * the signal handler is currently allowed to call ProcessIncomingNotify
 * directly, and one saying whether the signal has occurred but the handler
 * was not allowed to call ProcessIncomingNotify at the time.
 *
 * NB: the "volatile" on these declarations is critical!  If your compiler
 * does not grok "volatile", you'd be best advised to compile this file
 * with all optimization turned off.
 */
static volatile sig_atomic_t notifyInterruptEnabled = 0;
static volatile sig_atomic_t notifyInterruptOccurred = 0;

/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;

bool        Trace_notify = false;


static void queue_listen(ListenActionKind action, const char *condname);
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_Listen(Relation lRel, const char *relname);
static void Exec_Unlisten(Relation lRel, const char *relname);
static void Exec_UnlistenAll(Relation lRel);
static void Send_Notify(Relation lRel);
static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
static bool AsyncExistsPendingNotify(const char *relname);
static void ClearPendingActionsAndNotifies(void);


/*
 * Async_Notify
 *
 *          This is executed by the SQL notify command.
 *
 *          Adds the relation to the list of pending notifies.
 *          Actual notification happens during transaction commit.
 *          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 */
void
Async_Notify(const char *relname)
{
      if (Trace_notify)
            elog(DEBUG1, "Async_Notify(%s)", relname);

      /* no point in making duplicate entries in the list ... */
      if (!AsyncExistsPendingNotify(relname))
      {
            /*
             * The name list needs to live until end of transaction, so store it
             * in the transaction context.
             */
            MemoryContext oldcontext;

            oldcontext = MemoryContextSwitchTo(CurTransactionContext);

            /*
             * Ordering of the list isn't important.  We choose to put new
             * entries on the front, as this might make duplicate-elimination
             * a tad faster when the same condition is signaled many times in
             * a row.
             */
            pendingNotifies = lcons(pstrdup(relname), pendingNotifies);

            MemoryContextSwitchTo(oldcontext);
      }
}

/*
 * queue_listen
 *          Common code for listen, unlisten, unlisten all commands.
 *
 *          Adds the request to the list of pending actions.
 *          Actual update of pg_listener happens during transaction commit.
 *          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 */
static void
queue_listen(ListenActionKind action, const char *condname)
{
      MemoryContext oldcontext;
      ListenAction *actrec;

      /*
       * Unlike Async_Notify, we don't try to collapse out duplicates.
       * It would be too complicated to ensure we get the right interactions
       * of conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that
       * there would be any performance benefit anyway in sane applications.
       */
      oldcontext = MemoryContextSwitchTo(CurTransactionContext);

      /* space for terminating null is included in sizeof(ListenAction) */
      actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname));
      actrec->action = action;
      strcpy(actrec->condname, condname);

      pendingActions = lappend(pendingActions, actrec);

      MemoryContextSwitchTo(oldcontext);
}

/*
 * Async_Listen
 *
 *          This is executed by the SQL listen command.
 */
void
Async_Listen(const char *relname)
{
      if (Trace_notify)
            elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);

      queue_listen(LISTEN_LISTEN, relname);
}

/*
 * Async_Unlisten
 *
 *          This is executed by the SQL unlisten command.
 */
void
Async_Unlisten(const char *relname)
{
      if (Trace_notify)
            elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);

      /* If we couldn't possibly be listening, no need to queue anything */
      if (pendingActions == NIL && !unlistenExitRegistered)
            return;

      queue_listen(LISTEN_UNLISTEN, relname);
}

/*
 * Async_UnlistenAll
 *
 *          This is invoked by UNLISTEN * command, and also at backend exit.
 */
void
Async_UnlistenAll(void)
{
      if (Trace_notify)
            elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);

      /* If we couldn't possibly be listening, no need to queue anything */
      if (pendingActions == NIL && !unlistenExitRegistered)
            return;

      queue_listen(LISTEN_UNLISTEN_ALL, "");
}

/*
 * Async_UnlistenOnExit
 *
 *          Clean up the pg_listener table at backend exit.
 *
 *          This is executed if we have done any LISTENs in this backend.
 *          It might not be necessary anymore, if the user UNLISTENed everything,
 *          but we don't try to detect that case.
 */
static void
Async_UnlistenOnExit(int code, Datum arg)
{
      /*
       * We need to start/commit a transaction for the unlisten, but if there is
       * already an active transaction we had better abort that one first.
       * Otherwise we'd end up committing changes that probably ought to be
       * discarded.
       */
      AbortOutOfAnyTransaction();
      /* Now we can do the unlisten */
      StartTransactionCommand();
      Async_UnlistenAll();
      CommitTransactionCommand();
}

/*
 * AtPrepare_Notify
 *
 *          This is called at the prepare phase of a two-phase
 *          transaction.  Save the state for possible commit later.
 */
void
AtPrepare_Notify(void)
{
      ListCell   *p;

      /* It's not sensible to have any pending LISTEN/UNLISTEN actions */
      if (pendingActions)
            ereport(ERROR,
                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                         errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN")));

      /* We can deal with pending NOTIFY though */
      foreach(p, pendingNotifies)
      {
            const char *relname = (const char *) lfirst(p);

            RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
                                             relname, strlen(relname) + 1);
      }

      /*
       * We can clear the state immediately, rather than needing a separate
       * PostPrepare call, because if the transaction fails we'd just discard
       * the state anyway.
       */
      ClearPendingActionsAndNotifies();
}

/*
 * AtCommit_Notify
 *
 *          This is called at transaction commit.
 *
 *          If there are pending LISTEN/UNLISTEN actions, insert or delete
 *          tuples in pg_listener accordingly.
 *
 *          If there are outbound notify requests in the pendingNotifies list,
 *          scan pg_listener for matching tuples, and either signal the other
 *          backend or send a message to our own frontend.
 *
 *          NOTE: we are still inside the current transaction, therefore can
 *          piggyback on its committing of changes.
 */
void
AtCommit_Notify(void)
{
      Relation    lRel;
      ListCell   *p;

      if (pendingActions == NIL && pendingNotifies == NIL)
            return;                             /* no relevant statements in this xact */

      /*
       * NOTIFY is disabled if not normal processing mode. This test used to be
       * in xact.c, but it seems cleaner to do it here.
       */
      if (!IsNormalProcessingMode())
      {
            ClearPendingActionsAndNotifies();
            return;
      }

      if (Trace_notify)
            elog(DEBUG1, "AtCommit_Notify");

      /* Acquire ExclusiveLock on pg_listener */
      lRel = heap_open(ListenerRelationId, ExclusiveLock);

      /* Perform any pending listen/unlisten actions */
      foreach(p, pendingActions)
      {
            ListenAction *actrec = (ListenAction *) lfirst(p);

            switch (actrec->action)
            {
                  case LISTEN_LISTEN:
                        Exec_Listen(lRel, actrec->condname);
                        break;
                  case LISTEN_UNLISTEN:
                        Exec_Unlisten(lRel, actrec->condname);
                        break;
                  case LISTEN_UNLISTEN_ALL:
                        Exec_UnlistenAll(lRel);
                        break;
            }

            /* We must CCI after each action in case of conflicting actions */
            CommandCounterIncrement();
      }

      /* Perform any pending notifies */
      if (pendingNotifies)
            Send_Notify(lRel);

      /*
       * We do NOT release the lock on pg_listener here; we need to hold it
       * until end of transaction (which is about to happen, anyway) to ensure
       * that notified backends see our tuple updates when they look. Else they
       * might disregard the signal, which would make the application programmer
       * very unhappy.  Also, this prevents race conditions when we have just
       * inserted a listening tuple.
       */
      heap_close(lRel, NoLock);

      ClearPendingActionsAndNotifies();

      if (Trace_notify)
            elog(DEBUG1, "AtCommit_Notify: done");
}

/*
 * Exec_Listen --- subroutine for AtCommit_Notify
 *
 *          Register the current backend as listening on the specified relation.
 */
static void
Exec_Listen(Relation lRel, const char *relname)
{
      HeapScanDesc scan;
      HeapTuple   tuple;
      Datum       values[Natts_pg_listener];
      bool        nulls[Natts_pg_listener];
      NameData    condname;
      bool        alreadyListener = false;

      if (Trace_notify)
            elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);

      /* Detect whether we are already listening on this relname */
      scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
      while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
      {
            Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);

            if (listener->listenerpid == MyProcPid &&
                  strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
            {
                  alreadyListener = true;
                  /* No need to scan the rest of the table */
                  break;
            }
      }
      heap_endscan(scan);

      if (alreadyListener)
            return;

      /*
       * OK to insert a new tuple
       */
      memset(nulls, false, sizeof(nulls));

      namestrcpy(&condname, relname);
      values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
      values[Anum_pg_listener_pid - 1] = Int32GetDatum(MyProcPid);
      values[Anum_pg_listener_notify - 1] = Int32GetDatum(0);     /* no notifies pending */

      tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);

      simple_heap_insert(lRel, tuple);

#ifdef NOT_USED                           /* currently there are no indexes */
      CatalogUpdateIndexes(lRel, tuple);
#endif

      heap_freetuple(tuple);

      /*
       * now that we are listening, make sure we will unlisten before dying.
       */
      if (!unlistenExitRegistered)
      {
            on_shmem_exit(Async_UnlistenOnExit, 0);
            unlistenExitRegistered = true;
      }
}

/*
 * Exec_Unlisten --- subroutine for AtCommit_Notify
 *
 *          Remove the current backend from the list of listening backends
 *          for the specified relation.
 */
static void
Exec_Unlisten(Relation lRel, const char *relname)
{
      HeapScanDesc scan;
      HeapTuple   tuple;

      if (Trace_notify)
            elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);

      scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
      while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
      {
            Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);

            if (listener->listenerpid == MyProcPid &&
                  strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
            {
                  /* Found the matching tuple, delete it */
                  simple_heap_delete(lRel, &tuple->t_self);

                  /*
                   * We assume there can be only one match, so no need to scan the
                   * rest of the table
                   */
                  break;
            }
      }
      heap_endscan(scan);

      /*
       * We do not complain about unlistening something not being listened;
       * should we?
       */
}

/*
 * Exec_UnlistenAll --- subroutine for AtCommit_Notify
 *
 *          Update pg_listener to unlisten all relations for this backend.
 */
static void
Exec_UnlistenAll(Relation lRel)
{
      HeapScanDesc scan;
      HeapTuple   lTuple;
      ScanKeyData key[1];

      if (Trace_notify)
            elog(DEBUG1, "Exec_UnlistenAll");

      /* Find and delete all entries with my listenerPID */
      ScanKeyInit(&key[0],
                        Anum_pg_listener_pid,
                        BTEqualStrategyNumber, F_INT4EQ,
                        Int32GetDatum(MyProcPid));
      scan = heap_beginscan(lRel, SnapshotNow, 1, key);

      while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
            simple_heap_delete(lRel, &lTuple->t_self);

      heap_endscan(scan);
}

/*
 * Send_Notify --- subroutine for AtCommit_Notify
 *
 *          Scan pg_listener for tuples matching our pending notifies, and
 *          either signal the other backend or send a message to our own frontend.
 */
static void
Send_Notify(Relation lRel)
{
      TupleDesc   tdesc = RelationGetDescr(lRel);
      HeapScanDesc scan;
      HeapTuple   lTuple,
                        rTuple;
      Datum       value[Natts_pg_listener];
      bool        repl[Natts_pg_listener],
                        nulls[Natts_pg_listener];

      /* preset data to update notify column to MyProcPid */
      memset(nulls, false, sizeof(nulls));
      memset(repl, false, sizeof(repl));
      repl[Anum_pg_listener_notify - 1] = true;
      memset(value, 0, sizeof(value));
      value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);

      scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);

      while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
      {
            Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
            char     *relname = NameStr(listener->relname);
            int32       listenerPID = listener->listenerpid;

            if (!AsyncExistsPendingNotify(relname))
                  continue;

            if (listenerPID == MyProcPid)
            {
                  /*
                   * Self-notify: no need to bother with table update. Indeed, we
                   * *must not* clear the notification field in this path, or we
                   * could lose an outside notify, which'd be bad for applications
                   * that ignore self-notify messages.
                   */
                  if (Trace_notify)
                        elog(DEBUG1, "AtCommit_Notify: notifying self");

                  NotifyMyFrontEnd(relname, listenerPID);
            }
            else
            {
                  if (Trace_notify)
                        elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
                               listenerPID);

                  /*
                   * If someone has already notified this listener, we don't bother
                   * modifying the table, but we do still send a SIGUSR2 signal,
                   * just in case that backend missed the earlier signal for some
                   * reason.  It's OK to send the signal first, because the other
                   * guy can't read pg_listener until we unlock it.
                   */
                  if (kill(listenerPID, SIGUSR2) < 0)
                  {
                        /*
                         * Get rid of pg_listener entry if it refers to a PID that no
                         * longer exists.  Presumably, that backend crashed without
                         * deleting its pg_listener entries. This code used to only
                         * delete the entry if errno==ESRCH, but as far as I can see
                         * we should just do it for any failure (certainly at least
                         * for EPERM too...)
                         */
                        simple_heap_delete(lRel, &lTuple->t_self);
                  }
                  else if (listener->notification == 0)
                  {
                        /* Rewrite the tuple with my PID in notification column */
                        rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
                        simple_heap_update(lRel, &lTuple->t_self, rTuple);

#ifdef NOT_USED                           /* currently there are no indexes */
                        CatalogUpdateIndexes(lRel, rTuple);
#endif
                  }
            }
      }

      heap_endscan(scan);
}

/*
 * AtAbort_Notify
 *
 *          This is called at transaction abort.
 *
 *          Gets rid of pending actions and outbound notifies that we would have
 *          executed if the transaction got committed.
 */
void
AtAbort_Notify(void)
{
      ClearPendingActionsAndNotifies();
}

/*
 * AtSubStart_Notify() --- Take care of subtransaction start.
 *
 * Push empty state for the new subtransaction.
 */
void
AtSubStart_Notify(void)
{
      MemoryContext old_cxt;

      /* Keep the list-of-lists in TopTransactionContext for simplicity */
      old_cxt = MemoryContextSwitchTo(TopTransactionContext);

      upperPendingActions = lcons(pendingActions, upperPendingActions);

      Assert(list_length(upperPendingActions) ==
               GetCurrentTransactionNestLevel() - 1);

      pendingActions = NIL;

      upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);

      Assert(list_length(upperPendingNotifies) ==
               GetCurrentTransactionNestLevel() - 1);

      pendingNotifies = NIL;

      MemoryContextSwitchTo(old_cxt);
}

/*
 * AtSubCommit_Notify() --- Take care of subtransaction commit.
 *
 * Reassign all items in the pending lists to the parent transaction.
 */
void
AtSubCommit_Notify(void)
{
      List     *parentPendingActions;
      List     *parentPendingNotifies;

      parentPendingActions = (List *) linitial(upperPendingActions);
      upperPendingActions = list_delete_first(upperPendingActions);

      Assert(list_length(upperPendingActions) ==
               GetCurrentTransactionNestLevel() - 2);

      /*
       * Mustn't try to eliminate duplicates here --- see queue_listen()
       */
      pendingActions = list_concat(parentPendingActions, pendingActions);

      parentPendingNotifies = (List *) linitial(upperPendingNotifies);
      upperPendingNotifies = list_delete_first(upperPendingNotifies);

      Assert(list_length(upperPendingNotifies) ==
               GetCurrentTransactionNestLevel() - 2);

      /*
       * We could try to eliminate duplicates here, but it seems not worthwhile.
       */
      pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
}

/*
 * AtSubAbort_Notify() --- Take care of subtransaction abort.
 */
void
AtSubAbort_Notify(void)
{
      int               my_level = GetCurrentTransactionNestLevel();

      /*
       * All we have to do is pop the stack --- the actions/notifies made in this
       * subxact are no longer interesting, and the space will be freed when
       * CurTransactionContext is recycled.
       *
       * This routine could be called more than once at a given nesting level if
       * there is trouble during subxact abort.  Avoid dumping core by using
       * GetCurrentTransactionNestLevel as the indicator of how far we need to
       * prune the list.
       */
      while (list_length(upperPendingActions) > my_level - 2)
      {
            pendingActions = (List *) linitial(upperPendingActions);
            upperPendingActions = list_delete_first(upperPendingActions);
      }

      while (list_length(upperPendingNotifies) > my_level - 2)
      {
            pendingNotifies = (List *) linitial(upperPendingNotifies);
            upperPendingNotifies = list_delete_first(upperPendingNotifies);
      }
}

/*
 * NotifyInterruptHandler
 *
 *          This is the signal handler for SIGUSR2.
 *
 *          If we are idle (notifyInterruptEnabled is set), we can safely invoke
 *          ProcessIncomingNotify directly.  Otherwise, just set a flag
 *          to do it later.
 */
void
NotifyInterruptHandler(SIGNAL_ARGS)
{
      int               save_errno = errno;

      /*
       * Note: this is a SIGNAL HANDLER.  You must be very wary what you do
       * here. Some helpful soul had this routine sprinkled with TPRINTFs, which
       * would likely lead to corruption of stdio buffers if they were ever
       * turned on.
       */

      /* Don't joggle the elbow of proc_exit */
      if (proc_exit_inprogress)
            return;

      if (notifyInterruptEnabled)
      {
            bool        save_ImmediateInterruptOK = ImmediateInterruptOK;

            /*
             * We may be called while ImmediateInterruptOK is true; turn it off
             * while messing with the NOTIFY state.  (We would have to save and
             * restore it anyway, because PGSemaphore operations inside
             * ProcessIncomingNotify() might reset it.)
             */
            ImmediateInterruptOK = false;

            /*
             * I'm not sure whether some flavors of Unix might allow another
             * SIGUSR2 occurrence to recursively interrupt this routine. To cope
             * with the possibility, we do the same sort of dance that
             * EnableNotifyInterrupt must do --- see that routine for comments.
             */
            notifyInterruptEnabled = 0;         /* disable any recursive signal */
            notifyInterruptOccurred = 1;  /* do at least one iteration */
            for (;;)
            {
                  notifyInterruptEnabled = 1;
                  if (!notifyInterruptOccurred)
                        break;
                  notifyInterruptEnabled = 0;
                  if (notifyInterruptOccurred)
                  {
                        /* Here, it is finally safe to do stuff. */
                        if (Trace_notify)
                              elog(DEBUG1, "NotifyInterruptHandler: perform async notify");

                        ProcessIncomingNotify();

                        if (Trace_notify)
                              elog(DEBUG1, "NotifyInterruptHandler: done");
                  }
            }

            /*
             * Restore ImmediateInterruptOK, and check for interrupts if needed.
             */
            ImmediateInterruptOK = save_ImmediateInterruptOK;
            if (save_ImmediateInterruptOK)
                  CHECK_FOR_INTERRUPTS();
      }
      else
      {
            /*
             * In this path it is NOT SAFE to do much of anything, except this:
             */
            notifyInterruptOccurred = 1;
      }

      errno = save_errno;
}

/*
 * EnableNotifyInterrupt
 *
 *          This is called by the PostgresMain main loop just before waiting
 *          for a frontend command.  If we are truly idle (ie, *not* inside
 *          a transaction block), then process any pending inbound notifies,
 *          and enable the signal handler to process future notifies directly.
 *
 *          NOTE: the signal handler starts out disabled, and stays so until
 *          PostgresMain calls this the first time.
 */
void
EnableNotifyInterrupt(void)
{
      if (IsTransactionOrTransactionBlock())
            return;                             /* not really idle */

      /*
       * This code is tricky because we are communicating with a signal handler
       * that could interrupt us at any point.  If we just checked
       * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
       * fail to respond promptly to a signal that happens in between those two
       * steps.  (A very small time window, perhaps, but Murphy's Law says you
       * can hit it...)  Instead, we first set the enable flag, then test the
       * occurred flag.  If we see an unserviced interrupt has occurred, we
       * re-clear the enable flag before going off to do the service work. (That
       * prevents re-entrant invocation of ProcessIncomingNotify() if another
       * interrupt occurs.) If an interrupt comes in between the setting and
       * clearing of notifyInterruptEnabled, then it will have done the service
       * work and left notifyInterruptOccurred zero, so we have to check again
       * after clearing enable.  The whole thing has to be in a loop in case
       * another interrupt occurs while we're servicing the first. Once we get
       * out of the loop, enable is set and we know there is no unserviced
       * interrupt.
       *
       * NB: an overenthusiastic optimizing compiler could easily break this
       * code. Hopefully, they all understand what "volatile" means these days.
       */
      for (;;)
      {
            notifyInterruptEnabled = 1;
            if (!notifyInterruptOccurred)
                  break;
            notifyInterruptEnabled = 0;
            if (notifyInterruptOccurred)
            {
                  if (Trace_notify)
                        elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");

                  ProcessIncomingNotify();

                  if (Trace_notify)
                        elog(DEBUG1, "EnableNotifyInterrupt: done");
            }
      }
}

/*
 * DisableNotifyInterrupt
 *
 *          This is called by the PostgresMain main loop just after receiving
 *          a frontend command.  Signal handler execution of inbound notifies
 *          is disabled until the next EnableNotifyInterrupt call.
 *
 *          The SIGUSR1 signal handler also needs to call this, so as to
 *          prevent conflicts if one signal interrupts the other.  So we
 *          must return the previous state of the flag.
 */
bool
DisableNotifyInterrupt(void)
{
      bool        result = (notifyInterruptEnabled != 0);

      notifyInterruptEnabled = 0;

      return result;
}

/*
 * ProcessIncomingNotify
 *
 *          Deal with arriving NOTIFYs from other backends.
 *          This is called either directly from the SIGUSR2 signal handler,
 *          or the next time control reaches the outer idle loop.
 *          Scan pg_listener for arriving notifies, report them to my front end,
 *          and clear the notification field in pg_listener until next time.
 *
 *          NOTE: since we are outside any transaction, we must create our own.
 */
static void
ProcessIncomingNotify(void)
{
      Relation    lRel;
      TupleDesc   tdesc;
      ScanKeyData key[1];
      HeapScanDesc scan;
      HeapTuple   lTuple,
                        rTuple;
      Datum       value[Natts_pg_listener];
      bool        repl[Natts_pg_listener],
                        nulls[Natts_pg_listener];
      bool        catchup_enabled;

      /* Must prevent SIGUSR1 interrupt while I am running */
      catchup_enabled = DisableCatchupInterrupt();

      if (Trace_notify)
            elog(DEBUG1, "ProcessIncomingNotify");

      set_ps_display("notify interrupt", false);

      notifyInterruptOccurred = 0;

      StartTransactionCommand();

      lRel = heap_open(ListenerRelationId, ExclusiveLock);
      tdesc = RelationGetDescr(lRel);

      /* Scan only entries with my listenerPID */
      ScanKeyInit(&key[0],
                        Anum_pg_listener_pid,
                        BTEqualStrategyNumber, F_INT4EQ,
                        Int32GetDatum(MyProcPid));
      scan = heap_beginscan(lRel, SnapshotNow, 1, key);

      /* Prepare data for rewriting 0 into notification field */
      memset(nulls, false, sizeof(nulls));
      memset(repl, false, sizeof(repl));
      repl[Anum_pg_listener_notify - 1] = true;
      memset(value, 0, sizeof(value));
      value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);

      while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
      {
            Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
            char     *relname = NameStr(listener->relname);
            int32       sourcePID = listener->notification;

            if (sourcePID != 0)
            {
                  /* Notify the frontend */

                  if (Trace_notify)
                        elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
                               relname, (int) sourcePID);

                  NotifyMyFrontEnd(relname, sourcePID);

                  /*
                   * Rewrite the tuple with 0 in notification column.
                   */
                  rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
                  simple_heap_update(lRel, &lTuple->t_self, rTuple);

#ifdef NOT_USED                           /* currently there are no indexes */
                  CatalogUpdateIndexes(lRel, rTuple);
#endif
            }
      }
      heap_endscan(scan);

      /*
       * We do NOT release the lock on pg_listener here; we need to hold it
       * until end of transaction (which is about to happen, anyway) to ensure
       * that other backends see our tuple updates when they look. Otherwise, a
       * transaction started after this one might mistakenly think it doesn't
       * need to send this backend a new NOTIFY.
       */
      heap_close(lRel, NoLock);

      CommitTransactionCommand();

      /*
       * Must flush the notify messages to ensure frontend gets them promptly.
       */
      pq_flush();

      set_ps_display("idle", false);

      if (Trace_notify)
            elog(DEBUG1, "ProcessIncomingNotify: done");

      if (catchup_enabled)
            EnableCatchupInterrupt();
}

/*
 * Send NOTIFY message to my front end.
 */
static void
NotifyMyFrontEnd(char *relname, int32 listenerPID)
{
      if (whereToSendOutput == DestRemote)
      {
            StringInfoData buf;

            pq_beginmessage(&buf, 'A');
            pq_sendint(&buf, listenerPID, sizeof(int32));
            pq_sendstring(&buf, relname);
            if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
            {
                  /* XXX Add parameter string here later */
                  pq_sendstring(&buf, "");
            }
            pq_endmessage(&buf);

            /*
             * NOTE: we do not do pq_flush() here.    For a self-notify, it will
             * happen at the end of the transaction, and for incoming notifies
             * ProcessIncomingNotify will do it after finding all the notifies.
             */
      }
      else
            elog(INFO, "NOTIFY for %s", relname);
}

/* Does pendingNotifies include the given relname? */
static bool
AsyncExistsPendingNotify(const char *relname)
{
      ListCell   *p;

      foreach(p, pendingNotifies)
      {
            const char *prelname = (const char *) lfirst(p);

            if (strcmp(prelname, relname) == 0)
                  return true;
      }

      return false;
}

/* Clear the pendingActions and pendingNotifies lists. */
static void
ClearPendingActionsAndNotifies(void)
{
      /*
       * We used to have to explicitly deallocate the list members and nodes,
       * because they were malloc'd.  Now, since we know they are palloc'd in
       * CurTransactionContext, we need not do that --- they'll go away
       * automatically at transaction exit.  We need only reset the list head
       * pointers.
       */
      pendingActions = NIL;
      pendingNotifies = NIL;
}

/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * (We don't have to do anything for ROLLBACK PREPARED.)
 */
void
notify_twophase_postcommit(TransactionId xid, uint16 info,
                                       void *recdata, uint32 len)
{
      /*
       * Set up to issue the NOTIFY at the end of my own current transaction.
       * (XXX this has some issues if my own transaction later rolls back, or if
       * there is any significant delay before I commit.    OK for now because we
       * disallow COMMIT PREPARED inside a transaction block.)
       */
      Async_Notify((char *) recdata);
}

Generated by  Doxygen 1.6.0   Back to index