Logo Search packages:      
Sourcecode: postgresql-8.4 version File versions

tablespace.c

/*-------------------------------------------------------------------------
 *
 * tablespace.c
 *      Commands to manipulate table spaces
 *
 * Tablespaces in PostgreSQL are designed to allow users to determine
 * where the data file(s) for a given database object reside on the file
 * system.
 *
 * A tablespace represents a directory on the file system. At tablespace
 * creation time, the directory must be empty. To simplify things and
 * remove the possibility of having file name conflicts, we isolate
 * files within a tablespace into database-specific subdirectories.
 *
 * To support file access via the information given in RelFileNode, we
 * maintain a symbolic-link map in $PGDATA/pg_tblspc. The symlinks are
 * named by tablespace OIDs and point to the actual tablespace directories.
 * Thus the full path to an arbitrary file is
 *                $PGDATA/pg_tblspc/spcoid/dboid/relfilenode
 *
 * There are two tablespaces created at initdb time: pg_global (for shared
 * tables) and pg_default (for everything else).  For backwards compatibility
 * and to remain functional on platforms without symlinks, these tablespaces
 * are accessed specially: they are respectively
 *                $PGDATA/global/relfilenode
 *                $PGDATA/base/dboid/relfilenode
 *
 * To allow CREATE DATABASE to give a new database a default tablespace
 * that's different from the template database's default, we make the
 * provision that a zero in pg_class.reltablespace means the database's
 * default tablespace.  Without this, CREATE DATABASE would have to go in
 * and munge the system catalogs of the new database.
 *
 *
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *      $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <unistd.h>
#include <dirent.h>
#include <sys/types.h>
#include <sys/stat.h>

#include "access/heapam.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_tablespace.h"
#include "commands/comment.h"
#include "commands/tablespace.h"
#include "miscadmin.h"
#include "postmaster/bgwriter.h"
#include "storage/fd.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tqual.h"


/* GUC variables */
char     *default_tablespace = NULL;
char     *temp_tablespaces = NULL;


static bool remove_tablespace_directories(Oid tablespaceoid, bool redo);
static void set_short_version(const char *path);


/*
 * Each database using a table space is isolated into its own name space
 * by a subdirectory named for the database OID.  On first creation of an
 * object in the tablespace, create the subdirectory.  If the subdirectory
 * already exists, just fall through quietly.
 *
 * isRedo indicates that we are creating an object during WAL replay.
 * In this case we will cope with the possibility of the tablespace
 * directory not being there either --- this could happen if we are
 * replaying an operation on a table in a subsequently-dropped tablespace.
 * We handle this by making a directory in the place where the tablespace
 * symlink would normally be.  This isn't an exact replay of course, but
 * it's the best we can do given the available information.
 *
 * If tablespaces are not supported, you might think this could be a no-op,
 * but you'd be wrong: we still need it in case we have to re-create a
 * database subdirectory (of $PGDATA/base) during WAL replay.
 */
void
TablespaceCreateDbspace(Oid spcNode, Oid dbNode, bool isRedo)
{
      struct stat st;
      char     *dir;

      /*
       * The global tablespace doesn't have per-database subdirectories, so
       * nothing to do for it.
       */
      if (spcNode == GLOBALTABLESPACE_OID)
            return;

      Assert(OidIsValid(spcNode));
      Assert(OidIsValid(dbNode));

      dir = GetDatabasePath(dbNode, spcNode);

      if (stat(dir, &st) < 0)
      {
            if (errno == ENOENT)
            {
                  /*
                   * Acquire TablespaceCreateLock to ensure that no DROP TABLESPACE
                   * or TablespaceCreateDbspace is running concurrently.
                   */
                  LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);

                  /*
                   * Recheck to see if someone created the directory while we were
                   * waiting for lock.
                   */
                  if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode))
                  {
                        /* need not do anything */
                  }
                  else
                  {
                        /* OK, go for it */
                        if (mkdir(dir, S_IRWXU) < 0)
                        {
                              char     *parentdir;

                              if (errno != ENOENT || !isRedo)
                                    ereport(ERROR,
                                                (errcode_for_file_access(),
                                            errmsg("could not create directory \"%s\": %m",
                                                       dir)));
                              /* Try to make parent directory too */
                              parentdir = pstrdup(dir);
                              get_parent_directory(parentdir);
                              if (mkdir(parentdir, S_IRWXU) < 0)
                                    ereport(ERROR,
                                                (errcode_for_file_access(),
                                            errmsg("could not create directory \"%s\": %m",
                                                       parentdir)));
                              pfree(parentdir);
                              if (mkdir(dir, S_IRWXU) < 0)
                                    ereport(ERROR,
                                                (errcode_for_file_access(),
                                            errmsg("could not create directory \"%s\": %m",
                                                       dir)));
                        }
                  }

                  LWLockRelease(TablespaceCreateLock);
            }
            else
            {
                  ereport(ERROR,
                              (errcode_for_file_access(),
                               errmsg("could not stat directory \"%s\": %m", dir)));
            }
      }
      else
      {
            /* be paranoid */
            if (!S_ISDIR(st.st_mode))
                  ereport(ERROR,
                              (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                               errmsg("\"%s\" exists but is not a directory",
                                          dir)));
      }

      pfree(dir);
}

