Logo Search packages:      
Sourcecode: postgresql-8.4 version File versions

heap.c

/*-------------------------------------------------------------------------
 *
 * heap.c
 *      code to create and destroy POSTGRES heap relations
 *
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *      $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.354 2009/06/11 14:48:54 momjian Exp $
 *
 *
 * INTERFACE ROUTINES
 *          heap_create()                 - Create an uncataloged heap relation
 *          heap_create_with_catalog() - Create a cataloged relation
 *          heap_drop_with_catalog() - Removes named relation from catalogs
 *
 * NOTES
 *      this code taken from access/heap/create.c, which contains
 *      the old heap_create_with_catalog, amcreate, and amdestroy.
 *      those routines will soon call these routines using the function
 *      manager,
 *      just like the poorly named "NewXXX" routines do.    The
 *      "New" routines are all going to die soon, once and for all!
 *          -cim 1/13/91
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/genam.h"
#include "access/heapam.h"
#include "access/sysattr.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
#include "catalog/pg_type_fn.h"
#include "catalog/storage.h"
#include "commands/tablecmds.h"
#include "commands/typecmds.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/var.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/smgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"


static void AddNewRelationTuple(Relation pg_class_desc,
                              Relation new_rel_desc,
                              Oid new_rel_oid, Oid new_type_oid,
                              Oid relowner,
                              char relkind,
                              Datum reloptions);
static Oid AddNewRelationType(const char *typeName,
                           Oid typeNamespace,
                           Oid new_rel_oid,
                           char new_rel_kind,
                           Oid ownerid,
                           Oid new_array_type);
static void RelationRemoveInheritance(Oid relid);
static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
                    bool is_local, int inhcount);
static void StoreConstraints(Relation rel, List *cooked_constraints);
static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
                                          bool allow_merge, bool is_local);
static void SetRelationNumChecks(Relation rel, int numchecks);
static Node *cookConstraint(ParseState *pstate,
                     Node *raw_constraint,
                     char *relname);
static List *insert_ordered_unique_oid(List *list, Oid datum);


/* ----------------------------------------------------------------
 *                      XXX UGLY HARD CODED BADNESS FOLLOWS XXX
 *
 *          these should all be moved to someplace in the lib/catalog
 *          module, if not obliterated first.
 * ----------------------------------------------------------------
 */


/*
 * Note:
 *          Should the system special case these attributes in the future?
 *          Advantage:  consume much less space in the ATTRIBUTE relation.
 *          Disadvantage:  special cases will be all over the place.
 */

static FormData_pg_attribute a1 = {
      0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
      SelfItemPointerAttributeNumber, 0, -1, -1,
      false, 'p', 's', true, false, false, true, 0, {0}
};

static FormData_pg_attribute a2 = {
      0, {"oid"}, OIDOID, 0, sizeof(Oid),
      ObjectIdAttributeNumber, 0, -1, -1,
      true, 'p', 'i', true, false, false, true, 0, {0}
};

static FormData_pg_attribute a3 = {
      0, {"xmin"}, XIDOID, 0, sizeof(TransactionId),
      MinTransactionIdAttributeNumber, 0, -1, -1,
      true, 'p', 'i', true, false, false, true, 0, {0}
};

static FormData_pg_attribute a4 = {
      0, {"cmin"}, CIDOID, 0, sizeof(CommandId),
      MinCommandIdAttributeNumber, 0, -1, -1,
      true, 'p', 'i', true, false, false, true, 0, {0}
};

static FormData_pg_attribute a5 = {
      0, {"xmax"}, XIDOID, 0, sizeof(TransactionId),
      MaxTransactionIdAttributeNumber, 0, -1, -1,
      true, 'p', 'i', true, false, false, true, 0, {0}
};

static FormData_pg_attribute a6 = {
      0, {"cmax"}, CIDOID, 0, sizeof(CommandId),
      MaxCommandIdAttributeNumber, 0, -1, -1,
      true, 'p', 'i', true, false, false, true, 0, {0}
};

/*
 * We decided to call this attribute "tableoid" rather than say
 * "classoid" on the basis that in the future there may be more than one
 * table of a particular class/type. In any case table is still the word
 * used in SQL.
 */
static FormData_pg_attribute a7 = {
      0, {"tableoid"}, OIDOID, 0, sizeof(Oid),
      TableOidAttributeNumber, 0, -1, -1,
      true, 'p', 'i', true, false, false, true, 0, {0}
};

static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7};

/*
 * This function returns a Form_pg_attribute pointer for a system attribute.
 * Note that we elog if the presented attno is invalid, which would only
 * happen if there's a problem upstream.
 */
Form_pg_attribute
SystemAttributeDefinition(AttrNumber attno, bool relhasoids)
{
      if (attno >= 0 || attno < -(int) lengthof(SysAtt))
            elog(ERROR, "invalid system attribute number %d", attno);
      if (attno == ObjectIdAttributeNumber && !relhasoids)
            elog(ERROR, "invalid system attribute number %d", attno);
      return SysAtt[-attno - 1];
}

/*
 * If the given name is a system attribute name, return a Form_pg_attribute
 * pointer for a prototype definition.    If not, return NULL.
 */
Form_pg_attribute
SystemAttributeByName(const char *attname, bool relhasoids)
{
      int               j;

      for (j = 0; j < (int) lengthof(SysAtt); j++)
      {
            Form_pg_attribute att = SysAtt[j];

            if (relhasoids || att->attnum != ObjectIdAttributeNumber)
            {
                  if (strcmp(NameStr(att->attname), attname) == 0)
                        return att;
            }
      }

      return NULL;
}


/* ----------------------------------------------------------------
 *                      XXX END OF UGLY HARD CODED BADNESS XXX
 * ---------------------------------------------------------------- */


/* ----------------------------------------------------------------
 *          heap_create       - Create an uncataloged heap relation
 *
 *          Note API change: the caller must now always provide the OID
 *          to use for the relation.
 *
 *          rel->rd_rel is initialized by RelationBuildLocalRelation,
 *          and is mostly zeroes at return.
 * ----------------------------------------------------------------
 */
Relation
heap_create(const char *relname,
                  Oid relnamespace,
                  Oid reltablespace,
                  Oid relid,
                  TupleDesc tupDesc,
                  char relkind,
                  bool shared_relation,
                  bool allow_system_table_mods)
{
      bool        create_storage;
      Relation    rel;

      /* The caller must have provided an OID for the relation. */
      Assert(OidIsValid(relid));

      /*
       * sanity checks
       */
      if (!allow_system_table_mods &&
            (IsSystemNamespace(relnamespace) || IsToastNamespace(relnamespace)) &&
            IsNormalProcessingMode())
            ereport(ERROR,
                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                         errmsg("permission denied to create \"%s.%s\"",
                                    get_namespace_name(relnamespace), relname),
            errdetail("System catalog modifications are currently disallowed.")));

      /*
       * Decide if we need storage or not, and handle a couple other special
       * cases for particular relkinds.
       */
      switch (relkind)
      {
            case RELKIND_VIEW:
            case RELKIND_COMPOSITE_TYPE:
                  create_storage = false;

                  /*
                   * Force reltablespace to zero if the relation has no physical
                   * storage.  This is mainly just for cleanliness' sake.
                   */
                  reltablespace = InvalidOid;
                  break;
            case RELKIND_SEQUENCE:
                  create_storage = true;

                  /*
                   * Force reltablespace to zero for sequences, since we don't
                   * support moving them around into different tablespaces.
                   */
                  reltablespace = InvalidOid;
                  break;
            default:
                  create_storage = true;
                  break;
      }

      /*
       * Never allow a pg_class entry to explicitly specify the database's
       * default tablespace in reltablespace; force it to zero instead. This
       * ensures that if the database is cloned with a different default
       * tablespace, the pg_class entry will still match where CREATE DATABASE
       * will put the physically copied relation.
       *
       * Yes, this is a bit of a hack.
       */
      if (reltablespace == MyDatabaseTableSpace)
            reltablespace = InvalidOid;

      /*
       * build the relcache entry.
       */
      rel = RelationBuildLocalRelation(relname,
                                                       relnamespace,
                                                       tupDesc,
                                                       relid,
                                                       reltablespace,
                                                       shared_relation);

      /*
       * Have the storage manager create the relation's disk file, if needed.
       *
       * We only create the main fork here, other forks will be created on
       * demand.
       */
      if (create_storage)
      {
            RelationOpenSmgr(rel);
            RelationCreateStorage(rel->rd_node, rel->rd_istemp);
      }

      return rel;
}

/* ----------------------------------------------------------------
 *          heap_create_with_catalog            - Create a cataloged relation
 *
 *          this is done in multiple steps:
 *
 *          1) CheckAttributeNamesTypes() is used to make certain the tuple
 *             descriptor contains a valid set of attribute names and types
 *
 *          2) pg_class is opened and get_relname_relid()
 *             performs a scan to ensure that no relation with the
 *             same name already exists.
 *
 *          3) heap_create() is called to create the new relation on disk.
 *
 *          4) TypeCreate() is called to define a new type corresponding
 *             to the new relation.
 *
 *          5) AddNewRelationTuple() is called to register the
 *             relation in pg_class.
 *
 *          6) AddNewAttributeTuples() is called to register the
 *             new relation's schema in pg_attribute.
 *
 *          7) StoreConstraints is called ()          - vadim 08/22/97
 *
 *          8) the relations are closed and the new relation's oid
 *             is returned.
 *
 * ----------------------------------------------------------------
 */

