Logo Search packages:      
Sourcecode: postgresql-8.4 version File versions

view.c

/*-------------------------------------------------------------------------
 *
 * view.c
 *      use rewrite rules to construct views
 *
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *      $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/heapam.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "commands/view.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "parser/analyze.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteManip.h"
#include "rewrite/rewriteSupport.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"


static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
static bool isViewOnTempTable_walker(Node *node, void *context);

/*---------------------------------------------------------------------
 * isViewOnTempTable
 *
 * Returns true iff any of the relations underlying this view are
 * temporary tables.
 *---------------------------------------------------------------------
 */
static bool
isViewOnTempTable(Query *viewParse)
{
      return isViewOnTempTable_walker((Node *) viewParse, NULL);
}

static bool
isViewOnTempTable_walker(Node *node, void *context)
{
      if (node == NULL)
            return false;

      if (IsA(node, Query))
      {
            Query    *query = (Query *) node;
            ListCell   *rtable;

            foreach(rtable, query->rtable)
            {
                  RangeTblEntry *rte = lfirst(rtable);

                  if (rte->rtekind == RTE_RELATION)
                  {
                        Relation    rel = heap_open(rte->relid, AccessShareLock);
                        bool        istemp = rel->rd_istemp;

                        heap_close(rel, AccessShareLock);
                        if (istemp)
                              return true;
                  }
            }

            return query_tree_walker(query,
                                                 isViewOnTempTable_walker,
                                                 context,
                                                 QTW_IGNORE_JOINALIASES);
      }

      return expression_tree_walker(node,
                                                  isViewOnTempTable_walker,
                                                  context);
}

/*---------------------------------------------------------------------
 * DefineVirtualRelation
 *
 * Create the "view" relation. `DefineRelation' does all the work,
 * we just provide the correct arguments ... at least when we're
 * creating a view.  If we're updating an existing view, we have to
 * work harder.
 *---------------------------------------------------------------------
 */
static Oid
DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace)
{
      Oid               viewOid,
                        namespaceId;
      CreateStmt *createStmt = makeNode(CreateStmt);
      List     *attrList;
      ListCell   *t;

      /*
       * create a list of ColumnDef nodes based on the names and types of the
       * (non-junk) targetlist items from the view's SELECT list.
       */
      attrList = NIL;
      foreach(t, tlist)
      {
            TargetEntry *tle = lfirst(t);

            if (!tle->resjunk)
            {
                  ColumnDef  *def = makeNode(ColumnDef);

                  def->colname = pstrdup(tle->resname);
                  def->typename = makeTypeNameFromOid(exprType((Node *) tle->expr),
                                                                   exprTypmod((Node *) tle->expr));
                  def->inhcount = 0;
                  def->is_local = true;
                  def->is_not_null = false;
                  def->raw_default = NULL;
                  def->cooked_default = NULL;
                  def->constraints = NIL;

                  attrList = lappend(attrList, def);
            }
      }

      if (attrList == NIL)
            ereport(ERROR,
                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                         errmsg("view must have at least one column")));

      /*
       * Check to see if we want to replace an existing view.
       */
      namespaceId = RangeVarGetCreationNamespace(relation);
      viewOid = get_relname_relid(relation->relname, namespaceId);

      if (OidIsValid(viewOid) && replace)
      {
            Relation    rel;
            TupleDesc   descriptor;

            /*
             * Yes.  Get exclusive lock on the existing view ...
             */
            rel = relation_open(viewOid, AccessExclusiveLock);

            /*
             * Make sure it *is* a view, and do permissions checks.
             */
            if (rel->rd_rel->relkind != RELKIND_VIEW)
                  ereport(ERROR,
                              (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                               errmsg("\"%s\" is not a view",
                                          RelationGetRelationName(rel))));

            if (!pg_class_ownercheck(viewOid, GetUserId()))
                  aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
                                       RelationGetRelationName(rel));

            /* Also check it's not in use already */
            CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW");

            /*
             * Due to the namespace visibility rules for temporary objects, we
             * should only end up replacing a temporary view with another
             * temporary view, and vice versa.
             */
            Assert(relation->istemp == rel->rd_istemp);

            /*
             * Create a tuple descriptor to compare against the existing view, and
             * verify that the old column list is an initial prefix of the new
             * column list.
             */
            descriptor = BuildDescForRelation(attrList);
            checkViewTupleDesc(descriptor, rel->rd_att);

            /*
             * If new attributes have been added, we must add pg_attribute entries
             * for them.  It is convenient (although overkill) to use the ALTER
             * TABLE ADD COLUMN infrastructure for this.
             */
            if (list_length(attrList) > rel->rd_att->natts)
            {
                  List        *atcmds = NIL;
                  ListCell    *c;
                  int               skip = rel->rd_att->natts;

                  foreach(c, attrList)
                  {
                        AlterTableCmd *atcmd;

                        if (skip > 0)
                        {
                              skip--;
                              continue;
                        }
                        atcmd = makeNode(AlterTableCmd);
                        atcmd->subtype = AT_AddColumnToView;
                        atcmd->def = (Node *) lfirst(c);
                        atcmds = lappend(atcmds, atcmd);
                  }
                  AlterTableInternal(viewOid, atcmds, true);
            }

            /*
             * Seems okay, so return the OID of the pre-existing view.
             */
            relation_close(rel, NoLock);  /* keep the lock! */

            return viewOid;
      }
      else
      {
            /*
             * now set the parameters for keys/inheritance etc. All of these are
             * uninteresting for views...
             */
            createStmt->relation = (RangeVar *) relation;
            createStmt->tableElts = attrList;
            createStmt->inhRelations = NIL;
            createStmt->constraints = NIL;
            createStmt->options = list_make1(defWithOids(false));
            createStmt->oncommit = ONCOMMIT_NOOP;
            createStmt->tablespacename = NULL;

            /*
             * finally create the relation (this will error out if there's an
             * existing view, so we don't need more code to complain if "replace"
             * is false).
             */
            return DefineRelation(createStmt, RELKIND_VIEW);
      }
}

