Logo Search packages:      
Sourcecode: postgresql-8.4 version File versions  Download package

vacuumlazy.c
/*-------------------------------------------------------------------------
 *
 * vacuumlazy.c
 *      Concurrent ("lazy") vacuuming.
 *
 *
 * The major space usage for LAZY VACUUM is storage for the array of dead
 * tuple TIDs, with the next biggest need being storage for per-disk-page
 * free space info.  We want to ensure we can vacuum even the very largest
 * relations with finite memory space usage.  To do that, we set upper bounds
 * on the number of tuples and pages we will keep track of at once.
 *
 * We are willing to use at most maintenance_work_mem memory space to keep
 * track of dead tuples.  We initially allocate an array of TIDs of that size,
 * with an upper limit that depends on table size (this limit ensures we don't
 * allocate a huge area uselessly for vacuuming small tables).    If the array
 * threatens to overflow, we suspend the heap scan phase and perform a pass of
 * index cleanup and page compaction, then resume the heap scan with an empty
 * TID array.
 *
 * If we're processing a table with no indexes, we can just vacuum each page
 * as we go; there's no need to save up multiple tuples to minimize the number
 * of index scans performed.  So we don't use maintenance_work_mem memory for
 * the TID array, just enough to hold as many heap tuples as fit on one page.
 *
 *
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *      $PostgreSQL: pgsql/src/backend/commands/vacuumlazy.c,v 1.121.2.2 2009/11/10 18:00:28 alvherre Exp $
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <math.h>

#include "access/genam.h"
#include "access/heapam.h"
#include "access/transam.h"
#include "access/visibilitymap.h"
#include "catalog/storage.h"
#include "commands/dbcommands.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_rusage.h"
#include "utils/tqual.h"


/*
 * Space/time tradeoff parameters: do these need to be user-tunable?
 *
 * To consider truncating the relation, we want there to be at least
 * REL_TRUNCATE_MINIMUM or (relsize / REL_TRUNCATE_FRACTION) (whichever
 * is less) potentially-freeable pages.
 */
#define REL_TRUNCATE_MINIMUM  1000
#define REL_TRUNCATE_FRACTION 16

/*
 * Guesstimation of number of dead tuples per page.  This is used to
 * provide an upper limit to memory allocated when vacuuming small
 * tables.
 */
#define LAZY_ALLOC_TUPLES           MaxHeapTuplesPerPage

/*
 * Before we consider skipping a page that's marked as clean in
 * visibility map, we must've seen at least this many clean pages.
 */
#define SKIP_PAGES_THRESHOLD  ((BlockNumber) 32)

00083 typedef struct LVRelStats
{
      /* hasindex = true means two-pass strategy; false means one-pass */
      bool        hasindex;
      /* Overall statistics about rel */
      BlockNumber old_rel_pages;    /* previous value of pg_class.relpages */
      BlockNumber rel_pages;        /* total number of pages */
      BlockNumber scanned_pages;    /* number of pages we examined */
      double            scanned_tuples;   /* counts only tuples on scanned pages */
      double            old_rel_tuples; /* previous value of pg_class.reltuples */
      double            new_rel_tuples; /* new estimated total # of tuples */
      BlockNumber pages_removed;
      double            tuples_deleted;
      BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
      /* List of TIDs of tuples we intend to delete */
      /* NB: this list is ordered by TID address */
      int               num_dead_tuples;  /* current # of entries */
      int               max_dead_tuples;  /* # slots allocated in array */
      ItemPointer dead_tuples;      /* array of ItemPointerData */
      int               num_index_scans;
} LVRelStats;


/* A few variables that don't seem worth passing around as parameters */
static int  elevel = -1;

static TransactionId OldestXmin;
static TransactionId FreezeLimit;

static BufferAccessStrategy vac_strategy;


/* non-export function prototypes */
static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                     Relation *Irel, int nindexes, bool scan_all);
static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
static void lazy_vacuum_index(Relation indrel,
                          IndexBulkDeleteResult **stats,
                          LVRelStats *vacrelstats);
static void lazy_cleanup_index(Relation indrel,
                           IndexBulkDeleteResult *stats,
                           LVRelStats *vacrelstats);
static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
                         int tupindex, LVRelStats *vacrelstats);
static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
static BlockNumber count_nondeletable_pages(Relation onerel,
                                     LVRelStats *vacrelstats);
static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
                                 ItemPointer itemptr);
static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
static int  vac_cmp_itemptr(const void *left, const void *right);


/*
 *    lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation
 *
 *          This routine vacuums a single heap, cleans out its indexes, and
 *          updates its relpages and reltuples statistics.
 *
 *          At entry, we have already established a transaction and opened
 *          and locked the relation.
 *
 *          The return value indicates whether this function has held off
 *          interrupts -- caller must RESUME_INTERRUPTS() after commit if true.
 */