/* --------------------------------
 *          CheckAttributeNamesTypes
 *
 *          this is used to make certain the tuple descriptor contains a
 *          valid set of attribute names and datatypes.  a problem simply
 *          generates ereport(ERROR) which aborts the current transaction.
 * --------------------------------
 */
void
CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
{
      int               i;
      int               j;
      int               natts = tupdesc->natts;

      /* Sanity check on column count */
      if (natts < 0 || natts > MaxHeapAttributeNumber)
            ereport(ERROR,
                        (errcode(ERRCODE_TOO_MANY_COLUMNS),
                         errmsg("tables can have at most %d columns",
                                    MaxHeapAttributeNumber)));

      /*
       * first check for collision with system attribute names
       *
       * Skip this for a view or type relation, since those don't have system
       * attributes.
       */
      if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
      {
            for (i = 0; i < natts; i++)
            {
                  if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname),
                                                        tupdesc->tdhasoid) != NULL)
                        ereport(ERROR,
                                    (errcode(ERRCODE_DUPLICATE_COLUMN),
                                     errmsg("column name \"%s\" conflicts with a system column name",
                                                NameStr(tupdesc->attrs[i]->attname))));
            }
      }

      /*
       * next check for repeated attribute names
       */
      for (i = 1; i < natts; i++)
      {
            for (j = 0; j < i; j++)
            {
                  if (strcmp(NameStr(tupdesc->attrs[j]->attname),
                                 NameStr(tupdesc->attrs[i]->attname)) == 0)
                        ereport(ERROR,
                                    (errcode(ERRCODE_DUPLICATE_COLUMN),
                                     errmsg("column name \"%s\" specified more than once",
                                                NameStr(tupdesc->attrs[j]->attname))));
            }
      }

      /*
       * next check the attribute types
       */
      for (i = 0; i < natts; i++)
      {
            CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
                                       tupdesc->attrs[i]->atttypid);
      }
}

/* --------------------------------
 *          CheckAttributeType
 *
 *          Verify that the proposed datatype of an attribute is legal.
 *          This is needed because there are types (and pseudo-types)
 *          in the catalogs that we do not support as elements of real tuples.
 * --------------------------------
 */
void
CheckAttributeType(const char *attname, Oid atttypid)
{
      char        att_typtype = get_typtype(atttypid);

      if (atttypid == UNKNOWNOID)
      {
            /*
             * Warn user, but don't fail, if column to be created has UNKNOWN type
             * (usually as a result of a 'retrieve into' - jolly)
             */
            ereport(WARNING,
                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                         errmsg("column \"%s\" has type \"unknown\"", attname),
                         errdetail("Proceeding with relation creation anyway.")));
      }
      else if (att_typtype == TYPTYPE_PSEUDO)
      {
            /*
             * Refuse any attempt to create a pseudo-type column, except for a
             * special hack for pg_statistic: allow ANYARRAY during initdb
             */
            if (atttypid != ANYARRAYOID || IsUnderPostmaster)
                  ereport(ERROR,
                              (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                               errmsg("column \"%s\" has pseudo-type %s",
                                          attname, format_type_be(atttypid))));
      }
      else if (att_typtype == TYPTYPE_COMPOSITE)
      {
            /*
             * For a composite type, recurse into its attributes.  You might think
             * this isn't necessary, but since we allow system catalogs to break
             * the rule, we have to guard against the case.
             */
            Relation    relation;
            TupleDesc   tupdesc;
            int               i;

            relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);

            tupdesc = RelationGetDescr(relation);

            for (i = 0; i < tupdesc->natts; i++)
            {
                  Form_pg_attribute attr = tupdesc->attrs[i];

                  if (attr->attisdropped)
                        continue;
                  CheckAttributeType(NameStr(attr->attname), attr->atttypid);
            }

            relation_close(relation, AccessShareLock);
      }
}

/*
 * InsertPgAttributeTuple
 *          Construct and insert a new tuple in pg_attribute.
 *
 * Caller has already opened and locked pg_attribute.  new_attribute is the
 * attribute to insert (but we ignore its attacl, if indeed it has one).
 *
 * indstate is the index state for CatalogIndexInsert.      It can be passed as
 * NULL, in which case we'll fetch the necessary info.  (Don't do this when
 * inserting multiple attributes, because it's a tad more expensive.)
 *
 * We always initialize attacl to NULL (i.e., default permissions).
 */
void
InsertPgAttributeTuple(Relation pg_attribute_rel,
                                 Form_pg_attribute new_attribute,
                                 CatalogIndexState indstate)
{
      Datum       values[Natts_pg_attribute];
      bool        nulls[Natts_pg_attribute];
      HeapTuple   tup;

      /* This is a tad tedious, but way cleaner than what we used to do... */
      memset(values, 0, sizeof(values));
      memset(nulls, false, sizeof(nulls));

      values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
      values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
      values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
      values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
      values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
      values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
      values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
      values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
      values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
      values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
      values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
      values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
      values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
      values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
      values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
      values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
      values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);

      /* start out with empty permissions */
      nulls[Anum_pg_attribute_attacl - 1] = true;

      tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);

      /* finally insert the new tuple, update the indexes, and clean up */
      simple_heap_insert(pg_attribute_rel, tup);

      if (indstate != NULL)
            CatalogIndexInsert(indstate, tup);
      else
            CatalogUpdateIndexes(pg_attribute_rel, tup);

      heap_freetuple(tup);
}

/* --------------------------------
 *          AddNewAttributeTuples
 *
 *          this registers the new relation's schema by adding
 *          tuples to pg_attribute.
 * --------------------------------
 */
static void
AddNewAttributeTuples(Oid new_rel_oid,
                                TupleDesc tupdesc,
                                char relkind,
                                bool oidislocal,
                                int oidinhcount)
{
      Form_pg_attribute attr;
      int               i;
      Relation    rel;
      CatalogIndexState indstate;
      int               natts = tupdesc->natts;
      ObjectAddress myself,
                        referenced;

      /*
       * open pg_attribute and its indexes.
       */
      rel = heap_open(AttributeRelationId, RowExclusiveLock);

      indstate = CatalogOpenIndexes(rel);

      /*
       * First we add the user attributes.  This is also a convenient place to
       * add dependencies on their datatypes.
       */
      for (i = 0; i < natts; i++)
      {
            attr = tupdesc->attrs[i];
            /* Fill in the correct relation OID */
            attr->attrelid = new_rel_oid;
            /* Make sure these are OK, too */
            attr->attstattarget = -1;
            attr->attcacheoff = -1;

            InsertPgAttributeTuple(rel, attr, indstate);

            /* Add dependency info */
            myself.classId = RelationRelationId;
            myself.objectId = new_rel_oid;
            myself.objectSubId = i + 1;
            referenced.classId = TypeRelationId;
            referenced.objectId = attr->atttypid;
            referenced.objectSubId = 0;
            recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
      }

      /*
       * Next we add the system attributes.  Skip OID if rel has no OIDs. Skip
       * all for a view or type relation.  We don't bother with making datatype
       * dependencies here, since presumably all these types are pinned.
       */
      if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
      {
            for (i = 0; i < (int) lengthof(SysAtt); i++)
            {
                  FormData_pg_attribute attStruct;

                  /* skip OID where appropriate */
                  if (!tupdesc->tdhasoid &&
                        SysAtt[i]->attnum == ObjectIdAttributeNumber)
                        continue;

                  memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));

                  /* Fill in the correct relation OID in the copied tuple */
                  attStruct.attrelid = new_rel_oid;

                  /* Fill in correct inheritance info for the OID column */
                  if (attStruct.attnum == ObjectIdAttributeNumber)
                  {
                        attStruct.attislocal = oidislocal;
                        attStruct.attinhcount = oidinhcount;
                  }

                  InsertPgAttributeTuple(rel, &attStruct, indstate);
            }
      }

      /*
       * clean up
       */
      CatalogCloseIndexes(indstate);

      heap_close(rel, RowExclusiveLock);
}

/* --------------------------------
 *          InsertPgClassTuple
 *
 *          Construct and insert a new tuple in pg_class.
 *
 * Caller has already opened and locked pg_class.
 * Tuple data is taken from new_rel_desc->rd_rel, except for the
 * variable-width fields which are not present in a cached reldesc.
 * We always initialize relacl to NULL (i.e., default permissions),
 * and reloptions is set to the passed-in text array (if any).
 * --------------------------------
 */
void
InsertPgClassTuple(Relation pg_class_desc,
                           Relation new_rel_desc,
                           Oid new_rel_oid,
                           Datum reloptions)
{
      Form_pg_class rd_rel = new_rel_desc->rd_rel;
      Datum       values[Natts_pg_class];
      bool        nulls[Natts_pg_class];
      HeapTuple   tup;

      /* This is a tad tedious, but way cleaner than what we used to do... */
      memset(values, 0, sizeof(values));
      memset(nulls, false, sizeof(nulls));

      values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
      values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
      values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
      values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
      values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
      values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
      values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
      values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
      values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
      values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
      values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
      values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
      values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
      values[Anum_pg_class_relistemp - 1] = BoolGetDatum(rd_rel->relistemp);
      values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
      values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
      values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
      values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
      values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
      values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
      values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
      values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
      values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
      /* start out with empty permissions */
      nulls[Anum_pg_class_relacl - 1] = true;
      if (reloptions != (Datum) 0)
            values[Anum_pg_class_reloptions - 1] = reloptions;
      else
            nulls[Anum_pg_class_reloptions - 1] = true;

      tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);

      /*
       * The new tuple must have the oid already chosen for the rel.    Sure would
       * be embarrassing to do this sort of thing in polite company.
       */
      HeapTupleSetOid(tup, new_rel_oid);

      /* finally insert the new tuple, update the indexes, and clean up */
      simple_heap_insert(pg_class_desc, tup);

      CatalogUpdateIndexes(pg_class_desc, tup);

      heap_freetuple(tup);
}