/*
 * Verify that tupledesc associated with proposed new view definition
 * matches tupledesc of old view.  This is basically a cut-down version
 * of equalTupleDescs(), with code added to generate specific complaints.
 * Also, we allow the new tupledesc to have more columns than the old.
 */
static void
checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc)
{
      int               i;

      if (newdesc->natts < olddesc->natts)
            ereport(ERROR,
                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                         errmsg("cannot drop columns from view")));
      /* we can ignore tdhasoid */

      for (i = 0; i < olddesc->natts; i++)
      {
            Form_pg_attribute newattr = newdesc->attrs[i];
            Form_pg_attribute oldattr = olddesc->attrs[i];

            /* XXX msg not right, but we don't support DROP COL on view anyway */
            if (newattr->attisdropped != oldattr->attisdropped)
                  ereport(ERROR,
                              (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                               errmsg("cannot drop columns from view")));

            if (strcmp(NameStr(newattr->attname), NameStr(oldattr->attname)) != 0)
                  ereport(ERROR,
                              (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                               errmsg("cannot change name of view column \"%s\" to \"%s\"",
                                          NameStr(oldattr->attname),
                                          NameStr(newattr->attname))));
            /* XXX would it be safe to allow atttypmod to change?  Not sure */
            if (newattr->atttypid != oldattr->atttypid ||
                  newattr->atttypmod != oldattr->atttypmod)
                  ereport(ERROR,
                              (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                               errmsg("cannot change data type of view column \"%s\" from %s to %s",
                                          NameStr(oldattr->attname),
                                          format_type_with_typemod(oldattr->atttypid,
                                                                               oldattr->atttypmod),
                                          format_type_with_typemod(newattr->atttypid,
                                                                               newattr->atttypmod))));
            /* We can ignore the remaining attributes of an attribute... */
      }

      /*
       * We ignore the constraint fields.  The new view desc can't have any
       * constraints, and the only ones that could be on the old view are
       * defaults, which we are happy to leave in place.
       */
}

static void
DefineViewRules(Oid viewOid, Query *viewParse, bool replace)
{
      /*
       * Set up the ON SELECT rule.  Since the query has already been through
       * parse analysis, we use DefineQueryRewrite() directly.
       */
      DefineQueryRewrite(pstrdup(ViewSelectRuleName),
                                 viewOid,
                                 NULL,
                                 CMD_SELECT,
                                 true,
                                 replace,
                                 list_make1(viewParse));

      /*
       * Someday: automatic ON INSERT, etc
       */
}

/*---------------------------------------------------------------
 * UpdateRangeTableOfViewParse
 *
 * Update the range table of the given parsetree.
 * This update consists of adding two new entries IN THE BEGINNING
 * of the range table (otherwise the rule system will die a slow,
 * horrible and painful death, and we do not want that now, do we?)
 * one for the OLD relation and one for the NEW one (both of
 * them refer in fact to the "view" relation).
 *
 * Of course we must also increase the 'varnos' of all the Var nodes
 * by 2...
 *
 * These extra RT entries are not actually used in the query,
 * except for run-time permission checking.
 *---------------------------------------------------------------
 */