bool
lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
                        BufferAccessStrategy bstrategy)
{
      LVRelStats *vacrelstats;
      Relation   *Irel;
      int               nindexes;
      BlockNumber possibly_freeable;
      PGRUsage    ru0;
      TimestampTz starttime = 0;
      bool        scan_all;
      TransactionId freezeTableLimit;
      BlockNumber new_rel_pages;
      double            new_rel_tuples;
      TransactionId new_frozen_xid;
      bool        heldoff = false;

      pg_rusage_init(&ru0);

      /* measure elapsed time iff autovacuum logging requires it */
      if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration > 0)
            starttime = GetCurrentTimestamp();

      if (vacstmt->verbose)
            elevel = INFO;
      else
            elevel = DEBUG2;

      vac_strategy = bstrategy;

      vacuum_set_xid_limits(vacstmt->freeze_min_age, vacstmt->freeze_table_age,
                                      onerel->rd_rel->relisshared,
                                      &OldestXmin, &FreezeLimit, &freezeTableLimit);
      scan_all = TransactionIdPrecedesOrEquals(onerel->rd_rel->relfrozenxid,
                                                                   freezeTableLimit);

      vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));

      vacrelstats->old_rel_pages = onerel->rd_rel->relpages;
      vacrelstats->old_rel_tuples = onerel->rd_rel->reltuples;
      vacrelstats->num_index_scans = 0;

      /* Open all indexes of the relation */
      vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
      vacrelstats->hasindex = (nindexes > 0);

      /* Do the vacuuming */
      lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, scan_all);

      /* Done with indexes */
      vac_close_indexes(nindexes, Irel, NoLock);

      /*
       * Optionally truncate the relation.
       *
       * Don't even think about it unless we have a shot at releasing a goodly
       * number of pages.  Otherwise, the time taken isn't worth it.
       *
       * Note that after we've truncated the heap, it's too late to abort the
       * transaction; doing so would lose the sinval messages needed to tell
       * the other backends about the table being shrunk.  We prevent interrupts
       * in that case; caller is responsible for re-enabling them after
       * committing the transaction.
       */
      possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
      if (possibly_freeable > 0 &&
            (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
             possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
      {
            HOLD_INTERRUPTS();
            heldoff = true;
            lazy_truncate_heap(onerel, vacrelstats);
      }

      /* Vacuum the Free Space Map */
      FreeSpaceMapVacuum(onerel);

      /*
       * Update statistics in pg_class.
       *
       * A corner case here is that if we scanned no pages at all because every
       * page is all-visible, we should not update relpages/reltuples, because
       * we have no new information to contribute.  In particular this keeps
       * us from replacing relpages=reltuples=0 (which means "unknown tuple
       * density") with nonzero relpages and reltuples=0 (which means "zero
       * tuple density") unless there's some actual evidence for the latter.
       *
       * Also, don't change relfrozenxid if we skipped any pages, since then
       * we don't know for certain that all tuples have a newer xmin.
       */
      new_rel_pages = vacrelstats->rel_pages;
      new_rel_tuples = vacrelstats->new_rel_tuples;
      if (vacrelstats->scanned_pages == 0 && new_rel_pages > 0)
      {
            new_rel_pages = vacrelstats->old_rel_pages;
            new_rel_tuples = vacrelstats->old_rel_tuples;
      }

      new_frozen_xid = FreezeLimit;
      if (vacrelstats->scanned_pages < vacrelstats->rel_pages)
            new_frozen_xid = InvalidTransactionId;

      vac_update_relstats(onerel,
                                    new_rel_pages, new_rel_tuples,
                                    vacrelstats->hasindex,
                                    new_frozen_xid);

      /* report results to the stats collector, too */
      pgstat_report_vacuum(RelationGetRelid(onerel),
                                     onerel->rd_rel->relisshared,
                                     vacstmt->analyze,
                                     new_rel_tuples);

      /* and log the action if appropriate */
      if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
      {
            if (Log_autovacuum_min_duration == 0 ||
                  TimestampDifferenceExceeds(starttime, GetCurrentTimestamp(),
                                                         Log_autovacuum_min_duration))
                  ereport(LOG,
                              (errmsg("automatic vacuum of table \"%s.%s.%s\": index scans: %d\n"
                                          "pages: %d removed, %d remain\n"
                                          "tuples: %.0f removed, %.0f remain\n"
                                          "system usage: %s",
                                          get_database_name(MyDatabaseId),
                                          get_namespace_name(RelationGetNamespace(onerel)),
                                          RelationGetRelationName(onerel),
                                          vacrelstats->num_index_scans,
                                          vacrelstats->pages_removed,
                                          vacrelstats->rel_pages,
                                          vacrelstats->tuples_deleted,
                                          new_rel_tuples,
                                          pg_rusage_show(&ru0))));
      }

      return heldoff;
}


/*
 *    lazy_scan_heap() -- scan an open heap relation
 *
 *          This routine sets commit status bits, builds lists of dead tuples
 *          and pages with free space, and calculates statistics on the number
 *          of live tuples in the heap.  When done, or when we run low on space
 *          for dead-tuple TIDs, invoke vacuuming of indexes and heap.
 *
 *          If there are no indexes then we just vacuum each dirty page as we
 *          process it, since there's no point in gathering many tuples.
 */
static void
lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                     Relation *Irel, int nindexes, bool scan_all)
{
      BlockNumber nblocks,
                        blkno;
      HeapTupleData tuple;
      char     *relname;
      BlockNumber empty_pages,
                        vacuumed_pages;
      double            num_tuples,
                        tups_vacuumed,
                        nkeep,
                        nunused;
      IndexBulkDeleteResult **indstats;
      int               i;
      PGRUsage    ru0;
      Buffer            vmbuffer = InvalidBuffer;
      BlockNumber next_not_all_visible_block;
      bool        skipping_all_visible_blocks;

      pg_rusage_init(&ru0);

      relname = RelationGetRelationName(onerel);
      ereport(elevel,
                  (errmsg("vacuuming \"%s.%s\"",
                              get_namespace_name(RelationGetNamespace(onerel)),
                              relname)));

      empty_pages = vacuumed_pages = 0;
      num_tuples = tups_vacuumed = nkeep = nunused = 0;

      indstats = (IndexBulkDeleteResult **)
            palloc0(nindexes * sizeof(IndexBulkDeleteResult *));

      nblocks = RelationGetNumberOfBlocks(onerel);
      vacrelstats->rel_pages = nblocks;
      vacrelstats->scanned_pages = 0;
      vacrelstats->nonempty_pages = 0;

      lazy_space_alloc(vacrelstats, nblocks);

      /*
       * We want to skip pages that don't require vacuuming according to the
       * visibility map, but only when we can skip at least SKIP_PAGES_THRESHOLD
       * consecutive pages.  Since we're reading sequentially, the OS should be
       * doing readahead for us, so there's no gain in skipping a page now and
       * then; that's likely to disable readahead and so be counterproductive.
       * Also, skipping even a single page means that we can't update
       * relfrozenxid, so we only want to do it if we can skip a goodly number
       * of pages.
       *
       * Before entering the main loop, establish the invariant that
       * next_not_all_visible_block is the next block number >= blkno that's
       * not all-visible according to the visibility map, or nblocks if there's
       * no such block.  Also, we set up the skipping_all_visible_blocks flag,
       * which is needed because we need hysteresis in the decision: once we've
       * started skipping blocks, we may as well skip everything up to the next
       * not-all-visible block.
       *
       * Note: if scan_all is true, we won't actually skip any pages; but we
       * maintain next_not_all_visible_block anyway, so as to set up the
       * all_visible_according_to_vm flag correctly for each page.
       */
      for (next_not_all_visible_block = 0;
             next_not_all_visible_block < nblocks;
             next_not_all_visible_block++)
      {
            if (!visibilitymap_test(onerel, next_not_all_visible_block, &vmbuffer))
                  break;
            vacuum_delay_point();
      }
      if (next_not_all_visible_block >= SKIP_PAGES_THRESHOLD)
            skipping_all_visible_blocks = true;
      else
            skipping_all_visible_blocks = false;

      for (blkno = 0; blkno < nblocks; blkno++)
      {
            Buffer            buf;
            Page        page;
            OffsetNumber offnum,
                              maxoff;
            bool        tupgone,
                              hastup;
            int               prev_dead_count;
            OffsetNumber frozen[MaxOffsetNumber];
            int               nfrozen;
            Size        freespace;
            bool        all_visible_according_to_vm;
            bool        all_visible;
            bool        has_dead_tuples;

            if (blkno == next_not_all_visible_block)
            {
                  /* Time to advance next_not_all_visible_block */
                  for (next_not_all_visible_block++;
                         next_not_all_visible_block < nblocks;
                         next_not_all_visible_block++)
                  {
                        if (!visibilitymap_test(onerel, next_not_all_visible_block,
                                                            &vmbuffer))
                              break;
                        vacuum_delay_point();
                  }

                  /*
                   * We know we can't skip the current block.  But set up
                   * skipping_all_visible_blocks to do the right thing at the
                   * following blocks.
                   */
                  if (next_not_all_visible_block - blkno > SKIP_PAGES_THRESHOLD)
                        skipping_all_visible_blocks = true;
                  else
                        skipping_all_visible_blocks = false;
                  all_visible_according_to_vm = false;
            }
            else
            {
                  /* Current block is all-visible */
                  if (skipping_all_visible_blocks && !scan_all)
                        continue;
                  all_visible_according_to_vm = true;
            }

            vacuum_delay_point();

            vacrelstats->scanned_pages++;

            /*
             * If we are close to overrunning the available space for dead-tuple
             * TIDs, pause and do a cycle of vacuuming before we tackle this page.
             */
            if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
                  vacrelstats->num_dead_tuples > 0)
            {
                  /* Remove index entries */
                  for (i = 0; i < nindexes; i++)
                        lazy_vacuum_index(Irel[i],
                                                  &indstats[i],
                                                  vacrelstats);
                  /* Remove tuples from heap */
                  lazy_vacuum_heap(onerel, vacrelstats);
                  /* Forget the now-vacuumed tuples, and press on */
                  vacrelstats->num_dead_tuples = 0;
                  vacrelstats->num_index_scans++;
            }

            buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
                                                 RBM_NORMAL, vac_strategy);

            /* We need buffer cleanup lock so that we can prune HOT chains. */
            LockBufferForCleanup(buf);

            page = BufferGetPage(buf);

            if (PageIsNew(page))
            {
                  /*
                   * An all-zeroes page could be left over if a backend extends the
                   * relation but crashes before initializing the page. Reclaim such
                   * pages for use.
                   *
                   * We have to be careful here because we could be looking at a
                   * page that someone has just added to the relation and not yet
                   * been able to initialize (see RelationGetBufferForTuple). To
                   * protect against that, release the buffer lock, grab the
                   * relation extension lock momentarily, and re-lock the buffer. If
                   * the page is still uninitialized by then, it must be left over
                   * from a crashed backend, and we can initialize it.
                   *
                   * We don't really need the relation lock when this is a new or
                   * temp relation, but it's probably not worth the code space to
                   * check that, since this surely isn't a critical path.
                   *
                   * Note: the comparable code in vacuum.c need not worry because
                   * it's got exclusive lock on the whole relation.
                   */
                  LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                  LockRelationForExtension(onerel, ExclusiveLock);
                  UnlockRelationForExtension(onerel, ExclusiveLock);
                  LockBufferForCleanup(buf);
                  if (PageIsNew(page))
                  {
                        ereport(WARNING,
                        (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
                                    relname, blkno)));
                        PageInit(page, BufferGetPageSize(buf), 0);
                        empty_pages++;
                  }
                  freespace = PageGetHeapFreeSpace(page);
                  MarkBufferDirty(buf);
                  UnlockReleaseBuffer(buf);

                  RecordPageWithFreeSpace(onerel, blkno, freespace);
                  continue;
            }

            if (PageIsEmpty(page))
            {
                  empty_pages++;
                  freespace = PageGetHeapFreeSpace(page);

                  if (!PageIsAllVisible(page))
                  {
                        PageSetAllVisible(page);
                        SetBufferCommitInfoNeedsSave(buf);
                  }

                  LockBuffer(buf, BUFFER_LOCK_UNLOCK);

                  /* Update the visibility map */
                  if (!all_visible_according_to_vm)
                  {
                        visibilitymap_pin(onerel, blkno, &vmbuffer);
                        LockBuffer(buf, BUFFER_LOCK_SHARE);
                        if (PageIsAllVisible(page))
                              visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
                        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                  }

                  ReleaseBuffer(buf);
                  RecordPageWithFreeSpace(onerel, blkno, freespace);
                  continue;
            }

            /*
             * Prune all HOT-update chains in this page.
             *
             * We count tuples removed by the pruning step as removed by VACUUM.
             */
            tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
                                                             false, false);

            /*
             * Now scan the page to collect vacuumable items and check for tuples
             * requiring freezing.
             */
            all_visible = true;
            has_dead_tuples = false;
            nfrozen = 0;
            hastup = false;
            prev_dead_count = vacrelstats->num_dead_tuples;
            maxoff = PageGetMaxOffsetNumber(page);
            for (offnum = FirstOffsetNumber;
                   offnum <= maxoff;
                   offnum = OffsetNumberNext(offnum))
            {
                  ItemId            itemid;

                  itemid = PageGetItemId(page, offnum);

                  /* Unused items require no processing, but we count 'em */
                  if (!ItemIdIsUsed(itemid))
                  {
                        nunused += 1;
                        continue;
                  }

                  /* Redirect items mustn't be touched */
                  if (ItemIdIsRedirected(itemid))
                  {
                        hastup = true;    /* this page won't be truncatable */
                        continue;
                  }

                  ItemPointerSet(&(tuple.t_self), blkno, offnum);

                  /*
                   * DEAD item pointers are to be vacuumed normally; but we don't
                   * count them in tups_vacuumed, else we'd be double-counting (at
                   * least in the common case where heap_page_prune() just freed up
                   * a non-HOT tuple).
                   */
                  if (ItemIdIsDead(itemid))
                  {
                        lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
                        all_visible = false;
                        continue;
                  }

                  Assert(ItemIdIsNormal(itemid));

                  tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
                  tuple.t_len = ItemIdGetLength(itemid);

                  tupgone = false;

                  switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
                  {
                        case HEAPTUPLE_DEAD:

                              /*
                               * Ordinarily, DEAD tuples would have been removed by
                               * heap_page_prune(), but it's possible that the tuple
                               * state changed since heap_page_prune() looked.  In
                               * particular an INSERT_IN_PROGRESS tuple could have
                               * changed to DEAD if the inserter aborted.  So this
                               * cannot be considered an error condition.
                               *
                               * If the tuple is HOT-updated then it must only be
                               * removed by a prune operation; so we keep it just as if
                               * it were RECENTLY_DEAD.  Also, if it's a heap-only
                               * tuple, we choose to keep it, because it'll be a lot
                               * cheaper to get rid of it in the next pruning pass than
                               * to treat it like an indexed tuple.
                               */
                              if (HeapTupleIsHotUpdated(&tuple) ||
                                    HeapTupleIsHeapOnly(&tuple))
                                    nkeep += 1;
                              else
                                    tupgone = true; /* we can delete the tuple */
                              all_visible = false;
                              break;
                        case HEAPTUPLE_LIVE:
                              /* Tuple is good --- but let's do some validity checks */
                              if (onerel->rd_rel->relhasoids &&
                                    !OidIsValid(HeapTupleGetOid(&tuple)))
                                    elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
                                           relname, blkno, offnum);

                              /*
                               * Is the tuple definitely visible to all transactions?
                               *
                               * NB: Like with per-tuple hint bits, we can't set the
                               * PD_ALL_VISIBLE flag if the inserter committed
                               * asynchronously. See SetHintBits for more info. Check
                               * that the HEAP_XMIN_COMMITTED hint bit is set because of
                               * that.
                               */
                              if (all_visible)
                              {
                                    TransactionId xmin;

                                    if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                                    {
                                          all_visible = false;
                                          break;
                                    }

                                    /*
                                     * The inserter definitely committed. But is it old
                                     * enough that everyone sees it as committed?
                                     */
                                    xmin = HeapTupleHeaderGetXmin(tuple.t_data);
                                    if (!TransactionIdPrecedes(xmin, OldestXmin))
                                    {
                                          all_visible = false;
                                          break;
                                    }
                              }
                              break;
                        case HEAPTUPLE_RECENTLY_DEAD:

                              /*
                               * If tuple is recently deleted then we must not remove it
                               * from relation.
                               */
                              nkeep += 1;
                              all_visible = false;
                              break;
                        case HEAPTUPLE_INSERT_IN_PROGRESS:
                              /* This is an expected case during concurrent vacuum */
                              all_visible = false;
                              break;
                        case HEAPTUPLE_DELETE_IN_PROGRESS:
                              /* This is an expected case during concurrent vacuum */
                              all_visible = false;
                              break;
                        default:
                              elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
                              break;
                  }

                  if (tupgone)
                  {
                        lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
                        tups_vacuumed += 1;
                        has_dead_tuples = true;
                  }
                  else
                  {
                        num_tuples += 1;
                        hastup = true;

                        /*
                         * Each non-removable tuple must be checked to see if it needs
                         * freezing.  Note we already have exclusive buffer lock.
                         */
                        if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
                                                        InvalidBuffer))
                              frozen[nfrozen++] = offnum;
                  }
            }                                   /* scan along page */

            /*
             * If we froze any tuples, mark the buffer dirty, and write a WAL
             * record recording the changes.  We must log the changes to be
             * crash-safe against future truncation of CLOG.
             */
            if (nfrozen > 0)
            {
                  MarkBufferDirty(buf);
                  /* no XLOG for temp tables, though */
                  if (!onerel->rd_istemp)
                  {
                        XLogRecPtr  recptr;

                        recptr = log_heap_freeze(onerel, buf, FreezeLimit,
                                                             frozen, nfrozen);
                        PageSetLSN(page, recptr);
                        PageSetTLI(page, ThisTimeLineID);
                  }
            }

            /*
             * If there are no indexes then we can vacuum the page right now
             * instead of doing a second scan.
             */
            if (nindexes == 0 &&
                  vacrelstats->num_dead_tuples > 0)
            {
                  /* Remove tuples from heap */
                  lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
                  /* Forget the now-vacuumed tuples, and press on */
                  vacrelstats->num_dead_tuples = 0;
                  vacuumed_pages++;
            }

            freespace = PageGetHeapFreeSpace(page);

            /* Update the all-visible flag on the page */
            if (!PageIsAllVisible(page) && all_visible)
            {
                  PageSetAllVisible(page);
                  SetBufferCommitInfoNeedsSave(buf);
            }
            /*
             * It's possible for the value returned by GetOldestXmin() to move
             * backwards, so it's not wrong for us to see tuples that appear to
             * not be visible to everyone yet, while PD_ALL_VISIBLE is already
             * set. The real safe xmin value never moves backwards, but
             * GetOldestXmin() is conservative and sometimes returns a value
             * that's unnecessarily small, so if we see that contradiction it
             * just means that the tuples that we think are not visible to
             * everyone yet actually are, and the PD_ALL_VISIBLE flag is correct.
             *
             * There should never be dead tuples on a page with PD_ALL_VISIBLE
             * set, however.
             */
            else if (PageIsAllVisible(page) && has_dead_tuples)
            {
                  elog(WARNING, "page containing dead tuples is marked as all-visible in relation \"%s\" page %u",
                         relname, blkno);
                  PageClearAllVisible(page);
                  SetBufferCommitInfoNeedsSave(buf);

                  /*
                   * Normally, we would drop the lock on the heap page before
                   * updating the visibility map, but since this case shouldn't
                   * happen anyway, don't worry about that.
                   */
                  visibilitymap_clear(onerel, blkno);
            }

            LockBuffer(buf, BUFFER_LOCK_UNLOCK);

            /* Update the visibility map */
            if (!all_visible_according_to_vm && all_visible)
            {
                  visibilitymap_pin(onerel, blkno, &vmbuffer);
                  LockBuffer(buf, BUFFER_LOCK_SHARE);
                  if (PageIsAllVisible(page))
                        visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
                  LockBuffer(buf, BUFFER_LOCK_UNLOCK);
            }

            ReleaseBuffer(buf);

            /* Remember the location of the last page with nonremovable tuples */
            if (hastup)
                  vacrelstats->nonempty_pages = blkno + 1;

            /*
             * If we remembered any tuples for deletion, then the page will be
             * visited again by lazy_vacuum_heap, which will compute and record
             * its post-compaction free space.  If not, then we're done with this
             * page, so remember its free space as-is.      (This path will always be
             * taken if there are no indexes.)
             */
            if (vacrelstats->num_dead_tuples == prev_dead_count)
                  RecordPageWithFreeSpace(onerel, blkno, freespace);
      }

      /* save stats for use later */
      vacrelstats->scanned_tuples = num_tuples;
      vacrelstats->tuples_deleted = tups_vacuumed;

      /* now we can compute the new value for pg_class.reltuples */
      vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
                                                                                     nblocks,
                                                                                     vacrelstats->scanned_pages,
                                                                                     num_tuples);

      /* If any tuples need to be deleted, perform final vacuum cycle */
      /* XXX put a threshold on min number of tuples here? */
      if (vacrelstats->num_dead_tuples > 0)
      {
            /* Remove index entries */
            for (i = 0; i < nindexes; i++)
                  lazy_vacuum_index(Irel[i],
                                            &indstats[i],
                                            vacrelstats);
            /* Remove tuples from heap */
            lazy_vacuum_heap(onerel, vacrelstats);
            vacrelstats->num_index_scans++;
      }

      /* Release the pin on the visibility map page */
      if (BufferIsValid(vmbuffer))
      {
            ReleaseBuffer(vmbuffer);
            vmbuffer = InvalidBuffer;
      }

      /* Do post-vacuum cleanup and statistics update for each index */
      for (i = 0; i < nindexes; i++)
            lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);

      /* If no indexes, make log report that lazy_vacuum_heap would've made */
      if (vacuumed_pages)
            ereport(elevel,
                        (errmsg("\"%s\": removed %.0f row versions in %u pages",
                                    RelationGetRelationName(onerel),
                                    tups_vacuumed, vacuumed_pages)));

      ereport(elevel,
                  (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages",
                              RelationGetRelationName(onerel),
                              tups_vacuumed, num_tuples,
                              vacrelstats->scanned_pages, nblocks),
                   errdetail("%.0f dead row versions cannot be removed yet.\n"
                                 "There were %.0f unused item pointers.\n"
                                 "%u pages are entirely empty.\n"
                                 "%s.",
                                 nkeep,
                                 nunused,
                                 empty_pages,
                                 pg_rusage_show(&ru0))));
}