/* --------------------------------
 *          AddNewRelationTuple
 *
 *          this registers the new relation in the catalogs by
 *          adding a tuple to pg_class.
 * --------------------------------
 */
static void
AddNewRelationTuple(Relation pg_class_desc,
                              Relation new_rel_desc,
                              Oid new_rel_oid,
                              Oid new_type_oid,
                              Oid relowner,
                              char relkind,
                              Datum reloptions)
{
      Form_pg_class new_rel_reltup;

      /*
       * first we update some of the information in our uncataloged relation's
       * relation descriptor.
       */
      new_rel_reltup = new_rel_desc->rd_rel;

      switch (relkind)
      {
            case RELKIND_RELATION:
            case RELKIND_INDEX:
            case RELKIND_TOASTVALUE:
                  /* The relation is real, but as yet empty */
                  new_rel_reltup->relpages = 0;
                  new_rel_reltup->reltuples = 0;
                  break;
            case RELKIND_SEQUENCE:
                  /* Sequences always have a known size */
                  new_rel_reltup->relpages = 1;
                  new_rel_reltup->reltuples = 1;
                  break;
            default:
                  /* Views, etc, have no disk storage */
                  new_rel_reltup->relpages = 0;
                  new_rel_reltup->reltuples = 0;
                  break;
      }

      /* Initialize relfrozenxid */
      if (relkind == RELKIND_RELATION ||
            relkind == RELKIND_TOASTVALUE)
      {
            /*
             * Initialize to the minimum XID that could put tuples in the table.
             * We know that no xacts older than RecentXmin are still running, so
             * that will do.
             */
            new_rel_reltup->relfrozenxid = RecentXmin;
      }
      else
      {
            /*
             * Other relation types will not contain XIDs, so set relfrozenxid to
             * InvalidTransactionId.  (Note: a sequence does contain a tuple, but
             * we force its xmin to be FrozenTransactionId always; see
             * commands/sequence.c.)
             */
            new_rel_reltup->relfrozenxid = InvalidTransactionId;
      }

      new_rel_reltup->relowner = relowner;
      new_rel_reltup->reltype = new_type_oid;
      new_rel_reltup->relkind = relkind;

      new_rel_desc->rd_att->tdtypeid = new_type_oid;

      /* Now build and insert the tuple */
      InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid, reloptions);
}


/* --------------------------------
 *          AddNewRelationType -
 *
 *          define a composite type corresponding to the new relation
 * --------------------------------
 */
static Oid
AddNewRelationType(const char *typeName,
                           Oid typeNamespace,
                           Oid new_rel_oid,
                           char new_rel_kind,
                           Oid ownerid,
                           Oid new_array_type)
{
      return
            TypeCreate(InvalidOid,  /* no predetermined OID */
                           typeName,      /* type name */
                           typeNamespace,       /* type namespace */
                           new_rel_oid, /* relation oid */
                           new_rel_kind,  /* relation kind */
                           ownerid,       /* owner's ID */
                           -1,                  /* internal size (varlena) */
                           TYPTYPE_COMPOSITE,   /* type-type (composite) */
                           TYPCATEGORY_COMPOSITE,           /* type-category (ditto) */
                           false,         /* composite types are never preferred */
                           DEFAULT_TYPDELIM,    /* default array delimiter */
                           F_RECORD_IN, /* input procedure */
                           F_RECORD_OUT,  /* output procedure */
                           F_RECORD_RECV,       /* receive procedure */
                           F_RECORD_SEND,       /* send procedure */
                           InvalidOid,    /* typmodin procedure - none */
                           InvalidOid,    /* typmodout procedure - none */
                           InvalidOid,    /* analyze procedure - default */
                           InvalidOid,    /* array element type - irrelevant */
                           false,         /* this is not an array type */
                           new_array_type,            /* array type if any */
                           InvalidOid,    /* domain base type - irrelevant */
                           NULL,          /* default value - none */
                           NULL,          /* default binary representation */
                           false,         /* passed by reference */
                           'd',                 /* alignment - must be the largest! */
                           'x',                 /* fully TOASTable */
                           -1,                  /* typmod */
                           0,             /* array dimensions for typBaseType */
                           false);        /* Type NOT NULL */
}

/* --------------------------------
 *          heap_create_with_catalog
 *
 *          creates a new cataloged relation.  see comments above.
 * --------------------------------
 */
Oid
heap_create_with_catalog(const char *relname,
                                     Oid relnamespace,
                                     Oid reltablespace,
                                     Oid relid,
                                     Oid ownerid,
                                     TupleDesc tupdesc,
                                     List *cooked_constraints,
                                     char relkind,
                                     bool shared_relation,
                                     bool oidislocal,
                                     int oidinhcount,
                                     OnCommitAction oncommit,
                                     Datum reloptions,
                                     bool allow_system_table_mods)
{
      Relation    pg_class_desc;
      Relation    new_rel_desc;
      Oid               old_type_oid;
      Oid               new_type_oid;
      Oid               new_array_oid = InvalidOid;

      pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);

      /*
       * sanity checks
       */
      Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());

      CheckAttributeNamesTypes(tupdesc, relkind);

      if (get_relname_relid(relname, relnamespace))
            ereport(ERROR,
                        (errcode(ERRCODE_DUPLICATE_TABLE),
                         errmsg("relation \"%s\" already exists", relname)));

      /*
       * Since we are going to create a rowtype as well, also check for
       * collision with an existing type name.  If there is one and it's an
       * autogenerated array, we can rename it out of the way; otherwise we can
       * at least give a good error message.
       */
      old_type_oid = GetSysCacheOid(TYPENAMENSP,
                                                  CStringGetDatum(relname),
                                                  ObjectIdGetDatum(relnamespace),
                                                  0, 0);
      if (OidIsValid(old_type_oid))
      {
            if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
                  ereport(ERROR,
                              (errcode(ERRCODE_DUPLICATE_OBJECT),
                               errmsg("type \"%s\" already exists", relname),
                     errhint("A relation has an associated type of the same name, "
                                 "so you must use a name that doesn't conflict "
                                 "with any existing type.")));
      }

      /*
       * Validate shared/non-shared tablespace (must check this before doing
       * GetNewRelFileNode, to prevent Assert therein)
       */
      if (shared_relation)
      {
            if (reltablespace != GLOBALTABLESPACE_OID)
                  /* elog since this is not a user-facing error */
                  elog(ERROR,
                         "shared relations must be placed in pg_global tablespace");
      }
      else
      {
            if (reltablespace == GLOBALTABLESPACE_OID)
                  ereport(ERROR,
                              (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                               errmsg("only shared relations can be placed in pg_global tablespace")));
      }

      /*
       * Allocate an OID for the relation, unless we were told what to use.
       *
       * The OID will be the relfilenode as well, so make sure it doesn't
       * collide with either pg_class OIDs or existing physical files.
       */
      if (!OidIsValid(relid))
            relid = GetNewRelFileNode(reltablespace, shared_relation,
                                                  pg_class_desc);

      /*
       * Create the relcache entry (mostly dummy at this point) and the physical
       * disk file.  (If we fail further down, it's the smgr's responsibility to
       * remove the disk file again.)
       */
      new_rel_desc = heap_create(relname,
                                             relnamespace,
                                             reltablespace,
                                             relid,
                                             tupdesc,
                                             relkind,
                                             shared_relation,
                                             allow_system_table_mods);

      Assert(relid == RelationGetRelid(new_rel_desc));

      /*
       * Decide whether to create an array type over the relation's rowtype. We
       * do not create any array types for system catalogs (ie, those made
       * during initdb).      We create array types for regular relations, views,
       * and composite types ... but not, eg, for toast tables or sequences.
       */
      if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
                                            relkind == RELKIND_VIEW ||
                                            relkind == RELKIND_COMPOSITE_TYPE))
      {
            /* OK, so pre-assign a type OID for the array type */
            Relation    pg_type = heap_open(TypeRelationId, AccessShareLock);

            new_array_oid = GetNewOid(pg_type);
            heap_close(pg_type, AccessShareLock);
      }

      /*
       * Since defining a relation also defines a complex type, we add a new
       * system type corresponding to the new relation.
       *
       * NOTE: we could get a unique-index failure here, in case someone else is
       * creating the same type name in parallel but hadn't committed yet when
       * we checked for a duplicate name above.
       */
      new_type_oid = AddNewRelationType(relname,
                                                        relnamespace,
                                                        relid,
                                                        relkind,
                                                        ownerid,
                                                        new_array_oid);

      /*
       * Now make the array type if wanted.
       */
      if (OidIsValid(new_array_oid))
      {
            char     *relarrayname;

            relarrayname = makeArrayTypeName(relname, relnamespace);

            TypeCreate(new_array_oid,           /* force the type's OID to this */
                           relarrayname,  /* Array type name */
                           relnamespace,  /* Same namespace as parent */
                           InvalidOid,    /* Not composite, no relationOid */
                           0,             /* relkind, also N/A here */
                           ownerid,       /* owner's ID */
                           -1,                  /* Internal size (varlena) */
                           TYPTYPE_BASE,  /* Not composite - typelem is */
                           TYPCATEGORY_ARRAY,   /* type-category (array) */
                           false,         /* array types are never preferred */
                           DEFAULT_TYPDELIM,    /* default array delimiter */
                           F_ARRAY_IN,    /* array input proc */
                           F_ARRAY_OUT, /* array output proc */
                           F_ARRAY_RECV,  /* array recv (bin) proc */
                           F_ARRAY_SEND,  /* array send (bin) proc */
                           InvalidOid,    /* typmodin procedure - none */
                           InvalidOid,    /* typmodout procedure - none */
                           InvalidOid,    /* analyze procedure - default */
                           new_type_oid,  /* array element type - the rowtype */
                           true,          /* yes, this is an array type */
                           InvalidOid,    /* this has no array type */
                           InvalidOid,    /* domain base type - irrelevant */
                           NULL,          /* default value - none */
                           NULL,          /* default binary representation */
                           false,         /* passed by reference */
                           'd',                 /* alignment - must be the largest! */
                           'x',                 /* fully TOASTable */
                           -1,                  /* typmod */
                           0,             /* array dimensions for typBaseType */
                           false);        /* Type NOT NULL */

            pfree(relarrayname);
      }

      /*
       * now create an entry in pg_class for the relation.
       *
       * NOTE: we could get a unique-index failure here, in case someone else is
       * creating the same relation name in parallel but hadn't committed yet
       * when we checked for a duplicate name above.
       */
      AddNewRelationTuple(pg_class_desc,
                                    new_rel_desc,
                                    relid,
                                    new_type_oid,
                                    ownerid,
                                    relkind,
                                    reloptions);

      /*
       * now add tuples to pg_attribute for the attributes in our new relation.
       */
      AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind,
                                      oidislocal, oidinhcount);

      /*
       * Make a dependency link to force the relation to be deleted if its
       * namespace is.  Also make a dependency link to its owner.
       *
       * For composite types, these dependencies are tracked for the pg_type
       * entry, so we needn't record them here.  Likewise, TOAST tables don't
       * need a namespace dependency (they live in a pinned namespace) nor an
       * owner dependency (they depend indirectly through the parent table).
       * Also, skip this in bootstrap mode, since we don't make dependencies
       * while bootstrapping.
       */
      if (relkind != RELKIND_COMPOSITE_TYPE &&
            relkind != RELKIND_TOASTVALUE &&
            !IsBootstrapProcessingMode())
      {
            ObjectAddress myself,
                              referenced;

            myself.classId = RelationRelationId;
            myself.objectId = relid;
            myself.objectSubId = 0;
            referenced.classId = NamespaceRelationId;
            referenced.objectId = relnamespace;
            referenced.objectSubId = 0;
            recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);

            recordDependencyOnOwner(RelationRelationId, relid, ownerid);
      }

      /*
       * Store any supplied constraints and defaults.
       *
       * NB: this may do a CommandCounterIncrement and rebuild the relcache
       * entry, so the relation must be valid and self-consistent at this point.
       * In particular, there are not yet constraints and defaults anywhere.
       */
      StoreConstraints(new_rel_desc, cooked_constraints);

      /*
       * If there's a special on-commit action, remember it
       */
      if (oncommit != ONCOMMIT_NOOP)
            register_on_commit_action(relid, oncommit);

      /*
       * ok, the relation has been cataloged, so close our relations and return
       * the OID of the newly created relation.
       */
      heap_close(new_rel_desc, NoLock);   /* do not unlock till end of xact */
      heap_close(pg_class_desc, RowExclusiveLock);

      return relid;
}


