forked from huawei/openGauss-server
发布订阅支持以二进制格式发送数据
This commit is contained in:
parent
588299a50a
commit
71f53e575c
|
@ -233,6 +233,7 @@ char* all_data_nodename_list = NULL;
|
|||
const uint32 USTORE_UPGRADE_VERSION = 92368;
|
||||
const uint32 PACKAGE_ENHANCEMENT = 92444;
|
||||
const uint32 SUBSCRIPTION_VERSION = 92580;
|
||||
const uint32 SUBSCRIPTION_BINARY_VERSION_NUM = 92607;
|
||||
|
||||
#ifdef DUMPSYSLOG
|
||||
char* syslogpath = NULL;
|
||||
|
@ -4444,7 +4445,9 @@ void getSubscriptions(Archive *fout)
|
|||
int i_subslotname;
|
||||
int i_subsynccommit;
|
||||
int i_subpublications;
|
||||
int i, ntups;
|
||||
int i_subbinary;
|
||||
int i;
|
||||
int ntups;
|
||||
|
||||
if (no_subscriptions || GetVersionNum(fout) < SUBSCRIPTION_VERSION) {
|
||||
return;
|
||||
|
@ -4469,14 +4472,20 @@ void getSubscriptions(Archive *fout)
|
|||
resetPQExpBuffer(query);
|
||||
|
||||
/* Get the subscriptions in current database. */
|
||||
appendPQExpBuffer(query,
|
||||
"SELECT s.tableoid, s.oid, s.subname,"
|
||||
"(%s s.subowner) AS rolname, "
|
||||
" s.subconninfo, s.subslotname, s.subsynccommit, s.subpublications "
|
||||
"FROM pg_catalog.pg_subscription s "
|
||||
appendPQExpBuffer(query, "SELECT s.tableoid, s.oid, s.subname,"
|
||||
"(%s s.subowner) AS rolname, s.subconninfo, s.subslotname, "
|
||||
"s.subsynccommit, s.subpublications, \n", username_subquery);
|
||||
|
||||
if (GetVersionNum(fout) >= SUBSCRIPTION_BINARY_VERSION_NUM) {
|
||||
appendPQExpBuffer(query, " s.subbinary\n");
|
||||
} else {
|
||||
appendPQExpBuffer(query, " false AS subbinary\n");
|
||||
}
|
||||
|
||||
appendPQExpBuffer(query, "FROM pg_catalog.pg_subscription s "
|
||||
"WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
|
||||
" WHERE datname = current_database())",
|
||||
username_subquery);
|
||||
" WHERE datname = current_database())");
|
||||
|
||||
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
|
||||
|
||||
ntups = PQntuples(res);
|
||||
|
@ -4494,6 +4503,7 @@ void getSubscriptions(Archive *fout)
|
|||
i_subslotname = PQfnumber(res, "subslotname");
|
||||
i_subsynccommit = PQfnumber(res, "subsynccommit");
|
||||
i_subpublications = PQfnumber(res, "subpublications");
|
||||
i_subbinary = PQfnumber(res, "subbinary");
|
||||
|
||||
subinfo = (SubscriptionInfo *)pg_malloc(ntups * sizeof(SubscriptionInfo));
|
||||
|
||||
|
@ -4512,6 +4522,7 @@ void getSubscriptions(Archive *fout)
|
|||
}
|
||||
subinfo[i].subsynccommit = gs_strdup(PQgetvalue(res, i, i_subsynccommit));
|
||||
subinfo[i].subpublications = gs_strdup(PQgetvalue(res, i, i_subpublications));
|
||||
subinfo[i].subbinary = gs_strdup(PQgetvalue(res, i, i_subbinary));
|
||||
|
||||
if (strlen(subinfo[i].rolname) == 0) {
|
||||
write_msg(NULL, "WARNING: owner of subscription \"%s\" appears to be invalid\n", subinfo[i].dobj.name);
|
||||
|
@ -4578,6 +4589,10 @@ static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
|
|||
appendPQExpBufferStr(query, "NONE");
|
||||
}
|
||||
|
||||
if (strcmp(subinfo->subbinary, "t") == 0) {
|
||||
appendPQExpBuffer(query, ", binary = true");
|
||||
}
|
||||
|
||||
if (strcmp(subinfo->subsynccommit, "off") != 0) {
|
||||
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
|
||||
}
|
||||
|
|
|
@ -498,6 +498,7 @@ typedef struct _SubscriptionInfo {
|
|||
char *subslotname;
|
||||
char *subsynccommit;
|
||||
char *subpublications;
|
||||
char *subbinary;
|
||||
} SubscriptionInfo;
|
||||
|
||||
/* global decls */
|
||||
|
|
|
@ -91,6 +91,10 @@ Subscription *GetSubscription(Oid subid, bool missing_ok)
|
|||
}
|
||||
sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
|
||||
|
||||
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subbinary, &isnull);
|
||||
Assert(!isnull);
|
||||
sub->binary = DatumGetBool(datum);
|
||||
|
||||
ReleaseSysCache(tup);
|
||||
|
||||
return sub;
|
||||
|
|
|
@ -59,7 +59,7 @@ bool open_join_children = true;
|
|||
bool will_shutdown = false;
|
||||
|
||||
/* hard-wired binary version number */
|
||||
const uint32 GRAND_VERSION_NUM = 92606;
|
||||
const uint32 GRAND_VERSION_NUM = 92607;
|
||||
|
||||
const uint32 PREDPUSH_SAME_LEVEL_VERSION_NUM = 92522;
|
||||
const uint32 UPSERT_WHERE_VERSION_NUM = 92514;
|
||||
|
@ -101,6 +101,7 @@ const uint32 PRIVS_DIRECTORY_VERSION_NUM = 92460;
|
|||
const uint32 COMMENT_RECORD_PARAM_VERSION_NUM = 92484;
|
||||
const uint32 SCAN_BATCH_MODE_VERSION_NUM = 92568;
|
||||
const uint32 PUBLICATION_VERSION_NUM = 92580;
|
||||
const uint32 SUBSCRIPTION_BINARY_VERSION_NUM = 92607;
|
||||
|
||||
/* Version number of the guc parameter backend_version added in V500R001C20 */
|
||||
const uint32 V5R1C20_BACKEND_VERSION_NUM = 92305;
|
||||
|
|
|
@ -56,7 +56,7 @@ static void ValidateReplicationSlot(char *slotname, List *publications);
|
|||
* accommodate that.
|
||||
*/
|
||||
static void parse_subscription_options(const List *options, char **conninfo, List **publications, bool *enabled_given,
|
||||
bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit)
|
||||
bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit, bool *binary_given, bool *binary)
|
||||
{
|
||||
ListCell *lc;
|
||||
|
||||
|
@ -76,6 +76,10 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis
|
|||
if (synchronous_commit) {
|
||||
*synchronous_commit = NULL;
|
||||
}
|
||||
if (binary) {
|
||||
*binary_given = false;
|
||||
*binary = false;
|
||||
}
|
||||
|
||||
/* Parse options */
|
||||
foreach (lc, options) {
|
||||
|
@ -124,6 +128,15 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis
|
|||
/* Test if the given value is valid for synchronous_commit GUC. */
|
||||
(void)set_config_option("synchronous_commit", *synchronous_commit, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
|
||||
false, 0, false);
|
||||
} else if (strcmp(defel->defname, "binary") == 0 && binary) {
|
||||
if (*binary_given) {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
}
|
||||
|
||||
*binary_given = true;
|
||||
*binary = defGetBoolean(defel);
|
||||
} else {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname)));
|
||||
|
@ -296,6 +309,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
|
|||
char *conninfo;
|
||||
char *slotname;
|
||||
bool slotname_given;
|
||||
bool binary;
|
||||
bool binary_given;
|
||||
char originname[NAMEDATALEN];
|
||||
List *publications;
|
||||
int rc;
|
||||
|
@ -305,7 +320,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
|
|||
* Connection and publication should not be specified here.
|
||||
*/
|
||||
parse_subscription_options(stmt->options, NULL, NULL, &enabled_given, &enabled, &slotname_given, &slotname,
|
||||
&synchronous_commit);
|
||||
&synchronous_commit, &binary_given, &binary);
|
||||
|
||||
/*
|
||||
* Since creating a replication slot is not transactional, rolling back
|
||||
|
@ -349,6 +364,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
|
|||
values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
|
||||
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
|
||||
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
|
||||
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
|
||||
|
||||
/* encrypt conninfo */
|
||||
List *conninfoList = ConninfoToDefList(stmt->conninfo);
|
||||
|
@ -439,6 +455,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
|
|||
Oid subid;
|
||||
bool enabled_given = false;
|
||||
bool enabled;
|
||||
bool binary_given;
|
||||
bool binary;
|
||||
char *synchronous_commit;
|
||||
char *conninfo;
|
||||
char *slot_name;
|
||||
|
@ -473,7 +491,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
|
|||
|
||||
/* Parse options. */
|
||||
parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given,
|
||||
&slot_name, &synchronous_commit);
|
||||
&slot_name, &synchronous_commit, &binary_given, &binary);
|
||||
|
||||
/* Form a new tuple. */
|
||||
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
|
||||
|
@ -548,6 +566,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
|
|||
values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit);
|
||||
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
|
||||
}
|
||||
if (binary_given) {
|
||||
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
|
||||
replaces[Anum_pg_subscription_subbinary - 1] = true;
|
||||
}
|
||||
if (publications != NIL) {
|
||||
values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications);
|
||||
replaces[Anum_pg_subscription_subpublications - 1] = true;
|
||||
|
|
|
@ -255,8 +255,14 @@ void StartRemoteStreaming(const LibpqrcvConnectParam *options)
|
|||
stringlist_to_identifierstr(t_thrd.libwalreceiver_cxt.streamConn, options->publicationNames);
|
||||
appendStringInfo(&cmd, ", publication_names %s",
|
||||
PQescapeLiteral(t_thrd.libwalreceiver_cxt.streamConn, pubnames_str, strlen(pubnames_str)));
|
||||
appendStringInfoChar(&cmd, ')');
|
||||
pfree(pubnames_str);
|
||||
|
||||
if (options->binary && PQserverVersion(t_thrd.libwalreceiver_cxt.streamConn) >= 90204) {
|
||||
appendStringInfoString(&cmd, ", binary 'true'");
|
||||
ereport(DEBUG5, ( errmsg("append binary true")));
|
||||
}
|
||||
|
||||
appendStringInfoChar(&cmd, ')');
|
||||
}
|
||||
|
||||
PGresult *res = libpqrcv_PQexec(cmd.data);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "replication/logicalproto.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
|
@ -28,7 +27,7 @@
|
|||
static const int LOGICALREP_IS_REPLICA_IDENTITY = 1;
|
||||
|
||||
static void logicalrep_write_attrs(StringInfo out, Relation rel);
|
||||
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple);
|
||||
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary);
|
||||
|
||||
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
|
||||
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
|
||||
|
@ -115,7 +114,7 @@ void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr orig
|
|||
/*
|
||||
* Write INSERT to the output stream.
|
||||
*/
|
||||
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
|
||||
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
|
||||
{
|
||||
pq_sendbyte(out, 'I'); /* action INSERT */
|
||||
|
||||
|
@ -123,7 +122,7 @@ void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
|
|||
pq_sendint32(out, RelationGetRelid(rel));
|
||||
|
||||
pq_sendbyte(out, 'N'); /* new tuple follows */
|
||||
logicalrep_write_tuple(out, rel, newtuple);
|
||||
logicalrep_write_tuple(out, rel, newtuple, binary);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -151,7 +150,7 @@ LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtu
|
|||
/*
|
||||
* Write UPDATE to the output stream.
|
||||
*/
|
||||
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
|
||||
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
|
||||
{
|
||||
pq_sendbyte(out, 'U'); /* action UPDATE */
|
||||
|
||||
|
@ -166,11 +165,11 @@ void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, H
|
|||
pq_sendbyte(out, 'O'); /* old tuple follows */
|
||||
else
|
||||
pq_sendbyte(out, 'K'); /* old key follows */
|
||||
logicalrep_write_tuple(out, rel, oldtuple);
|
||||
logicalrep_write_tuple(out, rel, oldtuple, binary);
|
||||
}
|
||||
|
||||
pq_sendbyte(out, 'N'); /* new tuple follows */
|
||||
logicalrep_write_tuple(out, rel, newtuple);
|
||||
logicalrep_write_tuple(out, rel, newtuple, binary);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -213,7 +212,7 @@ LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, Logica
|
|||
/*
|
||||
* Write DELETE to the output stream.
|
||||
*/
|
||||
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
|
||||
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
|
||||
{
|
||||
char relreplident = RelationGetRelReplident(rel);
|
||||
Assert(relreplident == REPLICA_IDENTITY_DEFAULT ||
|
||||
|
@ -229,7 +228,7 @@ void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
|
|||
else
|
||||
pq_sendbyte(out, 'K'); /* old key follows */
|
||||
|
||||
logicalrep_write_tuple(out, rel, oldtuple);
|
||||
logicalrep_write_tuple(out, rel, oldtuple, binary);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -344,7 +343,7 @@ void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
|
|||
/*
|
||||
* Write a tuple to the outputstream, in the most efficient format possible.
|
||||
*/
|
||||
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
|
||||
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
|
||||
{
|
||||
TupleDesc desc;
|
||||
Datum values[MaxTupleAttributeNumber];
|
||||
|
@ -371,7 +370,6 @@ static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple
|
|||
HeapTuple typtup;
|
||||
Form_pg_type typclass;
|
||||
Form_pg_attribute att = desc->attrs[i];
|
||||
char *outputstr;
|
||||
|
||||
/* skip dropped columns */
|
||||
if (att->attisdropped || GetGeneratedCol(desc, i)) {
|
||||
|
@ -379,7 +377,7 @@ static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple
|
|||
}
|
||||
|
||||
if (isnull[i]) {
|
||||
pq_sendbyte(out, 'n'); /* null column */
|
||||
pq_sendbyte(out, LOGICALREP_COLUMN_NULL); /* null column */
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -388,61 +386,93 @@ static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple
|
|||
elog(ERROR, "cache lookup failed for type %u", att->atttypid);
|
||||
typclass = (Form_pg_type)GETSTRUCT(typtup);
|
||||
|
||||
pq_sendbyte(out, 't'); /* 'text' data follows */
|
||||
if (!typclass->typbyval && typclass->typlen == -1) {
|
||||
/* definitely detoasted Datum */
|
||||
Datum val = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
|
||||
outputstr = OidOutputFunctionCall(typclass->typoutput, val);
|
||||
/*
|
||||
* Send in binary if requested and type has suitable send function.
|
||||
*/
|
||||
if (binary && OidIsValid(typclass->typsend)) {
|
||||
bytea* outputbytes = NULL;
|
||||
pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
|
||||
if (!typclass->typbyval && typclass->typlen == -1) {
|
||||
/* definitely detoasted Datum */
|
||||
Datum val = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
|
||||
outputbytes = OidSendFunctionCall(typclass->typsend, val);
|
||||
} else {
|
||||
outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
|
||||
}
|
||||
int len = VARSIZE(outputbytes) - VARHDRSZ;
|
||||
pq_sendint(out, len, 4); /* length */
|
||||
pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
|
||||
if (outputbytes != NULL) {
|
||||
pfree(outputbytes);
|
||||
}
|
||||
} else {
|
||||
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
|
||||
char *outputstr;
|
||||
pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
|
||||
if (!typclass->typbyval && typclass->typlen == -1) {
|
||||
/* definitely detoasted Datum */
|
||||
Datum val = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
|
||||
outputstr = OidOutputFunctionCall(typclass->typoutput, val);
|
||||
} else {
|
||||
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
|
||||
}
|
||||
pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
|
||||
if (outputstr != NULL) {
|
||||
pfree(outputstr);
|
||||
}
|
||||
}
|
||||
pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
|
||||
pfree(outputstr);
|
||||
ReleaseSysCache(typtup);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read tuple in remote format from stream.
|
||||
*
|
||||
* The returned tuple points into the input stringinfo.
|
||||
* Read tuple in logical replication format from stream.
|
||||
*/
|
||||
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
|
||||
{
|
||||
uint16 i;
|
||||
uint16 natts;
|
||||
int rc;
|
||||
|
||||
/* Get number of attributes. */
|
||||
natts = pq_getmsgint(in, sizeof(uint16));
|
||||
uint16 natts = pq_getmsgint(in, sizeof(uint16));
|
||||
|
||||
rc = memset_s(tuple->changed, sizeof(tuple->changed), 0, sizeof(tuple->changed));
|
||||
securec_check(rc, "", "");
|
||||
/* Allocate space for per-column values; zero out unused StringInfoDatas */
|
||||
tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
|
||||
tuple->colstatus = (char *) palloc(natts * sizeof(char));
|
||||
tuple->ncols = natts;
|
||||
|
||||
/* Read the data */
|
||||
for (i = 0; i < natts; i++) {
|
||||
char kind;
|
||||
|
||||
kind = pq_getmsgbyte(in);
|
||||
for (uint16 i = 0; i < natts; i++) {
|
||||
char kind = pq_getmsgbyte(in);
|
||||
tuple->colstatus[i] = kind;
|
||||
uint32 len;
|
||||
StringInfo value = &tuple->colvalues[i];
|
||||
|
||||
switch (kind) {
|
||||
case 'n': /* null */
|
||||
tuple->values[i] = NULL;
|
||||
tuple->changed[i] = true;
|
||||
case LOGICALREP_COLUMN_NULL: /* null */
|
||||
/* nothing more to do */
|
||||
break;
|
||||
case 't': { /* text formatted value */
|
||||
uint32 len;
|
||||
tuple->changed[i] = true;
|
||||
|
||||
len = pq_getmsgint(in, sizeof(uint32)); /* read length */
|
||||
case LOGICALREP_COLUMN_TEXT:
|
||||
len = pq_getmsgint(in, sizeof(uint32)); /* read length */
|
||||
|
||||
/* and data */
|
||||
tuple->values[i] = (char *)palloc(len + 1);
|
||||
pq_copymsgbytes(in, tuple->values[i], len);
|
||||
tuple->values[i][len] = '\0';
|
||||
value->data = (char *) palloc((len + 1) * sizeof(char));
|
||||
pq_copymsgbytes(in, value->data, len);
|
||||
value->data[len] = '\0';
|
||||
/* make StringInfo fully valid */
|
||||
value->len = len;
|
||||
value->cursor = 0;
|
||||
value->maxlen = len;
|
||||
break;
|
||||
case LOGICALREP_COLUMN_BINARY:
|
||||
len = pq_getmsgint(in, sizeof(uint32)); /* read length */
|
||||
|
||||
/* and data */
|
||||
value->data = (char *) palloc((len + 1) * sizeof(char));
|
||||
pq_copymsgbytes(in, value->data, len);
|
||||
/* not strictly necessary but per StringInfo practice */
|
||||
value->data[len] = '\0';
|
||||
/* make StringInfo fully valid */
|
||||
value->len = len;
|
||||
value->cursor = 0;
|
||||
value->maxlen = len;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
elog(ERROR, "unrecognized data representation type '%c'", kind);
|
||||
break;
|
||||
|
@ -451,7 +481,7 @@ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
|
|||
}
|
||||
|
||||
/*
|
||||
* Write relation attributes to the stream.
|
||||
* Write relation attributes metadata to the stream.
|
||||
*/
|
||||
static void logicalrep_write_attrs(StringInfo out, Relation rel)
|
||||
{
|
||||
|
@ -504,7 +534,7 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel)
|
|||
}
|
||||
|
||||
/*
|
||||
* Read relation attribute names from the stream.
|
||||
* Read relation attribute metadata from the stream.
|
||||
*/
|
||||
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
|
||||
{
|
||||
|
|
|
@ -265,11 +265,11 @@ static void slot_store_error_callback(void *arg)
|
|||
}
|
||||
|
||||
/*
|
||||
* Store data in C string form into slot.
|
||||
* This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
|
||||
* use better.
|
||||
* Store tuple data into slot.
|
||||
*
|
||||
* Incoming data can be either text or binary format.
|
||||
*/
|
||||
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
|
||||
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
|
||||
{
|
||||
int natts = slot->tts_tupleDescriptor->natts;
|
||||
int i;
|
||||
|
@ -286,19 +286,52 @@ static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel
|
|||
errcallback.previous = t_thrd.log_cxt.error_context_stack;
|
||||
t_thrd.log_cxt.error_context_stack = &errcallback;
|
||||
|
||||
/* Call the "in" function for each non-dropped attribute */
|
||||
/* Call the "in" function for each non-dropped, non-null attribute */
|
||||
for (i = 0; i < natts; i++) {
|
||||
Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
|
||||
int remoteattnum = rel->attrmap[i];
|
||||
Oid typinput;
|
||||
Oid typioparam;
|
||||
|
||||
if (!att->attisdropped && remoteattnum >= 0 && values[remoteattnum] != NULL) {
|
||||
if (!att->attisdropped && remoteattnum >= 0) {
|
||||
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
|
||||
errarg.remote_attnum = remoteattnum;
|
||||
|
||||
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
|
||||
slot->tts_values[i] = OidInputFunctionCall(typinput, values[remoteattnum], typioparam, att->atttypmod);
|
||||
slot->tts_isnull[i] = false;
|
||||
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) {
|
||||
Oid typinput;
|
||||
Oid typioparam;
|
||||
|
||||
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
|
||||
slot->tts_values[i] = OidInputFunctionCall(typinput, colvalue->data, typioparam, att->atttypmod);
|
||||
slot->tts_isnull[i] = false;
|
||||
} else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) {
|
||||
Oid typreceive;
|
||||
Oid typioparam;
|
||||
|
||||
/*
|
||||
* In some code paths we may be asked to re-parse the same
|
||||
* tuple data. Reset the StringInfo's cursor so that works.
|
||||
*/
|
||||
colvalue->cursor = 0;
|
||||
|
||||
getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
|
||||
slot->tts_values[i] = OidReceiveFunctionCall(typreceive, colvalue, typioparam, att->atttypmod);
|
||||
|
||||
/* Trouble if it didn't eat the whole buffer */
|
||||
if (colvalue->cursor != colvalue->len) {
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
|
||||
errmsg("incorrect binary data format in logical replication column %d",
|
||||
remoteattnum + 1)));
|
||||
}
|
||||
slot->tts_isnull[i] = false;
|
||||
} else {
|
||||
/*
|
||||
* NULL value from remote. (We don't expect to see
|
||||
* LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
|
||||
* NULL.)
|
||||
*/
|
||||
slot->tts_values[i] = (Datum) 0;
|
||||
slot->tts_isnull[i] = true;
|
||||
}
|
||||
/* Reset attnum for error callback */
|
||||
errarg.remote_attnum = -1;
|
||||
} else {
|
||||
/*
|
||||
|
@ -318,18 +351,19 @@ static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel
|
|||
}
|
||||
|
||||
/*
|
||||
* Replace selected columns with user data provided as C strings.
|
||||
* Replace updated columns with data from the LogicalRepTupleData struct.
|
||||
* This is somewhat similar to heap_modify_tuple but also calls the type
|
||||
* input functions on the user data.
|
||||
* "slot" is filled with a copy of the tuple in "srcslot", with
|
||||
* columns selected by the "replaces" array replaced with data values
|
||||
* from "values".
|
||||
*
|
||||
* "slot" is filled with a copy of the tuple in "srcslot", replacing
|
||||
* columns provided in "tupleData" and leaving others as-is.
|
||||
*
|
||||
* Caution: unreplaced pass-by-ref columns in "slot" will point into the
|
||||
* storage for "srcslot". This is OK for current usage, but someday we may
|
||||
* need to materialize "slot" at the end to make it independent of "srcslot".
|
||||
*/
|
||||
static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel,
|
||||
char **values, const bool *replaces)
|
||||
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel,
|
||||
LogicalRepTupleData *tupleData)
|
||||
{
|
||||
int natts = slot->tts_tupleDescriptor->natts;
|
||||
int i;
|
||||
|
@ -364,23 +398,47 @@ static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
|
|||
Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
|
||||
int remoteattnum = rel->attrmap[i];
|
||||
|
||||
if (remoteattnum < 0 || !replaces[remoteattnum]) {
|
||||
if (remoteattnum < 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (values[remoteattnum] != NULL) {
|
||||
Oid typinput;
|
||||
Oid typioparam;
|
||||
|
||||
if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) {
|
||||
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
|
||||
errarg.remote_attnum = remoteattnum;
|
||||
|
||||
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
|
||||
slot->tts_values[i] = OidInputFunctionCall(typinput, values[remoteattnum], typioparam, att->atttypmod);
|
||||
slot->tts_isnull[i] = false;
|
||||
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) {
|
||||
Oid typinput;
|
||||
Oid typioparam;
|
||||
|
||||
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
|
||||
slot->tts_values[i] = OidInputFunctionCall(typinput, colvalue->data, typioparam, att->atttypmod);
|
||||
slot->tts_isnull[i] = false;
|
||||
} else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) {
|
||||
Oid typreceive;
|
||||
Oid typioparam;
|
||||
|
||||
/*
|
||||
* In some code paths we may be asked to re-parse the same
|
||||
* tuple data. Reset the StringInfo's cursor so that works.
|
||||
*/
|
||||
colvalue->cursor = 0;
|
||||
|
||||
getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
|
||||
slot->tts_values[i] = OidReceiveFunctionCall(typreceive, colvalue, typioparam, att->atttypmod);
|
||||
|
||||
/* Trouble if it didn't eat the whole buffer */
|
||||
if (colvalue->cursor != colvalue->len) {
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
|
||||
errmsg("incorrect binary data format in logical replication column %d", remoteattnum + 1)));
|
||||
}
|
||||
slot->tts_isnull[i] = false;
|
||||
} else {
|
||||
/* must be LOGICALREP_COLUMN_NULL */
|
||||
slot->tts_values[i] = (Datum) 0;
|
||||
slot->tts_isnull[i] = true;
|
||||
}
|
||||
|
||||
errarg.remote_attnum = -1;
|
||||
} else {
|
||||
slot->tts_values[i] = (Datum)0;
|
||||
slot->tts_isnull[i] = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -516,7 +574,7 @@ static void apply_handle_insert(StringInfo s)
|
|||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
/* Process and store remote tuple in the slot */
|
||||
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||
slot_store_cstrings(remoteslot, rel, newtup.values);
|
||||
slot_store_data(remoteslot, rel, &newtup);
|
||||
slot_fill_defaults(rel, estate, remoteslot);
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
|
@ -646,7 +704,7 @@ static void apply_handle_update(StringInfo s)
|
|||
int remoteattnum = rel->attrmap[i];
|
||||
if (!att->attisdropped && remoteattnum >= 0) {
|
||||
Assert(remoteattnum < newtup.ncols);
|
||||
if (newtup.changed[i]) {
|
||||
if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED) {
|
||||
target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
|
||||
i + 1 - FirstLowInvalidHeapAttributeNumber);
|
||||
}
|
||||
|
@ -660,7 +718,7 @@ static void apply_handle_update(StringInfo s)
|
|||
|
||||
/* Build the search tuple. */
|
||||
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||
slot_store_cstrings(remoteslot, rel, has_oldtup ? oldtup.values : newtup.values);
|
||||
slot_store_data(remoteslot, rel, has_oldtup ? &oldtup : &newtup);
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
/*
|
||||
|
@ -683,7 +741,7 @@ static void apply_handle_update(StringInfo s)
|
|||
if (found) {
|
||||
/* Process and store remote tuple in the slot */
|
||||
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||
slot_modify_cstrings(remoteslot, localslot, rel, newtup.values, newtup.changed);
|
||||
slot_modify_data(remoteslot, localslot, rel, &newtup);
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
EvalPlanQualSetSlot(&epqstate, remoteslot);
|
||||
|
@ -748,7 +806,7 @@ static void apply_handle_delete(StringInfo s)
|
|||
|
||||
/* Find the tuple using the replica identity index. */
|
||||
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||
slot_store_cstrings(remoteslot, rel, oldtup.values);
|
||||
slot_store_data(remoteslot, rel, &oldtup);
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
/*
|
||||
|
@ -1224,6 +1282,17 @@ static void reread_subscription(void)
|
|||
proc_exit(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Exit if any parameter that affects the remote connection was changed.
|
||||
* The launcher will start a new worker.
|
||||
*/
|
||||
if (strcmp(newsub->name, t_thrd.applyworker_cxt.mySubscription->name) != 0 ||
|
||||
newsub->binary != t_thrd.applyworker_cxt.mySubscription->binary) {
|
||||
ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" "
|
||||
"will restart because of a parameter change", t_thrd.applyworker_cxt.mySubscription->name)));
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
/* !slotname should never happen when enabled is true. */
|
||||
Assert(newsub->slotname);
|
||||
|
||||
|
@ -1447,6 +1516,7 @@ void ApplyWorkerMain()
|
|||
options.slotname = t_thrd.applyworker_cxt.mySubscription->slotname;
|
||||
options.protoVersion = LOGICALREP_PROTO_VERSION_NUM;
|
||||
options.publicationNames = t_thrd.applyworker_cxt.mySubscription->publications;
|
||||
options.binary = t_thrd.applyworker_cxt.mySubscription->binary;
|
||||
|
||||
/* Start streaming from the slot. */
|
||||
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_startstreaming(&options);
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
#include "catalog/pg_publication.h"
|
||||
|
||||
#include "commands/defrem.h"
|
||||
|
||||
#include "replication/logical.h"
|
||||
#include "replication/logicalproto.h"
|
||||
#include "replication/origin.h"
|
||||
|
@ -74,11 +76,14 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb)
|
|||
cb->shutdown_cb = pgoutput_shutdown;
|
||||
}
|
||||
|
||||
static void parse_output_parameters(List *options, PGOutputData *data)
|
||||
static void parse_output_parameters(List* options, PGOutputData* data)
|
||||
{
|
||||
ListCell *lc;
|
||||
bool protocol_version_given = false;
|
||||
bool publication_names_given = false;
|
||||
bool binary_option_given = false;
|
||||
|
||||
data->binary = false;
|
||||
|
||||
foreach (lc, options) {
|
||||
DefElem *defel = (DefElem *)lfirst(lc);
|
||||
|
@ -108,6 +113,12 @@ static void parse_output_parameters(List *options, PGOutputData *data)
|
|||
|
||||
if (!SplitIdentifierString(strVal(defel->arg), ',', &(data->publication_names)))
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax")));
|
||||
} else if (strcmp(defel->defname, "binary") == 0) {
|
||||
if (binary_option_given)
|
||||
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
|
||||
binary_option_given = true;
|
||||
|
||||
data->binary = defGetBoolean(defel);
|
||||
} else
|
||||
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
|
||||
}
|
||||
|
@ -300,19 +311,19 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
switch (change->action) {
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple);
|
||||
logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple, data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_UINSERT:
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_insert(ctx->out, relation, (HeapTuple)(&change->data.utp.newtuple->tuple));
|
||||
logicalrep_write_insert(ctx->out, relation, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_UPDATE: {
|
||||
HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
|
||||
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple);
|
||||
logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple, data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
break;
|
||||
}
|
||||
|
@ -320,14 +331,14 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
HeapTuple oldtuple = change->data.utp.oldtuple ? ((HeapTuple)(&change->data.utp.oldtuple->tuple)) : NULL;
|
||||
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple));
|
||||
logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
if (change->data.tp.oldtuple) {
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_delete(ctx->out, relation, &change->data.tp.oldtuple->tuple);
|
||||
logicalrep_write_delete(ctx->out, relation, &change->data.tp.oldtuple->tuple, data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
} else
|
||||
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
|
||||
|
@ -335,7 +346,7 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
case REORDER_BUFFER_CHANGE_UDELETE:
|
||||
if (change->data.utp.oldtuple) {
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_delete(ctx->out, relation, (HeapTuple)(&change->data.utp.oldtuple->tuple));
|
||||
logicalrep_write_delete(ctx->out, relation, (HeapTuple)(&change->data.utp.oldtuple->tuple), data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
} else
|
||||
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
|
||||
|
|
|
@ -50,13 +50,15 @@ CATALOG(pg_subscription,6126) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6128) BKI_SCHE
|
|||
NameData subslotname; /* Slot name on publisher */
|
||||
text subsynccommit; /* Synchronous commit setting for worker */
|
||||
text subpublications[1]; /* List of publications subscribed to */
|
||||
bool subbinary; /* True if the subscription wants the
|
||||
* publisher to send data in binary */
|
||||
#endif
|
||||
}
|
||||
FormData_pg_subscription;
|
||||
|
||||
typedef FormData_pg_subscription *Form_pg_subscription;
|
||||
|
||||
#define Natts_pg_subscription 8
|
||||
#define Natts_pg_subscription 9
|
||||
#define Anum_pg_subscription_subdbid 1
|
||||
#define Anum_pg_subscription_subname 2
|
||||
#define Anum_pg_subscription_subowner 3
|
||||
|
@ -65,6 +67,7 @@ typedef FormData_pg_subscription *Form_pg_subscription;
|
|||
#define Anum_pg_subscription_subslotname 6
|
||||
#define Anum_pg_subscription_subsynccommit 7
|
||||
#define Anum_pg_subscription_subpublications 8
|
||||
#define Anum_pg_subscription_subbinary 9
|
||||
|
||||
|
||||
typedef struct Subscription {
|
||||
|
@ -77,6 +80,7 @@ typedef struct Subscription {
|
|||
char *slotname; /* Name of the replication slot */
|
||||
char *synccommit; /* Synchronous commit setting for worker */
|
||||
List *publications; /* List of publication names to subscribe to */
|
||||
bool binary; /* Indicates if the subscription wants data in binary format */
|
||||
} Subscription;
|
||||
|
||||
|
||||
|
|
|
@ -90,6 +90,7 @@ extern const uint32 SUPPORT_DATA_REPAIR;
|
|||
extern const uint32 SCAN_BATCH_MODE_VERSION_NUM;
|
||||
extern const uint32 RELMAP_4K_VERSION_NUM;
|
||||
extern const uint32 PUBLICATION_VERSION_NUM;
|
||||
extern const uint32 SUBSCRIPTION_BINARY_VERSION_NUM;
|
||||
extern const uint32 ANALYZER_HOOK_VERSION_NUM;
|
||||
extern const uint32 SUPPORT_HASH_XLOG_VERSION_NUM;
|
||||
extern const uint32 PITR_INIT_VERSION_NUM;
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct LibpqrcvConnectParam {
|
|||
bool logical;
|
||||
uint32 protoVersion; /* Logical protocol version */
|
||||
List *publicationNames; /* String list of publications */
|
||||
bool binary; /* Ask publisher to use binary */
|
||||
}LibpqrcvConnectParam;
|
||||
|
||||
extern int32 pg_atoi(char* s, int size, int c);
|
||||
|
|
|
@ -35,12 +35,21 @@
|
|||
* Keep in mind that the columns correspond to the *remote* table.
|
||||
*/
|
||||
typedef struct LogicalRepTupleData {
|
||||
char *values[MaxTupleAttributeNumber]; /* value in out function format or NULL if values is NULL */
|
||||
bool changed[MaxTupleAttributeNumber]; /* marker for changed/unchanged values */
|
||||
/* Array of StringInfos, one per column; some may be unused */
|
||||
StringInfoData *colvalues;
|
||||
/* Array of markers for null/unchanged/text/binary, one per column */
|
||||
char *colstatus;
|
||||
/* Length of above arrays */
|
||||
int ncols;
|
||||
} LogicalRepTupleData;
|
||||
|
||||
/* Possible values for LogicalRepTupleData.colstatus[colnum] */
|
||||
/* These values are also used in the on-the-wire protocol */
|
||||
#define LOGICALREP_COLUMN_NULL 'n'
|
||||
#define LOGICALREP_COLUMN_UNCHANGED 'u'
|
||||
#define LOGICALREP_COLUMN_TEXT 't'
|
||||
#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */
|
||||
|
||||
typedef uint32 LogicalRepRelId;
|
||||
|
||||
/* Relation information */
|
||||
|
@ -81,12 +90,12 @@ extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data
|
|||
extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
|
||||
extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data);
|
||||
extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn);
|
||||
extern void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple);
|
||||
extern void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary);
|
||||
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
|
||||
extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple);
|
||||
extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary);
|
||||
extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup,
|
||||
LogicalRepTupleData *newtup);
|
||||
extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple);
|
||||
extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary);
|
||||
extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup);
|
||||
extern void logicalrep_write_rel(StringInfo out, Relation rel);
|
||||
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
|
||||
|
|
|
@ -24,6 +24,7 @@ typedef struct PGOutputData {
|
|||
|
||||
List *publication_names;
|
||||
List *publications;
|
||||
bool binary;
|
||||
} PGOutputData;
|
||||
|
||||
#endif /* PGOUTPUT_H */
|
||||
|
|
|
@ -37,7 +37,7 @@ CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''123
|
|||
ALTER SUBSCRIPTION testsub CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''';
|
||||
ALTER SUBSCRIPTION testsub CONNECTION 'dbname=does_not_exist';
|
||||
reset client_min_messages;
|
||||
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications from pg_subscription where subname='testsub';
|
||||
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications, subbinary from pg_subscription where subname='testsub';
|
||||
--- alter subscription
|
||||
------ set publication
|
||||
ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3;
|
||||
|
@ -57,6 +57,9 @@ select subname, subenabled, subsynccommit from pg_subscription where subname='t
|
|||
ALTER SUBSCRIPTION testsub SET (slot_name='testsub');
|
||||
-- alter owner
|
||||
ALTER SUBSCRIPTION testsub owner to regress_subscription_user2;
|
||||
-- alter subbinary to true
|
||||
ALTER SUBSCRIPTION testsub SET (binary=true);
|
||||
select subname, subbinary from pg_subscription where subname='testsub';
|
||||
--rename
|
||||
ALTER SUBSCRIPTION testsub rename to testsub_rename;
|
||||
--- inside a transaction block
|
||||
|
|
|
@ -76,10 +76,10 @@ CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''123
|
|||
ALTER SUBSCRIPTION testsub CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''';
|
||||
ALTER SUBSCRIPTION testsub CONNECTION 'dbname=does_not_exist';
|
||||
reset client_min_messages;
|
||||
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications from pg_subscription where subname='testsub';
|
||||
subname | owner | subenabled | subconninfo | subpublications
|
||||
---------+---------------------------+------------+------------------------+-----------------
|
||||
testsub | regress_subscription_user | f | dbname=does_not_exist | {testpub}
|
||||
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications, subbinary from pg_subscription where subname='testsub';
|
||||
subname | owner | subenabled | subconninfo | subpublications | subbinary
|
||||
---------+---------------------------+------------+----------------------+-----------------+-----------
|
||||
testsub | regress_subscription_user | f | dbname=doesnotexist | {testpub} | f
|
||||
(1 row)
|
||||
|
||||
--- alter subscription
|
||||
|
@ -122,6 +122,14 @@ ALTER SUBSCRIPTION testsub SET (slot_name='testsub');
|
|||
ERROR: Currently enabled=false, cannot change slot_name to a non-null value.
|
||||
-- alter owner
|
||||
ALTER SUBSCRIPTION testsub owner to regress_subscription_user2;
|
||||
-- alter subbinary to true
|
||||
ALTER SUBSCRIPTION testsub SET (binary=true);
|
||||
select subname, subbinary from pg_subscription where subname='testsub';
|
||||
subname | subbinary
|
||||
---------+-----------
|
||||
testsub | t
|
||||
(1 row)
|
||||
|
||||
--rename
|
||||
ALTER SUBSCRIPTION testsub rename to testsub_rename;
|
||||
--- inside a transaction block
|
||||
|
@ -281,10 +289,11 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1
|
|||
testsub_maskconninfo | ALTER SUBSCRIPTION testsub_maskconninfo SET (conninfo='*************************************************************************************************************************;
|
||||
testsub | ALTER SUBSCRIPTION testsub SET (synchronous_commit=on);
|
||||
testsub | ALTER SUBSCRIPTION testsub owner to regress_subscription_user2;
|
||||
testsub | ALTER SUBSCRIPTION testsub SET (binary=true);
|
||||
testsub | ALTER SUBSCRIPTION testsub rename to testsub_rename;
|
||||
testsub_rename | DROP SUBSCRIPTION IF EXISTS testsub_rename;
|
||||
testsub_maskconninfo | DROP SUBSCRIPTION IF EXISTS testsub_maskconninfo;
|
||||
(14 rows)
|
||||
(15 rows)
|
||||
|
||||
--clear audit log
|
||||
SELECT pg_delete_audit('1012-11-10', '3012-11-11');
|
||||
|
|
Loading…
Reference in New Issue