/*
 * Create a table space
 *
 * Only superusers can create a tablespace. This seems a reasonable restriction
 * since we're determining the system layout and, anyway, we probably have
 * root if we're doing this kind of activity
 */
void
CreateTableSpace(CreateTableSpaceStmt *stmt)
{
#ifdef HAVE_SYMLINK
      Relation    rel;
      Datum       values[Natts_pg_tablespace];
      bool        nulls[Natts_pg_tablespace];
      HeapTuple   tuple;
      Oid               tablespaceoid;
      char     *location;
      char     *linkloc;
      Oid               ownerId;

      /* Must be super user */
      if (!superuser())
            ereport(ERROR,
                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                         errmsg("permission denied to create tablespace \"%s\"",
                                    stmt->tablespacename),
                         errhint("Must be superuser to create a tablespace.")));

      /* However, the eventual owner of the tablespace need not be */
      if (stmt->owner)
            ownerId = get_roleid_checked(stmt->owner);
      else
            ownerId = GetUserId();

      /* Unix-ify the offered path, and strip any trailing slashes */
      location = pstrdup(stmt->location);
      canonicalize_path(location);

      /* disallow quotes, else CREATE DATABASE would be at risk */
      if (strchr(location, '\''))
            ereport(ERROR,
                        (errcode(ERRCODE_INVALID_NAME),
                         errmsg("tablespace location cannot contain single quotes")));

      /*
       * Allowing relative paths seems risky
       *
       * this also helps us ensure that location is not empty or whitespace
       */
      if (!is_absolute_path(location))
            ereport(ERROR,
                        (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
                         errmsg("tablespace location must be an absolute path")));

      /*
       * Check that location isn't too long. Remember that we're going to append
       * '/<dboid>/<relid>.<nnn>'  (XXX but do we ever form the whole path
       * explicitly?    This may be overly conservative.)
       */
      if (strlen(location) >= (MAXPGPATH - 1 - 10 - 1 - 10 - 1 - 10))
            ereport(ERROR,
                        (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
                         errmsg("tablespace location \"%s\" is too long",
                                    location)));

      /*
       * Disallow creation of tablespaces named "pg_xxx"; we reserve this
       * namespace for system purposes.
       */
      if (!allowSystemTableMods && IsReservedName(stmt->tablespacename))
            ereport(ERROR,
                        (errcode(ERRCODE_RESERVED_NAME),
                         errmsg("unacceptable tablespace name \"%s\"",
                                    stmt->tablespacename),
            errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));

      /*
       * Check that there is no other tablespace by this name.  (The unique
       * index would catch this anyway, but might as well give a friendlier
       * message.)
       */
      if (OidIsValid(get_tablespace_oid(stmt->tablespacename)))
            ereport(ERROR,
                        (errcode(ERRCODE_DUPLICATE_OBJECT),
                         errmsg("tablespace \"%s\" already exists",
                                    stmt->tablespacename)));

      /*
       * Insert tuple into pg_tablespace.  The purpose of doing this first is to
       * lock the proposed tablename against other would-be creators. The
       * insertion will roll back if we find problems below.
       */
      rel = heap_open(TableSpaceRelationId, RowExclusiveLock);

      MemSet(nulls, false, sizeof(nulls));

      values[Anum_pg_tablespace_spcname - 1] =
            DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename));
      values[Anum_pg_tablespace_spcowner - 1] =
            ObjectIdGetDatum(ownerId);
      values[Anum_pg_tablespace_spclocation - 1] =
            CStringGetTextDatum(location);
      nulls[Anum_pg_tablespace_spcacl - 1] = true;

      tuple = heap_form_tuple(rel->rd_att, values, nulls);

      tablespaceoid = simple_heap_insert(rel, tuple);

      CatalogUpdateIndexes(rel, tuple);

      heap_freetuple(tuple);

      /* Record dependency on owner */
      recordDependencyOnOwner(TableSpaceRelationId, tablespaceoid, ownerId);

      /*
       * Attempt to coerce target directory to safe permissions.  If this fails,
       * it doesn't exist or has the wrong owner.
       */
      if (chmod(location, 0700) != 0)
            ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not set permissions on directory \"%s\": %m",
                                    location)));

      /*
       * Check the target directory is empty.
       */
      if (!directory_is_empty(location))
            ereport(ERROR,
                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                         errmsg("directory \"%s\" is not empty",
                                    location)));

      /*
       * Create the PG_VERSION file in the target directory.      This has several
       * purposes: to make sure we can write in the directory, to prevent
       * someone from creating another tablespace pointing at the same directory
       * (the emptiness check above will fail), and to label tablespace
       * directories by PG version.
       */
      set_short_version(location);

      /*
       * All seems well, create the symlink
       */
      linkloc = (char *) palloc(10 + 10 + 1);
      sprintf(linkloc, "pg_tblspc/%u", tablespaceoid);

      if (symlink(location, linkloc) < 0)
            ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not create symbolic link \"%s\": %m",
                                    linkloc)));

      /* Record the filesystem change in XLOG */
      {
            xl_tblspc_create_rec xlrec;
            XLogRecData rdata[2];

            xlrec.ts_id = tablespaceoid;
            rdata[0].data = (char *) &xlrec;
            rdata[0].len = offsetof(xl_tblspc_create_rec, ts_path);
            rdata[0].buffer = InvalidBuffer;
            rdata[0].next = &(rdata[1]);

            rdata[1].data = (char *) location;
            rdata[1].len = strlen(location) + 1;
            rdata[1].buffer = InvalidBuffer;
            rdata[1].next = NULL;

            (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_CREATE, rdata);
      }

      /*
       * Force synchronous commit, to minimize the window between creating the
       * symlink on-disk and marking the transaction committed.  It's not great
       * that there is any window at all, but definitely we don't want to make
       * it larger than necessary.
       */
      ForceSyncCommit();

      pfree(linkloc);
      pfree(location);

      /* We keep the lock on pg_tablespace until commit */
      heap_close(rel, NoLock);
