diff options
author | John Clizbe <John.Clizbe@gmail.com> | 2010-05-12 23:30:06 +0200 |
---|---|---|
committer | Robby Workman <rworkman@slackbuilds.org> | 2010-05-12 23:30:06 +0200 |
commit | 4f180c271a6bca58ae51d67a5d39c930b42a8dbb (patch) | |
tree | f1db19e320724285904e6d1b855fa402d27e27bb /libraries/db46/patches | |
parent | 4fc75e8f9a9f2a81716505a4d4582b06f2f995d2 (diff) | |
download | slackbuilds-4f180c271a6bca58ae51d67a5d39c930b42a8dbb.tar.gz |
libraries/db46: Added to 12.2 repository
Diffstat (limited to 'libraries/db46/patches')
-rw-r--r-- | libraries/db46/patches/patch.4.6.21.1 | 90 | ||||
-rw-r--r-- | libraries/db46/patches/patch.4.6.21.2 | 27 | ||||
-rw-r--r-- | libraries/db46/patches/patch.4.6.21.3 | 53 | ||||
-rw-r--r-- | libraries/db46/patches/patch.4.6.21.4 | 1414 |
4 files changed, 1584 insertions, 0 deletions
diff --git a/libraries/db46/patches/patch.4.6.21.1 b/libraries/db46/patches/patch.4.6.21.1 new file mode 100644 index 0000000000..636bc51c7d --- /dev/null +++ b/libraries/db46/patches/patch.4.6.21.1 @@ -0,0 +1,90 @@ +*** dbinc/mp.h 2007-09-28 01:28:25.000000000 +1000 +--- dbinc/mp.h 2008-02-14 01:22:09.000000000 +1100 +*************** +*** 639,644 **** +--- 639,647 ---- + */ + #define MP_TRUNC_RECOVER 0x01 + ++ /* Private flags to DB_MPOOLFILE->close. */ ++ #define DB_MPOOL_NOLOCK 0x002 /* Already have mpf locked. */ ++ + #if defined(__cplusplus) + } + #endif +*** mp/mp_fopen.c 2007-05-18 03:18:01.000000000 +1000 +--- mp/mp_fopen.c 2008-02-12 16:09:42.000000000 +1100 +*************** +*** 888,894 **** + * when we try to flush them. + */ + deleted = 0; +! MUTEX_LOCK(dbenv, mfp->mutex); + if (F_ISSET(dbmfp, MP_MULTIVERSION)) + --mfp->multiversion; + if (--mfp->mpf_cnt == 0 || LF_ISSET(DB_MPOOL_DISCARD)) { +--- 888,895 ---- + * when we try to flush them. + */ + deleted = 0; +! if (!LF_ISSET(DB_MPOOL_NOLOCK)) +! MUTEX_LOCK(dbenv, mfp->mutex); + if (F_ISSET(dbmfp, MP_MULTIVERSION)) + --mfp->multiversion; + if (--mfp->mpf_cnt == 0 || LF_ISSET(DB_MPOOL_DISCARD)) { +*************** +*** 909,921 **** + } + } + if (mfp->block_cnt == 0) { + if ((t_ret = + __memp_mf_discard(dbmp, mfp)) != 0 && ret == 0) + ret = t_ret; + deleted = 1; + } + } +! if (!deleted) + MUTEX_UNLOCK(dbenv, mfp->mutex); + + done: /* Discard the DB_MPOOLFILE structure. */ +--- 910,928 ---- + } + } + if (mfp->block_cnt == 0) { ++ /* ++ * We should never discard this mp file if our caller ++ * is holding the lock on it. See comment in ++ * __memp_sync_file. ++ */ ++ DB_ASSERT(dbenv, !LF_ISSET(DB_MPOOL_NOLOCK)); + if ((t_ret = + __memp_mf_discard(dbmp, mfp)) != 0 && ret == 0) + ret = t_ret; + deleted = 1; + } + } +! if (!deleted && !LF_ISSET(DB_MPOOL_NOLOCK)) + MUTEX_UNLOCK(dbenv, mfp->mutex); + + done: /* Discard the DB_MPOOLFILE structure. */ +*** mp/mp_sync.c 2007-06-02 04:32:44.000000000 +1000 +--- mp/mp_sync.c 2008-02-12 16:09:42.000000000 +1100 +*************** +*** 755,761 **** + * This is important since we are called with the hash bucket + * locked. The mfp will get freed via the cleanup pass. + */ +! if (dbmfp != NULL && (t_ret = __memp_fclose(dbmfp, 0)) != 0 && ret == 0) + ret = t_ret; + + --mfp->mpf_cnt; +--- 755,762 ---- + * This is important since we are called with the hash bucket + * locked. The mfp will get freed via the cleanup pass. + */ +! if (dbmfp != NULL && +! (t_ret = __memp_fclose(dbmfp, DB_MPOOL_NOLOCK)) != 0 && ret == 0) + ret = t_ret; + + --mfp->mpf_cnt; +
diff --git a/libraries/db46/patches/patch.4.6.21.2 b/libraries/db46/patches/patch.4.6.21.2 new file mode 100644 index 0000000000..4e59f18afe --- /dev/null +++ b/libraries/db46/patches/patch.4.6.21.2 @@ -0,0 +1,27 @@ +*** mp/mp_region.c 2007-05-18 03:18:01.000000000 +1000
+--- mp/mp_region.c 2008-06-24 13:15:56.000000000 +1000
+***************
+*** 249,256 ****
+ mtx_base = htab[0].mtx_hash;
+ }
+
+ if (mtx_base != MUTEX_INVALID)
+! mtx_base += reginfo_off * htab_buckets;
+
+ /* Allocate hash table space and initialize it. */
+ if ((ret = __env_alloc(infop,
+--- 249,262 ----
+ mtx_base = htab[0].mtx_hash;
+ }
+
++ /*
++ * We preallocated all of the mutexes in a block, so for regions after
++ * the first, we skip mutexes in use in earlier regions. Each region
++ * has the same number of buckets and there are two mutexes per hash
++ * bucket (the bucket mutex and the I/O mutex).
++ */
+ if (mtx_base != MUTEX_INVALID)
+! mtx_base += reginfo_off * htab_buckets * 2;
+
+ /* Allocate hash table space and initialize it. */
+ if ((ret = __env_alloc(infop,
diff --git a/libraries/db46/patches/patch.4.6.21.3 b/libraries/db46/patches/patch.4.6.21.3 new file mode 100644 index 0000000000..e23a27a02f --- /dev/null +++ b/libraries/db46/patches/patch.4.6.21.3 @@ -0,0 +1,53 @@ +*** sequence/sequence.c.orig 2007-05-17 10:18:04.000000000 -0700
+--- sequence/sequence.c 2008-08-14 12:24:23.000000000 -0700
+***************
+*** 252,257 ****
+--- 252,262 ----
+ if ((ret != DB_NOTFOUND && ret != DB_KEYEMPTY) ||
+ !LF_ISSET(DB_CREATE))
+ goto err;
++ if (IS_REP_CLIENT(dbenv) &&
++ !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
++ ret = __db_rdonly(dbenv, "DB_SEQUENCE->open");
++ goto err;
++ }
+ ret = 0;
+
+ rp = &seq->seq_record;
+***************
+*** 304,310 ****
+ */
+ rp = seq->seq_data.data;
+ if (rp->seq_version == DB_SEQUENCE_OLDVER) {
+! oldver: rp->seq_version = DB_SEQUENCE_VERSION;
+ if (__db_isbigendian()) {
+ if (IS_DB_AUTO_COMMIT(dbp, txn)) {
+ if ((ret =
+--- 309,320 ----
+ */
+ rp = seq->seq_data.data;
+ if (rp->seq_version == DB_SEQUENCE_OLDVER) {
+! oldver: if (IS_REP_CLIENT(dbenv) &&
+! !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
+! ret = __db_rdonly(dbenv, "DB_SEQUENCE->open");
+! goto err;
+! }
+! rp->seq_version = DB_SEQUENCE_VERSION;
+ if (__db_isbigendian()) {
+ if (IS_DB_AUTO_COMMIT(dbp, txn)) {
+ if ((ret =
+***************
+*** 713,718 ****
+--- 723,734 ----
+
+ MUTEX_LOCK(dbenv, seq->mtx_seq);
+
++ if (handle_check && IS_REP_CLIENT(dbenv) &&
++ !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
++ ret = __db_rdonly(dbenv, "DB_SEQUENCE->get");
++ goto err;
++ }
++
+ if (rp->seq_min + delta > rp->seq_max) {
+ __db_errx(dbenv, "Sequence overflow");
+ ret = EINVAL;
diff --git a/libraries/db46/patches/patch.4.6.21.4 b/libraries/db46/patches/patch.4.6.21.4 new file mode 100644 index 0000000000..7c1f7e2a12 --- /dev/null +++ b/libraries/db46/patches/patch.4.6.21.4 @@ -0,0 +1,1414 @@ +*** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700 +--- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700 +*************** +*** 36,41 **** +--- 36,55 ---- + #endif + + /* ++ * The (arbitrary) maximum number of outgoing messages we're willing to hold, on ++ * a queue per connection, waiting for TCP buffer space to become available in ++ * the kernel. Rather than exceeding this limit, we simply discard additional ++ * messages (since this is always allowed by the replication protocol). ++ * As a special dispensation, if a message is destined for a specific remote ++ * site (i.e., it's not a broadcast), then we first try blocking the sending ++ * thread, waiting for space to become available (though we only wait a limited ++ * time). This is so as to be able to handle the immediate flood of (a ++ * potentially large number of) outgoing messages that replication generates, in ++ * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests. ++ */ ++ #define OUT_QUEUE_LIMIT 10 ++ ++ /* + * The system value is available from sysconf(_SC_HOST_NAME_MAX). + * Historically, the maximum host name was 256. + */ +*************** +*** 47,52 **** +--- 61,71 ---- + #define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20) + typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1]; + ++ /* Default timeout values, in seconds. */ ++ #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC) ++ #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC) ++ #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC) ++ + struct __repmgr_connection; + typedef struct __repmgr_connection REPMGR_CONNECTION; + struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE; +*************** +*** 171,178 **** + #ifdef DB_WIN32 + WSAEVENT event_object; + #endif +! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */ +! #define CONN_DEFUNCT 0x02 /* socket close pending */ + u_int32_t flags; + + /* +--- 190,198 ---- + #ifdef DB_WIN32 + WSAEVENT event_object; + #endif +! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */ +! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */ +! #define CONN_DEFUNCT 0x04 /* socket close pending */ + u_int32_t flags; + + /* +*************** +*** 180,189 **** + * send() function's thread. But if TCP doesn't have enough network + * buffer space for us when we first try it, we instead allocate some + * memory, and copy the message, and then send it as space becomes +! * available in our main select() thread. + */ + OUT_Q_HEADER outbound_queue; + int out_queue_length; + + /* + * Input: while we're reading a message, we keep track of what phase +--- 200,215 ---- + * send() function's thread. But if TCP doesn't have enough network + * buffer space for us when we first try it, we instead allocate some + * memory, and copy the message, and then send it as space becomes +! * available in our main select() thread. In some cases, if the queue +! * gets too long we wait until it's drained, and then append to it. +! * This condition variable's associated mutex is the normal per-repmgr +! * db_rep->mutex, because that mutex is always held anyway whenever the +! * output queue is consulted. + */ + OUT_Q_HEADER outbound_queue; + int out_queue_length; ++ cond_var_t drained; ++ int blockers; /* ref count of msg threads waiting on us */ + + /* + * Input: while we're reading a message, we keep track of what phase +*** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700 +--- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700 +*************** +*** 1420,1425 **** +--- 1420,1428 ---- + #define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@ + #define __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@ + #define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@ ++ #define __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@ ++ #define __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@ ++ #define __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@ + #define __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@ + #define __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@ + #define __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@ +*** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700 +--- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700 +*************** +*** 21,30 **** + int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); + void __repmgr_stash_generation __P((DB_ENV *)); + int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); +! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *)); + int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); +! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int)); +! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *)); + int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); + int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *)); + int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **)); +--- 21,30 ---- + int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); + void __repmgr_stash_generation __P((DB_ENV *)); + int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); +! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int)); + int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); +! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *)); +! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *)); + int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); + int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *)); + int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **)); +*************** +*** 39,44 **** +--- 39,47 ---- + int __repmgr_wake_waiting_senders __P((DB_ENV *)); + int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *)); + void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t)); ++ int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t)); ++ int __repmgr_alloc_cond __P((cond_var_t *)); ++ int __repmgr_free_cond __P((cond_var_t *)); + int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); + int __repmgr_close_sync __P((DB_ENV *)); + int __repmgr_net_init __P((DB_ENV *, DB_REP *)); +*** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700 +--- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700 +*************** +*** 196,204 **** + int ret; + + /* Set some default values. */ +! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */ +! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */ +! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */ + db_rep->config_nsites = 0; + db_rep->peer = DB_EID_INVALID; + db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; +--- 196,204 ---- + int ret; + + /* Set some default values. */ +! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT; +! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; +! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; + db_rep->config_nsites = 0; + db_rep->peer = DB_EID_INVALID; + db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; +*************** +*** 238,243 **** +--- 238,244 ---- + DB_ENV *dbenv; + { + DB_REP *db_rep; ++ REPMGR_CONNECTION *conn; + int ret; + + db_rep = dbenv->rep_handle; +*************** +*** 254,259 **** +--- 255,266 ---- + + if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) + goto unlock; ++ ++ TAILQ_FOREACH(conn, &db_rep->connections, entries) { ++ if (conn->blockers > 0 && ++ ((ret = __repmgr_signal(&conn->drained)) != 0)) ++ goto unlock; ++ } + UNLOCK_MUTEX(db_rep->mutex); + + return (__repmgr_wake_main_thread(dbenv)); +*** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700 +--- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700 +*************** +*** 183,192 **** + + /* + * Acknowledges a message. +- * +- * !!! +- * Note that this cannot be called from the select() thread, in case we call +- * __repmgr_bust_connection(..., FALSE). + */ + static int + ack_message(dbenv, generation, lsn) +--- 183,188 ---- +*************** +*** 227,235 **** + rec2.size = 0; + + conn = site->ref.conn; + if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, +! &control2, &rec2)) == DB_REP_UNAVAIL) +! ret = __repmgr_bust_connection(dbenv, conn, FALSE); + } + + UNLOCK_MUTEX(db_rep->mutex); +--- 223,236 ---- + rec2.size = 0; + + conn = site->ref.conn; ++ /* ++ * It's hard to imagine anyone would care about a lost ack if ++ * the path to the master is so congested as to need blocking; ++ * so pass "blockable" argument as FALSE. ++ */ + if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, +! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL) +! ret = __repmgr_bust_connection(dbenv, conn); + } + + UNLOCK_MUTEX(db_rep->mutex); +*** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700 +--- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700 +*************** +*** 63,69 **** + static void setup_sending_msg + __P((struct sending_msg *, u_int, const DBT *, const DBT *)); + static int __repmgr_send_internal +! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *)); + static int enqueue_msg + __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); + static int flatten __P((DB_ENV *, struct sending_msg *)); +--- 63,69 ---- + static void setup_sending_msg + __P((struct sending_msg *, u_int, const DBT *, const DBT *)); + static int __repmgr_send_internal +! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int)); + static int enqueue_msg + __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); + static int flatten __P((DB_ENV *, struct sending_msg *)); +*************** +*** 73,85 **** + * __repmgr_send -- + * The send function for DB_ENV->rep_set_transport. + * +- * !!! +- * This is only ever called as the replication transport call-back, which means +- * it's either on one of our message processing threads or an application +- * thread. It mustn't be called from the select() thread, because we might call +- * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the +- * select() thread. +- * + * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, + * PUBLIC: const DB_LSN *, int, u_int32_t)); + */ +--- 73,78 ---- +*************** +*** 126,134 **** + } + + conn = site->ref.conn; + if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, +! control, rec)) == DB_REP_UNAVAIL && +! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0) + ret = t_ret; + if (ret != 0) + goto out; +--- 119,128 ---- + } + + conn = site->ref.conn; ++ /* Pass the "blockable" argument as TRUE. */ + if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, +! control, rec, TRUE)) == DB_REP_UNAVAIL && +! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0) + ret = t_ret; + if (ret != 0) + goto out; +*************** +*** 222,228 **** + if (site->state != SITE_CONNECTED) + return (NULL); + +! if (F_ISSET(site->ref.conn, CONN_CONNECTING)) + return (NULL); + return (site); + } +--- 216,222 ---- + if (site->state != SITE_CONNECTED) + return (NULL); + +! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT)) + return (NULL); + return (site); + } +*************** +*** 235,244 **** + * + * !!! + * Caller must hold dbenv->mutex. +- * +- * !!! +- * Note that this cannot be called from the select() thread, in case we call +- * __repmgr_bust_connection(..., FALSE). + */ + static int + __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp) +--- 229,234 ---- +*************** +*** 268,281 **** + !IS_VALID_EID(conn->eid)) + continue; + +! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) { + site = SITE_FROM_EID(conn->eid); + nsites++; + if (site->priority > 0) + npeers++; + } else if (ret == DB_REP_UNAVAIL) { +! if ((ret = __repmgr_bust_connection( +! dbenv, conn, FALSE)) != 0) + return (ret); + } else + return (ret); +--- 258,277 ---- + !IS_VALID_EID(conn->eid)) + continue; + +! /* +! * Broadcast messages are either application threads committing +! * transactions, or replication status message that we can +! * afford to lose. So don't allow blocking for them (pass +! * "blockable" argument as FALSE). +! */ +! if ((ret = __repmgr_send_internal(dbenv, +! conn, &msg, FALSE)) == 0) { + site = SITE_FROM_EID(conn->eid); + nsites++; + if (site->priority > 0) + npeers++; + } else if (ret == DB_REP_UNAVAIL) { +! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) + return (ret); + } else + return (ret); +*************** +*** 301,339 **** + * intersperse writes that are part of two single messages. + * + * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, +! * PUBLIC: u_int, const DBT *, const DBT *)); + */ + int +! __repmgr_send_one(dbenv, conn, msg_type, control, rec) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + u_int msg_type; + const DBT *control, *rec; + { + struct sending_msg msg; + + setup_sending_msg(&msg, msg_type, control, rec); +! return (__repmgr_send_internal(dbenv, conn, &msg)); + } + + /* + * Attempts a "best effort" to send a message on the given site. If there is an +! * excessive backlog of message already queued on the connection, we simply drop +! * this message, and still return 0 even in this case. + */ + static int +! __repmgr_send_internal(dbenv, conn, msg) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + struct sending_msg *msg; + { +! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */ + REPMGR_IOVECS iovecs; + SITE_STRING_BUFFER buffer; + int ret; + size_t nw; + size_t total_written; + + DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); + if (!STAILQ_EMPTY(&conn->outbound_queue)) { + /* +--- 297,355 ---- + * intersperse writes that are part of two single messages. + * + * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, +! * PUBLIC: u_int, const DBT *, const DBT *, int)); + */ + int +! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + u_int msg_type; + const DBT *control, *rec; ++ int blockable; + { + struct sending_msg msg; + + setup_sending_msg(&msg, msg_type, control, rec); +! return (__repmgr_send_internal(dbenv, conn, &msg, blockable)); + } + + /* + * Attempts a "best effort" to send a message on the given site. If there is an +! * excessive backlog of message already queued on the connection, what shall we +! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of +! * time) for the queue to drain. Otherwise we'll simply drop the message. This +! * is always allowed by the replication protocol. But in the case of a +! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we +! * almost always get a flood of messages that instantly fills our queue, so +! * blocking improves performance (by avoiding the need for the client to +! * re-request). +! * +! * How long shall we wait? We could of course create a new timeout +! * configuration type, so that the application could set it directly. But that +! * would start to overwhelm the user with too many choices to think about. We +! * already have an ACK timeout, which is the user's estimate of how long it +! * should take to send a message to the client, have it be processed, and return +! * a message back to us. We multiply that by the queue size, because that's how +! * many messages have to be swallowed up by the client before we're able to +! * start sending again (at least to a rough approximation). + */ + static int +! __repmgr_send_internal(dbenv, conn, msg, blockable) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + struct sending_msg *msg; ++ int blockable; + { +! DB_REP *db_rep; + REPMGR_IOVECS iovecs; + SITE_STRING_BUFFER buffer; ++ db_timeout_t drain_to; + int ret; + size_t nw; + size_t total_written; + ++ db_rep = dbenv->rep_handle; ++ + DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); + if (!STAILQ_EMPTY(&conn->outbound_queue)) { + /* +*************** +*** 344,358 **** + RPRINT(dbenv, (dbenv, "msg to %s to be queued", + __repmgr_format_eid_loc(dbenv->rep_handle, + conn->eid, buffer))); + if (conn->out_queue_length < OUT_QUEUE_LIMIT) + return (enqueue_msg(dbenv, conn, msg, 0)); + else { + RPRINT(dbenv, (dbenv, "queue limit exceeded")); + STAT(dbenv->rep_handle-> + region->mstat.st_msgs_dropped++); +! return (0); + } + } + + /* + * Send as much data to the site as we can, without blocking. Keep +--- 360,393 ---- + RPRINT(dbenv, (dbenv, "msg to %s to be queued", + __repmgr_format_eid_loc(dbenv->rep_handle, + conn->eid, buffer))); ++ if (conn->out_queue_length >= OUT_QUEUE_LIMIT && ++ blockable && !F_ISSET(conn, CONN_CONGESTED)) { ++ RPRINT(dbenv, (dbenv, ++ "block msg thread, await queue space")); ++ ++ if ((drain_to = db_rep->ack_timeout) == 0) ++ drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT; ++ conn->blockers++; ++ ret = __repmgr_await_drain(dbenv, ++ conn, drain_to * OUT_QUEUE_LIMIT); ++ conn->blockers--; ++ if (db_rep->finished) ++ return (DB_TIMEOUT); ++ if (ret != 0) ++ return (ret); ++ if (STAILQ_EMPTY(&conn->outbound_queue)) ++ goto empty; ++ } + if (conn->out_queue_length < OUT_QUEUE_LIMIT) + return (enqueue_msg(dbenv, conn, msg, 0)); + else { + RPRINT(dbenv, (dbenv, "queue limit exceeded")); + STAT(dbenv->rep_handle-> + region->mstat.st_msgs_dropped++); +! return (blockable ? DB_TIMEOUT : 0); + } + } ++ empty: + + /* + * Send as much data to the site as we can, without blocking. Keep +*************** +*** 498,521 **** + + /* + * Abandons a connection, to recover from an error. Upon entry the conn struct +! * must be on the connections list. +! * +! * If the 'do_close' flag is true, we do the whole job; the clean-up includes +! * removing the struct from the list and freeing all its memory, so upon return +! * the caller must not refer to it any further. Otherwise, we merely mark the +! * connection for clean-up later by the main thread. + * + * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, +! * PUBLIC: REPMGR_CONNECTION *, int)); + * + * !!! + * Caller holds mutex. + */ + int +! __repmgr_bust_connection(dbenv, conn, do_close) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +- int do_close; + { + DB_REP *db_rep; + int connecting, ret, eid; +--- 533,553 ---- + + /* + * Abandons a connection, to recover from an error. Upon entry the conn struct +! * must be on the connections list. For now, just mark it as unusable; it will +! * be fully cleaned up in the top-level select thread, as soon as possible. + * + * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, +! * PUBLIC: REPMGR_CONNECTION *)); + * + * !!! + * Caller holds mutex. ++ * ++ * Must be idempotent + */ + int +! __repmgr_bust_connection(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; + { + DB_REP *db_rep; + int connecting, ret, eid; +*************** +*** 526,537 **** + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); + eid = conn->eid; + connecting = F_ISSET(conn, CONN_CONNECTING); +! if (do_close) +! __repmgr_cleanup_connection(dbenv, conn); +! else { +! F_SET(conn, CONN_DEFUNCT); +! conn->eid = -1; +! } + + /* + * When we first accepted the incoming connection, we set conn->eid to +--- 558,566 ---- + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); + eid = conn->eid; + connecting = F_ISSET(conn, CONN_CONNECTING); +! +! F_SET(conn, CONN_DEFUNCT); +! conn->eid = -1; + + /* + * When we first accepted the incoming connection, we set conn->eid to +*************** +*** 557,563 **** + dbenv, ELECT_FAILURE_ELECTION)) != 0) + return (ret); + } +! } else if (!do_close) { + /* + * One way or another, make sure the main thread is poked, so + * that we do the deferred clean-up. +--- 586,592 ---- + dbenv, ELECT_FAILURE_ELECTION)) != 0) + return (ret); + } +! } else { + /* + * One way or another, make sure the main thread is poked, so + * that we do the deferred clean-up. +*************** +*** 568,577 **** + } + + /* +! * PUBLIC: void __repmgr_cleanup_connection + * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); + */ +! void + __repmgr_cleanup_connection(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +--- 597,610 ---- + } + + /* +! * PUBLIC: int __repmgr_cleanup_connection + * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); ++ * ++ * !!! ++ * Idempotent. This can be called repeatedly as blocking message threads (of ++ * which there could be multiples) wake up in case of error on the connection. + */ +! int + __repmgr_cleanup_connection(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +*************** +*** 580,596 **** + QUEUED_OUTPUT *out; + REPMGR_FLAT *msg; + DBT *dbt; + + db_rep = dbenv->rep_handle; + +! TAILQ_REMOVE(&db_rep->connections, conn, entries); + if (conn->fd != INVALID_SOCKET) { +! (void)closesocket(conn->fd); + #ifdef DB_WIN32 +! (void)WSACloseEvent(conn->event_object); + #endif + } + + /* + * Deallocate any input and output buffers we may have. + */ +--- 613,643 ---- + QUEUED_OUTPUT *out; + REPMGR_FLAT *msg; + DBT *dbt; ++ int ret; + + db_rep = dbenv->rep_handle; + +! DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished); +! + if (conn->fd != INVALID_SOCKET) { +! ret = closesocket(conn->fd); +! conn->fd = INVALID_SOCKET; +! if (ret == SOCKET_ERROR) { +! ret = net_errno; +! __db_err(dbenv, ret, "closing socket"); +! } + #ifdef DB_WIN32 +! if (!WSACloseEvent(conn->event_object) && ret != 0) +! ret = net_errno; + #endif ++ if (ret != 0) ++ return (ret); + } + ++ if (conn->blockers > 0) ++ return (__repmgr_signal(&conn->drained)); ++ ++ TAILQ_REMOVE(&db_rep->connections, conn, entries); + /* + * Deallocate any input and output buffers we may have. + */ +*************** +*** 614,620 **** +--- 661,669 ---- + __os_free(dbenv, out); + } + ++ ret = __repmgr_free_cond(&conn->drained); + __os_free(dbenv, conn); ++ return (ret); + } + + static int +*************** +*** 1063,1069 **** + + while (!TAILQ_EMPTY(&db_rep->connections)) { + conn = TAILQ_FIRST(&db_rep->connections); +! __repmgr_cleanup_connection(dbenv, conn); + } + + for (i = 0; i < db_rep->site_cnt; i++) { +--- 1112,1118 ---- + + while (!TAILQ_EMPTY(&db_rep->connections)) { + conn = TAILQ_FIRST(&db_rep->connections); +! (void)__repmgr_cleanup_connection(dbenv, conn); + } + + for (i = 0; i < db_rep->site_cnt; i++) { +*** repmgr/repmgr_posix.c 2007-10-31 10:23:52.000000000 -0700 +--- repmgr/repmgr_posix.c 2007-10-31 10:23:53.000000000 -0700 +*************** +*** 21,26 **** +--- 21,28 ---- + size_t __repmgr_guesstimated_max = (128 * 1024); + #endif + ++ static int __repmgr_conn_work __P((DB_ENV *, ++ REPMGR_CONNECTION *, fd_set *, fd_set *, int)); + static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *)); + + /* +*************** +*** 189,194 **** +--- 191,284 ---- + } + + /* ++ * PUBLIC: int __repmgr_await_drain __P((DB_ENV *, ++ * PUBLIC: REPMGR_CONNECTION *, db_timeout_t)); ++ * ++ * Waits for space to become available on the connection's output queue. ++ * Various ways we can exit: ++ * ++ * 1. queue becomes non-full ++ * 2. exceed time limit ++ * 3. connection becomes defunct (due to error in another thread) ++ * 4. repmgr is shutting down ++ * 5. any unexpected system resource failure ++ * ++ * In cases #3 and #5 we return an error code. Caller is responsible for ++ * distinguishing the remaining cases if desired. ++ * ++ * !!! ++ * Caller must hold repmgr->mutex. ++ */ ++ int ++ __repmgr_await_drain(dbenv, conn, timeout) ++ DB_ENV *dbenv; ++ REPMGR_CONNECTION *conn; ++ db_timeout_t timeout; ++ { ++ DB_REP *db_rep; ++ struct timespec deadline; ++ int ret; ++ ++ db_rep = dbenv->rep_handle; ++ ++ __repmgr_compute_wait_deadline(dbenv, &deadline, timeout); ++ ++ ret = 0; ++ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { ++ ret = pthread_cond_timedwait(&conn->drained, ++ &db_rep->mutex, &deadline); ++ switch (ret) { ++ case 0: ++ if (db_rep->finished) ++ goto out; /* #4. */ ++ /* ++ * Another thread could have stumbled into an error on ++ * the socket while we were waiting. ++ */ ++ if (F_ISSET(conn, CONN_DEFUNCT)) { ++ ret = DB_REP_UNAVAIL; /* #3. */ ++ goto out; ++ } ++ break; ++ case ETIMEDOUT: ++ F_SET(conn, CONN_CONGESTED); ++ ret = 0; ++ goto out; /* #2. */ ++ default: ++ goto out; /* #5. */ ++ } ++ } ++ /* #1. */ ++ ++ out: ++ return (ret); ++ } ++ ++ /* ++ * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *)); ++ * ++ * Initialize a condition variable (in allocated space). ++ */ ++ int ++ __repmgr_alloc_cond(c) ++ cond_var_t *c; ++ { ++ return (pthread_cond_init(c, NULL)); ++ } ++ ++ /* ++ * PUBLIC: int __repmgr_free_cond __P((cond_var_t *)); ++ * ++ * Clean up a previously initialized condition variable. ++ */ ++ int ++ __repmgr_free_cond(c) ++ cond_var_t *c; ++ { ++ return (pthread_cond_destroy(c)); ++ } ++ ++ /* + * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); + * + * Allocate/initialize all data necessary for thread synchronization. This +*************** +*** 443,449 **** + REPMGR_RETRY *retry; + db_timespec timeout; + fd_set reads, writes; +! int ret, flow_control, maxfd, nready; + u_int8_t buf[10]; /* arbitrary size */ + + flow_control = FALSE; +--- 533,539 ---- + REPMGR_RETRY *retry; + db_timespec timeout; + fd_set reads, writes; +! int ret, flow_control, maxfd; + u_int8_t buf[10]; /* arbitrary size */ + + flow_control = FALSE; +*************** +*** 477,482 **** +--- 567,575 ---- + * each one. + */ + TAILQ_FOREACH(conn, &db_rep->connections, entries) { ++ if (F_ISSET(conn, CONN_DEFUNCT)) ++ continue; ++ + if (F_ISSET(conn, CONN_CONNECTING)) { + FD_SET((u_int)conn->fd, &reads); + FD_SET((u_int)conn->fd, &writes); +*************** +*** 533,616 **** + return (ret); + } + } +- nready = ret; +- + LOCK_MUTEX(db_rep->mutex); + +- /* +- * The first priority thing we must do is to clean up any +- * pending defunct connections. Otherwise, if they have any +- * lingering pending input, we get very confused if we try to +- * process it. +- * +- * The TAILQ_FOREACH macro would be suitable here, except that +- * it doesn't allow unlinking the current element, which is +- * needed for cleanup_connection. +- */ +- for (conn = TAILQ_FIRST(&db_rep->connections); +- conn != NULL; +- conn = next) { +- next = TAILQ_NEXT(conn, entries); +- if (F_ISSET(conn, CONN_DEFUNCT)) +- __repmgr_cleanup_connection(dbenv, conn); +- } +- + if ((ret = __repmgr_retry_connections(dbenv)) != 0) + goto out; +- if (nready == 0) +- continue; + + /* +! * Traverse the linked list. (Again, like TAILQ_FOREACH, except +! * that we need the ability to unlink an element along the way.) + */ + for (conn = TAILQ_FIRST(&db_rep->connections); + conn != NULL; + conn = next) { + next = TAILQ_NEXT(conn, entries); +! if (F_ISSET(conn, CONN_CONNECTING)) { +! if (FD_ISSET((u_int)conn->fd, &reads) || +! FD_ISSET((u_int)conn->fd, &writes)) { +! if ((ret = finish_connecting(dbenv, +! conn)) == DB_REP_UNAVAIL) { +! if ((ret = +! __repmgr_bust_connection( +! dbenv, conn, TRUE)) != 0) +! goto out; +! } else if (ret != 0) +! goto out; +! } +! continue; +! } +! +! /* +! * Here, the site is connected, and the FD_SET's are +! * valid. +! */ +! if (FD_ISSET((u_int)conn->fd, &writes)) { +! if ((ret = __repmgr_write_some( +! dbenv, conn)) == DB_REP_UNAVAIL) { +! if ((ret = +! __repmgr_bust_connection(dbenv, +! conn, TRUE)) != 0) +! goto out; +! continue; +! } else if (ret != 0) +! goto out; +! } +! +! if (!flow_control && +! FD_ISSET((u_int)conn->fd, &reads)) { +! if ((ret = __repmgr_read_from_site(dbenv, conn)) +! == DB_REP_UNAVAIL) { +! if ((ret = +! __repmgr_bust_connection(dbenv, +! conn, TRUE)) != 0) +! goto out; +! continue; +! } else if (ret != 0) +! goto out; +! } + } + + /* +--- 626,650 ---- + return (ret); + } + } + LOCK_MUTEX(db_rep->mutex); + + if ((ret = __repmgr_retry_connections(dbenv)) != 0) + goto out; + + /* +! * Examine each connection, to see what work needs to be done. +! * +! * The TAILQ_FOREACH macro would be suitable here, except that +! * it doesn't allow unlinking the current element, which is +! * needed for cleanup_connection. + */ + for (conn = TAILQ_FIRST(&db_rep->connections); + conn != NULL; + conn = next) { + next = TAILQ_NEXT(conn, entries); +! if ((ret = __repmgr_conn_work(dbenv, +! conn, &reads, &writes, flow_control)) != 0) +! goto out; + } + + /* +*************** +*** 637,642 **** +--- 671,719 ---- + } + + static int ++ __repmgr_conn_work(dbenv, conn, reads, writes, flow_control) ++ DB_ENV *dbenv; ++ REPMGR_CONNECTION *conn; ++ fd_set *reads, *writes; ++ int flow_control; ++ { ++ int ret; ++ u_int fd; ++ ++ if (F_ISSET(conn, CONN_DEFUNCT)) { ++ /* ++ * Deferred clean-up, from an error that happened in another ++ * thread, while we were sleeping in select(). ++ */ ++ return (__repmgr_cleanup_connection(dbenv, conn)); ++ } ++ ++ ret = 0; ++ fd = (u_int)conn->fd; ++ ++ if (F_ISSET(conn, CONN_CONNECTING)) { ++ if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes)) ++ ret = finish_connecting(dbenv, conn); ++ } else { ++ /* ++ * Here, the site is connected, and the FD_SET's are valid. ++ */ ++ if (FD_ISSET(fd, writes)) ++ ret = __repmgr_write_some(dbenv, conn); ++ ++ if (ret == 0 && !flow_control && FD_ISSET(fd, reads)) ++ ret = __repmgr_read_from_site(dbenv, conn); ++ } ++ ++ if (ret == DB_REP_UNAVAIL) { ++ if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) ++ return (ret); ++ ret = __repmgr_cleanup_connection(dbenv, conn); ++ } ++ return (ret); ++ } ++ ++ static int + finish_connecting(dbenv, conn) + DB_ENV *dbenv; + REPMGR_CONNECTION *conn; +*************** +*** 657,662 **** +--- 734,740 ---- + goto err_rpt; + } + ++ DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING)); + F_CLR(conn, CONN_CONNECTING); + return (__repmgr_send_handshake(dbenv, conn)); + +*************** +*** 671,690 **** + "connecting to %s", __repmgr_format_site_loc(site, buffer)); + + /* If we've exhausted the list of possible addresses, give up. */ +! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) + return (DB_REP_UNAVAIL); + + /* + * This is just like a little mini-"bust_connection", except that we + * don't reschedule for later, 'cuz we're just about to try again right +! * now. + * + * !!! + * Which means this must only be called on the select() thread, since + * only there are we allowed to actually close a connection. + */ + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); +! __repmgr_cleanup_connection(dbenv, conn); + ret = __repmgr_connect_site(dbenv, eid); + DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); + return (ret); +--- 749,773 ---- + "connecting to %s", __repmgr_format_site_loc(site, buffer)); + + /* If we've exhausted the list of possible addresses, give up. */ +! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) { +! STAT(db_rep->region->mstat.st_connect_fail++); + return (DB_REP_UNAVAIL); ++ } + + /* + * This is just like a little mini-"bust_connection", except that we + * don't reschedule for later, 'cuz we're just about to try again right +! * now. (Note that we don't have to worry about message threads +! * blocking on a full output queue: that can't happen when we're only +! * just connecting.) + * + * !!! + * Which means this must only be called on the select() thread, since + * only there are we allowed to actually close a connection. + */ + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); +! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0) +! return (ret); + ret = __repmgr_connect_site(dbenv, eid); + DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); + return (ret); +*** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700 +--- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700 +*************** +*** 36,45 **** + + /* + * PUBLIC: int __repmgr_accept __P((DB_ENV *)); +- * +- * !!! +- * Only ever called in the select() thread, since we may call +- * __repmgr_bust_connection(..., TRUE). + */ + int + __repmgr_accept(dbenv) +--- 36,41 ---- +*************** +*** 133,139 **** + case 0: + return (0); + case DB_REP_UNAVAIL: +! return (__repmgr_bust_connection(dbenv, conn, TRUE)); + default: + return (ret); + } +--- 129,135 ---- + case 0: + return (0); + case DB_REP_UNAVAIL: +! return (__repmgr_bust_connection(dbenv, conn)); + default: + return (ret); + } +*************** +*** 254,263 **** + * starting with the "current" element of its address list and trying as many + * addresses as necessary until the list is exhausted. + * +- * !!! +- * Only ever called in the select() thread, since we may call +- * __repmgr_bust_connection(..., TRUE). +- * + * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid)); + */ + int +--- 250,255 ---- +*************** +*** 332,338 **** + case 0: + break; + case DB_REP_UNAVAIL: +! return (__repmgr_bust_connection(dbenv, con, TRUE)); + default: + return (ret); + } +--- 324,330 ---- + case 0: + break; + case DB_REP_UNAVAIL: +! return (__repmgr_bust_connection(dbenv, con)); + default: + return (ret); + } +*************** +*** 437,443 **** + + DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); + +! return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec)); + } + + /* +--- 429,443 ---- + + DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); + +! /* +! * It would of course be disastrous to block the select() thread, so +! * pass the "blockable" argument as FALSE. Fortunately blocking should +! * never be necessary here, because the hand-shake is always the first +! * thing we send. Which is a good thing, because it would be almost as +! * disastrous if we allowed ourselves to drop a handshake. +! */ +! return (__repmgr_send_one(dbenv, +! conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE)); + } + + /* +*************** +*** 854,859 **** +--- 854,872 ---- + conn->out_queue_length--; + if (--msg->ref_count <= 0) + __os_free(dbenv, msg); ++ ++ /* ++ * We've achieved enough movement to free up at least ++ * one space in the outgoing queue. Wake any message ++ * threads that may be waiting for space. Clear the ++ * CONGESTED status so that when the queue reaches the ++ * high-water mark again, the filling thread will be ++ * allowed to try waiting again. ++ */ ++ F_CLR(conn, CONN_CONGESTED); ++ if (conn->blockers > 0 && ++ (ret = __repmgr_signal(&conn->drained)) != 0) ++ return (ret); + } + } + +*** repmgr/repmgr_util.c 2007-10-31 10:23:52.000000000 -0700 +--- repmgr/repmgr_util.c 2007-10-31 10:23:53.000000000 -0700 +*************** +*** 103,108 **** +--- 103,113 ---- + db_rep = dbenv->rep_handle; + if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0) + return (ret); ++ if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) { ++ __os_free(dbenv, c); ++ return (ret); ++ } ++ c->blockers = 0; + + c->fd = s; + c->flags = flags; +*** repmgr/repmgr_windows.c 2007-10-31 10:23:52.000000000 -0700 +--- repmgr/repmgr_windows.c 2007-10-31 10:23:53.000000000 -0700 +*************** +*** 11,16 **** +--- 11,19 ---- + #define __INCLUDE_NETWORKING 1 + #include "db_int.h" + ++ /* Convert time-out from microseconds to milliseconds, rounding up. */ ++ #define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS) ++ + typedef struct __ack_waiter { + HANDLE event; + const DB_LSN *lsnp; +*************** +*** 120,136 **** + { + DB_REP *db_rep; + ACK_WAITER *me; +! DWORD ret; +! DWORD timeout; + + db_rep = dbenv->rep_handle; + + if ((ret = allocate_wait_slot(dbenv, &me)) != 0) + goto err; + +- /* convert time-out from microseconds to milliseconds, rounding up */ + timeout = db_rep->ack_timeout > 0 ? +! ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE; + me->lsnp = lsnp; + if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, + FALSE)) == WAIT_FAILED) { +--- 123,137 ---- + { + DB_REP *db_rep; + ACK_WAITER *me; +! DWORD ret, timeout; + + db_rep = dbenv->rep_handle; + + if ((ret = allocate_wait_slot(dbenv, &me)) != 0) + goto err; + + timeout = db_rep->ack_timeout > 0 ? +! DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE; + me->lsnp = lsnp; + if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, + FALSE)) == WAIT_FAILED) { +*************** +*** 211,216 **** +--- 212,296 ---- + db_rep->waiters->first_free = slot; + } + ++ /* (See requirements described in repmgr_posix.c.) */ ++ int ++ __repmgr_await_drain(dbenv, conn, timeout) ++ DB_ENV *dbenv; ++ REPMGR_CONNECTION *conn; ++ db_timeout_t timeout; ++ { ++ DB_REP *db_rep; ++ db_timespec deadline, delta, now; ++ db_timeout_t t; ++ DWORD duration, ret; ++ int round_up; ++ ++ db_rep = dbenv->rep_handle; ++ ++ __os_gettime(dbenv, &deadline); ++ DB_TIMEOUT_TO_TIMESPEC(timeout, &delta); ++ timespecadd(&deadline, &delta); ++ ++ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { ++ if (!ResetEvent(conn->drained)) ++ return (GetLastError()); ++ ++ /* How long until the deadline? */ ++ __os_gettime(dbenv, &now); ++ if (timespeccmp(&now, &deadline, >=)) { ++ F_SET(conn, CONN_CONGESTED); ++ return (0); ++ } ++ delta = deadline; ++ timespecsub(&delta, &now); ++ round_up = TRUE; ++ DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up); ++ duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t); ++ ++ ret = SignalObjectAndWait(db_rep->mutex, ++ conn->drained, duration, FALSE); ++ LOCK_MUTEX(db_rep->mutex); ++ if (ret == WAIT_FAILED) ++ return (GetLastError()); ++ else if (ret == WAIT_TIMEOUT) { ++ F_SET(conn, CONN_CONGESTED); ++ return (0); ++ } else ++ DB_ASSERT(dbenv, ret == WAIT_OBJECT_0); ++ ++ if (db_rep->finished) ++ return (0); ++ if (F_ISSET(conn, CONN_DEFUNCT)) ++ return (DB_REP_UNAVAIL); ++ } ++ return (0); ++ } ++ ++ /* ++ * Creates a manual reset event, which is usually our best choice when we may ++ * have multiple threads waiting on a single event. ++ */ ++ int ++ __repmgr_alloc_cond(c) ++ cond_var_t *c; ++ { ++ HANDLE event; ++ ++ if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) ++ return (GetLastError()); ++ *c = event; ++ return (0); ++ } ++ ++ int ++ __repmgr_free_cond(c) ++ cond_var_t *c; ++ { ++ if (CloseHandle(*c)) ++ return (0); ++ return (GetLastError()); ++ } ++ + /* + * Make resource allocation an all-or-nothing affair, outside of this and the + * close_sync function. db_rep->waiters should be non-NULL iff all of these +*************** +*** 488,493 **** +--- 568,576 ---- + * don't hurt anything flow-control-wise. + */ + TAILQ_FOREACH(conn, &db_rep->connections, entries) { ++ if (F_ISSET(conn, CONN_DEFUNCT)) ++ continue; ++ + if (F_ISSET(conn, CONN_CONNECTING) || + !STAILQ_EMPTY(&conn->outbound_queue) || + (!flow_control || !IS_VALID_EID(conn->eid))) { +*************** +*** 534,541 **** + conn != NULL; + conn = next) { + next = TAILQ_NEXT(conn, entries); +! if (F_ISSET(conn, CONN_DEFUNCT)) +! __repmgr_cleanup_connection(dbenv, conn); + } + + /* +--- 617,626 ---- + conn != NULL; + conn = next) { + next = TAILQ_NEXT(conn, entries); +! if (F_ISSET(conn, CONN_DEFUNCT) && +! (ret = __repmgr_cleanup_connection(dbenv, +! conn)) != 0) +! goto unlock; + } + + /* +*************** +*** 587,597 **** + return (ret); + } + +- /* +- * !!! +- * Only ever called on the select() thread, since we may call +- * __repmgr_bust_connection(..., TRUE). +- */ + static int + handle_completion(dbenv, conn) + DB_ENV *dbenv; +--- 672,677 ---- +*************** +*** 651,660 **** + } + } + +! return (0); +! +! err: if (ret == DB_REP_UNAVAIL) +! return (__repmgr_bust_connection(dbenv, conn, TRUE)); + return (ret); + } + +--- 731,742 ---- + } + } + +! err: +! if (ret == DB_REP_UNAVAIL) { +! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) +! return (ret); +! ret = __repmgr_cleanup_connection(dbenv, conn); +! } + return (ret); + } + +*************** +*** 708,714 **** + } + + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); +! __repmgr_cleanup_connection(dbenv, conn); + ret = __repmgr_connect_site(dbenv, eid); + DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); + return (ret); +--- 790,797 ---- + } + + DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); +! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0) +! return (ret); + ret = __repmgr_connect_site(dbenv, eid); + DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); + return (ret); |