/*
 *    lazy_vacuum_heap() -- second pass over the heap
 *
 *          This routine marks dead tuples as unused and compacts out free
 *          space on their pages.  Pages not having dead tuples recorded from
 *          lazy_scan_heap are not visited at all.
 *
 * Note: the reason for doing this as a second pass is we cannot remove
 * the tuples until we've removed their index entries, and we want to
 * process index entry removal in batches as large as possible.
 */
static void
lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
{
      int               tupindex;
      int               npages;
      PGRUsage    ru0;

      pg_rusage_init(&ru0);
      npages = 0;

      tupindex = 0;
      while (tupindex < vacrelstats->num_dead_tuples)
      {
            BlockNumber tblk;
            Buffer            buf;
            Page        page;
            Size        freespace;

            vacuum_delay_point();

            tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
            buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
                                                 vac_strategy);
            LockBufferForCleanup(buf);
            tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);

            /* Now that we've compacted the page, record its available space */
            page = BufferGetPage(buf);
            freespace = PageGetHeapFreeSpace(page);

            UnlockReleaseBuffer(buf);
            RecordPageWithFreeSpace(onerel, tblk, freespace);
            npages++;
      }

      ereport(elevel,
                  (errmsg("\"%s\": removed %d row versions in %d pages",
                              RelationGetRelationName(onerel),
                              tupindex, npages),
                   errdetail("%s.",
                                 pg_rusage_show(&ru0))));
}