#else                                     /* !HAVE_SYMLINK */
      ereport(ERROR,
                  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                   errmsg("tablespaces are not supported on this platform")));
#endif   /* HAVE_SYMLINK */
}

/*
 * Drop a table space
 *
 * Be careful to check that the tablespace is empty.
 */
void
DropTableSpace(DropTableSpaceStmt *stmt)
{
#ifdef HAVE_SYMLINK
      char     *tablespacename = stmt->tablespacename;
      HeapScanDesc scandesc;
      Relation    rel;
      HeapTuple   tuple;
      ScanKeyData entry[1];
      Oid               tablespaceoid;

      /*
       * Find the target tuple
       */
      rel = heap_open(TableSpaceRelationId, RowExclusiveLock);

      ScanKeyInit(&entry[0],
                        Anum_pg_tablespace_spcname,
                        BTEqualStrategyNumber, F_NAMEEQ,
                        CStringGetDatum(tablespacename));
      scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
      tuple = heap_getnext(scandesc, ForwardScanDirection);

      if (!HeapTupleIsValid(tuple))
      {
            if (!stmt->missing_ok)
            {
                  ereport(ERROR,
                              (errcode(ERRCODE_UNDEFINED_OBJECT),
                               errmsg("tablespace \"%s\" does not exist",
                                          tablespacename)));
            }
            else
            {
                  ereport(NOTICE,
                              (errmsg("tablespace \"%s\" does not exist, skipping",
                                          tablespacename)));
                  /* XXX I assume I need one or both of these next two calls */
                  heap_endscan(scandesc);
                  heap_close(rel, NoLock);
            }
            return;
      }

      tablespaceoid = HeapTupleGetOid(tuple);

      /* Must be tablespace owner */
      if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
            aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
                                 tablespacename);

      /* Disallow drop of the standard tablespaces, even by superuser */
      if (tablespaceoid == GLOBALTABLESPACE_OID ||
            tablespaceoid == DEFAULTTABLESPACE_OID)
            aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE,
                                 tablespacename);

      /*
       * Remove the pg_tablespace tuple (this will roll back if we fail below)
       */
      simple_heap_delete(rel, &tuple->t_self);

      heap_endscan(scandesc);

      /*
       * Remove any comments on this tablespace.
       */
      DeleteSharedComments(tablespaceoid, TableSpaceRelationId);

      /*
       * Remove dependency on owner.
       */
      deleteSharedDependencyRecordsFor(TableSpaceRelationId, tablespaceoid, 0);

      /*
       * Acquire TablespaceCreateLock to ensure that no TablespaceCreateDbspace
       * is running concurrently.
       */
      LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);

      /*
       * Try to remove the physical infrastructure.
       */
      if (!remove_tablespace_directories(tablespaceoid, false))
      {
            /*
             * Not all files deleted?  However, there can be lingering empty files
             * in the directories, left behind by for example DROP TABLE, that
             * have been scheduled for deletion at next checkpoint (see comments
             * in mdunlink() for details).      We could just delete them immediately,
             * but we can't tell them apart from important data files that we
             * mustn't delete.  So instead, we force a checkpoint which will clean
             * out any lingering files, and try again.
             */
            RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
            if (!remove_tablespace_directories(tablespaceoid, false))
            {
                  /* Still not empty, the files must be important then */
                  ereport(ERROR,
                              (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               errmsg("tablespace \"%s\" is not empty",
                                          tablespacename)));
            }
      }

      /* Record the filesystem change in XLOG */
      {
            xl_tblspc_drop_rec xlrec;
            XLogRecData rdata[1];

            xlrec.ts_id = tablespaceoid;
            rdata[0].data = (char *) &xlrec;
            rdata[0].len = sizeof(xl_tblspc_drop_rec);
            rdata[0].buffer = InvalidBuffer;
            rdata[0].next = NULL;

            (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP, rdata);
      }

      /*
       * Note: because we checked that the tablespace was empty, there should be
       * no need to worry about flushing shared buffers or free space map
       * entries for relations in the tablespace.
       */

      /*
       * Force synchronous commit, to minimize the window between removing the
       * files on-disk and marking the transaction committed.  It's not great
       * that there is any window at all, but definitely we don't want to make
       * it larger than necessary.
       */
      ForceSyncCommit();

      /*
       * Allow TablespaceCreateDbspace again.
       */
      LWLockRelease(TablespaceCreateLock);

      /* We keep the lock on pg_tablespace until commit */
      heap_close(rel, NoLock);
