Logo Search packages:      
Sourcecode: postgresql-8.4 version File versions

nodeWindowAgg.c

/*-------------------------------------------------------------------------
 *
 * nodeWindowAgg.c
 *      routines to handle WindowAgg nodes.
 *
 * A WindowAgg node evaluates "window functions" across suitable partitions
 * of the input tuple set.  Any one WindowAgg works for just a single window
 * specification, though it can evaluate multiple window functions sharing
 * identical window specifications.  The input tuples are required to be
 * delivered in sorted order, with the PARTITION BY columns (if any) as
 * major sort keys and the ORDER BY columns (if any) as minor sort keys.
 * (The planner generates a stack of WindowAggs with intervening Sort nodes
 * as needed, if a query involves more than one window specification.)
 *
 * Since window functions can require access to any or all of the rows in
 * the current partition, we accumulate rows of the partition into a
 * tuplestore.  The window functions are called using the WindowObject API
 * so that they can access those rows as needed.
 *
 * We also support using plain aggregate functions as window functions.
 * For these, the regular Agg-node environment is emulated for each partition.
 * As required by the SQL spec, the output represents the value of the
 * aggregate function over all rows in the current row's window frame.
 *
 *
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *      $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/nodeWindowAgg.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "windowapi.h"

/*
 * All the window function APIs are called with this object, which is passed
 * to window functions as fcinfo->context.
 */
typedef struct WindowObjectData
{
      NodeTag           type;
      WindowAggState *winstate;     /* parent WindowAggState */
      List     *argstates;          /* ExprState trees for fn's arguments */
      void     *localmem;           /* WinGetPartitionLocalMemory's chunk */
      int               markptr;          /* tuplestore mark pointer for this fn */
      int               readptr;          /* tuplestore read pointer for this fn */
      int64       markpos;          /* row that markptr is positioned on */
      int64       seekpos;          /* row that readptr is positioned on */
} WindowObjectData;

/*
 * We have one WindowStatePerFunc struct for each window function and
 * window aggregate handled by this node.
 */
typedef struct WindowStatePerFuncData
{
      /* Links to WindowFunc expr and state nodes this working state is for */
      WindowFuncExprState *wfuncstate;
      WindowFunc     *wfunc;

      int               numArguments;     /* number of arguments */

      FmgrInfo    flinfo;                 /* fmgr lookup data for window function */

      /*
       * We need the len and byval info for the result of each function
       * in order to know how to copy/delete values.
       */
      int16       resulttypeLen;
      bool        resulttypeByVal;

      bool        plain_agg;        /* is it just a plain aggregate function? */
      int               aggno;                  /* if so, index of its PerAggData */

      WindowObject      winobj;           /* object used in window function API */
} WindowStatePerFuncData;

/*
 * For plain aggregate window functions, we also have one of these.
 */
typedef struct WindowStatePerAggData
{
      /* Oids of transfer functions */
      Oid               transfn_oid;
      Oid               finalfn_oid;      /* may be InvalidOid */

      /*
       * fmgr lookup data for transfer functions --- only valid when
       * corresponding oid is not InvalidOid.  Note in particular that fn_strict
       * flags are kept here.
       */
      FmgrInfo    transfn;
      FmgrInfo    finalfn;

      /*
       * initial value from pg_aggregate entry
       */
      Datum       initValue;
      bool        initValueIsNull;

      /*
       * cached value for current frame boundaries
       */
      Datum       resultValue;
      bool        resultValueIsNull;

      /*
       * We need the len and byval info for the agg's input, result, and
       * transition data types in order to know how to copy/delete values.
       */
      int16       inputtypeLen,
                        resulttypeLen,
                        transtypeLen;
      bool        inputtypeByVal,
                        resulttypeByVal,
                        transtypeByVal;

      int               wfuncno;          /* index of associated PerFuncData */

      /* Current transition value */
      Datum       transValue;       /* current transition value */
      bool        transValueIsNull;

      bool        noTransValue;     /* true if transValue not set yet */
} WindowStatePerAggData;

static void initialize_windowaggregate(WindowAggState *winstate,
                                                         WindowStatePerFunc perfuncstate,
                                                         WindowStatePerAgg peraggstate);
static void advance_windowaggregate(WindowAggState *winstate,
                                                      WindowStatePerFunc perfuncstate,
                                                      WindowStatePerAgg peraggstate);
static void finalize_windowaggregate(WindowAggState *winstate,
                                                       WindowStatePerFunc perfuncstate,
                                                       WindowStatePerAgg peraggstate,
                                                       Datum *result, bool *isnull);

static void eval_windowaggregates(WindowAggState *winstate);
static void eval_windowfunction(WindowAggState *winstate,
                                                WindowStatePerFunc perfuncstate,
                                                Datum *result, bool *isnull);

static void begin_partition(WindowAggState *winstate);
static void spool_tuples(WindowAggState *winstate, int64 pos);
static void release_partition(WindowAggState *winstate);

static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
                                          TupleTableSlot *slot);
static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);

static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
                                                                        WindowFunc *wfunc,
                                                                        WindowStatePerAgg peraggstate);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);

static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
                                TupleTableSlot *slot2);
static bool window_gettupleslot(WindowObject winobj, int64 pos,
                                                TupleTableSlot *slot);


/*
 * initialize_windowaggregate
 * parallel to initialize_aggregate in nodeAgg.c
 */
static void
initialize_windowaggregate(WindowAggState *winstate,
                                       WindowStatePerFunc perfuncstate,
                                       WindowStatePerAgg peraggstate)
{
      MemoryContext           oldContext;

      if (peraggstate->initValueIsNull)
            peraggstate->transValue = peraggstate->initValue;
      else
      {
            oldContext = MemoryContextSwitchTo(winstate->wincontext);
            peraggstate->transValue = datumCopy(peraggstate->initValue,
                                                                  peraggstate->transtypeByVal,
                                                                  peraggstate->transtypeLen);
            MemoryContextSwitchTo(oldContext);
      }
      peraggstate->transValueIsNull = peraggstate->initValueIsNull;
      peraggstate->noTransValue = peraggstate->initValueIsNull;
      peraggstate->resultValueIsNull = true;
}

/*
 * advance_windowaggregate
 * parallel to advance_aggregate in nodeAgg.c
 */