/*
 *    lazy_vacuum_page() -- free dead tuples on a page
 *                             and repair its fragmentation.
 *
 * Caller must hold pin and buffer cleanup lock on the buffer.
 *
 * tupindex is the index in vacrelstats->dead_tuples of the first dead
 * tuple for this page.  We assume the rest follow sequentially.
 * The return value is the first tupindex after the tuples of this page.
 */
static int
lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
                         int tupindex, LVRelStats *vacrelstats)
{
      Page        page = BufferGetPage(buffer);
      OffsetNumber unused[MaxOffsetNumber];
      int               uncnt = 0;

      START_CRIT_SECTION();

      for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
      {
            BlockNumber tblk;
            OffsetNumber toff;
            ItemId            itemid;

            tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
            if (tblk != blkno)
                  break;                        /* past end of tuples for this block */
            toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
            itemid = PageGetItemId(page, toff);
            ItemIdSetUnused(itemid);
            unused[uncnt++] = toff;
      }

      PageRepairFragmentation(page);

      MarkBufferDirty(buffer);

      /* XLOG stuff */
      if (!onerel->rd_istemp)
      {
            XLogRecPtr  recptr;

            recptr = log_heap_clean(onerel, buffer,
                                                NULL, 0, NULL, 0,
                                                unused, uncnt,
                                                false);
            PageSetLSN(page, recptr);
            PageSetTLI(page, ThisTimeLineID);
      }

      END_CRIT_SECTION();

      return tupindex;
}