/*
 *          RelationRemoveInheritance
 *
 * Formerly, this routine checked for child relations and aborted the
 * deletion if any were found.      Now we rely on the dependency mechanism
 * to check for or delete child relations.      By the time we get here,
 * there are no children and we need only remove any pg_inherits rows
 * linking this relation to its parent(s).
 */
static void
RelationRemoveInheritance(Oid relid)
{
      Relation    catalogRelation;
      SysScanDesc scan;
      ScanKeyData key;
      HeapTuple   tuple;

      catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);

      ScanKeyInit(&key,
                        Anum_pg_inherits_inhrelid,
                        BTEqualStrategyNumber, F_OIDEQ,
                        ObjectIdGetDatum(relid));

      scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
                                            SnapshotNow, 1, &key);

      while (HeapTupleIsValid(tuple = systable_getnext(scan)))
            simple_heap_delete(catalogRelation, &tuple->t_self);

      systable_endscan(scan);
      heap_close(catalogRelation, RowExclusiveLock);
}

/*
 *          DeleteRelationTuple
 *
 * Remove pg_class row for the given relid.
 *
 * Note: this is shared by relation deletion and index deletion.  It's
 * not intended for use anyplace else.
 */
void
DeleteRelationTuple(Oid relid)
{
      Relation    pg_class_desc;
      HeapTuple   tup;

      /* Grab an appropriate lock on the pg_class relation */
      pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);

      tup = SearchSysCache(RELOID,
                                     ObjectIdGetDatum(relid),
                                     0, 0, 0);
      if (!HeapTupleIsValid(tup))
            elog(ERROR, "cache lookup failed for relation %u", relid);

      /* delete the relation tuple from pg_class, and finish up */
      simple_heap_delete(pg_class_desc, &tup->t_self);

      ReleaseSysCache(tup);

      heap_close(pg_class_desc, RowExclusiveLock);
}

/*
 *          DeleteAttributeTuples
 *
 * Remove pg_attribute rows for the given relid.
 *
 * Note: this is shared by relation deletion and index deletion.  It's
 * not intended for use anyplace else.
 */
void
DeleteAttributeTuples(Oid relid)
{
      Relation    attrel;
      SysScanDesc scan;
      ScanKeyData key[1];
      HeapTuple   atttup;

      /* Grab an appropriate lock on the pg_attribute relation */
      attrel = heap_open(AttributeRelationId, RowExclusiveLock);

      /* Use the index to scan only attributes of the target relation */
      ScanKeyInit(&key[0],
                        Anum_pg_attribute_attrelid,
                        BTEqualStrategyNumber, F_OIDEQ,
                        ObjectIdGetDatum(relid));

      scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
                                            SnapshotNow, 1, key);

      /* Delete all the matching tuples */
      while ((atttup = systable_getnext(scan)) != NULL)
            simple_heap_delete(attrel, &atttup->t_self);

      /* Clean up after the scan */
      systable_endscan(scan);
      heap_close(attrel, RowExclusiveLock);
}

/*
 *          RemoveAttributeById
 *
 * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
 * deleted in pg_attribute.  We also remove pg_statistic entries for it.
 * (Everything else needed, such as getting rid of any pg_attrdef entry,
 * is handled by dependency.c.)
 */
void
RemoveAttributeById(Oid relid, AttrNumber attnum)
{
      Relation    rel;
      Relation    attr_rel;
      HeapTuple   tuple;
      Form_pg_attribute attStruct;
      char        newattname[NAMEDATALEN];

      /*
       * Grab an exclusive lock on the target table, which we will NOT release
       * until end of transaction.  (In the simple case where we are directly
       * dropping this column, AlterTableDropColumn already did this ... but
       * when cascading from a drop of some other object, we may not have any
       * lock.)
       */
      rel = relation_open(relid, AccessExclusiveLock);

      attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);

      tuple = SearchSysCacheCopy(ATTNUM,
                                             ObjectIdGetDatum(relid),
                                             Int16GetDatum(attnum),
                                             0, 0);
      if (!HeapTupleIsValid(tuple))       /* shouldn't happen */
            elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                   attnum, relid);
      attStruct = (Form_pg_attribute) GETSTRUCT(tuple);

      if (attnum < 0)
      {
            /* System attribute (probably OID) ... just delete the row */

            simple_heap_delete(attr_rel, &tuple->t_self);
      }
      else
      {
            /* Dropping user attributes is lots harder */

            /* Mark the attribute as dropped */
            attStruct->attisdropped = true;

            /*
             * Set the type OID to invalid.  A dropped attribute's type link
             * cannot be relied on (once the attribute is dropped, the type might
             * be too). Fortunately we do not need the type row --- the only
             * really essential information is the type's typlen and typalign,
             * which are preserved in the attribute's attlen and attalign.  We set
             * atttypid to zero here as a means of catching code that incorrectly
             * expects it to be valid.
             */
            attStruct->atttypid = InvalidOid;

            /* Remove any NOT NULL constraint the column may have */
            attStruct->attnotnull = false;

            /* We don't want to keep stats for it anymore */
            attStruct->attstattarget = 0;

            /*
             * Change the column name to something that isn't likely to conflict
             */
            snprintf(newattname, sizeof(newattname),
                         "........pg.dropped.%d........", attnum);
            namestrcpy(&(attStruct->attname), newattname);

            simple_heap_update(attr_rel, &tuple->t_self, tuple);

            /* keep the system catalog indexes current */
            CatalogUpdateIndexes(attr_rel, tuple);
      }

      /*
       * Because updating the pg_attribute row will trigger a relcache flush for
       * the target relation, we need not do anything else to notify other
       * backends of the change.
       */

      heap_close(attr_rel, RowExclusiveLock);

      if (attnum > 0)
            RemoveStatistics(relid, attnum);

      relation_close(rel, NoLock);
}