#else                                     /* !HAVE_SYMLINK */
      ereport(ERROR,
                  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                   errmsg("tablespaces are not supported on this platform")));
#endif   /* HAVE_SYMLINK */
}

/*
 * remove_tablespace_directories: attempt to remove filesystem infrastructure
 *
 * Returns TRUE if successful, FALSE if some subdirectory is not empty
 *
 * redo indicates we are redoing a drop from XLOG; okay if nothing there
 */
static bool
remove_tablespace_directories(Oid tablespaceoid, bool redo)
{
      char     *location;
      DIR            *dirdesc;
      struct dirent *de;
      char     *subfile;
      struct stat st;

      location = (char *) palloc(10 + 10 + 1);
      sprintf(location, "pg_tblspc/%u", tablespaceoid);

      /*
       * Check if the tablespace still contains any files.  We try to rmdir each
       * per-database directory we find in it.  rmdir failure implies there are
       * still files in that subdirectory, so give up.  (We do not have to worry
       * about undoing any already completed rmdirs, since the next attempt to
       * use the tablespace from that database will simply recreate the
       * subdirectory via TablespaceCreateDbspace.)
       *
       * Since we hold TablespaceCreateLock, no one else should be creating any
       * fresh subdirectories in parallel. It is possible that new files are
       * being created within subdirectories, though, so the rmdir call could
       * fail.  Worst consequence is a less friendly error message.
       *
       * If redo is true then ENOENT is a likely outcome here, and we allow it
       * to pass without comment.  In normal operation we still allow it, but
       * with a warning.      This is because even though ProcessUtility disallows
       * DROP TABLESPACE in a transaction block, it's possible that a previous
       * DROP failed and rolled back after removing the tablespace directories
       * and symlink.  We want to allow a new DROP attempt to succeed at
       * removing the catalog entries, so we should not give a hard error here.
       */
      dirdesc = AllocateDir(location);
      if (dirdesc == NULL)
      {
            if (errno == ENOENT)
            {
                  if (!redo)
                        ereport(WARNING,
                                    (errcode_for_file_access(),
                                     errmsg("could not open directory \"%s\": %m",
                                                location)));
                  pfree(location);
                  return true;
            }
            /* else let ReadDir report the error */
      }

      while ((de = ReadDir(dirdesc, location)) != NULL)
      {
            /* Note we ignore PG_VERSION for the nonce */
            if (strcmp(de->d_name, ".") == 0 ||
                  strcmp(de->d_name, "..") == 0 ||
                  strcmp(de->d_name, "PG_VERSION") == 0)
                  continue;

            subfile = palloc(strlen(location) + 1 + strlen(de->d_name) + 1);
            sprintf(subfile, "%s/%s", location, de->d_name);

            /* This check is just to deliver a friendlier error message */
            if (!directory_is_empty(subfile))
            {
                  FreeDir(dirdesc);
                  return false;
            }

            /* Do the real deed */
            if (rmdir(subfile) < 0)
                  ereport(ERROR,
                              (errcode_for_file_access(),
                               errmsg("could not remove directory \"%s\": %m",
                                          subfile)));

            pfree(subfile);
      }

      FreeDir(dirdesc);

      /*
       * Okay, try to unlink PG_VERSION (we allow it to not be there, even in
       * non-REDO case, for robustness).
       */
      subfile = palloc(strlen(location) + 11 + 1);
      sprintf(subfile, "%s/PG_VERSION", location);

      if (unlink(subfile) < 0)
      {
            if (errno != ENOENT)
                  ereport(ERROR,
                              (errcode_for_file_access(),
                               errmsg("could not remove file \"%s\": %m",
                                          subfile)));
      }

      pfree(subfile);

      /*
       * Okay, try to remove the symlink.  We must however deal with the
       * possibility that it's a directory instead of a symlink --- this could
       * happen during WAL replay (see TablespaceCreateDbspace), and it is also
       * the normal case on Windows.
       */
      if (lstat(location, &st) == 0 && S_ISDIR(st.st_mode))
      {
            if (rmdir(location) < 0)
                  ereport(ERROR,
                              (errcode_for_file_access(),
                               errmsg("could not remove directory \"%s\": %m",
                                          location)));
      }
      else
      {
            if (unlink(location) < 0)
                  ereport(ERROR,
                              (errcode_for_file_access(),
                               errmsg("could not remove symbolic link \"%s\": %m",
                                          location)));
      }

      pfree(location);

      return true;
}