static void
advance_windowaggregate(WindowAggState *winstate,
                                    WindowStatePerFunc perfuncstate,
                                    WindowStatePerAgg peraggstate)
{
      WindowFuncExprState        *wfuncstate = perfuncstate->wfuncstate;
      int                                 numArguments = perfuncstate->numArguments;
      FunctionCallInfoData    fcinfodata;
      FunctionCallInfo        fcinfo = &fcinfodata;
      Datum                         newVal;
      ListCell                   *arg;
      int                                 i;
      MemoryContext                 oldContext;
      ExprContext *econtext = winstate->tmpcontext;

      oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

      /* We start from 1, since the 0th arg will be the transition value */
      i = 1;
      foreach(arg, wfuncstate->args)
      {
            ExprState      *argstate = (ExprState *) lfirst(arg);

            fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
                                                        &fcinfo->argnull[i], NULL);
            i++;
      }

      if (peraggstate->transfn.fn_strict)
      {
            /*
             * For a strict transfn, nothing happens when there's a NULL input; we
             * just keep the prior transValue.
             */
            for (i = 1; i <= numArguments; i++)
            {
                  if (fcinfo->argnull[i])
                  {
                        MemoryContextSwitchTo(oldContext);
                        return;
                  }
            }
            if (peraggstate->noTransValue)
            {
                  /*
                   * transValue has not been initialized. This is the first non-NULL
                   * input value. We use it as the initial value for transValue. (We
                   * already checked that the agg's input type is binary-compatible
                   * with its transtype, so straight copy here is OK.)
                   *
                   * We must copy the datum into wincontext if it is pass-by-ref. We
                   * do not need to pfree the old transValue, since it's NULL.
                   */
                  MemoryContextSwitchTo(winstate->wincontext);
                  peraggstate->transValue = datumCopy(fcinfo->arg[1],
                                                                   peraggstate->transtypeByVal,
                                                                   peraggstate->transtypeLen);
                  peraggstate->transValueIsNull = false;
                  peraggstate->noTransValue = false;
                  MemoryContextSwitchTo(oldContext);
                  return;
            }
            if (peraggstate->transValueIsNull)
            {
                  /*
                   * Don't call a strict function with NULL inputs.  Note it is
                   * possible to get here despite the above tests, if the transfn is
                   * strict *and* returned a NULL on a prior cycle. If that happens
                   * we will propagate the NULL all the way to the end.
                   */
                  MemoryContextSwitchTo(oldContext);
                  return;
            }
      }

      /*
       * OK to call the transition function
       */
      InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
                                           numArguments + 1,
                                           (void *) winstate, NULL);
      fcinfo->arg[0] = peraggstate->transValue;
      fcinfo->argnull[0] = peraggstate->transValueIsNull;
      newVal = FunctionCallInvoke(fcinfo);

      /*
       * If pass-by-ref datatype, must copy the new value into wincontext and
       * pfree the prior transValue.      But if transfn returned a pointer to its
       * first input, we don't need to do anything.
       */
      if (!peraggstate->transtypeByVal &&
            DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
      {
            if (!fcinfo->isnull)
            {
                  MemoryContextSwitchTo(winstate->wincontext);
                  newVal = datumCopy(newVal,
                                             peraggstate->transtypeByVal,
                                             peraggstate->transtypeLen);
            }
            if (!peraggstate->transValueIsNull)
                  pfree(DatumGetPointer(peraggstate->transValue));
      }

      MemoryContextSwitchTo(oldContext);
      peraggstate->transValue = newVal;
      peraggstate->transValueIsNull = fcinfo->isnull;
}

/*
 * finalize_windowaggregate
 * parallel to finalize_aggregate in nodeAgg.c
 */
static void
finalize_windowaggregate(WindowAggState *winstate,
                                     WindowStatePerFunc perfuncstate,
                                     WindowStatePerAgg peraggstate,
                                     Datum *result, bool *isnull)
{
      MemoryContext                 oldContext;

      oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);

      /*
       * Apply the agg's finalfn if one is provided, else return transValue.
       */
      if (OidIsValid(peraggstate->finalfn_oid))
      {
            FunctionCallInfoData    fcinfo;

            InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
                                                 (void *) winstate, NULL);
            fcinfo.arg[0] = peraggstate->transValue;
            fcinfo.argnull[0] = peraggstate->transValueIsNull;
            if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
            {
                  /* don't call a strict function with NULL inputs */
                  *result = (Datum) 0;
                  *isnull = true;
            }
            else
            {
                  *result = FunctionCallInvoke(&fcinfo);
                  *isnull = fcinfo.isnull;
            }
      }
      else
      {
            *result = peraggstate->transValue;
            *isnull = peraggstate->transValueIsNull;
      }

      /*
       * If result is pass-by-ref, make sure it is in the right context.
       */
      if (!peraggstate->resulttypeByVal && !*isnull &&
            !MemoryContextContains(CurrentMemoryContext,
                                             DatumGetPointer(*result)))
            *result = datumCopy(*result,
                                          peraggstate->resulttypeByVal,
                                          peraggstate->resulttypeLen);
      MemoryContextSwitchTo(oldContext);
}

/*
 * eval_windowaggregates
 * evaluate plain aggregates being used as window functions
 *
 * Much of this is duplicated from nodeAgg.c.  But NOTE that we expect to be
 * able to call aggregate final functions repeatedly after aggregating more
 * data onto the same transition value.  This is not a behavior required by
 * nodeAgg.c.
 */
