diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index ec7b6f53624f2..b0c4cdcdf24f9 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -118,7 +118,8 @@ CATALOG_HEADERS := \ pg_publication_namespace.h \ pg_publication_rel.h \ pg_subscription.h \ - pg_subscription_rel.h + pg_subscription_rel.h \ + pg_copy_handler.h \ GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h @@ -150,6 +151,7 @@ POSTGRES_BKI_DATA = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_ts_parser.dat \ pg_ts_template.dat \ pg_type.dat \ + pg_copy_handler.dat \ ) all: generated-header-symlinks diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cfad47b562249..d5c1cb50a3c6f 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -18,10 +18,12 @@ #include #include +#include "access/genam.h" #include "access/sysattr.h" #include "access/table.h" #include "access/xact.h" #include "catalog/pg_authid.h" +#include "catalog/pg_copy_handler.h" #include "commands/copy.h" #include "commands/defrem.h" #include "executor/executor.h" @@ -36,6 +38,8 @@ #include "rewrite/rewriteHandler.h" #include "utils/acl.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/formatting.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -427,6 +431,8 @@ ProcessCopyOptions(ParseState *pstate, opts_out->file_encoding = -1; + /* Text is the default format. */ + opts_out->handler = GetCopyRoutineByName(DEFAULT_COPY_HANDLER); /* Extract options from the statement node tree */ foreach(option, options) { @@ -439,17 +445,11 @@ ProcessCopyOptions(ParseState *pstate, if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) + opts_out->handler = GetCopyRoutineByName(fmt); + if (strcmp(fmt, "csv") == 0) opts_out->csv_mode = true; else if (strcmp(fmt, "binary") == 0) opts_out->binary = true; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt), - parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "freeze") == 0) { @@ -864,3 +864,146 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) return attnums; } + +static const +CopyRoutine CopyRoutineText = { + .type = T_CopyRoutine, + .to_start = CopyToFormatTextStart, + .to_one_row = CopyToFormatTextOneRow, + .to_end = CopyToFormatTextEnd, + .from_start = CopyFromFormatTextStart, + .from_next = CopyFromFormatTextNext, + .from_error_callback = CopyFromFormatTextErrorCallback, +}; + +/* + * We can use the same CopyRoutine for both of "text" and "csv" because + * CopyToFormatText*() refer cstate->opts.csv_mode and change their + * behavior. We can split the implementations and stop referring + * cstate->opts.csv_mode later. + */ +static const +CopyRoutine CopyRoutineCSV = { + .type = T_CopyRoutine, + .to_start = CopyToFormatTextStart, + .to_one_row = CopyToFormatTextOneRow, + .to_end = CopyToFormatTextEnd, + .from_start = CopyFromFormatTextStart, + .from_next = CopyFromFormatTextNext, + .from_error_callback = CopyFromFormatTextErrorCallback, +}; + +static const +CopyRoutine CopyRoutineBinary = { + .type = T_CopyRoutine, + .to_start = CopyToFormatBinaryStart, + .to_one_row = CopyToFormatBinaryOneRow, + .to_end = CopyToFormatBinaryEnd, + .from_start = CopyFromFormatBinaryStart, + .from_next = CopyFromFormatBinaryNext, + .from_error_callback = CopyFromFormatBinaryErrorCallback, +}; + +Datum +text_copy_handler(PG_FUNCTION_ARGS) +{ + PG_RETURN_POINTER(&CopyRoutineText); +} + +Datum +csv_copy_handler(PG_FUNCTION_ARGS) +{ + PG_RETURN_POINTER(&CopyRoutineCSV); +} + +Datum +binary_copy_handler(PG_FUNCTION_ARGS) +{ + PG_RETURN_POINTER(&CopyRoutineBinary); +} + +static NameData +fmt_to_name(char *fmt) +{ + char *lcf; /* lower cased fmt */ + size_t len; + NameData fmtname; + + if (strlen(fmt) >= NAMEDATALEN) + elog(ERROR, "fmt name \"%s\" exceeds maximum name length " + "of %d bytes", fmt, NAMEDATALEN - 1); + + len = strlen(fmt); + lcf = asc_tolower(fmt, len); + len = strlen(lcf); + + memcpy(&(NameStr(fmtname)), lcf, len); + NameStr(fmtname)[len] = '\0'; + pfree(lcf); + + return fmtname; +} + +CopyRoutine * +GetCopyRoutine(Oid copyhandler) +{ + Datum datum; + CopyRoutine *routine; + + datum = OidFunctionCall0(copyhandler); + routine = (CopyRoutine *) DatumGetPointer(datum); + + if (routine == NULL || !IsA(routine, CopyRoutine)) + elog(ERROR, "copy handler function %u did not return an CopyRoutine struct", + copyhandler); + + return routine; +} + +CopyRoutine * +GetCopyRoutineByName(char *fmt) +{ + HeapTuple tuple; + NameData fmtname; + Relation chrel; + ScanKeyData scankey; + SysScanDesc scan; + Form_pg_copy_handler chform; + regproc copyhandler; + + fmtname = fmt_to_name(fmt); + + chrel = table_open(CopyHandlerRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_copy_handler_chname, + BTEqualStrategyNumber, F_NAMEEQ, + NameGetDatum(&fmtname)); + + scan = systable_beginscan(chrel, CopyHandlerNameIndexId, true, + NULL, 1, &scankey); + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY format \"%s\" not recognized", fmt))); + + chform = (Form_pg_copy_handler)GETSTRUCT(tuple); + + copyhandler = chform->copyhandler; + + /* Complain if handler OID is invalid */ + if (!RegProcedureIsValid(copyhandler)) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("index access method \"%s\" does not have a handler", + NameStr(chform->chname)))); + } + + systable_endscan(scan); + table_close(chrel, AccessShareLock); + + /* And finally, call the handler function to get the API struct. */ + return GetCopyRoutine(copyhandler); +} diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index f4861652a9569..c2d92108c1eda 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -107,83 +107,88 @@ static char *limit_printout_length(const char *str); static void ClosePipeFromProgram(CopyFromState cstate); -/* - * error context callback for COPY FROM - * - * The argument for the error context must be CopyFromState. - */ void -CopyFromErrorCallback(void *arg) +CopyFromFormatBinaryErrorCallback(CopyFromState cstate) { - CopyFromState cstate = (CopyFromState) arg; + /* can't usefully display the data */ + if (cstate->cur_attname) + errcontext("COPY %s, line %llu, column %s", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname); + else + errcontext("COPY %s, line %llu", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno); +} - if (cstate->relname_only) +void +CopyFromFormatTextErrorCallback(CopyFromState cstate) +{ + if (cstate->cur_attname && cstate->cur_attval) { - errcontext("COPY %s", - cstate->cur_relname); - return; + /* error is relevant to a particular column */ + char *attval; + + attval = limit_printout_length(cstate->cur_attval); + errcontext("COPY %s, line %llu, column %s: \"%s\"", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval); + pfree(attval); } - if (cstate->opts.binary) + else if (cstate->cur_attname) { - /* can't usefully display the data */ - if (cstate->cur_attname) - errcontext("COPY %s, line %llu, column %s", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname); - else - errcontext("COPY %s, line %llu", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno); + /* error is relevant to a particular column, value is NULL */ + errcontext("COPY %s, line %llu, column %s: null input", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname); } else { - if (cstate->cur_attname && cstate->cur_attval) + /* + * Error is relevant to a particular line. + * + * If line_buf still contains the correct line, print it. + */ + if (cstate->line_buf_valid) { - /* error is relevant to a particular column */ - char *attval; + char *lineval; - attval = limit_printout_length(cstate->cur_attval); - errcontext("COPY %s, line %llu, column %s: \"%s\"", + lineval = limit_printout_length(cstate->line_buf.data); + errcontext("COPY %s, line %llu: \"%s\"", cstate->cur_relname, - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval); - pfree(attval); + (unsigned long long) cstate->cur_lineno, lineval); + pfree(lineval); } - else if (cstate->cur_attname) + else { - /* error is relevant to a particular column, value is NULL */ - errcontext("COPY %s, line %llu, column %s: null input", + errcontext("COPY %s, line %llu", cstate->cur_relname, - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname); + (unsigned long long) cstate->cur_lineno); } - else - { - /* - * Error is relevant to a particular line. - * - * If line_buf still contains the correct line, print it. - */ - if (cstate->line_buf_valid) - { - char *lineval; + } +} - lineval = limit_printout_length(cstate->line_buf.data); - errcontext("COPY %s, line %llu: \"%s\"", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno, lineval); - pfree(lineval); - } - else - { - errcontext("COPY %s, line %llu", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno); - } - } +/* + * error context callback for COPY FROM + * + * The argument for the error context must be CopyFromState. + */ +void +CopyFromErrorCallback(void *arg) +{ + CopyFromState cstate = (CopyFromState) arg; + + if (cstate->relname_only) + { + errcontext("COPY %s", + cstate->cur_relname); + return; } + cstate->opts.handler->from_error_callback(cstate); } /* @@ -1320,6 +1325,99 @@ CopyFrom(CopyFromState cstate) return processed; } +void +CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + AttrNumber num_phys_attrs; + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function). + */ + num_phys_attrs = tupDesc->natts; + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + + for (int attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + getTypeBinaryInputInfo(att->atttypid, + &in_func_oid, &typioparams[attnum - 1]); + + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } + cstate->in_functions = in_functions; + cstate->typioparams = typioparams; +} + +void +CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc) +{ + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + AttrNumber attr_count, + num_phys_attrs; + + num_phys_attrs = tupDesc->natts; + + /* + * If encoding conversion is needed, we need another buffer to hold + * the converted input data. Otherwise, we can just point input_buf + * to the same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* create workspace for CopyReadAttributes results */ + attr_count = list_length(cstate->attnumlist); + + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function). + */ + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + + for (int attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + getTypeInputInfo(att->atttypid, + &in_func_oid, &typioparams[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } + cstate->in_functions = in_functions; + cstate->typioparams = typioparams; +} + /* * Setup to read tuples from a file for COPY FROM. * @@ -1348,9 +1446,6 @@ BeginCopyFrom(ParseState *pstate, TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; - FmgrInfo *in_functions; - Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1518,25 +1613,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1546,17 +1622,14 @@ BeginCopyFrom(ParseState *pstate, cstate->rteperminfos = pstate->p_rteperminfos; } - num_defaults = 0; - volatile_defexprs = false; + cstate->opts.handler->from_start(cstate, tupDesc); /* - * Pick up the required catalog information for each attribute in the - * relation, including the input function, the element type (to pass to - * the input function), and info about defaults and constraints. (Which + * Pick up info about defaults and constraints. (Which * input function we use depends on text/binary format choice.) */ - in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + num_defaults = 0; + volatile_defexprs = false; defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); @@ -1568,15 +1641,6 @@ BeginCopyFrom(ParseState *pstate, if (att->attisdropped) continue; - /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1636,8 +1700,6 @@ BeginCopyFrom(ParseState *pstate, cstate->bytes_processed = 0; /* We keep those variables in cstate. */ - cstate->in_functions = in_functions; - cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; @@ -1716,15 +1778,6 @@ BeginCopyFrom(ParseState *pstate, ReceiveCopyBinaryHeader(cstate); } - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) - { - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } - MemoryContextSwitchTo(oldcontext); return cstate; diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index f553734582f5e..bbe5bd1166f1c 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -839,187 +839,208 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } -/* - * Read next tuple from file for COPY FROM. Return false if no more tuples. - * - * 'econtext' is used to evaluate default expression for each column that is - * either not read from the file or is using the DEFAULT option of COPY FROM. - * It can be NULL when no default values are used, i.e. when all columns are - * read from the file, and DEFAULT option is unset. - * - * 'values' and 'nulls' arrays must be the same length as columns of the - * relation passed to BeginCopyFrom. This function fills the arrays. - */ bool -NextCopyFrom(CopyFromState cstate, ExprContext *econtext, - Datum *values, bool *nulls) +CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) { TupleDesc tupDesc; - AttrNumber num_phys_attrs, - attr_count, - num_defaults = cstate->num_defaults; + AttrNumber attr_count; + int16 fld_count; + ListCell *cur; FmgrInfo *in_functions = cstate->in_functions; Oid *typioparams = cstate->typioparams; - int i; - int *defmap = cstate->defmap; - ExprState **defexprs = cstate->defexprs; - tupDesc = RelationGetDescr(cstate->rel); - num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); - /* Initialize all values for row to NULL */ - MemSet(values, 0, num_phys_attrs * sizeof(Datum)); - MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); + cstate->cur_lineno++; - if (!cstate->opts.binary) + if (!CopyGetInt16(cstate, &fld_count)) { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and + * complain if it doesn't come immediately. In COPY FROM STDIN, + * this ensures that we correctly handle CopyFail, if client + * chooses to send that now. When copying from file, we could + * ignore the rest of the file like in text mode, but we choose to + * be consistent with the COPY FROM STDIN case. + */ + char dummy; - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + errmsg("received copy data after EOF marker"))); + return false; + } - fieldno = 0; + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); + tupDesc = RelationGetDescr(cstate->rel); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; + return true; +} - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } +bool +CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno = 0; + char *string; - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; + tupDesc = RelationGetDescr(cstate->rel); + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); - if (string != NULL) - nulls[m] = false; + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; - if (cstate->defaults[m]) + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (cstate->opts.csv_mode) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) { /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. + * FORCE_NOT_NULL option is set and column is NULL - + * convert it to the NULL string. */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the + * string would already have been set to NULL. Convert it + * to NULL as specified. + */ + string = NULL; } - else - values[m] = InputFunctionCall(&in_functions[m], - string, - typioparams[m], - att->atttypmod); - - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; } - Assert(fieldno == attr_count); - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; - cstate->cur_lineno++; + if (string != NULL) + nulls[m] = false; - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) + if (cstate->defaults[m]) { /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. */ - char dummy; + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); } + else + values[m] = InputFunctionCall(&in_functions[m], + string, + typioparams[m], + att->atttypmod); - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } + Assert(fieldno == attr_count); + return true; +} + +/* + * Read next tuple from file for COPY FROM. Return false if no more tuples. + * + * 'econtext' is used to evaluate default expression for each column that is + * either not read from the file or is using the DEFAULT option of COPY FROM. + * It can be NULL when no default values are used, i.e. when all columns are + * read from the file, and DEFAULT option is unset. + * + * 'values' and 'nulls' arrays must be the same length as columns of the + * relation passed to BeginCopyFrom. This function fills the arrays. + */ +bool +NextCopyFrom(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber num_phys_attrs, + num_defaults = cstate->num_defaults; + int i; + int *defmap = cstate->defmap; + ExprState **defexprs = cstate->defexprs; + + tupDesc = RelationGetDescr(cstate->rel); + num_phys_attrs = tupDesc->natts; + + /* Initialize all values for row to NULL */ + MemSet(values, 0, num_phys_attrs * sizeof(Datum)); + MemSet(nulls, true, num_phys_attrs * sizeof(bool)); + MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); + + if (!cstate->opts.handler->from_next(cstate, econtext, values, nulls)) + { + return false; } /* diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index c66a047c4a79c..f906a6cf7f1fc 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -131,6 +131,205 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyHandlerOps implementation of COPY TO for "text" and "csv". + * CopyToFormatText*() refer cstate->opts.csv_mode and change their behavior. + * We can split this implementation and stop referring cstate->opts.csv_mode + * later. + */ + +static void +CopyToFormatTextSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + CopySendEndOfRow(cstate); +} + +void +CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* + * For non-binary copy, we need to convert null_print to file + * encoding, because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false, + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToFormatTextSendEndOfRow(cstate); + } +} + +void +CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + CopySendString(cstate, cstate->opts.null_print_client); + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], value); + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1], + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToFormatTextSendEndOfRow(cstate); +} + +void +CopyToFormatTextEnd(CopyToState cstate) +{ +} + +/* + * CopyHandlerOps implementation for "binary" COPY TO. + */ + +void +CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* Generate header for a binary copy */ + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + CopySendInt32(cstate, 0); + /* No header extension */ + CopySendInt32(cstate, 0); +} + +void +CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + CopySendInt32(cstate, -1); + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +void +CopyToFormatBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} /* * Send copy start/stop messages for frontend copies. These have changed @@ -198,16 +397,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -242,10 +431,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -748,8 +933,6 @@ DoCopyTo(CopyToState cstate) bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; - int num_phys_attrs; - ListCell *cur; uint64 processed; if (fe_copy) @@ -759,32 +942,11 @@ DoCopyTo(CopyToState cstate) tupDesc = RelationGetDescr(cstate->rel); else tupDesc = cstate->queryDesc->tupDesc; - num_phys_attrs = tupDesc->natts; cstate->opts.null_print_client = cstate->opts.null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); - /* Get info about the columns we need to process. */ - cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (cstate->opts.binary) - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - else - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside @@ -795,57 +957,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false, - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->opts.handler->to_start(cstate, tupDesc); if (cstate->rel) { @@ -884,13 +996,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } + cstate->opts.handler->to_end(cstate); MemoryContextDelete(cstate->rowcontext); @@ -906,71 +1012,15 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - bool need_delim = false; - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; - ListCell *cur; - char *string; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - - if (!cstate->opts.binary) - { - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - } - - if (isnull) - { - if (!cstate->opts.binary) - CopySendString(cstate, cstate->opts.null_print_client); - else - CopySendInt32(cstate, -1); - } - else - { - if (!cstate->opts.binary) - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1], - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, string); - } - else - { - bytea *outputbytes; - - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->opts.handler->to_one_row(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile index ebbe9052cb766..e64e121c01be0 100644 --- a/src/backend/nodes/Makefile +++ b/src/backend/nodes/Makefile @@ -50,6 +50,7 @@ node_headers = \ access/sdir.h \ access/tableam.h \ access/tsmapi.h \ + commands/copy.h \ commands/event_trigger.h \ commands/trigger.h \ executor/tuptable.h \ diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl index 72c79635781ed..237ac42742470 100644 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -61,6 +61,7 @@ sub elem access/sdir.h access/tableam.h access/tsmapi.h + commands/copy.h commands/event_trigger.h commands/trigger.h executor/tuptable.h @@ -85,6 +86,7 @@ sub elem access/sdir.h access/tableam.h access/tsmapi.h + commands/copy.h commands/event_trigger.h commands/trigger.h executor/tuptable.h diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index 3ba8cb192ca26..8f12927a4d4a9 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -378,3 +378,4 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement); PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray); PSEUDOTYPE_DUMMY_IO_FUNCS(anycompatible); PSEUDOTYPE_DUMMY_IO_FUNCS(anycompatiblenonarray); +PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler); diff --git a/src/include/catalog/meson.build b/src/include/catalog/meson.build index dcb3c5f7666c5..d9aa3f14ba1d6 100644 --- a/src/include/catalog/meson.build +++ b/src/include/catalog/meson.build @@ -69,6 +69,7 @@ catalog_headers = [ 'pg_publication_rel.h', 'pg_subscription.h', 'pg_subscription_rel.h', + 'pg_copy_handler.h', ] # The .dat files we need can just be listed alphabetically. @@ -97,6 +98,7 @@ bki_data = [ 'pg_ts_parser.dat', 'pg_ts_template.dat', 'pg_type.dat', + 'pg_copy_handler.dat', ] bki_data_f = files(bki_data) diff --git a/src/include/catalog/pg_copy_handler.dat b/src/include/catalog/pg_copy_handler.dat new file mode 100644 index 0000000000000..052329cb5338d --- /dev/null +++ b/src/include/catalog/pg_copy_handler.dat @@ -0,0 +1,25 @@ +#---------------------------------------------------------------------- +# +# pg_copy_handler.dat +# Initial contents of the pg_copy_handler system catalog. +# +# Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group +# Portions Copyright (c) 1994, Regents of the University of California +# +# src/include/catalog/pg_copy_handler.dat +# +#---------------------------------------------------------------------- + +[ + +{ oid => '4554', oid_symbol => 'TEXT_COPY_HANDLER_OID', + descr => 'text copy handler', + chname => 'text', copyhandler => 'text_copy_handler'}, +{ oid => '4555', oid_symbol => 'CSV_COPY_HANDLER_OID', + descr => 'csv copy handler', + chname => 'csv', copyhandler => 'csv_copy_handler'}, +{ oid => '4556', oid_symbol => 'BINARY_COPY_HANDLER_OID', + descr => 'binary copy handler', + chname => 'binary', copyhandler => 'binary_copy_handler'}, + +] diff --git a/src/include/catalog/pg_copy_handler.h b/src/include/catalog/pg_copy_handler.h new file mode 100644 index 0000000000000..74ad06f4d61d3 --- /dev/null +++ b/src/include/catalog/pg_copy_handler.h @@ -0,0 +1,50 @@ +/*------------------------------------------------------------------------- + * + * pg_copy_handler.h + * definition of the "copy handler" system catalog (pg_copy_handler) + * + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_copy_handler.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_COPY_HANDLER_H +#define PG_COPY_HANDLER_H + +#include "catalog/genbki.h" +#include "catalog/pg_copy_handler_d.h" + +/* ---------------- + * pg_copy_handler definition. cpp turns this into + * typedef struct FormData_pg_copy_handler + * ---------------- + */ +CATALOG(pg_copy_handler,4551,CopyHandlerRelationId) +{ + Oid oid; /* oid */ + + /* copy handler name */ + NameData chname; + + /* handler function */ + regproc copyhandler BKI_LOOKUP(pg_proc); +} FormData_pg_copy_handler; + +/* ---------------- + * Form_pg_copy_handler corresponds to a pointer to a tuple with + * the format of pg_copy_handler relation. + * ---------------- + */ +typedef FormData_pg_copy_handler *Form_pg_copy_handler; + +DECLARE_UNIQUE_INDEX(pg_copy_handler_name_index, 4552, CopyHandlerNameIndexId, pg_copy_handler, btree(chname name_ops)); +DECLARE_UNIQUE_INDEX_PKEY(pg_copy_handler_oid_index, 4553, CopyHandlerOidIndexId, pg_copy_handler, btree(oid oid_ops)); + +#endif /* PG_COPY_HANDLER_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index fb58dee3bcdf6..8b9b60c90fd9e 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -948,6 +948,20 @@ prorettype => 'void', proargtypes => 'regclass int8', prosrc => 'brin_desummarize_range' }, +# Copy handlers +{ oid => '4557', descr => 'text copy handler', + proname => 'text_copy_handler', provolatile => 'v', + prorettype => 'copy_handler', proargtypes => 'internal', + prosrc => 'text_copy_handler' }, +{ oid => '4558', descr => 'csv copy handler', + proname => 'csv_copy_handler', provolatile => 'v', + prorettype => 'copy_handler', proargtypes => 'internal', + prosrc => 'csv_copy_handler' }, +{ oid => '4559', descr => 'binary copy handler', + proname => 'binary_copy_handler', provolatile => 'v', + prorettype => 'copy_handler', proargtypes => 'internal', + prosrc => 'binary_copy_handler' }, + { oid => '338', descr => 'validate an operator class', proname => 'amvalidate', provolatile => 'v', prorettype => 'bool', proargtypes => 'oid', prosrc => 'amvalidate' }, @@ -7609,6 +7623,13 @@ { oid => '268', descr => 'I/O', proname => 'table_am_handler_out', prorettype => 'cstring', proargtypes => 'table_am_handler', prosrc => 'table_am_handler_out' }, +{ oid => '388', descr => 'I/O', + proname => 'copy_handler_in', proisstrict => 'f', + prorettype => 'copy_handler', proargtypes => 'cstring', + prosrc => 'copy_handler_in' }, +{ oid => '389', descr => 'I/O', + proname => 'copy_handler_out', prorettype => 'cstring', + proargtypes => 'copy_handler', prosrc => 'copy_handler_out' }, { oid => '5086', descr => 'I/O', proname => 'anycompatible_in', prorettype => 'anycompatible', proargtypes => 'cstring', prosrc => 'anycompatible_in' }, diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index f6110a850d495..aa6d731cb5c23 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -639,6 +639,13 @@ typcategory => 'P', typinput => 'table_am_handler_in', typoutput => 'table_am_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, +{ oid => '3814', + typname => 'copy_handler', + descr => 'pseudo-type for the result of a copy handler', + typlen => '4', typbyval => 't', typtype => 'p', + typcategory => 'P', typinput => 'copy_handler_in', + typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', + typalign => 'i' }, { oid => '3831', descr => 'pseudo-type representing a range over a polymorphic base type', typname => 'anyrange', typlen => '-1', typbyval => 'f', typtype => 'p', diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f2cca0b90b494..c2af6f2aff3cb 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -30,6 +30,33 @@ typedef enum CopyHeaderChoice COPY_HEADER_MATCH, } CopyHeaderChoice; +#define DEFAULT_COPY_HANDLER "text" + +/* These are private in commands/copy[from|to].c */ +typedef struct CopyFromStateData *CopyFromState; +typedef struct CopyToStateData *CopyToState; + +/* Routines for a COPY HANDLER implementation. */ +typedef struct CopyRoutine +{ + /* this must be set to T_CopyRoutine */ + NodeTag type; + + /* Called when COPY TO is started. This will send a header. */ + void (*to_start) (CopyToState cstate, TupleDesc tupDesc); + + /* Copy one row for COPY TO. */ + void (*to_one_row) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO is ended. This will send a trailer. */ + void (*to_end) (CopyToState cstate); + + void (*from_start) (CopyFromState cstate, TupleDesc tupDesc); + bool (*from_next) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + void (*from_error_callback) (CopyFromState cstate); +} CopyRoutine; + /* * A struct to hold COPY options, in a parsed form. All of these are related * to formatting, except for 'freeze', which doesn't really belong here, but @@ -63,12 +90,9 @@ typedef struct CopyFormatOptions bool *force_null_flags; /* per-column CSV FN flags */ bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ + CopyRoutine *handler; /* copy handler routines */ } CopyFormatOptions; -/* These are private in commands/copy[from|to].c */ -typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; - typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); typedef void (*copy_data_dest_cb) (void *data, int len); @@ -91,6 +115,9 @@ extern uint64 CopyFrom(CopyFromState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern CopyRoutine *GetCopyRoutine(Oid copyhandler); +extern CopyRoutine *GetCopyRoutineByName(char *fmt); + /* * internal prototypes */ @@ -102,4 +129,20 @@ extern uint64 DoCopyTo(CopyToState cstate); extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); +extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc); +extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot); +extern void CopyToFormatTextEnd(CopyToState cstate); +extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc); +extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern void CopyFromFormatTextErrorCallback(CopyFromState cstate); + +extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc); +extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot); +extern void CopyToFormatBinaryEnd(CopyToState cstate); +extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc); +extern bool CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate); + #endif /* COPY_H */ diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb899be3e9..a1ab73dc37265 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -266,3 +266,4 @@ NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid} +NOTICE: checking pg_copy_handler {copyhandler} => pg_proc {oid}