/*
 * write out the PG_VERSION file in the specified directory
 */
static void
set_short_version(const char *path)
{
      char     *short_version;
      bool        gotdot = false;
      int               end;
      char     *fullname;
      FILE     *version_file;

      /* Construct short version string (should match initdb.c) */
      short_version = pstrdup(PG_VERSION);

      for (end = 0; short_version[end] != '\0'; end++)
      {
            if (short_version[end] == '.')
            {
                  Assert(end != 0);
                  if (gotdot)
                        break;
                  else
                        gotdot = true;
            }
            else if (short_version[end] < '0' || short_version[end] > '9')
            {
                  /* gone past digits and dots */
                  break;
            }
      }
      Assert(end > 0 && short_version[end - 1] != '.' && gotdot);
      short_version[end] = '\0';

      /* Now write the file */
      fullname = palloc(strlen(path) + 11 + 1);
      sprintf(fullname, "%s/PG_VERSION", path);
      version_file = AllocateFile(fullname, PG_BINARY_W);
      if (version_file == NULL)
            ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not write to file \"%s\": %m",
                                    fullname)));
      fprintf(version_file, "%s\n", short_version);
      if (FreeFile(version_file))
            ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not write to file \"%s\": %m",
                                    fullname)));

      pfree(fullname);
      pfree(short_version);
}

/*
 * Check if a directory is empty.
 *
 * This probably belongs somewhere else, but not sure where...
 */
bool
directory_is_empty(const char *path)
{
      DIR            *dirdesc;
      struct dirent *de;

      dirdesc = AllocateDir(path);

      while ((de = ReadDir(dirdesc, path)) != NULL)
      {
            if (strcmp(de->d_name, ".") == 0 ||
                  strcmp(de->d_name, "..") == 0)
                  continue;
            FreeDir(dirdesc);
            return false;
      }

      FreeDir(dirdesc);
      return true;
}

/*
 * Rename a tablespace
 */
void
RenameTableSpace(const char *oldname, const char *newname)
{
      Relation    rel;
      ScanKeyData entry[1];
      HeapScanDesc scan;
      HeapTuple   tup;
      HeapTuple   newtuple;
      Form_pg_tablespace newform;

      /* Search pg_tablespace */
      rel = heap_open(TableSpaceRelationId, RowExclusiveLock);

      ScanKeyInit(&entry[0],
                        Anum_pg_tablespace_spcname,
                        BTEqualStrategyNumber, F_NAMEEQ,
                        CStringGetDatum(oldname));
      scan = heap_beginscan(rel, SnapshotNow, 1, entry);
      tup = heap_getnext(scan, ForwardScanDirection);
      if (!HeapTupleIsValid(tup))
            ereport(ERROR,
                        (errcode(ERRCODE_UNDEFINED_OBJECT),
                         errmsg("tablespace \"%s\" does not exist",
                                    oldname)));

      newtuple = heap_copytuple(tup);
      newform = (Form_pg_tablespace) GETSTRUCT(newtuple);

      heap_endscan(scan);

      /* Must be owner */
      if (!pg_tablespace_ownercheck(HeapTupleGetOid(newtuple), GetUserId()))
            aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE, oldname);

      /* Validate new name */
      if (!allowSystemTableMods && IsReservedName(newname))
            ereport(ERROR,
                        (errcode(ERRCODE_RESERVED_NAME),
                         errmsg("unacceptable tablespace name \"%s\"", newname),
            errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));

      /* Make sure the new name doesn't exist */
      ScanKeyInit(&entry[0],
                        Anum_pg_tablespace_spcname,
                        BTEqualStrategyNumber, F_NAMEEQ,
                        CStringGetDatum(newname));
      scan = heap_beginscan(rel, SnapshotNow, 1, entry);
      tup = heap_getnext(scan, ForwardScanDirection);
      if (HeapTupleIsValid(tup))
            ereport(ERROR,
                        (errcode(ERRCODE_DUPLICATE_OBJECT),
                         errmsg("tablespace \"%s\" already exists",
                                    newname)));

      heap_endscan(scan);

      /* OK, update the entry */
      namestrcpy(&(newform->spcname), newname);

      simple_heap_update(rel, &newtuple->t_self, newtuple);
      CatalogUpdateIndexes(rel, newtuple);

      heap_close(rel, NoLock);
}

/*
 * Change tablespace owner
 */