/*
 *          RemoveAttrDefault
 *
 * If the specified relation/attribute has a default, remove it.
 * (If no default, raise error if complain is true, else return quietly.)
 */
void
RemoveAttrDefault(Oid relid, AttrNumber attnum,
                          DropBehavior behavior, bool complain)
{
      Relation    attrdef_rel;
      ScanKeyData scankeys[2];
      SysScanDesc scan;
      HeapTuple   tuple;
      bool        found = false;

      attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);

      ScanKeyInit(&scankeys[0],
                        Anum_pg_attrdef_adrelid,
                        BTEqualStrategyNumber, F_OIDEQ,
                        ObjectIdGetDatum(relid));
      ScanKeyInit(&scankeys[1],
                        Anum_pg_attrdef_adnum,
                        BTEqualStrategyNumber, F_INT2EQ,
                        Int16GetDatum(attnum));

      scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
                                            SnapshotNow, 2, scankeys);

      /* There should be at most one matching tuple, but we loop anyway */
      while (HeapTupleIsValid(tuple = systable_getnext(scan)))
      {
            ObjectAddress object;

            object.classId = AttrDefaultRelationId;
            object.objectId = HeapTupleGetOid(tuple);
            object.objectSubId = 0;

            performDeletion(&object, behavior);

            found = true;
      }

      systable_endscan(scan);
      heap_close(attrdef_rel, RowExclusiveLock);

      if (complain && !found)
            elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
                   relid, attnum);
}

/*
 *          RemoveAttrDefaultById
 *
 * Remove a pg_attrdef entry specified by OID.  This is the guts of
 * attribute-default removal.  Note it should be called via performDeletion,
 * not directly.
 */
void
RemoveAttrDefaultById(Oid attrdefId)
{
      Relation    attrdef_rel;
      Relation    attr_rel;
      Relation    myrel;
      ScanKeyData scankeys[1];
      SysScanDesc scan;
      HeapTuple   tuple;
      Oid               myrelid;
      AttrNumber  myattnum;

      /* Grab an appropriate lock on the pg_attrdef relation */
      attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);

      /* Find the pg_attrdef tuple */
      ScanKeyInit(&scankeys[0],
                        ObjectIdAttributeNumber,
                        BTEqualStrategyNumber, F_OIDEQ,
                        ObjectIdGetDatum(attrdefId));

      scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
                                            SnapshotNow, 1, scankeys);

      tuple = systable_getnext(scan);
      if (!HeapTupleIsValid(tuple))
            elog(ERROR, "could not find tuple for attrdef %u", attrdefId);

      myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
      myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;

      /* Get an exclusive lock on the relation owning the attribute */
      myrel = relation_open(myrelid, AccessExclusiveLock);

      /* Now we can delete the pg_attrdef row */
      simple_heap_delete(attrdef_rel, &tuple->t_self);

      systable_endscan(scan);
      heap_close(attrdef_rel, RowExclusiveLock);

      /* Fix the pg_attribute row */
      attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);

      tuple = SearchSysCacheCopy(ATTNUM,
                                             ObjectIdGetDatum(myrelid),
                                             Int16GetDatum(myattnum),
                                             0, 0);
      if (!HeapTupleIsValid(tuple))       /* shouldn't happen */
            elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                   myattnum, myrelid);

      ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;

      simple_heap_update(attr_rel, &tuple->t_self, tuple);

      /* keep the system catalog indexes current */
      CatalogUpdateIndexes(attr_rel, tuple);

      /*
       * Our update of the pg_attribute row will force a relcache rebuild, so
       * there's nothing else to do here.
       */
      heap_close(attr_rel, RowExclusiveLock);

      /* Keep lock on attribute's rel until end of xact */
      relation_close(myrel, NoLock);
}

/*
 * heap_drop_with_catalog     - removes specified relation from catalogs
 *
 * Note that this routine is not responsible for dropping objects that are
 * linked to the pg_class entry via dependencies (for example, indexes and
 * constraints).  Those are deleted by the dependency-tracing logic in
 * dependency.c before control gets here.  In general, therefore, this routine
 * should never be called directly; go through performDeletion() instead.
 */
void
heap_drop_with_catalog(Oid relid)
{
      Relation    rel;

      /*
       * Open and lock the relation.
       */
      rel = relation_open(relid, AccessExclusiveLock);

      /*
       * There can no longer be anyone *else* touching the relation, but we
       * might still have open queries or cursors in our own session.
       */
      if (rel->rd_refcnt != 1)
            ereport(ERROR,
                        (errcode(ERRCODE_OBJECT_IN_USE),
                         errmsg("cannot drop \"%s\" because "
                                    "it is being used by active queries in this session",
                                    RelationGetRelationName(rel))));

      /*
       * Schedule unlinking of the relation's physical files at commit.
       */
      if (rel->rd_rel->relkind != RELKIND_VIEW &&
            rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
      {
            RelationDropStorage(rel);
      }

      /*
       * Close relcache entry, but *keep* AccessExclusiveLock on the relation
       * until transaction commit.  This ensures no one else will try to do
       * something with the doomed relation.
       */
      relation_close(rel, NoLock);

      /*
       * Forget any ON COMMIT action for the rel
       */
      remove_on_commit_action(relid);

      /*
       * Flush the relation from the relcache.  We want to do this before
       * starting to remove catalog entries, just to be certain that no relcache
       * entry rebuild will happen partway through.  (That should not really
       * matter, since we don't do CommandCounterIncrement here, but let's be
       * safe.)
       */
      RelationForgetRelation(relid);

      /*
       * remove inheritance information
       */
      RelationRemoveInheritance(relid);

      /*
       * delete statistics
       */
      RemoveStatistics(relid, 0);

      /*
       * delete attribute tuples
       */
      DeleteAttributeTuples(relid);

      /*
       * delete relation tuple
       */
      DeleteRelationTuple(relid);
}


/*
 * Store a default expression for column attnum of relation rel.
 */
void
StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
{
      char     *adbin;
      char     *adsrc;
      Relation    adrel;
      HeapTuple   tuple;
      Datum       values[4];
      static bool nulls[4] = {false, false, false, false};
      Relation    attrrel;
      HeapTuple   atttup;
      Form_pg_attribute attStruct;
      Oid               attrdefOid;
      ObjectAddress colobject,
                        defobject;

      /*
       * Flatten expression to string form for storage.
       */
      adbin = nodeToString(expr);

      /*
       * Also deparse it to form the mostly-obsolete adsrc field.
       */
      adsrc = deparse_expression(expr,
                                          deparse_context_for(RelationGetRelationName(rel),
                                                                        RelationGetRelid(rel)),
                                             false, false);

      /*
       * Make the pg_attrdef entry.
       */
      values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
      values[Anum_pg_attrdef_adnum - 1] = attnum;
      values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
      values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);

      adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);

      tuple = heap_form_tuple(adrel->rd_att, values, nulls);
      attrdefOid = simple_heap_insert(adrel, tuple);

      CatalogUpdateIndexes(adrel, tuple);

      defobject.classId = AttrDefaultRelationId;
      defobject.objectId = attrdefOid;
      defobject.objectSubId = 0;

      heap_close(adrel, RowExclusiveLock);

      /* now can free some of the stuff allocated above */
      pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
      pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
      heap_freetuple(tuple);
      pfree(adbin);
      pfree(adsrc);

      /*
       * Update the pg_attribute entry for the column to show that a default
       * exists.
       */
      attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
      atttup = SearchSysCacheCopy(ATTNUM,
                                                ObjectIdGetDatum(RelationGetRelid(rel)),
                                                Int16GetDatum(attnum),
                                                0, 0);
      if (!HeapTupleIsValid(atttup))
            elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                   attnum, RelationGetRelid(rel));
      attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
      if (!attStruct->atthasdef)
      {
            attStruct->atthasdef = true;
            simple_heap_update(attrrel, &atttup->t_self, atttup);
            /* keep catalog indexes current */
            CatalogUpdateIndexes(attrrel, atttup);
      }
      heap_close(attrrel, RowExclusiveLock);
      heap_freetuple(atttup);

      /*
       * Make a dependency so that the pg_attrdef entry goes away if the column
       * (or whole table) is deleted.
       */
      colobject.classId = RelationRelationId;
      colobject.objectId = RelationGetRelid(rel);
      colobject.objectSubId = attnum;

      recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);

      /*
       * Record dependencies on objects used in the expression, too.
       */
      recordDependencyOnExpr(&defobject, expr, NIL, DEPENDENCY_NORMAL);
}

/*
 * Store a check-constraint expression for the given relation.
 *
 * Caller is responsible for updating the count of constraints
 * in the pg_class entry for the relation.
 */