static Query *
UpdateRangeTableOfViewParse(Oid viewOid, Query *viewParse)
{
      Relation    viewRel;
      List     *new_rt;
      RangeTblEntry *rt_entry1,
                     *rt_entry2;

      /*
       * Make a copy of the given parsetree.    It's not so much that we don't
       * want to scribble on our input, it's that the parser has a bad habit of
       * outputting multiple links to the same subtree for constructs like
       * BETWEEN, and we mustn't have OffsetVarNodes increment the varno of a
       * Var node twice.      copyObject will expand any multiply-referenced subtree
       * into multiple copies.
       */
      viewParse = (Query *) copyObject(viewParse);

      /* need to open the rel for addRangeTableEntryForRelation */
      viewRel = relation_open(viewOid, AccessShareLock);

      /*
       * Create the 2 new range table entries and form the new range table...
       * OLD first, then NEW....
       */
      rt_entry1 = addRangeTableEntryForRelation(NULL, viewRel,
                                                                    makeAlias("*OLD*", NIL),
                                                                    false, false);
      rt_entry2 = addRangeTableEntryForRelation(NULL, viewRel,
                                                                    makeAlias("*NEW*", NIL),
                                                                    false, false);
      /* Must override addRangeTableEntry's default access-check flags */
      rt_entry1->requiredPerms = 0;
      rt_entry2->requiredPerms = 0;

      new_rt = lcons(rt_entry1, lcons(rt_entry2, viewParse->rtable));

      viewParse->rtable = new_rt;

      /*
       * Now offset all var nodes by 2, and jointree RT indexes too.
       */
      OffsetVarNodes((Node *) viewParse, 2, 0);

      relation_close(viewRel, AccessShareLock);

      return viewParse;
}

/*
 * DefineView
 *          Execute a CREATE VIEW command.
 */
void
DefineView(ViewStmt *stmt, const char *queryString)
{
      Query    *viewParse;
      Oid               viewOid;
      RangeVar   *view;

      /*
       * Run parse analysis to convert the raw parse tree to a Query.  Note this
       * also acquires sufficient locks on the source table(s).
       *
       * Since parse analysis scribbles on its input, copy the raw parse tree;
       * this ensures we don't corrupt a prepared statement, for example.
       */
      viewParse = parse_analyze((Node *) copyObject(stmt->query),
                                            queryString, NULL, 0);

      /*
       * The grammar should ensure that the result is a single SELECT Query.
       */
      if (!IsA(viewParse, Query) ||
            viewParse->commandType != CMD_SELECT)
            elog(ERROR, "unexpected parse analysis result");

      /*
       * If a list of column names was given, run through and insert these into
       * the actual query tree. - thomas 2000-03-08
       */
      if (stmt->aliases != NIL)
      {
            ListCell   *alist_item = list_head(stmt->aliases);
            ListCell   *targetList;

            foreach(targetList, viewParse->targetList)
            {
                  TargetEntry *te = (TargetEntry *) lfirst(targetList);

                  Assert(IsA(te, TargetEntry));
                  /* junk columns don't get aliases */
                  if (te->resjunk)
                        continue;
                  te->resname = pstrdup(strVal(lfirst(alist_item)));
                  alist_item = lnext(alist_item);
                  if (alist_item == NULL)
                        break;                  /* done assigning aliases */
            }

            if (alist_item != NULL)
                  ereport(ERROR,
                              (errcode(ERRCODE_SYNTAX_ERROR),
                               errmsg("CREATE VIEW specifies more column "
                                          "names than columns")));
      }

      /*
       * If the user didn't explicitly ask for a temporary view, check whether
       * we need one implicitly.    We allow TEMP to be inserted automatically as
       * long as the CREATE command is consistent with that --- no explicit
       * schema name.
       */
      view = stmt->view;
      if (!view->istemp && isViewOnTempTable(viewParse))
      {
            view = copyObject(view);      /* don't corrupt original command */
            view->istemp = true;
            ereport(NOTICE,
                        (errmsg("view \"%s\" will be a temporary view",
                                    view->relname)));
      }

      /*
       * Create the view relation
       *
       * NOTE: if it already exists and replace is false, the xact will be
       * aborted.
       */
      viewOid = DefineVirtualRelation(view, viewParse->targetList,
                                                      stmt->replace);

      /*
       * The relation we have just created is not visible to any other commands
       * running with the same transaction & command id. So, increment the
       * command id counter (but do NOT pfree any memory!!!!)
       */
      CommandCounterIncrement();

      /*
       * The range table of 'viewParse' does not contain entries for the "OLD"
       * and "NEW" relations. So... add them!
       */
      viewParse = UpdateRangeTableOfViewParse(viewOid, viewParse);

      /*
       * Now create the rules associated with the view.
       */
      DefineViewRules(viewOid, viewParse, stmt->replace);
}

Generated by  Doxygen 1.6.0   Back to index