static void
eval_windowaggregates(WindowAggState *winstate)
{
      WindowStatePerAgg   peraggstate;
      int                           wfuncno, numaggs;
      int                           i;
      MemoryContext           oldContext;
      ExprContext          *econtext;
      TupleTableSlot       *agg_row_slot;

      numaggs = winstate->numaggs;
      if (numaggs == 0)
            return;                             /* nothing to do */

      /* final output execution is in ps_ExprContext */
      econtext = winstate->ss.ps.ps_ExprContext;

      /*
       * Currently, we support only a subset of the SQL-standard window framing
       * rules.  In all the supported cases, the window frame always consists
       * of a contiguous group of rows extending forward from the start of the
       * partition, and rows only enter the frame, never exit it, as the
       * current row advances forward.  This makes it possible to use an
       * incremental strategy for evaluating aggregates: we run the transition
       * function for each row added to the frame, and run the final function
       * whenever we need the current aggregate value.  This is considerably
       * more efficient than the naive approach of re-running the entire
       * aggregate calculation for each current row.  It does assume that the
       * final function doesn't damage the running transition value.  (Some
       * C-coded aggregates do that for efficiency's sake --- but they are
       * supposed to do so only when their fcinfo->context is an AggState, not
       * a WindowAggState.)
       *
       * In many common cases, multiple rows share the same frame and hence
       * the same aggregate value. (In particular, if there's no ORDER BY in
       * a RANGE window, then all rows are peers and so they all have window
       * frame equal to the whole partition.)  We optimize such cases by
       * calculating the aggregate value once when we reach the first row of a
       * peer group, and then returning the saved value for all subsequent rows.
       *
       * 'aggregatedupto' keeps track of the first row that has not yet been
       * accumulated into the aggregate transition values.  Whenever we start a
       * new peer group, we accumulate forward to the end of the peer group.
       *
       * TODO: In the future, we should implement the full SQL-standard set
       * of framing rules.  We could implement the other cases by recalculating
       * the aggregates whenever a row exits the frame.  That would be pretty
       * slow, though.  For aggregates like SUM and COUNT we could implement a
       * "negative transition function" that would be called for each row as it
       * exits the frame.  We'd have to think about avoiding recalculation of
       * volatile arguments of aggregate functions, too.
       */

      /*
       * If we've already aggregated up through current row, reuse the
       * saved result values.  NOTE: this test works for the currently
       * supported framing rules, but will need fixing when more are added.
       */
      if (winstate->aggregatedupto > winstate->currentpos)
      {
            for (i = 0; i < numaggs; i++)
            {
                  peraggstate = &winstate->peragg[i];
                  wfuncno = peraggstate->wfuncno;
                  econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
                  econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
            }
            return;
      }

      /* Initialize aggregates on first call for partition */
      if (winstate->currentpos == 0)
      {
            for (i = 0; i < numaggs; i++)
            {
                  peraggstate = &winstate->peragg[i];
                  wfuncno = peraggstate->wfuncno;
                  initialize_windowaggregate(winstate,
                                                         &winstate->perfunc[wfuncno],
                                                         peraggstate);
            }
      }

      /*
       * Advance until we reach a row not in frame (or end of partition).
       *
       * Note the loop invariant: agg_row_slot is either empty or holds the
       * row at position aggregatedupto.  The agg_ptr read pointer must always
       * point to the next row to read into agg_row_slot.
       */
      agg_row_slot = winstate->agg_row_slot;
      for (;;)
      {
            /* Fetch next row if we didn't already */
            if (TupIsNull(agg_row_slot))
            {
                  spool_tuples(winstate, winstate->aggregatedupto);
                  tuplestore_select_read_pointer(winstate->buffer,
                                                               winstate->agg_ptr);
                  if (!tuplestore_gettupleslot(winstate->buffer, true, true,
                                                             agg_row_slot))
                        break;                  /* must be end of partition */
            }

            /* Exit loop (for now) if not in frame */
            if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
                  break;

            /* Set tuple context for evaluation of aggregate arguments */
            winstate->tmpcontext->ecxt_outertuple = agg_row_slot;

            /* Accumulate row into the aggregates */
            for (i = 0; i < numaggs; i++)
            {
                  peraggstate = &winstate->peragg[i];
                  wfuncno = peraggstate->wfuncno;
                  advance_windowaggregate(winstate,
                                                      &winstate->perfunc[wfuncno],
                                                      peraggstate);
            }

            /* Reset per-input-tuple context after each tuple */
            ResetExprContext(winstate->tmpcontext);

            /* And advance the aggregated-row state */
            winstate->aggregatedupto++;
            ExecClearTuple(agg_row_slot);
      }

      /*
       * finalize aggregates and fill result/isnull fields.
       */
      for (i = 0; i < numaggs; i++)
      {
            Datum    *result;
            bool     *isnull;

            peraggstate = &winstate->peragg[i];
            wfuncno = peraggstate->wfuncno;
            result = &econtext->ecxt_aggvalues[wfuncno];
            isnull = &econtext->ecxt_aggnulls[wfuncno];
            finalize_windowaggregate(winstate,
                                                 &winstate->perfunc[wfuncno],
                                                 peraggstate,
                                                 result, isnull);

            /*
             * save the result in case next row shares the same frame.
             *
             * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know
             * in advance that the next row can't possibly share the same frame.
             * Is it worth detecting that and skipping this code?
             */
            if (!peraggstate->resulttypeByVal)
            {
                  /*
                   * clear old resultValue in order not to leak memory.  (Note:
                   * the new result can't possibly be the same datum as old
                   * resultValue, because we never passed it to the trans function.)
                   */
                  if (!peraggstate->resultValueIsNull)
                        pfree(DatumGetPointer(peraggstate->resultValue));

                  /*
                   * If pass-by-ref, copy it into our global context.
                   */
                  if (!*isnull)
                  {
                        oldContext = MemoryContextSwitchTo(winstate->wincontext);
                        peraggstate->resultValue =
                              datumCopy(*result,
                                            peraggstate->resulttypeByVal,
                                            peraggstate->resulttypeLen);
                        MemoryContextSwitchTo(oldContext);
                  }
            }
            else
            {
                  peraggstate->resultValue = *result;
            }
            peraggstate->resultValueIsNull = *isnull;
      }
}

/*
 * eval_windowfunction
 *
 * Arguments of window functions are not evaluated here, because a window
 * function can need random access to arbitrary rows in the partition.
 * The window function uses the special WinGetFuncArgInPartition and
 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
 * it wants.
 */
static void
eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
                              Datum *result, bool *isnull)
{
      FunctionCallInfoData fcinfo;
      MemoryContext           oldContext;

      oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);

      /*
       * We don't pass any normal arguments to a window function, but we do
       * pass it the number of arguments, in order to permit window function
       * implementations to support varying numbers of arguments.  The real
       * info goes through the WindowObject, which is passed via fcinfo->context.
       */
      InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
                                           perfuncstate->numArguments,
                                           (void *) perfuncstate->winobj, NULL);
      /* Just in case, make all the regular argument slots be null */
      memset(fcinfo.argnull, true, perfuncstate->numArguments);

      *result = FunctionCallInvoke(&fcinfo);
      *isnull = fcinfo.isnull;

      /*
       * Make sure pass-by-ref data is allocated in the appropriate context.
       * (We need this in case the function returns a pointer into some
       * short-lived tuple, as is entirely possible.)
       */
      if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
            !MemoryContextContains(CurrentMemoryContext,
                                             DatumGetPointer(*result)))
            *result = datumCopy(*result,
                                          perfuncstate->resulttypeByVal,
                                          perfuncstate->resulttypeLen);

      MemoryContextSwitchTo(oldContext);
}

/*
 * begin_partition
 * Start buffering rows of the next partition.
 */
static void
begin_partition(WindowAggState *winstate)
{
      PlanState      *outerPlan = outerPlanState(winstate);
      int                     numfuncs = winstate->numfuncs;
      int                     i;

      winstate->partition_spooled = false;
      winstate->frametail_valid = false;
      winstate->spooled_rows = 0;
      winstate->currentpos = 0;
      winstate->frametailpos = -1;
      winstate->aggregatedupto = 0;
      ExecClearTuple(winstate->agg_row_slot);

      /*
       * If this is the very first partition, we need to fetch the first
       * input row to store in first_part_slot.
       */
      if (TupIsNull(winstate->first_part_slot))
      {
            TupleTableSlot *outerslot = ExecProcNode(outerPlan);

            if (!TupIsNull(outerslot))
                   ExecCopySlot(winstate->first_part_slot, outerslot);
            else
            {
                  /* outer plan is empty, so we have nothing to do */
                  winstate->partition_spooled = true;
                  winstate->more_partitions = false;
                  return;
            }
      }

      /* Create new tuplestore for this partition */
      winstate->buffer = tuplestore_begin_heap(false, false, work_mem);

      /*
       * Set up read pointers for the tuplestore.  The current and agg pointers
       * don't need BACKWARD capability, but the per-window-function read
       * pointers do.
       */
      winstate->current_ptr = 0;    /* read pointer 0 is pre-allocated */

      /* reset default REWIND capability bit for current ptr */
      tuplestore_set_eflags(winstate->buffer, 0);

      /* create a read pointer for aggregates, if needed */
      if (winstate->numaggs > 0)
            winstate->agg_ptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);

      /* create mark and read pointers for each real window function */
      for (i = 0; i < numfuncs; i++)
      {
            WindowStatePerFunc      perfuncstate = &(winstate->perfunc[i]);

            if (!perfuncstate->plain_agg)
            {
                  WindowObject      winobj = perfuncstate->winobj;

                  winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
                                                                                          0);
                  winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
                                                                                          EXEC_FLAG_BACKWARD);
                  winobj->markpos = -1;
                  winobj->seekpos = -1;
            }
      }

      /*
       * Store the first tuple into the tuplestore (it's always available now;
       * we either read it above, or saved it at the end of previous partition)
       */
      tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
      winstate->spooled_rows++;
}