static void
StoreRelCheck(Relation rel, char *ccname, Node *expr,
                    bool is_local, int inhcount)
{
      char     *ccbin;
      char     *ccsrc;
      List     *varList;
      int               keycount;
      int16    *attNos;

      /*
       * Flatten expression to string form for storage.
       */
      ccbin = nodeToString(expr);

      /*
       * Also deparse it to form the mostly-obsolete consrc field.
       */
      ccsrc = deparse_expression(expr,
                                          deparse_context_for(RelationGetRelationName(rel),
                                                                        RelationGetRelid(rel)),
                                             false, false);

      /*
       * Find columns of rel that are used in expr
       *
       * NB: pull_var_clause is okay here only because we don't allow subselects
       * in check constraints; it would fail to examine the contents of
       * subselects.
       */
      varList = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
      keycount = list_length(varList);

      if (keycount > 0)
      {
            ListCell   *vl;
            int               i = 0;

            attNos = (int16 *) palloc(keycount * sizeof(int16));
            foreach(vl, varList)
            {
                  Var            *var = (Var *) lfirst(vl);
                  int               j;

                  for (j = 0; j < i; j++)
                        if (attNos[j] == var->varattno)
                              break;
                  if (j == i)
                        attNos[i++] = var->varattno;
            }
            keycount = i;
      }
      else
            attNos = NULL;

      /*
       * Create the Check Constraint
       */
      CreateConstraintEntry(ccname,       /* Constraint Name */
                                      RelationGetNamespace(rel),  /* namespace */
                                      CONSTRAINT_CHECK,           /* Constraint Type */
                                      false,    /* Is Deferrable */
                                      false,    /* Is Deferred */
                                      RelationGetRelid(rel),            /* relation */
                                      attNos,         /* attrs in the constraint */
                                      keycount,       /* # attrs in the constraint */
                                      InvalidOid,     /* not a domain constraint */
                                      InvalidOid,     /* Foreign key fields */
                                      NULL,
                                      NULL,
                                      NULL,
                                      NULL,
                                      0,
                                      ' ',
                                      ' ',
                                      ' ',
                                      InvalidOid,     /* no associated index */
                                      expr, /* Tree form check constraint */
                                      ccbin,    /* Binary form check constraint */
                                      ccsrc,    /* Source form check constraint */
                                      is_local,       /* conislocal */
                                      inhcount);      /* coninhcount */

      pfree(ccbin);
      pfree(ccsrc);
}

/*
 * Store defaults and constraints (passed as a list of CookedConstraint).
 *
 * NOTE: only pre-cooked expressions will be passed this way, which is to
 * say expressions inherited from an existing relation.  Newly parsed
 * expressions can be added later, by direct calls to StoreAttrDefault
 * and StoreRelCheck (see AddRelationNewConstraints()).
 */
static void
StoreConstraints(Relation rel, List *cooked_constraints)
{
      int               numchecks = 0;
      ListCell   *lc;

      if (!cooked_constraints)
            return;                             /* nothing to do */

      /*
       * Deparsing of constraint expressions will fail unless the just-created
       * pg_attribute tuples for this relation are made visible.  So, bump the
       * command counter.  CAUTION: this will cause a relcache entry rebuild.
       */
      CommandCounterIncrement();

      foreach(lc, cooked_constraints)
      {
            CookedConstraint *con = (CookedConstraint *) lfirst(lc);

            switch (con->contype)
            {
                  case CONSTR_DEFAULT:
                        StoreAttrDefault(rel, con->attnum, con->expr);
                        break;
                  case CONSTR_CHECK:
                        StoreRelCheck(rel, con->name, con->expr,
                                            con->is_local, con->inhcount);
                        numchecks++;
                        break;
                  default:
                        elog(ERROR, "unrecognized constraint type: %d",
                               (int) con->contype);
            }
      }

      if (numchecks > 0)
            SetRelationNumChecks(rel, numchecks);
}

/*
 * AddRelationNewConstraints
 *
 * Add new column default expressions and/or constraint check expressions
 * to an existing relation.  This is defined to do both for efficiency in
 * DefineRelation, but of course you can do just one or the other by passing
 * empty lists.
 *
 * rel: relation to be modified
 * newColDefaults: list of RawColumnDefault structures
 * newConstraints: list of Constraint nodes
 * allow_merge: TRUE if check constraints may be merged with existing ones
 * is_local: TRUE if definition is local, FALSE if it's inherited
 *
 * All entries in newColDefaults will be processed.  Entries in newConstraints
 * will be processed only if they are CONSTR_CHECK type.
 *
 * Returns a list of CookedConstraint nodes that shows the cooked form of
 * the default and constraint expressions added to the relation.
 *
 * NB: caller should have opened rel with AccessExclusiveLock, and should
 * hold that lock till end of transaction.      Also, we assume the caller has
 * done a CommandCounterIncrement if necessary to make the relation's catalog
 * tuples visible.
 */
List *
AddRelationNewConstraints(Relation rel,
                                      List *newColDefaults,
                                      List *newConstraints,
                                      bool allow_merge,
                                      bool is_local)
{
      List     *cookedConstraints = NIL;
      TupleDesc   tupleDesc;
      TupleConstr *oldconstr;
      int               numoldchecks;
      ParseState *pstate;
      RangeTblEntry *rte;
      int               numchecks;
      List     *checknames;
      ListCell   *cell;
      Node     *expr;
      CookedConstraint *cooked;

      /*
       * Get info about existing constraints.
       */
      tupleDesc = RelationGetDescr(rel);
      oldconstr = tupleDesc->constr;
      if (oldconstr)
            numoldchecks = oldconstr->num_check;
      else
            numoldchecks = 0;

      /*
       * Create a dummy ParseState and insert the target relation as its sole
       * rangetable entry.  We need a ParseState for transformExpr.
       */
      pstate = make_parsestate(NULL);
      rte = addRangeTableEntryForRelation(pstate,
                                                            rel,
                                                            NULL,
                                                            false,
                                                            true);
      addRTEtoQuery(pstate, rte, true, true, true);

      /*
       * Process column default expressions.
       */
      foreach(cell, newColDefaults)
      {
            RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
            Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];

            expr = cookDefault(pstate, colDef->raw_default,
                                       atp->atttypid, atp->atttypmod,
                                       NameStr(atp->attname));

            /*
             * If the expression is just a NULL constant, we do not bother to make
             * an explicit pg_attrdef entry, since the default behavior is
             * equivalent.
             *
             * Note a nonobvious property of this test: if the column is of a
             * domain type, what we'll get is not a bare null Const but a
             * CoerceToDomain expr, so we will not discard the default.  This is
             * critical because the column default needs to be retained to
             * override any default that the domain might have.
             */
            if (expr == NULL ||
                  (IsA(expr, Const) &&((Const *) expr)->constisnull))
                  continue;

            StoreAttrDefault(rel, colDef->attnum, expr);

            cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
            cooked->contype = CONSTR_DEFAULT;
            cooked->name = NULL;
            cooked->attnum = colDef->attnum;
            cooked->expr = expr;
            cooked->is_local = is_local;
            cooked->inhcount = is_local ? 0 : 1;
            cookedConstraints = lappend(cookedConstraints, cooked);
      }

      /*
       * Process constraint expressions.
       */
      numchecks = numoldchecks;
      checknames = NIL;
      foreach(cell, newConstraints)
      {
            Constraint *cdef = (Constraint *) lfirst(cell);
            char     *ccname;

            if (cdef->contype != CONSTR_CHECK)
                  continue;

            if (cdef->raw_expr != NULL)
            {
                  Assert(cdef->cooked_expr == NULL);

                  /*
                   * Transform raw parsetree to executable expression, and verify
                   * it's valid as a CHECK constraint.
                   */
                  expr = cookConstraint(pstate, cdef->raw_expr,
                                                  RelationGetRelationName(rel));
            }
            else
            {
                  Assert(cdef->cooked_expr != NULL);

                  /*
                   * Here, we assume the parser will only pass us valid CHECK
                   * expressions, so we do no particular checking.
                   */
                  expr = stringToNode(cdef->cooked_expr);
            }

            /*
             * Check name uniqueness, or generate a name if none was given.
             */
            if (cdef->name != NULL)
            {
                  ListCell   *cell2;

                  ccname = cdef->name;
                  /* Check against other new constraints */
                  /* Needed because we don't do CommandCounterIncrement in loop */
                  foreach(cell2, checknames)
                  {
                        if (strcmp((char *) lfirst(cell2), ccname) == 0)
                              ereport(ERROR,
                                          (errcode(ERRCODE_DUPLICATE_OBJECT),
                                           errmsg("check constraint \"%s\" already exists",
                                                      ccname)));
                  }

                  /* save name for future checks */
                  checknames = lappend(checknames, ccname);

                  /*
                   * Check against pre-existing constraints.      If we are allowed to
                   * merge with an existing constraint, there's no more to do here.
                   * (We omit the duplicate constraint from the result, which is
                   * what ATAddCheckConstraint wants.)
                   */
                  if (MergeWithExistingConstraint(rel, ccname, expr,
                                                                  allow_merge, is_local))
                        continue;
            }
            else
            {
                  /*
                   * When generating a name, we want to create "tab_col_check" for a
                   * column constraint and "tab_check" for a table constraint.  We
                   * no longer have any info about the syntactic positioning of the
                   * constraint phrase, so we approximate this by seeing whether the
                   * expression references more than one column.  (If the user
                   * played by the rules, the result is the same...)
                   *
                   * Note: pull_var_clause() doesn't descend into sublinks, but we
                   * eliminated those above; and anyway this only needs to be an
                   * approximate answer.
                   */
                  List     *vars;
                  char     *colname;

                  vars = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);

                  /* eliminate duplicates */
                  vars = list_union(NIL, vars);

                  if (list_length(vars) == 1)
                        colname = get_attname(RelationGetRelid(rel),
                                                        ((Var *) linitial(vars))->varattno);
                  else
                        colname = NULL;

                  ccname = ChooseConstraintName(RelationGetRelationName(rel),
                                                              colname,
                                                              "check",
                                                              RelationGetNamespace(rel),
                                                              checknames);

                  /* save name for future checks */
                  checknames = lappend(checknames, ccname);
            }

            /*
             * OK, store it.
             */
            StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1);

            numchecks++;

            cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
            cooked->contype = CONSTR_CHECK;
            cooked->name = ccname;
            cooked->attnum = 0;
            cooked->expr = expr;
            cooked->is_local = is_local;
            cooked->inhcount = is_local ? 0 : 1;
            cookedConstraints = lappend(cookedConstraints, cooked);
      }

      /*
       * Update the count of constraints in the relation's pg_class tuple. We do
       * this even if there was no change, in order to ensure that an SI update
       * message is sent out for the pg_class tuple, which will force other
       * backends to rebuild their relcache entries for the rel. (This is
       * critical if we added defaults but not constraints.)
       */
      SetRelationNumChecks(rel, numchecks);

      return cookedConstraints;
}