void
AlterTableSpaceOwner(const char *name, Oid newOwnerId)
{
      Relation    rel;
      ScanKeyData entry[1];
      HeapScanDesc scandesc;
      Form_pg_tablespace spcForm;
      HeapTuple   tup;

      /* Search pg_tablespace */
      rel = heap_open(TableSpaceRelationId, RowExclusiveLock);

      ScanKeyInit(&entry[0],
                        Anum_pg_tablespace_spcname,
                        BTEqualStrategyNumber, F_NAMEEQ,
                        CStringGetDatum(name));
      scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
      tup = heap_getnext(scandesc, ForwardScanDirection);
      if (!HeapTupleIsValid(tup))
            ereport(ERROR,
                        (errcode(ERRCODE_UNDEFINED_OBJECT),
                         errmsg("tablespace \"%s\" does not exist", name)));

      spcForm = (Form_pg_tablespace) GETSTRUCT(tup);

      /*
       * If the new owner is the same as the existing owner, consider the
       * command to have succeeded.  This is for dump restoration purposes.
       */
      if (spcForm->spcowner != newOwnerId)
      {
            Datum       repl_val[Natts_pg_tablespace];
            bool        repl_null[Natts_pg_tablespace];
            bool        repl_repl[Natts_pg_tablespace];
            Acl            *newAcl;
            Datum       aclDatum;
            bool        isNull;
            HeapTuple   newtuple;

            /* Otherwise, must be owner of the existing object */
            if (!pg_tablespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
                  aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
                                       name);

            /* Must be able to become new owner */
            check_is_member_of_role(GetUserId(), newOwnerId);

            /*
             * Normally we would also check for create permissions here, but there
             * are none for tablespaces so we follow what rename tablespace does
             * and omit the create permissions check.
             *
             * NOTE: Only superusers may create tablespaces to begin with and so
             * initially only a superuser would be able to change its ownership
             * anyway.
             */

            memset(repl_null, false, sizeof(repl_null));
            memset(repl_repl, false, sizeof(repl_repl));

            repl_repl[Anum_pg_tablespace_spcowner - 1] = true;
            repl_val[Anum_pg_tablespace_spcowner - 1] = ObjectIdGetDatum(newOwnerId);

            /*
             * Determine the modified ACL for the new owner.  This is only
             * necessary when the ACL is non-null.
             */
            aclDatum = heap_getattr(tup,
                                                Anum_pg_tablespace_spcacl,
                                                RelationGetDescr(rel),
                                                &isNull);
            if (!isNull)
            {
                  newAcl = aclnewowner(DatumGetAclP(aclDatum),
                                                 spcForm->spcowner, newOwnerId);
                  repl_repl[Anum_pg_tablespace_spcacl - 1] = true;
                  repl_val[Anum_pg_tablespace_spcacl - 1] = PointerGetDatum(newAcl);
            }

            newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, repl_repl);

            simple_heap_update(rel, &newtuple->t_self, newtuple);
            CatalogUpdateIndexes(rel, newtuple);

            heap_freetuple(newtuple);

            /* Update owner dependency reference */
            changeDependencyOnOwner(TableSpaceRelationId, HeapTupleGetOid(tup),
                                                newOwnerId);
      }

      heap_endscan(scandesc);
      heap_close(rel, NoLock);
}


/*
 * Routines for handling the GUC variable 'default_tablespace'.
 */

/* assign_hook: validate new default_tablespace, do extra actions as needed */
const char *
assign_default_tablespace(const char *newval, bool doit, GucSource source)
{
      /*
       * If we aren't inside a transaction, we cannot do database access so
       * cannot verify the name.    Must accept the value on faith.
       */
      if (IsTransactionState())
      {
            if (newval[0] != '\0' &&
                  !OidIsValid(get_tablespace_oid(newval)))
            {
                  ereport(GUC_complaint_elevel(source),
                              (errcode(ERRCODE_UNDEFINED_OBJECT),
                               errmsg("tablespace \"%s\" does not exist",
                                          newval)));
                  return NULL;
            }
      }

      return newval;
}

/*
 * GetDefaultTablespace -- get the OID of the current default tablespace
 *
 * Regular objects and temporary objects have different default tablespaces,
 * hence the forTemp parameter must be specified.
 *
 * May return InvalidOid to indicate "use the database's default tablespace".
 *
 * Note that caller is expected to check appropriate permissions for any
 * result other than InvalidOid.
 *
 * This exists to hide (and possibly optimize the use of) the
 * default_tablespace GUC variable.
 */
Oid
GetDefaultTablespace(bool forTemp)
{
      Oid               result;

      /* The temp-table case is handled elsewhere */
      if (forTemp)
      {
            PrepareTempTablespaces();
            return GetNextTempTableSpace();
      }

      /* Fast path for default_tablespace == "" */
      if (default_tablespace == NULL || default_tablespace[0] == '\0')
            return InvalidOid;

      /*
       * It is tempting to cache this lookup for more speed, but then we would
       * fail to detect the case where the tablespace was dropped since the GUC
       * variable was set.  Note also that we don't complain if the value fails
       * to refer to an existing tablespace; we just silently return InvalidOid,
       * causing the new object to be created in the database's tablespace.
       */
      result = get_tablespace_oid(default_tablespace);

      /*
       * Allow explicit specification of database's default tablespace in
       * default_tablespace without triggering permissions checks.
       */
      if (result == MyDatabaseTableSpace)
            result = InvalidOid;
      return result;
}


/*
 * Routines for handling the GUC variable 'temp_tablespaces'.
 */