/*
 *    lazy_vacuum_index() -- vacuum one index relation.
 *
 *          Delete all the index entries pointing to tuples listed in
 *          vacrelstats->dead_tuples, and update running statistics.
 */
static void
lazy_vacuum_index(Relation indrel,
                          IndexBulkDeleteResult **stats,
                          LVRelStats *vacrelstats)
{
      IndexVacuumInfo ivinfo;
      PGRUsage    ru0;

      pg_rusage_init(&ru0);

      ivinfo.index = indrel;
      ivinfo.vacuum_full = false;
      ivinfo.analyze_only = false;
      ivinfo.estimated_count = true;
      ivinfo.message_level = elevel;
      ivinfo.num_heap_tuples = vacrelstats->old_rel_tuples;
      ivinfo.strategy = vac_strategy;

      /* Do bulk deletion */
      *stats = index_bulk_delete(&ivinfo, *stats,
                                             lazy_tid_reaped, (void *) vacrelstats);

      ereport(elevel,
                  (errmsg("scanned index \"%s\" to remove %d row versions",
                              RelationGetRelationName(indrel),
                              vacrelstats->num_dead_tuples),
                   errdetail("%s.", pg_rusage_show(&ru0))));
}

/*
 *    lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
 */
static void
lazy_cleanup_index(Relation indrel,
                           IndexBulkDeleteResult *stats,
                           LVRelStats *vacrelstats)
{
      IndexVacuumInfo ivinfo;
      PGRUsage    ru0;

      pg_rusage_init(&ru0);

      ivinfo.index = indrel;
      ivinfo.vacuum_full = false;
      ivinfo.analyze_only = false;
      ivinfo.estimated_count = (vacrelstats->scanned_pages < vacrelstats->rel_pages);
      ivinfo.message_level = elevel;
      ivinfo.num_heap_tuples = vacrelstats->new_rel_tuples;
      ivinfo.strategy = vac_strategy;

      stats = index_vacuum_cleanup(&ivinfo, stats);

      if (!stats)
            return;

      /*
       * Now update statistics in pg_class, but only if the index says the count
       * is accurate.
       */
      if (!stats->estimated_count)
            vac_update_relstats(indrel,
                                          stats->num_pages, stats->num_index_tuples,
                                          false, InvalidTransactionId);

      ereport(elevel,
                  (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
                              RelationGetRelationName(indrel),
                              stats->num_index_tuples,
                              stats->num_pages),
                   errdetail("%.0f index row versions were removed.\n"
                   "%u index pages have been deleted, %u are currently reusable.\n"
                                 "%s.",
                                 stats->tuples_removed,
                                 stats->pages_deleted, stats->pages_free,
                                 pg_rusage_show(&ru0))));

      pfree(stats);
}