/*
 * Check for a pre-existing check constraint that conflicts with a proposed
 * new one, and either adjust its conislocal/coninhcount settings or throw
 * error as needed.
 *
 * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
 * got a so-far-unique name, or throws error if conflict.
 */
static bool
MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
                                          bool allow_merge, bool is_local)
{
      bool        found;
      Relation    conDesc;
      SysScanDesc conscan;
      ScanKeyData skey[2];
      HeapTuple   tup;

      /* Search for a pg_constraint entry with same name and relation */
      conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);

      found = false;

      ScanKeyInit(&skey[0],
                        Anum_pg_constraint_conname,
                        BTEqualStrategyNumber, F_NAMEEQ,
                        CStringGetDatum(ccname));

      ScanKeyInit(&skey[1],
                        Anum_pg_constraint_connamespace,
                        BTEqualStrategyNumber, F_OIDEQ,
                        ObjectIdGetDatum(RelationGetNamespace(rel)));

      conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
                                                 SnapshotNow, 2, skey);

      while (HeapTupleIsValid(tup = systable_getnext(conscan)))
      {
            Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);

            if (con->conrelid == RelationGetRelid(rel))
            {
                  /* Found it.  Conflicts if not identical check constraint */
                  if (con->contype == CONSTRAINT_CHECK)
                  {
                        Datum       val;
                        bool        isnull;

                        val = fastgetattr(tup,
                                                  Anum_pg_constraint_conbin,
                                                  conDesc->rd_att, &isnull);
                        if (isnull)
                              elog(ERROR, "null conbin for rel %s",
                                     RelationGetRelationName(rel));
                        if (equal(expr, stringToNode(TextDatumGetCString(val))))
                              found = true;
                  }
                  if (!found || !allow_merge)
                        ereport(ERROR,
                                    (errcode(ERRCODE_DUPLICATE_OBJECT),
                        errmsg("constraint \"%s\" for relation \"%s\" already exists",
                                 ccname, RelationGetRelationName(rel))));
                  /* OK to update the tuple */
                  ereport(NOTICE,
                     (errmsg("merging constraint \"%s\" with inherited definition",
                                 ccname)));
                  tup = heap_copytuple(tup);
                  con = (Form_pg_constraint) GETSTRUCT(tup);
                  if (is_local)
                        con->conislocal = true;
                  else
                        con->coninhcount++;
                  simple_heap_update(conDesc, &tup->t_self, tup);
                  CatalogUpdateIndexes(conDesc, tup);
                  break;
            }
      }

      systable_endscan(conscan);
      heap_close(conDesc, RowExclusiveLock);

      return found;
}

/*
 * Update the count of constraints in the relation's pg_class tuple.
 *
 * Caller had better hold exclusive lock on the relation.
 *
 * An important side effect is that a SI update message will be sent out for
 * the pg_class tuple, which will force other backends to rebuild their
 * relcache entries for the rel.  Also, this backend will rebuild its
 * own relcache entry at the next CommandCounterIncrement.
 */
static void
SetRelationNumChecks(Relation rel, int numchecks)
{
      Relation    relrel;
      HeapTuple   reltup;
      Form_pg_class relStruct;

      relrel = heap_open(RelationRelationId, RowExclusiveLock);
      reltup = SearchSysCacheCopy(RELOID,
                                                ObjectIdGetDatum(RelationGetRelid(rel)),
                                                0, 0, 0);
      if (!HeapTupleIsValid(reltup))
            elog(ERROR, "cache lookup failed for relation %u",
                   RelationGetRelid(rel));
      relStruct = (Form_pg_class) GETSTRUCT(reltup);

      if (relStruct->relchecks != numchecks)
      {
            relStruct->relchecks = numchecks;

            simple_heap_update(relrel, &reltup->t_self, reltup);

            /* keep catalog indexes current */
            CatalogUpdateIndexes(relrel, reltup);
      }
      else
      {
            /* Skip the disk update, but force relcache inval anyway */
            CacheInvalidateRelcache(rel);
      }

      heap_freetuple(reltup);
      heap_close(relrel, RowExclusiveLock);
}

/*
 * Take a raw default and convert it to a cooked format ready for
 * storage.
 *
 * Parse state should be set up to recognize any vars that might appear
 * in the expression.  (Even though we plan to reject vars, it's more
 * user-friendly to give the correct error message than "unknown var".)
 *
 * If atttypid is not InvalidOid, coerce the expression to the specified
 * type (and typmod atttypmod).   attname is only needed in this case:
 * it is used in the error message, if any.
 */
Node *
cookDefault(ParseState *pstate,
                  Node *raw_default,
                  Oid atttypid,
                  int32 atttypmod,
                  char *attname)
{
      Node     *expr;

      Assert(raw_default != NULL);

      /*
       * Transform raw parsetree to executable expression.
       */
      expr = transformExpr(pstate, raw_default);

      /*
       * Make sure default expr does not refer to any vars.
       */
      if (contain_var_clause(expr))
            ereport(ERROR,
                        (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                    errmsg("cannot use column references in default expression")));

      /*
       * It can't return a set either.
       */
      if (expression_returns_set(expr))
            ereport(ERROR,
                        (errcode(ERRCODE_DATATYPE_MISMATCH),
                         errmsg("default expression must not return a set")));

      /*
       * No subplans or aggregates, either...
       */
      if (pstate->p_hasSubLinks)
            ereport(ERROR,
                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                         errmsg("cannot use subquery in default expression")));
      if (pstate->p_hasAggs)
            ereport(ERROR,
                        (errcode(ERRCODE_GROUPING_ERROR),
                   errmsg("cannot use aggregate function in default expression")));
      if (pstate->p_hasWindowFuncs)
            ereport(ERROR,
                        (errcode(ERRCODE_WINDOWING_ERROR),
                         errmsg("cannot use window function in default expression")));

      /*
       * Coerce the expression to the correct type and typmod, if given. This
       * should match the parser's processing of non-defaulted expressions ---
       * see transformAssignedExpr().
       */
      if (OidIsValid(atttypid))
      {
            Oid               type_id = exprType(expr);

            expr = coerce_to_target_type(pstate, expr, type_id,
                                                       atttypid, atttypmod,
                                                       COERCION_ASSIGNMENT,
                                                       COERCE_IMPLICIT_CAST,
                                                       -1);
            if (expr == NULL)
                  ereport(ERROR,
                              (errcode(ERRCODE_DATATYPE_MISMATCH),
                               errmsg("column \"%s\" is of type %s"
                                          " but default expression is of type %s",
                                          attname,
                                          format_type_be(atttypid),
                                          format_type_be(type_id)),
                     errhint("You will need to rewrite or cast the expression.")));
      }

      return expr;
}

/*
 * Take a raw CHECK constraint expression and convert it to a cooked format
 * ready for storage.
 *
 * Parse state must be set up to recognize any vars that might appear
 * in the expression.
 */
static Node *
cookConstraint(ParseState *pstate,
                     Node *raw_constraint,
                     char *relname)
{
      Node     *expr;

      /*
       * Transform raw parsetree to executable expression.
       */
      expr = transformExpr(pstate, raw_constraint);

      /*
       * Make sure it yields a boolean result.
       */
      expr = coerce_to_boolean(pstate, expr, "CHECK");

      /*
       * Make sure no outside relations are referred to.
       */
      if (list_length(pstate->p_rtable) != 1)
            ereport(ERROR,
                        (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
                  errmsg("only table \"%s\" can be referenced in check constraint",
                           relname)));

      /*
       * No subplans or aggregates, either...
       */
      if (pstate->p_hasSubLinks)
            ereport(ERROR,
                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                         errmsg("cannot use subquery in check constraint")));
      if (pstate->p_hasAggs)
            ereport(ERROR,
                        (errcode(ERRCODE_GROUPING_ERROR),
                     errmsg("cannot use aggregate function in check constraint")));
      if (pstate->p_hasWindowFuncs)
            ereport(ERROR,
                        (errcode(ERRCODE_WINDOWING_ERROR),
                         errmsg("cannot use window function in check constraint")));

      return expr;
}


/*
 * RemoveStatistics --- remove entries in pg_statistic for a rel or column
 *
 * If attnum is zero, remove all entries for rel; else remove only the one
 * for that column.
 */