/* assign_hook: validate new temp_tablespaces, do extra actions as needed */
const char *
assign_temp_tablespaces(const char *newval, bool doit, GucSource source)
{
      char     *rawname;
      List     *namelist;

      /* Need a modifiable copy of string */
      rawname = pstrdup(newval);

      /* Parse string into list of identifiers */
      if (!SplitIdentifierString(rawname, ',', &namelist))
      {
            /* syntax error in name list */
            pfree(rawname);
            list_free(namelist);
            return NULL;
      }

      /*
       * If we aren't inside a transaction, we cannot do database access so
       * cannot verify the individual names.    Must accept the list on faith.
       * Fortunately, there's then also no need to pass the data to fd.c.
       */
      if (IsTransactionState())
      {
            /*
             * If we error out below, or if we are called multiple times in one
             * transaction, we'll leak a bit of TopTransactionContext memory.
             * Doesn't seem worth worrying about.
             */
            Oid            *tblSpcs;
            int               numSpcs;
            ListCell   *l;

            tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
                                                            list_length(namelist) * sizeof(Oid));
            numSpcs = 0;
            foreach(l, namelist)
            {
                  char     *curname = (char *) lfirst(l);
                  Oid               curoid;
                  AclResult   aclresult;

                  /* Allow an empty string (signifying database default) */
                  if (curname[0] == '\0')
                  {
                        tblSpcs[numSpcs++] = InvalidOid;
                        continue;
                  }

                  /* Else verify that name is a valid tablespace name */
                  curoid = get_tablespace_oid(curname);
                  if (curoid == InvalidOid)
                  {
                        /*
                         * In an interactive SET command, we ereport for bad info.
                         * Otherwise, silently ignore any bad list elements.
                         */
                        if (source >= PGC_S_INTERACTIVE)
                              ereport(ERROR,
                                          (errcode(ERRCODE_UNDEFINED_OBJECT),
                                           errmsg("tablespace \"%s\" does not exist",
                                                      curname)));
                        continue;
                  }

                  /*
                   * Allow explicit specification of database's default tablespace
                   * in temp_tablespaces without triggering permissions checks.
                   */
                  if (curoid == MyDatabaseTableSpace)
                  {
                        tblSpcs[numSpcs++] = InvalidOid;
                        continue;
                  }

                  /* Check permissions similarly */
                  aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
                                                                     ACL_CREATE);
                  if (aclresult != ACLCHECK_OK)
                  {
                        if (source >= PGC_S_INTERACTIVE)
                              aclcheck_error(aclresult, ACL_KIND_TABLESPACE, curname);
                        continue;
                  }

                  tblSpcs[numSpcs++] = curoid;
            }

            /* If actively "doing it", give the new list to fd.c */
            if (doit)
                  SetTempTablespaces(tblSpcs, numSpcs);
            else
                  pfree(tblSpcs);
      }

      pfree(rawname);
      list_free(namelist);

      return newval;
}

/*
 * PrepareTempTablespaces -- prepare to use temp tablespaces
 *
 * If we have not already done so in the current transaction, parse the
 * temp_tablespaces GUC variable and tell fd.c which tablespace(s) to use
 * for temp files.
 */
void
PrepareTempTablespaces(void)
{
      char     *rawname;
      List     *namelist;
      Oid            *tblSpcs;
      int               numSpcs;
      ListCell   *l;

      /* No work if already done in current transaction */
      if (TempTablespacesAreSet())
            return;

      /*
       * Can't do catalog access unless within a transaction.  This is just a
       * safety check in case this function is called by low-level code that
       * could conceivably execute outside a transaction.  Note that in such a
       * scenario, fd.c will fall back to using the current database's default
       * tablespace, which should always be OK.
       */
      if (!IsTransactionState())
            return;

      /* Need a modifiable copy of string */
      rawname = pstrdup(temp_tablespaces);

      /* Parse string into list of identifiers */
      if (!SplitIdentifierString(rawname, ',', &namelist))
      {
            /* syntax error in name list */
            SetTempTablespaces(NULL, 0);
            pfree(rawname);
            list_free(namelist);
            return;
      }

      /* Store tablespace OIDs in an array in TopTransactionContext */
      tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
                                                             list_length(namelist) * sizeof(Oid));
      numSpcs = 0;
      foreach(l, namelist)
      {
            char     *curname = (char *) lfirst(l);
            Oid               curoid;
            AclResult   aclresult;

            /* Allow an empty string (signifying database default) */
            if (curname[0] == '\0')
            {
                  tblSpcs[numSpcs++] = InvalidOid;
                  continue;
            }

            /* Else verify that name is a valid tablespace name */
            curoid = get_tablespace_oid(curname);
            if (curoid == InvalidOid)
            {
                  /* Silently ignore any bad list elements */
                  continue;
            }

            /*
             * Allow explicit specification of database's default tablespace in
             * temp_tablespaces without triggering permissions checks.
             */
            if (curoid == MyDatabaseTableSpace)
            {
                  tblSpcs[numSpcs++] = InvalidOid;
                  continue;
            }

            /* Check permissions similarly */
            aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
                                                               ACL_CREATE);
            if (aclresult != ACLCHECK_OK)
                  continue;

            tblSpcs[numSpcs++] = curoid;
      }

      SetTempTablespaces(tblSpcs, numSpcs);

      pfree(rawname);
      list_free(namelist);
}