/*
 * Read tuples from the outer node, up to position 'pos', and store them
 * into the tuplestore. If pos is -1, reads the whole partition.
 */
static void
spool_tuples(WindowAggState *winstate, int64 pos)
{
      WindowAgg      *node = (WindowAgg *) winstate->ss.ps.plan;
      PlanState      *outerPlan;
      TupleTableSlot *outerslot;
      MemoryContext oldcontext;

      if (!winstate->buffer)
            return;                             /* just a safety check */
      if (winstate->partition_spooled)
            return;                             /* whole partition done already */

      /*
       * If the tuplestore has spilled to disk, alternate reading and writing
       * becomes quite expensive due to frequent buffer flushes.  It's cheaper
       * to force the entire partition to get spooled in one go.
       *
       * XXX this is a horrid kluge --- it'd be better to fix the performance
       * problem inside tuplestore.  FIXME
       */
      if (!tuplestore_in_memory(winstate->buffer))
            pos = -1;

      outerPlan = outerPlanState(winstate);

      /* Must be in query context to call outerplan or touch tuplestore */
      oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);

      while (winstate->spooled_rows <= pos || pos == -1)
      {
            outerslot = ExecProcNode(outerPlan);
            if (TupIsNull(outerslot))
            {
                  /* reached the end of the last partition */
                  winstate->partition_spooled = true;
                  winstate->more_partitions = false;
                  break;
            }

            if (node->partNumCols > 0)
            {
                  /* Check if this tuple still belongs to the current partition */
                  if (!execTuplesMatch(winstate->first_part_slot,
                                                 outerslot,
                                                 node->partNumCols, node->partColIdx,
                                                 winstate->partEqfunctions,
                                                 winstate->tmpcontext->ecxt_per_tuple_memory))
                  {
                        /*
                         * end of partition; copy the tuple for the next cycle.
                         */
                        ExecCopySlot(winstate->first_part_slot, outerslot);
                        winstate->partition_spooled = true;
                        winstate->more_partitions = true;
                        break;
                  }
            }

            /* Still in partition, so save it into the tuplestore */
            tuplestore_puttupleslot(winstate->buffer, outerslot);
            winstate->spooled_rows++;
      }

      MemoryContextSwitchTo(oldcontext);
}

/*
 * release_partition
 * clear information kept within a partition, including
 * tuplestore and aggregate results.
 */
static void
release_partition(WindowAggState *winstate)
{
      int                           i;

      for (i = 0; i < winstate->numfuncs; i++)
      {
            WindowStatePerFunc            perfuncstate = &(winstate->perfunc[i]);

            /* Release any partition-local state of this window function */
            if (perfuncstate->winobj)
                  perfuncstate->winobj->localmem = NULL;
      }

      /*
       * Release all partition-local memory (in particular, any partition-local
       * state that we might have trashed our pointers to in the above loop, and
       * any aggregate temp data).  We don't rely on retail pfree because some
       * aggregates might have allocated data we don't have direct pointers to.
       */
      MemoryContextResetAndDeleteChildren(winstate->wincontext);

      if (winstate->buffer)
            tuplestore_end(winstate->buffer);
      winstate->buffer = NULL;
      winstate->partition_spooled = false;
}

/*
 * row_is_in_frame
 * Determine whether a row is in the current row's window frame according
 * to our window framing rule
 *
 * The caller must have already determined that the row is in the partition
 * and fetched it into a slot.  This function just encapsulates the framing
 * rules.
 */
static bool
row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
{
      WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
      int               frameOptions = node->frameOptions;

      Assert(pos >= 0);             /* else caller error */

      /* We only support frame start mode UNBOUNDED PRECEDING for now */
      Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);

      /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
      if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
            return true;

      /* Else frame tail mode must be CURRENT ROW */
      Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);

      /* if row is current row or a predecessor, it must be in frame */
      if (pos <= winstate->currentpos)
            return true;

      /* In ROWS mode, *only* such rows are in frame */
      if (frameOptions & FRAMEOPTION_ROWS)
            return false;

      /* Else must be RANGE mode */
      Assert(frameOptions & FRAMEOPTION_RANGE);

      /* In frame iff it's a peer of current row */
      return are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot);
}

/*
 * update_frametailpos
 * make frametailpos valid for the current row
 *
 * Uses the winobj's read pointer for any required fetches; the winobj's
 * mark must not be past the currently known frame tail.  Also uses the
 * specified slot for any required fetches.
 */
static void
update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
{
      WindowAggState *winstate = winobj->winstate;
      WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
      int               frameOptions = node->frameOptions;
      int64       ftnext;

      if (winstate->frametail_valid)
            return;                             /* already known for current row */

      /* We only support frame start mode UNBOUNDED PRECEDING for now */
      Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);

      /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
      if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
      {
            spool_tuples(winstate, -1);
            winstate->frametailpos = winstate->spooled_rows - 1;
            winstate->frametail_valid = true;
            return;
      }

      /* Else frame tail mode must be CURRENT ROW */
      Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);

      /* In ROWS mode, exactly the rows up to current are in frame */
      if (frameOptions & FRAMEOPTION_ROWS)
      {
            winstate->frametailpos = winstate->currentpos;
            winstate->frametail_valid = true;
            return;
      }

      /* Else must be RANGE mode */
      Assert(frameOptions & FRAMEOPTION_RANGE);

      /* If no ORDER BY, all rows are peers with each other */
      if (node->ordNumCols == 0)
      {
            spool_tuples(winstate, -1);
            winstate->frametailpos = winstate->spooled_rows - 1;
            winstate->frametail_valid = true;
            return;
      }

      /*
       * Else we have to search for the first non-peer of the current row.
       * We assume the current value of frametailpos is a lower bound on the
       * possible frame tail location, ie, frame tail never goes backward, and
       * that currentpos is also a lower bound, ie, current row is always in
       * frame.
       */
      ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
      for (;;)
      {
            if (!window_gettupleslot(winobj, ftnext, slot))
                  break;                        /* end of partition */
            if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
                  break;                        /* not peer of current row */
            ftnext++;
      }
      winstate->frametailpos = ftnext - 1;
      winstate->frametail_valid = true;
}


/* -----------------
 * ExecWindowAgg
 *
 *    ExecWindowAgg receives tuples from its outer subplan and
 *    stores them into a tuplestore, then processes window functions.
 *    This node doesn't reduce nor qualify any row so the number of
 *    returned rows is exactly the same as its outer subplan's result
 *    (ignoring the case of SRFs in the targetlist, that is).
 * -----------------
 */