/*
 * lazy_truncate_heap - try to truncate off any empty pages at the end
 */
static void
lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
{
      BlockNumber old_rel_pages = vacrelstats->rel_pages;
      BlockNumber new_rel_pages;
      PGRUsage    ru0;

      pg_rusage_init(&ru0);

      /*
       * We need full exclusive lock on the relation in order to do truncation.
       * If we can't get it, give up rather than waiting --- we don't want to
       * block other backends, and we don't want to deadlock (which is quite
       * possible considering we already hold a lower-grade lock).
       */
      if (!ConditionalLockRelation(onerel, AccessExclusiveLock))
            return;

      /*
       * Now that we have exclusive lock, look to see if the rel has grown
       * whilst we were vacuuming with non-exclusive lock.  If so, give up; the
       * newly added pages presumably contain non-deletable tuples.
       */
      new_rel_pages = RelationGetNumberOfBlocks(onerel);
      if (new_rel_pages != old_rel_pages)
      {
            /*
             * Note: we intentionally don't update vacrelstats->rel_pages with
             * the new rel size here.  If we did, it would amount to assuming that
             * the new pages are empty, which is unlikely.  Leaving the numbers
             * alone amounts to assuming that the new pages have the same tuple
             * density as existing ones, which is less unlikely.
             */
            UnlockRelation(onerel, AccessExclusiveLock);
            return;
      }

      /*
       * Scan backwards from the end to verify that the end pages actually
       * contain no tuples.  This is *necessary*, not optional, because other
       * backends could have added tuples to these pages whilst we were
       * vacuuming.
       */
      new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);

      if (new_rel_pages >= old_rel_pages)
      {
            /* can't do anything after all */
            UnlockRelation(onerel, AccessExclusiveLock);
            return;
      }

      /*
       * Okay to truncate.
       */
      RelationTruncate(onerel, new_rel_pages);

      /* force relcache inval so all backends reset their rd_targblock */
      CacheInvalidateRelcache(onerel);

      /*
       * Note: once we have truncated, we *must* keep the exclusive lock until
       * commit.  The sinval message won't be sent until commit, and other
       * backends must see it and reset their rd_targblock values before they
       * can safely access the table again.
       */

      /*
       * Update statistics.  Here, it *is* correct to adjust rel_pages without
       * also touching reltuples, since the tuple count wasn't changed by the
       * truncation.
       */
      vacrelstats->rel_pages = new_rel_pages;
      vacrelstats->pages_removed = old_rel_pages - new_rel_pages;

      ereport(elevel,
                  (errmsg("\"%s\": truncated %u to %u pages",
                              RelationGetRelationName(onerel),
                              old_rel_pages, new_rel_pages),
                   errdetail("%s.",
                                 pg_rusage_show(&ru0))));
}