/*
 * get_tablespace_oid - given a tablespace name, look up the OID
 *
 * Returns InvalidOid if tablespace name not found.
 */
Oid
get_tablespace_oid(const char *tablespacename)
{
      Oid               result;
      Relation    rel;
      HeapScanDesc scandesc;
      HeapTuple   tuple;
      ScanKeyData entry[1];

      /*
       * Search pg_tablespace.  We use a heapscan here even though there is an
       * index on name, on the theory that pg_tablespace will usually have just
       * a few entries and so an indexed lookup is a waste of effort.
       */
      rel = heap_open(TableSpaceRelationId, AccessShareLock);

      ScanKeyInit(&entry[0],
                        Anum_pg_tablespace_spcname,
                        BTEqualStrategyNumber, F_NAMEEQ,
                        CStringGetDatum(tablespacename));
      scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
      tuple = heap_getnext(scandesc, ForwardScanDirection);

      /* We assume that there can be at most one matching tuple */
      if (HeapTupleIsValid(tuple))
            result = HeapTupleGetOid(tuple);
      else
            result = InvalidOid;

      heap_endscan(scandesc);
      heap_close(rel, AccessShareLock);

      return result;
}

/*
 * get_tablespace_name - given a tablespace OID, look up the name
 *
 * Returns a palloc'd string, or NULL if no such tablespace.
 */
char *
get_tablespace_name(Oid spc_oid)
{
      char     *result;
      Relation    rel;
      HeapScanDesc scandesc;
      HeapTuple   tuple;
      ScanKeyData entry[1];

      /*
       * Search pg_tablespace.  We use a heapscan here even though there is an
       * index on oid, on the theory that pg_tablespace will usually have just a
       * few entries and so an indexed lookup is a waste of effort.
       */
      rel = heap_open(TableSpaceRelationId, AccessShareLock);

      ScanKeyInit(&entry[0],
                        ObjectIdAttributeNumber,
                        BTEqualStrategyNumber, F_OIDEQ,
                        ObjectIdGetDatum(spc_oid));
      scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
      tuple = heap_getnext(scandesc, ForwardScanDirection);

      /* We assume that there can be at most one matching tuple */
      if (HeapTupleIsValid(tuple))
            result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname));
      else
            result = NULL;

      heap_endscan(scandesc);
      heap_close(rel, AccessShareLock);

      return result;
}


/*
 * TABLESPACE resource manager's routines
 */
void
tblspc_redo(XLogRecPtr lsn, XLogRecord *record)
{
      uint8       info = record->xl_info & ~XLR_INFO_MASK;

      /* Backup blocks are not used in tblspc records */
      Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));

      if (info == XLOG_TBLSPC_CREATE)
      {
            xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
            char     *location = xlrec->ts_path;
            char     *linkloc;

            /*
             * Attempt to coerce target directory to safe permissions.  If this
             * fails, it doesn't exist or has the wrong owner.
             */
            if (chmod(location, 0700) != 0)
                  ereport(ERROR,
                              (errcode_for_file_access(),
                          errmsg("could not set permissions on directory \"%s\": %m",
                                     location)));

            /* Create or re-create the PG_VERSION file in the target directory */
            set_short_version(location);

            /* Create the symlink if not already present */
            linkloc = (char *) palloc(10 + 10 + 1);
            sprintf(linkloc, "pg_tblspc/%u", xlrec->ts_id);

            if (symlink(location, linkloc) < 0)
            {
                  if (errno != EEXIST)
                        ereport(ERROR,
                                    (errcode_for_file_access(),
                                     errmsg("could not create symbolic link \"%s\": %m",
                                                linkloc)));
            }

            pfree(linkloc);
      }
      else if (info == XLOG_TBLSPC_DROP)
      {
            xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);

            if (!remove_tablespace_directories(xlrec->ts_id, true))
                  ereport(ERROR,
                              (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               errmsg("tablespace %u is not empty",
                                          xlrec->ts_id)));
      }
      else
            elog(PANIC, "tblspc_redo: unknown op code %u", info);
}

void
tblspc_desc(StringInfo buf, uint8 xl_info, char *rec)
{
      uint8       info = xl_info & ~XLR_INFO_MASK;

      if (info == XLOG_TBLSPC_CREATE)
      {
            xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) rec;

            appendStringInfo(buf, "create ts: %u \"%s\"",
                                     xlrec->ts_id, xlrec->ts_path);
      }
      else if (info == XLOG_TBLSPC_DROP)
      {
            xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) rec;

            appendStringInfo(buf, "drop ts: %u", xlrec->ts_id);
      }
      else
            appendStringInfo(buf, "UNKNOWN");
}

Generated by  Doxygen 1.6.0   Back to index