TupleTableSlot *
ExecWindowAgg(WindowAggState *winstate)
{
      TupleTableSlot *result;
      ExprDoneCond      isDone;
      ExprContext    *econtext;
      int                     i;
      int                     numfuncs;

      if (winstate->all_done)
            return NULL;

      /*
       * Check to see if we're still projecting out tuples from a previous output
       * tuple (because there is a function-returning-set in the projection
       * expressions).  If so, try to project another one.
       */
      if (winstate->ss.ps.ps_TupFromTlist)
      {
            TupleTableSlot *result;
            ExprDoneCond isDone;

            result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
            if (isDone == ExprMultipleResult)
                  return result;
            /* Done with that source tuple... */
            winstate->ss.ps.ps_TupFromTlist = false;
      }

restart:
      if (winstate->buffer == NULL)
      {
            /* Initialize for first partition and set current row = 0 */
            begin_partition(winstate);
            /* If there are no input rows, we'll detect that and exit below */
      }
      else
      {
            /* Advance current row within partition */
            winstate->currentpos++;
            /* This might mean that the frame tail moves, too */
            winstate->frametail_valid = false;
      }

      /*
       * Spool all tuples up to and including the current row, if we haven't
       * already
       */
      spool_tuples(winstate, winstate->currentpos);

      /* Move to the next partition if we reached the end of this partition */
      if (winstate->partition_spooled &&
            winstate->currentpos >= winstate->spooled_rows)
      {
            release_partition(winstate);

            if (winstate->more_partitions)
            {
                  begin_partition(winstate);
                  Assert(winstate->spooled_rows > 0);
            }
            else
            {
                  winstate->all_done = true;
                  return NULL;
            }
      }

      /* final output execution is in ps_ExprContext */
      econtext = winstate->ss.ps.ps_ExprContext;

      /* Clear the per-output-tuple context for current row */
      ResetExprContext(econtext);

      /*
       * Read the current row from the tuplestore, and save in ScanTupleSlot.
       * (We can't rely on the outerplan's output slot because we may have to
       * read beyond the current row.  Also, we have to actually copy the row
       * out of the tuplestore, since window function evaluation might cause
       * the tuplestore to dump its state to disk.)
       *
       * Current row must be in the tuplestore, since we spooled it above.
       */
      tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
      if (!tuplestore_gettupleslot(winstate->buffer, true, true,
                                                 winstate->ss.ss_ScanTupleSlot))
            elog(ERROR, "unexpected end of tuplestore");

      /*
       * Evaluate true window functions
       */
      numfuncs = winstate->numfuncs;
      for (i = 0; i < numfuncs; i++)
      {
            WindowStatePerFunc      perfuncstate = &(winstate->perfunc[i]);

            if (perfuncstate->plain_agg)
                  continue;
            eval_windowfunction(winstate, perfuncstate,
                                          &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
                                          &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
      }

      /*
       * Evaluate aggregates
       */
      if (winstate->numaggs > 0)
            eval_windowaggregates(winstate);

      /*
       * Truncate any no-longer-needed rows from the tuplestore.
       */
      tuplestore_trim(winstate->buffer);

      /*
       * Form and return a projection tuple using the windowfunc results
       * and the current row.  Setting ecxt_outertuple arranges that any
       * Vars will be evaluated with respect to that row.
       */
      econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
      result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);

      if (isDone == ExprEndResult)
      {
            /* SRF in tlist returned no rows, so advance to next input tuple */
            goto restart;
      }

      winstate->ss.ps.ps_TupFromTlist =
            (isDone == ExprMultipleResult);
      return result;
}

/* -----------------
 * ExecInitWindowAgg
 *
 *    Creates the run-time information for the WindowAgg node produced by the
 *    planner and initializes its outer subtree
 * -----------------
 */
WindowAggState *
ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
{
      WindowAggState *winstate;
      Plan     *outerPlan;
      ExprContext *econtext;
      ExprContext *tmpcontext;
      WindowStatePerFunc  perfunc;
      WindowStatePerAgg   peragg;
      int               numfuncs,
                        wfuncno,
                        numaggs,
                        aggno;
      ListCell   *l;

      /* check for unsupported flags */
      Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));

      /*
       * create state structure
       */
      winstate = makeNode(WindowAggState);
      winstate->ss.ps.plan = (Plan *) node;
      winstate->ss.ps.state = estate;

      /*
       * Create expression contexts.      We need two, one for per-input-tuple
       * processing and one for per-output-tuple processing.      We cheat a little
       * by using ExecAssignExprContext() to build both.
       */
      ExecAssignExprContext(estate, &winstate->ss.ps);
      tmpcontext = winstate->ss.ps.ps_ExprContext;
      winstate->tmpcontext = tmpcontext;
      ExecAssignExprContext(estate, &winstate->ss.ps);

      /* Create long-lived context for storage of aggregate transvalues etc */
      winstate->wincontext =
            AllocSetContextCreate(CurrentMemoryContext,
                                            "WindowAggContext",
                                            ALLOCSET_DEFAULT_MINSIZE,
                                            ALLOCSET_DEFAULT_INITSIZE,
                                            ALLOCSET_DEFAULT_MAXSIZE);