/*
 * Rescan end pages to verify that they are (still) empty of tuples.
 *
 * Returns number of nondeletable pages (last nonempty page + 1).
 */
static BlockNumber
count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
{
      BlockNumber blkno;

      /* Strange coding of loop control is needed because blkno is unsigned */
      blkno = vacrelstats->rel_pages;
      while (blkno > vacrelstats->nonempty_pages)
      {
            Buffer            buf;
            Page        page;
            OffsetNumber offnum,
                              maxoff;
            bool        hastup;

            /*
             * We don't insert a vacuum delay point here, because we have an
             * exclusive lock on the table which we want to hold for as short a
             * time as possible.  We still need to check for interrupts however.
             */
            CHECK_FOR_INTERRUPTS();

            blkno--;

            buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
                                                 RBM_NORMAL, vac_strategy);

            /* In this phase we only need shared access to the buffer */
            LockBuffer(buf, BUFFER_LOCK_SHARE);

            page = BufferGetPage(buf);

            if (PageIsNew(page) || PageIsEmpty(page))
            {
                  /* PageIsNew probably shouldn't happen... */
                  UnlockReleaseBuffer(buf);
                  continue;
            }

            hastup = false;
            maxoff = PageGetMaxOffsetNumber(page);
            for (offnum = FirstOffsetNumber;
                   offnum <= maxoff;
                   offnum = OffsetNumberNext(offnum))
            {
                  ItemId            itemid;

                  itemid = PageGetItemId(page, offnum);

                  /*
                   * Note: any non-unused item should be taken as a reason to keep
                   * this page.  We formerly thought that DEAD tuples could be
                   * thrown away, but that's not so, because we'd not have cleaned
                   * out their index entries.
                   */
                  if (ItemIdIsUsed(itemid))
                  {
                        hastup = true;
                        break;                  /* can stop scanning */
                  }
            }                                   /* scan along page */

            UnlockReleaseBuffer(buf);

            /* Done scanning if we found a tuple here */
            if (hastup)
                  return blkno + 1;
      }

      /*
       * If we fall out of the loop, all the previously-thought-to-be-empty
       * pages still are; we need not bother to look at the last known-nonempty
       * page.
       */
      return vacrelstats->nonempty_pages;
}