void
RemoveStatistics(Oid relid, AttrNumber attnum)
{
      Relation    pgstatistic;
      SysScanDesc scan;
      ScanKeyData key[2];
      int               nkeys;
      HeapTuple   tuple;

      pgstatistic = heap_open(StatisticRelationId, RowExclusiveLock);

      ScanKeyInit(&key[0],
                        Anum_pg_statistic_starelid,
                        BTEqualStrategyNumber, F_OIDEQ,
                        ObjectIdGetDatum(relid));

      if (attnum == 0)
            nkeys = 1;
      else
      {
            ScanKeyInit(&key[1],
                              Anum_pg_statistic_staattnum,
                              BTEqualStrategyNumber, F_INT2EQ,
                              Int16GetDatum(attnum));
            nkeys = 2;
      }

      scan = systable_beginscan(pgstatistic, StatisticRelidAttnumIndexId, true,
                                            SnapshotNow, nkeys, key);

      while (HeapTupleIsValid(tuple = systable_getnext(scan)))
            simple_heap_delete(pgstatistic, &tuple->t_self);

      systable_endscan(scan);

      heap_close(pgstatistic, RowExclusiveLock);
}


/*
 * RelationTruncateIndexes - truncate all indexes associated
 * with the heap relation to zero tuples.
 *
 * The routine will truncate and then reconstruct the indexes on
 * the specified relation.    Caller must hold exclusive lock on rel.
 */
static void
RelationTruncateIndexes(Relation heapRelation)
{
      ListCell   *indlist;

      /* Ask the relcache to produce a list of the indexes of the rel */
      foreach(indlist, RelationGetIndexList(heapRelation))
      {
            Oid               indexId = lfirst_oid(indlist);
            Relation    currentIndex;
            IndexInfo  *indexInfo;

            /* Open the index relation; use exclusive lock, just to be sure */
            currentIndex = index_open(indexId, AccessExclusiveLock);

            /* Fetch info needed for index_build */
            indexInfo = BuildIndexInfo(currentIndex);

            /*
             * Now truncate the actual file (and discard buffers).
             */
            RelationTruncate(currentIndex, 0);

            /* Initialize the index and rebuild */
            /* Note: we do not need to re-establish pkey setting */
            index_build(heapRelation, currentIndex, indexInfo, false);

            /* We're done with this index */
            index_close(currentIndex, NoLock);
      }
}

/*
 *     heap_truncate
 *
 *     This routine deletes all data within all the specified relations.
 *
 * This is not transaction-safe!  There is another, transaction-safe
 * implementation in commands/tablecmds.c.      We now use this only for
 * ON COMMIT truncation of temporary tables, where it doesn't matter.
 */
void
heap_truncate(List *relids)
{
      List     *relations = NIL;
      ListCell   *cell;

      /* Open relations for processing, and grab exclusive access on each */
      foreach(cell, relids)
      {
            Oid               rid = lfirst_oid(cell);
            Relation    rel;
            Oid               toastrelid;

            rel = heap_open(rid, AccessExclusiveLock);
            relations = lappend(relations, rel);

            /* If there is a toast table, add it to the list too */
            toastrelid = rel->rd_rel->reltoastrelid;
            if (OidIsValid(toastrelid))
            {
                  rel = heap_open(toastrelid, AccessExclusiveLock);
                  relations = lappend(relations, rel);
            }
      }

      /* Don't allow truncate on tables that are referenced by foreign keys */
      heap_truncate_check_FKs(relations, true);

      /* OK to do it */
      foreach(cell, relations)
      {
            Relation    rel = lfirst(cell);

            /* Truncate the actual file (and discard buffers) */
            RelationTruncate(rel, 0);

            /* If this relation has indexes, truncate the indexes too */
            RelationTruncateIndexes(rel);

            /*
             * Close the relation, but keep exclusive lock on it until commit.
             */
            heap_close(rel, NoLock);
      }
}

/*
 * heap_truncate_check_FKs
 *          Check for foreign keys referencing a list of relations that
 *          are to be truncated, and raise error if there are any
 *
 * We disallow such FKs (except self-referential ones) since the whole point
 * of TRUNCATE is to not scan the individual rows to be thrown away.
 *
 * This is split out so it can be shared by both implementations of truncate.
 * Caller should already hold a suitable lock on the relations.
 *
 * tempTables is only used to select an appropriate error message.
 */
void
heap_truncate_check_FKs(List *relations, bool tempTables)
{
      List     *oids = NIL;
      List     *dependents;
      ListCell   *cell;

      /*
       * Build a list of OIDs of the interesting relations.
       *
       * If a relation has no triggers, then it can neither have FKs nor be
       * referenced by a FK from another table, so we can ignore it.
       */
      foreach(cell, relations)
      {
            Relation    rel = lfirst(cell);

            if (rel->rd_rel->relhastriggers)
                  oids = lappend_oid(oids, RelationGetRelid(rel));
      }

      /*
       * Fast path: if no relation has triggers, none has FKs either.
       */
      if (oids == NIL)
            return;

      /*
       * Otherwise, must scan pg_constraint.    We make one pass with all the
       * relations considered; if this finds nothing, then all is well.
       */
      dependents = heap_truncate_find_FKs(oids);
      if (dependents == NIL)
            return;

      /*
       * Otherwise we repeat the scan once per relation to identify a particular
       * pair of relations to complain about.  This is pretty slow, but
       * performance shouldn't matter much in a failure path.  The reason for
       * doing things this way is to ensure that the message produced is not
       * dependent on chance row locations within pg_constraint.
       */
      foreach(cell, oids)
      {
            Oid               relid = lfirst_oid(cell);
            ListCell   *cell2;

            dependents = heap_truncate_find_FKs(list_make1_oid(relid));

            foreach(cell2, dependents)
            {
                  Oid               relid2 = lfirst_oid(cell2);

                  if (!list_member_oid(oids, relid2))
                  {
                        char     *relname = get_rel_name(relid);
                        char     *relname2 = get_rel_name(relid2);

                        if (tempTables)
                              ereport(ERROR,
                                          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                           errmsg("unsupported ON COMMIT and foreign key combination"),
                                           errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
                                                         relname2, relname)));
                        else
                              ereport(ERROR,
                                          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                           errmsg("cannot truncate a table referenced in a foreign key constraint"),
                                           errdetail("Table \"%s\" references \"%s\".",
                                                         relname2, relname),
                                       errhint("Truncate table \"%s\" at the same time, "
                                                   "or use TRUNCATE ... CASCADE.",
                                                   relname2)));
                  }
            }
      }
}

/*
 * heap_truncate_find_FKs
 *          Find relations having foreign keys referencing any of the given rels
 *
 * Input and result are both lists of relation OIDs.  The result contains
 * no duplicates, does *not* include any rels that were already in the input
 * list, and is sorted in OID order.  (The last property is enforced mainly
 * to guarantee consistent behavior in the regression tests; we don't want
 * behavior to change depending on chance locations of rows in pg_constraint.)
 *
 * Note: caller should already have appropriate lock on all rels mentioned
 * in relationIds.      Since adding or dropping an FK requires exclusive lock
 * on both rels, this ensures that the answer will be stable.
 */
List *
heap_truncate_find_FKs(List *relationIds)
{
      List     *result = NIL;
      Relation    fkeyRel;
      SysScanDesc fkeyScan;
      HeapTuple   tuple;

      /*
       * Must scan pg_constraint.  Right now, it is a seqscan because there is
       * no available index on confrelid.
       */
      fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);

      fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
                                                  SnapshotNow, 0, NULL);

      while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
      {
            Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);

            /* Not a foreign key */
            if (con->contype != CONSTRAINT_FOREIGN)
                  continue;

            /* Not referencing one of our list of tables */
            if (!list_member_oid(relationIds, con->confrelid))
                  continue;

            /* Add referencer unless already in input or result list */
            if (!list_member_oid(relationIds, con->conrelid))
                  result = insert_ordered_unique_oid(result, con->conrelid);
      }

      systable_endscan(fkeyScan);
      heap_close(fkeyRel, AccessShareLock);

      return result;
}

/*
 * insert_ordered_unique_oid
 *          Insert a new Oid into a sorted list of Oids, preserving ordering,
 *          and eliminating duplicates
 *
 * Building the ordered list this way is O(N^2), but with a pretty small
 * constant, so for the number of entries we expect it will probably be
 * faster than trying to apply qsort().  It seems unlikely someone would be
 * trying to truncate a table with thousands of dependent tables ...
 */
static List *
insert_ordered_unique_oid(List *list, Oid datum)
{
      ListCell   *prev;

      /* Does the datum belong at the front? */
      if (list == NIL || datum < linitial_oid(list))
            return lcons_oid(datum, list);
      /* Does it match the first entry? */
      if (datum == linitial_oid(list))
            return list;                  /* duplicate, so don't insert */
      /* No, so find the entry it belongs after */
      prev = list_head(list);
      for (;;)
      {
            ListCell   *curr = lnext(prev);

            if (curr == NULL || datum < lfirst_oid(curr))
                  break;                        /* it belongs after 'prev', before 'curr' */

            if (datum == lfirst_oid(curr))
                  return list;            /* duplicate, so don't insert */

            prev = curr;
      }
      /* Insert datum into list after 'prev' */
      lappend_cell_oid(list, prev, datum);
      return list;
}

Generated by  Doxygen 1.6.0   Back to index