#define WINDOWAGG_NSLOTS 6

      /*
       * tuple table initialization
       */
      ExecInitScanTupleSlot(estate, &winstate->ss);
      ExecInitResultTupleSlot(estate, &winstate->ss.ps);
      winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
      winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
      winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
      winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);

      winstate->ss.ps.targetlist = (List *)
            ExecInitExpr((Expr *) node->plan.targetlist,
                               (PlanState *) winstate);

      /*
       * WindowAgg nodes never have quals, since they can only occur at the
       * logical top level of a query (ie, after any WHERE or HAVING filters)
       */
      Assert(node->plan.qual == NIL);
      winstate->ss.ps.qual = NIL;

      /*
       * initialize child nodes
       */
      outerPlan = outerPlan(node);
      outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);

      /*
       * initialize source tuple type (which is also the tuple type that we'll
       * store in the tuplestore and use in all our working slots).
       */
      ExecAssignScanTypeFromOuterPlan(&winstate->ss);

      ExecSetSlotDescriptor(winstate->first_part_slot,
                                      winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
      ExecSetSlotDescriptor(winstate->agg_row_slot,
                                      winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
      ExecSetSlotDescriptor(winstate->temp_slot_1,
                                      winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
      ExecSetSlotDescriptor(winstate->temp_slot_2,
                                      winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);

      /*
       * Initialize result tuple type and projection info.
       */
      ExecAssignResultTypeFromTL(&winstate->ss.ps);
      ExecAssignProjectionInfo(&winstate->ss.ps, NULL);

      winstate->ss.ps.ps_TupFromTlist = false;

      /* Set up data for comparing tuples */
      if (node->partNumCols > 0)
            winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
                                                                                      node->partOperators);
      if (node->ordNumCols > 0)
            winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
                                                                                      node->ordOperators);

      /*
       * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
       */
      numfuncs = winstate->numfuncs;
      numaggs = winstate->numaggs;
      econtext = winstate->ss.ps.ps_ExprContext;
      econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
      econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);

      /*
       * allocate per-wfunc/per-agg state information.
       */
      perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
      peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
      winstate->perfunc = perfunc;
      winstate->peragg = peragg;

      wfuncno = -1;
      aggno = -1;
      foreach(l, winstate->funcs)
      {
            WindowFuncExprState        *wfuncstate = (WindowFuncExprState *) lfirst(l);
            WindowFunc                 *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
            WindowStatePerFunc perfuncstate;
            AclResult   aclresult;
            int               i;

            if (wfunc->winref != node->winref)              /* planner screwed up? */
                  elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
                         wfunc->winref, node->winref);

            /* Look for a previous duplicate window function */
            for (i = 0; i <= wfuncno; i++)
            {
                  if (equal(wfunc, perfunc[i].wfunc) &&
                        !contain_volatile_functions((Node *) wfunc))
                        break;
            }
            if (i <= wfuncno)
            {
                  /* Found a match to an existing entry, so just mark it */
                  wfuncstate->wfuncno = i;
                  continue;
            }

            /* Nope, so assign a new PerAgg record */
            perfuncstate = &perfunc[++wfuncno];

            /* Mark WindowFunc state node with assigned index in the result array */
            wfuncstate->wfuncno = wfuncno;

            /* Check permission to call window function */
            aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
                                                       ACL_EXECUTE);
            if (aclresult != ACLCHECK_OK)
                  aclcheck_error(aclresult, ACL_KIND_PROC,
                                       get_func_name(wfunc->winfnoid));

            /* Fill in the perfuncstate data */
            perfuncstate->wfuncstate = wfuncstate;
            perfuncstate->wfunc = wfunc;
            perfuncstate->numArguments = list_length(wfuncstate->args);

            fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
                                tmpcontext->ecxt_per_query_memory);
            perfuncstate->flinfo.fn_expr = (Node *) wfunc;
            get_typlenbyval(wfunc->wintype,
                                    &perfuncstate->resulttypeLen,
                                    &perfuncstate->resulttypeByVal);

            /*
             * If it's really just a plain aggregate function,
             * we'll emulate the Agg environment for it.
             */
            perfuncstate->plain_agg = wfunc->winagg;
            if (wfunc->winagg)
            {
                  WindowStatePerAgg peraggstate;

                  perfuncstate->aggno = ++aggno;
                  peraggstate = &winstate->peragg[aggno];
                  initialize_peragg(winstate, wfunc, peraggstate);
                  peraggstate->wfuncno = wfuncno;
            }
            else
            {
                  WindowObject winobj = makeNode(WindowObjectData);

                  winobj->winstate = winstate;
                  winobj->argstates = wfuncstate->args;
                  winobj->localmem = NULL;
                  perfuncstate->winobj = winobj;
            }
      }

      /* Update numfuncs, numaggs to match number of unique functions found */
      winstate->numfuncs = wfuncno + 1;
      winstate->numaggs = aggno + 1;

      winstate->partition_spooled = false;
      winstate->more_partitions = false;

      return winstate;
}

/* -----------------
 * ExecCountSlotsWindowAgg
 * -----------------
 */
int
ExecCountSlotsWindowAgg(WindowAgg *node)
{
      return ExecCountSlotsNode(outerPlan(node)) +
            ExecCountSlotsNode(innerPlan(node)) +
            WINDOWAGG_NSLOTS;
}

/* -----------------
 * ExecEndWindowAgg
 * -----------------
 */
void
ExecEndWindowAgg(WindowAggState *node)
{
      PlanState  *outerPlan;

      release_partition(node);

      pfree(node->perfunc);
      pfree(node->peragg);

      ExecClearTuple(node->ss.ss_ScanTupleSlot);
      ExecClearTuple(node->first_part_slot);
      ExecClearTuple(node->agg_row_slot);
      ExecClearTuple(node->temp_slot_1);
      ExecClearTuple(node->temp_slot_2);

      /*
       * Free both the expr contexts.
       */
      ExecFreeExprContext(&node->ss.ps);
      node->ss.ps.ps_ExprContext = node->tmpcontext;
      ExecFreeExprContext(&node->ss.ps);

      MemoryContextDelete(node->wincontext);

      outerPlan = outerPlanState(node);
      ExecEndNode(outerPlan);
}

/* -----------------
 * ExecRescanWindowAgg
 * -----------------
 */
void
ExecReScanWindowAgg(WindowAggState *node, ExprContext *exprCtxt)
{
      ExprContext    *econtext = node->ss.ps.ps_ExprContext;

      node->all_done = false;

      node->ss.ps.ps_TupFromTlist = false;

      /* release tuplestore et al */
      release_partition(node);

      /* release all temp tuples, but especially first_part_slot */
      ExecClearTuple(node->ss.ss_ScanTupleSlot);
      ExecClearTuple(node->first_part_slot);
      ExecClearTuple(node->agg_row_slot);
      ExecClearTuple(node->temp_slot_1);
      ExecClearTuple(node->temp_slot_2);

      /* Forget current wfunc values */
      MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
      MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);

      /*
       * if chgParam of subnode is not null then plan will be re-scanned by
       * first ExecProcNode.
       */
      if (((PlanState *) node)->lefttree->chgParam == NULL)
            ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
}

/*
 * initialize_peragg
 *
 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
 */