/*
 * lazy_space_alloc - space allocation decisions for lazy vacuum
 *
 * See the comments at the head of this file for rationale.
 */
static void
lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
{
      long        maxtuples;

      if (vacrelstats->hasindex)
      {
            maxtuples = (maintenance_work_mem * 1024L) / sizeof(ItemPointerData);
            maxtuples = Min(maxtuples, INT_MAX);
            maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));

            /* curious coding here to ensure the multiplication can't overflow */
            if ((BlockNumber) (maxtuples / LAZY_ALLOC_TUPLES) > relblocks)
                  maxtuples = relblocks * LAZY_ALLOC_TUPLES;

            /* stay sane if small maintenance_work_mem */
            maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
      }
      else
      {
            maxtuples = MaxHeapTuplesPerPage;
      }

      vacrelstats->num_dead_tuples = 0;
      vacrelstats->max_dead_tuples = (int) maxtuples;
      vacrelstats->dead_tuples = (ItemPointer)
            palloc(maxtuples * sizeof(ItemPointerData));
}

/*
 * lazy_record_dead_tuple - remember one deletable tuple
 */
static void
lazy_record_dead_tuple(LVRelStats *vacrelstats,
                                 ItemPointer itemptr)
{
      /*
       * The array shouldn't overflow under normal behavior, but perhaps it
       * could if we are given a really small maintenance_work_mem. In that
       * case, just forget the last few tuples (we'll get 'em next time).
       */
      if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
      {
            vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
            vacrelstats->num_dead_tuples++;
      }
}

/*
 *    lazy_tid_reaped() -- is a particular tid deletable?
 *
 *          This has the right signature to be an IndexBulkDeleteCallback.
 *
 *          Assumes dead_tuples array is in sorted order.
 */
static bool
lazy_tid_reaped(ItemPointer itemptr, void *state)
{
      LVRelStats *vacrelstats = (LVRelStats *) state;
      ItemPointer res;

      res = (ItemPointer) bsearch((void *) itemptr,
                                                (void *) vacrelstats->dead_tuples,
                                                vacrelstats->num_dead_tuples,
                                                sizeof(ItemPointerData),
                                                vac_cmp_itemptr);

      return (res != NULL);
}

/*
 * Comparator routines for use with qsort() and bsearch().
 */
static int
vac_cmp_itemptr(const void *left, const void *right)
{
      BlockNumber lblk,
                        rblk;
      OffsetNumber loff,
                        roff;

      lblk = ItemPointerGetBlockNumber((ItemPointer) left);
      rblk = ItemPointerGetBlockNumber((ItemPointer) right);

      if (lblk < rblk)
            return -1;
      if (lblk > rblk)
            return 1;

      loff = ItemPointerGetOffsetNumber((ItemPointer) left);
      roff = ItemPointerGetOffsetNumber((ItemPointer) right);

      if (loff < roff)
            return -1;
      if (loff > roff)
            return 1;

      return 0;
}

Generated by  Doxygen 1.6.0   Back to index