static WindowStatePerAggData *
initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
                          WindowStatePerAgg peraggstate)
{
      Oid               inputTypes[FUNC_MAX_ARGS];
      int               numArguments;
      HeapTuple   aggTuple;
      Form_pg_aggregate aggform;
      Oid               aggtranstype;
      AclResult   aclresult;
      Oid               transfn_oid,
                        finalfn_oid;
      Expr     *transfnexpr,
                     *finalfnexpr;
      Datum       textInitVal;
      int               i;
      ListCell   *lc;

      numArguments = list_length(wfunc->args);

      i = 0;
      foreach(lc, wfunc->args)
      {
            inputTypes[i++] = exprType((Node *) lfirst(lc));
      }

      aggTuple = SearchSysCache(AGGFNOID,
                                            ObjectIdGetDatum(wfunc->winfnoid),
                                            0, 0, 0);
      if (!HeapTupleIsValid(aggTuple))
            elog(ERROR, "cache lookup failed for aggregate %u",
                   wfunc->winfnoid);
      aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);

      /*
       * ExecInitWindowAgg already checked permission to call aggregate function
       * ... but we still need to check the component functions
       */

      peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
      peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;

      /* Check that aggregate owner has permission to call component fns */
      {
            HeapTuple   procTuple;
            Oid               aggOwner;

            procTuple = SearchSysCache(PROCOID,
                                                   ObjectIdGetDatum(wfunc->winfnoid),
                                                   0, 0, 0);
            if (!HeapTupleIsValid(procTuple))
                  elog(ERROR, "cache lookup failed for function %u",
                         wfunc->winfnoid);
            aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
            ReleaseSysCache(procTuple);

            aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
                                                       ACL_EXECUTE);
            if (aclresult != ACLCHECK_OK)
                  aclcheck_error(aclresult, ACL_KIND_PROC,
                                       get_func_name(transfn_oid));
            if (OidIsValid(finalfn_oid))
            {
                  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
                                                             ACL_EXECUTE);
                  if (aclresult != ACLCHECK_OK)
                        aclcheck_error(aclresult, ACL_KIND_PROC,
                                             get_func_name(finalfn_oid));
            }
      }

      /* resolve actual type of transition state, if polymorphic */
      aggtranstype = aggform->aggtranstype;
      if (IsPolymorphicType(aggtranstype))
      {
            /* have to fetch the agg's declared input types... */
            Oid            *declaredArgTypes;
            int               agg_nargs;

            get_func_signature(wfunc->winfnoid,
                                       &declaredArgTypes, &agg_nargs);
            Assert(agg_nargs == numArguments);
            aggtranstype = enforce_generic_type_consistency(inputTypes,
                                                                                    declaredArgTypes,
                                                                                    agg_nargs,
                                                                                    aggtranstype,
                                                                                    false);
            pfree(declaredArgTypes);
      }

      /* build expression trees using actual argument & result types */
      build_aggregate_fnexprs(inputTypes,
                                          numArguments,
                                          aggtranstype,
                                          wfunc->wintype,
                                          transfn_oid,
                                          finalfn_oid,
                                          &transfnexpr,
                                          &finalfnexpr);

      fmgr_info(transfn_oid, &peraggstate->transfn);
      peraggstate->transfn.fn_expr = (Node *) transfnexpr;

      if (OidIsValid(finalfn_oid))
      {
            fmgr_info(finalfn_oid, &peraggstate->finalfn);
            peraggstate->finalfn.fn_expr = (Node *) finalfnexpr;
      }

      get_typlenbyval(wfunc->wintype,
                              &peraggstate->resulttypeLen,
                              &peraggstate->resulttypeByVal);
      get_typlenbyval(aggtranstype,
                              &peraggstate->transtypeLen,
                              &peraggstate->transtypeByVal);

      /*
       * initval is potentially null, so don't try to access it as a struct
       * field. Must do it the hard way with SysCacheGetAttr.
       */
      textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
                                                  Anum_pg_aggregate_agginitval,
                                                  &peraggstate->initValueIsNull);

      if (peraggstate->initValueIsNull)
            peraggstate->initValue = (Datum) 0;
      else
            peraggstate->initValue = GetAggInitVal(textInitVal,
                                                                     aggtranstype);

      /*
       * If the transfn is strict and the initval is NULL, make sure input
       * type and transtype are the same (or at least binary-compatible), so
       * that it's OK to use the first input value as the initial
       * transValue.    This should have been checked at agg definition time,
       * but just in case...
       */
      if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
      {
            if (numArguments < 1 ||
                  !IsBinaryCoercible(inputTypes[0], aggtranstype))
                  ereport(ERROR,
                              (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
                               errmsg("aggregate %u needs to have compatible input type and transition type",
                                          wfunc->winfnoid)));
      }

      ReleaseSysCache(aggTuple);

      return peraggstate;
}

static Datum
GetAggInitVal(Datum textInitVal, Oid transtype)
{
      Oid               typinput,
                        typioparam;
      char     *strInitVal;
      Datum       initVal;

      getTypeInputInfo(transtype, &typinput, &typioparam);
      strInitVal = TextDatumGetCString(textInitVal);
      initVal = OidInputFunctionCall(typinput, strInitVal,
                                                   typioparam, -1);
      pfree(strInitVal);
      return initVal;
}

/*
 * are_peers
 * compare two rows to see if they are equal according to the ORDER BY clause
 *
 * NB: this does not consider the window frame mode.
 */
static bool
are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
              TupleTableSlot *slot2)
{
      WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;

      /* If no ORDER BY, all rows are peers with each other */
      if (node->ordNumCols == 0)
            return true;

      return execTuplesMatch(slot1, slot2,
                                       node->ordNumCols, node->ordColIdx,
                                       winstate->ordEqfunctions,
                                       winstate->tmpcontext->ecxt_per_tuple_memory);
}

/*
 * window_gettupleslot
 *    Fetch the pos'th tuple of the current partition into the slot,
 *    using the winobj's read pointer
 *
 * Returns true if successful, false if no such row
 */
static bool
window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
{
      WindowAggState *winstate = winobj->winstate;
      MemoryContext oldcontext;

      /* Don't allow passing -1 to spool_tuples here */
      if (pos < 0)
            return false;

      /* If necessary, fetch the tuple into the spool */
      spool_tuples(winstate, pos);

      if (pos >= winstate->spooled_rows)
            return false;

      if (pos < winobj->markpos)
            elog(ERROR, "cannot fetch row before WindowObject's mark position");

      oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);

      tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);

      /*
       * There's no API to refetch the tuple at the current position. We
       * have to move one tuple forward, and then one backward.  (We don't
       * do it the other way because we might try to fetch the row before
       * our mark, which isn't allowed.)
       */
      if (winobj->seekpos == pos)
      {
            tuplestore_advance(winstate->buffer, true);
            winobj->seekpos++;
      }

      while (winobj->seekpos > pos)
      {
            if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
                  elog(ERROR, "unexpected end of tuplestore");
            winobj->seekpos--;
      }

      while (winobj->seekpos < pos)
      {
            if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
                  elog(ERROR, "unexpected end of tuplestore");
            winobj->seekpos++;
      }

      MemoryContextSwitchTo(oldcontext);

      return true;
}


/***********************************************************************
 * API exposed to window functions
 ***********************************************************************/


/*
 * WinGetPartitionLocalMemory
 *          Get working memory that lives till end of partition processing
 *
 * On first call within a given partition, this allocates and zeroes the
 * requested amount of space.  Subsequent calls just return the same chunk.
 *
 * Memory obtained this way is normally used to hold state that should be
 * automatically reset for each new partition.  If a window function wants
 * to hold state across the whole query, fcinfo->fn_extra can be used in the
 * usual way for that.
 */
void *
WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
{
      Assert(WindowObjectIsValid(winobj));
      if (winobj->localmem == NULL)
            winobj->localmem = MemoryContextAllocZero(winobj->winstate->wincontext,
                                                                          sz);
      return winobj->localmem;
}

/*
 * WinGetCurrentPosition
 *          Return the current row's position (counting from 0) within the current
 *          partition.
 */
int64
WinGetCurrentPosition(WindowObject winobj)
{
      Assert(WindowObjectIsValid(winobj));
      return winobj->winstate->currentpos;
}

/*
 * WinGetPartitionRowCount
 *          Return total number of rows contained in the current partition.
 *
 * Note: this is a relatively expensive operation because it forces the
 * whole partition to be "spooled" into the tuplestore at once.  Once
 * executed, however, additional calls within the same partition are cheap.
 */
int64
WinGetPartitionRowCount(WindowObject winobj)
{
      Assert(WindowObjectIsValid(winobj));
      spool_tuples(winobj->winstate, -1);
      return winobj->winstate->spooled_rows;
}

/*
 * WinSetMarkPosition
 *          Set the "mark" position for the window object, which is the oldest row
 *          number (counting from 0) it is allowed to fetch during all subsequent
 *          operations within the current partition.
 *
 * Window functions do not have to call this, but are encouraged to move the
 * mark forward when possible to keep the tuplestore size down and prevent
 * having to spill rows to disk.
 */
void
WinSetMarkPosition(WindowObject winobj, int64 markpos)
{
      WindowAggState *winstate;

      Assert(WindowObjectIsValid(winobj));
      winstate = winobj->winstate;

      if (markpos < winobj->markpos)
            elog(ERROR, "cannot move WindowObject's mark position backward");
      tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
      while (markpos > winobj->markpos)
      {
            tuplestore_advance(winstate->buffer, true);
            winobj->markpos++;
      }
      tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
      while (markpos > winobj->seekpos)
      {
            tuplestore_advance(winstate->buffer, true);
            winobj->seekpos++;
      }
}

/*
 * WinRowsArePeers
 *          Compare two rows (specified by absolute position in window) to see
 *          if they are equal according to the ORDER BY clause.
 *
 * NB: this does not consider the window frame mode.
 */
bool
WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
{
      WindowAggState *winstate;
      WindowAgg      *node;
      TupleTableSlot *slot1;
      TupleTableSlot *slot2;
      bool              res;

      Assert(WindowObjectIsValid(winobj));
      winstate = winobj->winstate;
      node = (WindowAgg *) winstate->ss.ps.plan;

      /* If no ORDER BY, all rows are peers; don't bother to fetch them */
      if (node->ordNumCols == 0)
            return true;

      slot1 = winstate->temp_slot_1;
      slot2 = winstate->temp_slot_2;

      if (!window_gettupleslot(winobj, pos1, slot1))
            elog(ERROR, "specified position is out of window: " INT64_FORMAT,
                   pos1);
      if (!window_gettupleslot(winobj, pos2, slot2))
            elog(ERROR, "specified position is out of window: " INT64_FORMAT,
                   pos2);

      res = are_peers(winstate, slot1, slot2);

      ExecClearTuple(slot1);
      ExecClearTuple(slot2);

      return res;
}

/*
 * WinGetFuncArgInPartition
 *          Evaluate a window function's argument expression on a specified
 *          row of the partition.  The row is identified in lseek(2) style,
 *          i.e. relative to the current, first, or last row.
 *
 * argno: argument number to evaluate (counted from 0)
 * relpos: signed rowcount offset from the seek position
 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
 * set_mark: If the row is found and set_mark is true, the mark is moved to
 *          the row as a side-effect.
 * isnull: output argument, receives isnull status of result
 * isout: output argument, set to indicate whether target row position
 *          is out of partition (can pass NULL if caller doesn't care about this)
 *
 * Specifying a nonexistent row is not an error, it just causes a null result
 * (plus setting *isout true, if isout isn't NULL).
 */
Datum
WinGetFuncArgInPartition(WindowObject winobj, int argno,
                                     int relpos, int seektype, bool set_mark,
                                     bool *isnull, bool *isout)
{
      WindowAggState *winstate;
      ExprContext *econtext;
      TupleTableSlot *slot;
      bool        gottuple;
      int64       abs_pos;

      Assert(WindowObjectIsValid(winobj));
      winstate = winobj->winstate;
      econtext = winstate->ss.ps.ps_ExprContext;
      slot = winstate->temp_slot_1;

      switch (seektype)
      {
            case WINDOW_SEEK_CURRENT:
                  abs_pos = winstate->currentpos + relpos;
                  break;
            case WINDOW_SEEK_HEAD:
                  abs_pos = relpos;
                  break;
            case WINDOW_SEEK_TAIL:
                  spool_tuples(winstate, -1);
                  abs_pos = winstate->spooled_rows - 1 + relpos;
                  break;
            default:
                  elog(ERROR, "unrecognized window seek type: %d", seektype);
                  abs_pos = 0; /* keep compiler quiet */
                  break;
      }

      gottuple = window_gettupleslot(winobj, abs_pos, slot);

      if (!gottuple)
      {
            if (isout)
                  *isout = true;
            *isnull = true;
            return (Datum) 0;
      }
      else
      {
            if (isout)
                  *isout = false;
            if (set_mark)
                  WinSetMarkPosition(winobj, abs_pos);
            econtext->ecxt_outertuple = slot;
            return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
                                          econtext, isnull, NULL);
      }
}

/*
 * WinGetFuncArgInFrame
 *          Evaluate a window function's argument expression on a specified
 *          row of the window frame.  The row is identified in lseek(2) style,
 *          i.e. relative to the current, first, or last row.
 *
 * argno: argument number to evaluate (counted from 0)
 * relpos: signed rowcount offset from the seek position
 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
 * set_mark: If the row is found and set_mark is true, the mark is moved to
 *          the row as a side-effect.
 * isnull: output argument, receives isnull status of result
 * isout: output argument, set to indicate whether target row position
 *          is out of frame (can pass NULL if caller doesn't care about this)
 *
 * Specifying a nonexistent row is not an error, it just causes a null result
 * (plus setting *isout true, if isout isn't NULL).
 */
Datum
WinGetFuncArgInFrame(WindowObject winobj, int argno,
                               int relpos, int seektype, bool set_mark,
                               bool *isnull, bool *isout)
{
      WindowAggState *winstate;
      ExprContext *econtext;
      TupleTableSlot *slot;
      bool        gottuple;
      int64       abs_pos;

      Assert(WindowObjectIsValid(winobj));
      winstate = winobj->winstate;
      econtext = winstate->ss.ps.ps_ExprContext;
      slot = winstate->temp_slot_1;

      switch (seektype)
      {
            case WINDOW_SEEK_CURRENT:
                  abs_pos = winstate->currentpos + relpos;
                  break;
            case WINDOW_SEEK_HEAD:
                  abs_pos = relpos;
                  break;
            case WINDOW_SEEK_TAIL:
                  update_frametailpos(winobj, slot);
                  abs_pos = winstate->frametailpos + relpos;
                  break;
            default:
                  elog(ERROR, "unrecognized window seek type: %d", seektype);
                  abs_pos = 0; /* keep compiler quiet */
                  break;
      }

      gottuple = window_gettupleslot(winobj, abs_pos, slot);
      if (gottuple)
            gottuple = row_is_in_frame(winstate, abs_pos, slot);

      if (!gottuple)
      {
            if (isout)
                  *isout = true;
            *isnull = true;
            return (Datum) 0;
      }
      else
      {
            if (isout)
                  *isout = false;
            if (set_mark)
                  WinSetMarkPosition(winobj, abs_pos);
            econtext->ecxt_outertuple = slot;
            return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
                                          econtext, isnull, NULL);
      }
}

/*
 * WinGetFuncArgCurrent
 *          Evaluate a window function's argument expression on the current row.
 *
 * argno: argument number to evaluate (counted from 0)
 * isnull: output argument, receives isnull status of result
 *
 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
 * WinGetFuncArgInFrame targeting the current row, because it will succeed
 * even if the WindowObject's mark has been set beyond the current row.
 * This should generally be used for "ordinary" arguments of a window
 * function, such as the offset argument of lead() or lag().
 */
Datum
WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
{
      WindowAggState *winstate;
      ExprContext *econtext;

      Assert(WindowObjectIsValid(winobj));
      winstate = winobj->winstate;

      econtext = winstate->ss.ps.ps_ExprContext;

      econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
      return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
                                    econtext, isnull, NULL);
}

Generated by  Doxygen 1.6.0   Back to index