Branch '389-ds-base-1.2.11' - ldap/servers
by Mark Reynolds
ldap/servers/slapd/back-ldbm/ldbm_delete.c | 7 +++++++
1 file changed, 7 insertions(+)
New commits:
commit 5ef3183a434e694c120536c073a6f9f6e4cfa898
Author: Mark Reynolds <mreynolds(a)redhat.com>
Date: Tue Dec 16 10:37:02 2014 -0500
Ticket 47750 - Need to refresh cache entry after called betxn postop plugins
Bug Description: If deleting an entry triggers multiple betxn plugins, the
entry might not be removed the cache - which prevents that
same entry(same dn) from being re-added(error 68). For
example, the RI and memberOf plugins are enabled. Then we
add a user to a group. This adds the memberOf attribute
to the entry. We then delete that user, which triggers the
RI plugin, which then triggers the memberOf plugin. So the
entry that is deleted gets modified in bvetxn postop, and
has its its cache entry replaced. This then confuses the
cache logic at the end of the delete operation, and the entry
is not removed from the cache.
Fix Description: Refresh the cache entry after calling the betxn postop plugins.
If the entry has changed, return the old old entry and proceed
with the new one.
https://fedorahosted.org/389/ticket/47750
Reviewed by: nhosoi & rmeggins (Thanks!!)
(cherry picked from commit 4a5eee63f45ed290375440827c92af9b2347a177)
diff --git a/ldap/servers/slapd/back-ldbm/ldbm_delete.c b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
index a9530ec..87cc57e 100644
--- a/ldap/servers/slapd/back-ldbm/ldbm_delete.c
+++ b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
@@ -1116,6 +1116,13 @@ ldbm_back_delete( Slapi_PBlock *pb )
/* delete from cache and clean up */
if (e) {
+ struct backentry *old_e = e;
+
+ e = cache_find_id(&inst->inst_cache,e->ep_id);
+ if(e != old_e){
+ /* return the old entry, and proceed with the new one */
+ CACHE_RETURN(&inst->inst_cache, &old_e);
+ }
if (cache_is_in_cache(&inst->inst_cache, e)) {
ep_id = e->ep_id;
CACHE_REMOVE(&inst->inst_cache, e);
9 years, 3 months
Branch '389-ds-base-1.3.1' - ldap/servers
by Mark Reynolds
ldap/servers/slapd/back-ldbm/ldbm_delete.c | 7 +++++++
1 file changed, 7 insertions(+)
New commits:
commit 153e574ceeefec5af05a529b550c69441994ba2a
Author: Mark Reynolds <mreynolds(a)redhat.com>
Date: Tue Dec 16 10:37:02 2014 -0500
Ticket 47750 - Need to refresh cache entry after called betxn postop plugins
Bug Description: If deleting an entry triggers multiple betxn plugins, the
entry might not be removed the cache - which prevents that
same entry(same dn) from being re-added(error 68). For
example, the RI and memberOf plugins are enabled. Then we
add a user to a group. This adds the memberOf attribute
to the entry. We then delete that user, which triggers the
RI plugin, which then triggers the memberOf plugin. So the
entry that is deleted gets modified in bvetxn postop, and
has its its cache entry replaced. This then confuses the
cache logic at the end of the delete operation, and the entry
is not removed from the cache.
Fix Description: Refresh the cache entry after calling the betxn postop plugins.
If the entry has changed, return the old old entry and proceed
with the new one.
https://fedorahosted.org/389/ticket/47750
Reviewed by: nhosoi & rmeggins (Thanks!!)
(cherry picked from commit 4a5eee63f45ed290375440827c92af9b2347a177)
diff --git a/ldap/servers/slapd/back-ldbm/ldbm_delete.c b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
index c06ca7d..52f84de 100644
--- a/ldap/servers/slapd/back-ldbm/ldbm_delete.c
+++ b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
@@ -1209,6 +1209,13 @@ ldbm_back_delete( Slapi_PBlock *pb )
/* delete from cache and clean up */
if (e) {
+ struct backentry *old_e = e;
+
+ e = cache_find_id(&inst->inst_cache,e->ep_id);
+ if(e != old_e){
+ /* return the old entry, and proceed with the new one */
+ CACHE_RETURN(&inst->inst_cache, &old_e);
+ }
if (cache_is_in_cache(&inst->inst_cache, e)) {
ep_id = e->ep_id; /* Otherwise, e might have been freed. */
CACHE_REMOVE(&inst->inst_cache, e);
9 years, 3 months
Branch '389-ds-base-1.3.2' - ldap/servers
by Mark Reynolds
ldap/servers/slapd/back-ldbm/ldbm_delete.c | 7 +++++++
1 file changed, 7 insertions(+)
New commits:
commit 9295966f1f7e7235f13215b756398915df54fa6a
Author: Mark Reynolds <mreynolds(a)redhat.com>
Date: Tue Dec 16 10:37:02 2014 -0500
Ticket 47750 - Need to refresh cache entry after called betxn postop plugins
Bug Description: If deleting an entry triggers multiple betxn plugins, the
entry might not be removed the cache - which prevents that
same entry(same dn) from being re-added(error 68). For
example, the RI and memberOf plugins are enabled. Then we
add a user to a group. This adds the memberOf attribute
to the entry. We then delete that user, which triggers the
RI plugin, which then triggers the memberOf plugin. So the
entry that is deleted gets modified in bvetxn postop, and
has its its cache entry replaced. This then confuses the
cache logic at the end of the delete operation, and the entry
is not removed from the cache.
Fix Description: Refresh the cache entry after calling the betxn postop plugins.
If the entry has changed, return the old old entry and proceed
with the new one.
https://fedorahosted.org/389/ticket/47750
Reviewed by: nhosoi & rmeggins (Thanks!!)
(cherry picked from commit 4a5eee63f45ed290375440827c92af9b2347a177)
diff --git a/ldap/servers/slapd/back-ldbm/ldbm_delete.c b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
index ba2af26..89710d4 100644
--- a/ldap/servers/slapd/back-ldbm/ldbm_delete.c
+++ b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
@@ -1199,6 +1199,13 @@ ldbm_back_delete( Slapi_PBlock *pb )
/* delete from cache and clean up */
if (e) {
+ struct backentry *old_e = e;
+
+ e = cache_find_id(&inst->inst_cache,e->ep_id);
+ if(e != old_e){
+ /* return the old entry, and proceed with the new one */
+ CACHE_RETURN(&inst->inst_cache, &old_e);
+ }
if (cache_is_in_cache(&inst->inst_cache, e)) {
ep_id = e->ep_id; /* Otherwise, e might have been freed. */
CACHE_REMOVE(&inst->inst_cache, e);
9 years, 3 months
Branch '389-ds-base-1.3.3' - ldap/servers
by Mark Reynolds
ldap/servers/slapd/back-ldbm/ldbm_delete.c | 7 +++++++
1 file changed, 7 insertions(+)
New commits:
commit 6164553b681692f35e9a62c243eed7bfa352a6c4
Author: Mark Reynolds <mreynolds(a)redhat.com>
Date: Tue Dec 16 10:37:02 2014 -0500
Ticket 47750 - Need to refresh cache entry after called betxn postop plugins
Bug Description: If deleting an entry triggers multiple betxn plugins, the
entry might not be removed the cache - which prevents that
same entry(same dn) from being re-added(error 68). For
example, the RI and memberOf plugins are enabled. Then we
add a user to a group. This adds the memberOf attribute
to the entry. We then delete that user, which triggers the
RI plugin, which then triggers the memberOf plugin. So the
entry that is deleted gets modified in bvetxn postop, and
has its its cache entry replaced. This then confuses the
cache logic at the end of the delete operation, and the entry
is not removed from the cache.
Fix Description: Refresh the cache entry after calling the betxn postop plugins.
If the entry has changed, return the old old entry and proceed
with the new one.
https://fedorahosted.org/389/ticket/47750
Reviewed by: nhosoi & rmeggins (Thanks!!)
(cherry picked from commit 4a5eee63f45ed290375440827c92af9b2347a177)
diff --git a/ldap/servers/slapd/back-ldbm/ldbm_delete.c b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
index 9c19f08..eaac39d 100644
--- a/ldap/servers/slapd/back-ldbm/ldbm_delete.c
+++ b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
@@ -1261,6 +1261,13 @@ ldbm_back_delete( Slapi_PBlock *pb )
/* delete from cache and clean up */
if (e) {
+ struct backentry *old_e = e;
+
+ e = cache_find_id(&inst->inst_cache,e->ep_id);
+ if(e != old_e){
+ /* return the old entry, and proceed with the new one */
+ CACHE_RETURN(&inst->inst_cache, &old_e);
+ }
if (cache_is_in_cache(&inst->inst_cache, e)) {
ep_id = e->ep_id; /* Otherwise, e might have been freed. */
CACHE_REMOVE(&inst->inst_cache, e);
9 years, 3 months
ldap/servers
by Mark Reynolds
ldap/servers/slapd/back-ldbm/ldbm_delete.c | 7 +++++++
1 file changed, 7 insertions(+)
New commits:
commit 4a5eee63f45ed290375440827c92af9b2347a177
Author: Mark Reynolds <mreynolds(a)redhat.com>
Date: Tue Dec 16 10:37:02 2014 -0500
Ticket 47750 - Need to refresh cache entry after called betxn postop plugins
Bug Description: If deleting an entry triggers multiple betxn plugins, the
entry might not be removed the cache - which prevents that
same entry(same dn) from being re-added(error 68). For
example, the RI and memberOf plugins are enabled. Then we
add a user to a group. This adds the memberOf attribute
to the entry. We then delete that user, which triggers the
RI plugin, which then triggers the memberOf plugin. So the
entry that is deleted gets modified in bvetxn postop, and
has its its cache entry replaced. This then confuses the
cache logic at the end of the delete operation, and the entry
is not removed from the cache.
Fix Description: Refresh the cache entry after calling the betxn postop plugins.
If the entry has changed, return the old old entry and proceed
with the new one.
https://fedorahosted.org/389/ticket/47750
Reviewed by: nhosoi & rmeggins (Thanks!!)
diff --git a/ldap/servers/slapd/back-ldbm/ldbm_delete.c b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
index 9c19f08..eaac39d 100644
--- a/ldap/servers/slapd/back-ldbm/ldbm_delete.c
+++ b/ldap/servers/slapd/back-ldbm/ldbm_delete.c
@@ -1261,6 +1261,13 @@ ldbm_back_delete( Slapi_PBlock *pb )
/* delete from cache and clean up */
if (e) {
+ struct backentry *old_e = e;
+
+ e = cache_find_id(&inst->inst_cache,e->ep_id);
+ if(e != old_e){
+ /* return the old entry, and proceed with the new one */
+ CACHE_RETURN(&inst->inst_cache, &old_e);
+ }
if (cache_is_in_cache(&inst->inst_cache, e)) {
ep_id = e->ep_id; /* Otherwise, e might have been freed. */
CACHE_REMOVE(&inst->inst_cache, e);
9 years, 3 months
Branch '389-ds-base-1.3.2' - ldap/schema ldap/servers
by thierry bordaz
ldap/schema/01core389.ldif | 4
ldap/servers/plugins/replication/repl5.h | 10 +
ldap/servers/plugins/replication/repl5_agmt.c | 160 +++++++++++++++++
ldap/servers/plugins/replication/repl5_agmtlist.c | 26 ++
ldap/servers/plugins/replication/repl5_connection.c | 163 +++++++++++++++++-
ldap/servers/plugins/replication/repl5_inc_protocol.c | 32 +++
ldap/servers/plugins/replication/repl5_prot_private.h | 2
ldap/servers/plugins/replication/repl5_tot_protocol.c | 53 +++++
ldap/servers/plugins/replication/repl_globals.c | 2
9 files changed, 446 insertions(+), 6 deletions(-)
New commits:
commit 9851929baa628a87ff701f3e7b457c99f51ff9f4
Author: Thierry bordaz (tbordaz) <tbordaz(a)redhat.com>
Date: Mon Dec 15 15:12:35 2014 +0100
Ticket 47942: DS hangs during online total update
Bug Description:
During incremental or total update of a consumer the replica agreement thread may hang.
For total update:
The replica agreement thread that send the entries flowed the consumer that is not
able to process fast enough the entries. So the TCP connection get full and
the RA sender sleep on the connection to be able to write the next entries.
Sleeping on the poll or write the RA.sender holds the connection lock.
It prevents the replica agreement result thread to read the results from the
network. So the consumer is also halted because is can no longer send the results.
For incrementatl update:
During incremental update, all updates are sent by the RA.sender.
If many updates need to be send, the supplier may overflow the consumer
that is very late. This flow of updates can fill the TCP connection
so that the RA.sender hang when writing the next update.
On the hang, it holds the connection lock preventing the RA.reader
to receive the acks. And so the consumer can also hang trying to send the
acks.
Fix Description:
For total update there are two parts of the fix:
To prevent the RA.sender to sleep too long on the poll, the fix (conn_is_available)
splits the RA.timeout into 1s period.
If unable to write for 1s, it releases the connection for a short period of time 100ms.
To prevent the RA.sender to sleep on the write, the fix (check_flow_control_tot_init)
checks how late is the consumer and if it is too late, it pauses (releasing the connection
during that time). This second part of the fix is configurable and it may need to be
tune according to the observed failures.
For incremental update:
The fix is to implement a flow control on the RA.sender.
After each sent update, if the window (update.sent - update.acked) cross the limit
The RA.sender pause during a configured delay.
When the RA.sender pause it does not hold the connection lock
Tuning can be done with nsds5ReplicaFlowControlWindow (how late is the consumer in terms of
number of entries/updates acknowledged) and nsds5ReplicaFlowControlPause (how long the RA.sender will
pause if the consumer is too late)
Logging:
For total update, the first time the flow control pauses, it logs a message (FATAL level).
If flow control happened, then at the end of the total update, it also logs the number
of flow control pauses (FATAL level).
For incremental update, if flow control happened it logs the number of pause (REPL level).
https://fedorahosted.org/389/ticket/47942
Reviewed by: Mark Reynolds, Rich Megginson, Andrey Ivanov, Noriko Hosoi (many many thanks to all of you !)
Platforms tested: RHEL 7.0, Centos
Flag Day: no
Doc impact: no
diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif
index 07a673b..a91efd8 100644
--- a/ldap/schema/01core389.ldif
+++ b/ldap/schema/01core389.ldif
@@ -159,6 +159,8 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2166 NAME 'schemaUpdateObjectclassReject
attributeTypes: ( 2.16.840.1.113730.3.1.2167 NAME 'schemaUpdateAttributeAccept' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 X-ORIGIN 'Netscape Directory Server' )
attributeTypes: ( 2.16.840.1.113730.3.1.2168 NAME 'schemaUpdateAttributeReject' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 X-ORIGIN 'Netscape Directory Server' )
attributeTypes: ( 2.16.840.1.113730.3.1.2307 NAME 'nsslapd-allow-hashed-passwords' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2310 NAME 'nsds5ReplicaFlowControlWindow' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
#
# objectclasses
#
@@ -170,7 +172,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape d
objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' )
-objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout ) X-ORIGIN 'Netscape Directory Server' )
+objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause )
X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.317 NAME 'nsSaslMapping' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSaslMapRegexString $ nsSaslMapBaseDNTemplate $ nsSaslMapFilterTemplate ) MAY ( nsSaslMapPriority ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.43 NAME 'nsSNMP' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSNMPEnabled ) MAY ( nsSNMPOrganization $ nsSNMPLocation $ nsSNMPContact $ nsSNMPDescription $ nsSNMPName $ nsSNMPMasterHost $ nsSNMPMasterPort ) X-ORIGIN 'Netscape Directory Server' )
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index d91dbeb..2fc4e47 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -168,6 +168,8 @@ extern const char *type_nsds5ReplicaBusyWaitTime;
extern const char *type_nsds5ReplicaSessionPauseTime;
extern const char *type_nsds5ReplicaEnabled;
extern const char *type_nsds5ReplicaStripAttrs;
+extern const char *type_nsds5ReplicaFlowControlWindow;
+extern const char *type_nsds5ReplicaFlowControlPause;
extern const char *type_replicaProtocolTimeout;
extern const char *type_replicaBackoffMin;
extern const char *type_replicaBackoffMax;
@@ -324,6 +326,8 @@ int agmt_get_auto_initialize(const Repl_Agmt *ra);
long agmt_get_timeout(const Repl_Agmt *ra);
long agmt_get_busywaittime(const Repl_Agmt *ra);
long agmt_get_pausetime(const Repl_Agmt *ra);
+long agmt_get_flowcontrolwindow(const Repl_Agmt *ra);
+long agmt_get_flowcontrolpause(const Repl_Agmt *ra);
int agmt_start(Repl_Agmt *ra);
int windows_agmt_start(Repl_Agmt *ra);
int agmt_stop(Repl_Agmt *ra);
@@ -344,6 +348,8 @@ int agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name);
int agmt_schedule_in_window_now(const Repl_Agmt *ra);
int agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_timeout_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
+int agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
+int agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
int agmt_set_busywaittime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_pausetime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_credentials_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
@@ -482,6 +488,10 @@ void conn_lock(Repl_Connection *conn);
void conn_unlock(Repl_Connection *conn);
void conn_delete_internal_ext(Repl_Connection *conn);
const char* conn_get_bindmethod(Repl_Connection *conn);
+void conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data);
+void conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data);
+void conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data);
+void conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data);
/* In repl5_protocol.c */
typedef struct repl_protocol Repl_Protocol;
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c
index d5c6439..fb01c8e 100644
--- a/ldap/servers/plugins/replication/repl5_agmt.c
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
@@ -87,6 +87,8 @@
#include "slapi-plugin.h"
#define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */
+#define DEFAULT_FLOWCONTROL_WINDOW 1000 /* #entries sent without acknowledgment */
+#define DEFAULT_FLOWCONTROL_PAUSE 2000 /* msec of pause when #entries sent witout acknowledgment */
#define STATUS_LEN 1024
struct changecounter {
@@ -144,6 +146,12 @@ typedef struct repl5agmt {
int agreement_type;
Slapi_Counter *protocol_timeout;
char *maxcsn; /* agmt max csn */
+ long flowControlWindow; /* This is the maximum number of entries
+ * sent without acknowledgment
+ */
+ long flowControlPause; /* When nb of not acknowledged entries overpass totalUpdateWindow
+ * This is the duration (in msec) that the RA will pause before sending the next entry
+ */
Slapi_RWLock *attr_lock; /* RW lock for all the stripped attrs */
} repl5agmt;
@@ -344,6 +352,28 @@ agmt_new_from_entry(Slapi_Entry *e)
}
}
+ /* flow control update window. */
+ ra->flowControlWindow = DEFAULT_FLOWCONTROL_WINDOW;
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr) == 0)
+ {
+ Slapi_Value *sval;
+ if (slapi_attr_first_value(sattr, &sval) == 0)
+ {
+ ra->flowControlWindow = slapi_value_get_long(sval);
+ }
+ }
+
+ /* flow control update pause. */
+ ra->flowControlPause = DEFAULT_FLOWCONTROL_PAUSE;
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr) == 0)
+ {
+ Slapi_Value *sval;
+ if (slapi_attr_first_value(sattr, &sval) == 0)
+ {
+ ra->flowControlPause = slapi_value_get_long(sval);
+ }
+ }
+
/* DN of entry at root of replicated area */
tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot);
if (NULL != tmpstr)
@@ -991,6 +1021,26 @@ agmt_get_pausetime(const Repl_Agmt *ra)
return return_value;
}
+long
+agmt_get_flowcontrolwindow(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->flowControlWindow;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+long
+agmt_get_flowcontrolpause(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->flowControlPause;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
/*
* Warning - reference to the long name of the agreement is returned.
* The long name of an agreement is the DN of the agreement entry,
@@ -1722,6 +1772,90 @@ agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
return return_value;
}
+/*
+ * Set or reset the windows of entries sent without acknowledgment.
+ * The window is used during update to determine the number of
+ * entries will be send by the replica agreement without acknowledgment from the consumer
+ *
+ * Returns 0 if window set, or -1 if an error occurred.
+ */
+int
+agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->flowControlWindow = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
+/*
+ * Set or reset the pause duration when #entries sent without acknowledgment overpass flow control window
+ *
+ * Returns 0 if pause set, or -1 if an error occurred.
+ */
+int
+agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->flowControlPause = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
int
agmt_set_timeout(Repl_Agmt *ra, long timeout)
{
@@ -1735,6 +1869,32 @@ agmt_set_timeout(Repl_Agmt *ra, long timeout)
return 0;
}
+int
+agmt_set_flowcontrolwindow(Repl_Agmt *ra, long window)
+{
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress){
+ PR_Unlock(ra->lock);
+ return -1;
+ }
+ ra->flowControlWindow = window;
+ PR_Unlock(ra->lock);
+
+ return 0;
+}
+int
+agmt_set_flowcontrolpause(Repl_Agmt *ra, long pause)
+{
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress){
+ PR_Unlock(ra->lock);
+ return -1;
+ }
+ ra->flowControlPause = pause;
+ PR_Unlock(ra->lock);
+
+ return 0;
+}
/*
* Set or reset the busywaittime
diff --git a/ldap/servers/plugins/replication/repl5_agmtlist.c b/ldap/servers/plugins/replication/repl5_agmtlist.c
index dfaef94..69be459 100644
--- a/ldap/servers/plugins/replication/repl5_agmtlist.c
+++ b/ldap/servers/plugins/replication/repl5_agmtlist.c
@@ -330,6 +330,32 @@ agmtlist_modify_callback(Slapi_PBlock *pb, Slapi_Entry *entryBefore, Slapi_Entry
}
}
else if (slapi_attr_types_equivalent(mods[i]->mod_type,
+ type_nsds5ReplicaFlowControlWindow))
+ {
+ /* New replica timeout */
+ if (agmt_set_flowcontrolwindow_from_entry(agmt, e) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: "
+ "failed to update the flow control window for agreement %s\n",
+ agmt_get_long_name(agmt));
+ *returncode = LDAP_OPERATIONS_ERROR;
+ rc = SLAPI_DSE_CALLBACK_ERROR;
+ }
+ }
+ else if (slapi_attr_types_equivalent(mods[i]->mod_type,
+ type_nsds5ReplicaFlowControlPause))
+ {
+ /* New replica timeout */
+ if (agmt_set_flowcontrolpause_from_entry(agmt, e) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: "
+ "failed to update the flow control pause for agreement %s\n",
+ agmt_get_long_name(agmt));
+ *returncode = LDAP_OPERATIONS_ERROR;
+ rc = SLAPI_DSE_CALLBACK_ERROR;
+ }
+ }
+ else if (slapi_attr_types_equivalent(mods[i]->mod_type,
type_nsds5ReplicaBusyWaitTime))
{
/* New replica busywaittime */
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
index c004bfb..2971025 100644
--- a/ldap/servers/plugins/replication/repl5_connection.c
+++ b/ldap/servers/plugins/replication/repl5_connection.c
@@ -52,6 +52,7 @@ replica locked. Seems like right thing to do.
*/
#include "repl5.h"
+#include "repl5_prot_private.h"
#include "slapi-private.h"
#if defined(USE_OPENLDAP)
#include "ldap.h"
@@ -91,6 +92,7 @@ typedef struct repl_connection
struct timeval timeout;
int flag_agmt_changed;
char *plain;
+ void *tot_init_callback; /* Used during total update to do flow control */
} repl_connection;
/* #define DEFAULT_LINGER_TIME (5 * 60) */ /* 5 minutes */
@@ -274,6 +276,32 @@ conn_delete(Repl_Connection *conn)
PR_Unlock(conn->lock);
}
+void
+conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data)
+{
+ conn->tot_init_callback = (void *) cb_data;
+}
+void
+conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data)
+{
+ PR_Lock(conn->lock);
+ conn_set_tot_update_cb_nolock(conn, cb_data);
+ PR_Unlock(conn->lock);
+}
+
+void
+conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data)
+{
+ *cb_data = (void *) conn->tot_init_callback;
+}
+void
+conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data)
+{
+ PR_Lock(conn->lock);
+ conn_get_tot_update_cb_nolock(conn, cb_data);
+ PR_Unlock(conn->lock);
+}
+
/*
* Return the last operation type processed by the connection
* object, and the LDAP error encountered.
@@ -640,6 +668,131 @@ see_if_write_available(Repl_Connection *conn, PRIntervalTime timeout)
}
#endif /* ! USE_OPENLDAP */
+/*
+ * During a total update, this function checks how much entries
+ * have been sent to the consumer without having received their acknowledgment.
+ * Basically it checks how late is the consumer.
+ *
+ * If the consumer is too late, it pause the RA.sender (releasing the lock) to
+ * let the consumer to catch up and RA.reader to receive the acknowledgments.
+ *
+ * Caller must hold conn->lock
+ */
+static void
+check_flow_control_tot_init(Repl_Connection *conn, int optype, const char *extop_oid, int sent_msgid)
+{
+ int rcv_msgid;
+ int once;
+
+ if ((sent_msgid != 0) && (optype == CONN_EXTENDED_OPERATION) && (strcmp(extop_oid, REPL_NSDS50_REPLICATION_ENTRY_REQUEST_OID) == 0)) {
+ /* We are sending entries part of the total update of a consumer
+ * Wait a bit if the consumer needs to catchup from the current sent entries
+ */
+ rcv_msgid = repl5_tot_last_rcv_msgid(conn);
+ if (rcv_msgid == -1) {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: check_flow_control_tot_init no callback data [ msgid sent: %d]\n",
+ agmt_get_long_name(conn->agmt),
+ sent_msgid);
+ } else if (sent_msgid < rcv_msgid) {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: check_flow_control_tot_init invalid message ids [ msgid sent: %d, rcv: %d]\n",
+ agmt_get_long_name(conn->agmt),
+ sent_msgid,
+ rcv_msgid);
+ } else if ((sent_msgid - rcv_msgid) > agmt_get_flowcontrolwindow(conn->agmt)) {
+ int totalUpdatePause;
+
+ totalUpdatePause = agmt_get_flowcontrolpause(conn->agmt);
+ if (totalUpdatePause) {
+ /* The consumer is late. Last sent entry compare to last acknowledged entry
+ * overpass the allowed limit (flowcontrolwindow)
+ * Give some time to the consumer to catch up
+ */
+ once = repl5_tot_flowcontrol_detection(conn, 1);
+ PR_Unlock(conn->lock);
+ if (once == 1) {
+ /* This is the first time we hit total update flow control.
+ * Log it at least once to inform administrator there is
+ * a potential configuration issue here
+ */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Total update flow control gives time (%d msec) to the consumer before sending more entries [ msgid sent: %d, rcv: %d])\n"
+ "If total update fails you can try to increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(conn->agmt),
+ totalUpdatePause,
+ sent_msgid,
+ rcv_msgid,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ DS_Sleep(PR_MillisecondsToInterval(totalUpdatePause));
+ PR_Lock(conn->lock);
+ }
+ }
+ }
+
+}
+/*
+ * Test if the connection is available to do a write.
+ * This function is doing a periodic polling of the connection.
+ * If the polling times out:
+ * - it releases the connection lock (to let other thread ,i.e.
+ * replication result thread, the opportunity to use the connection)
+ * - Sleeps for a short period (100ms)
+ * - acquires the connection lock
+ *
+ * It loops until
+ * - it is available
+ * - exceeds RA complete timeout
+ * - server is shutdown
+ * - connection is disconnected (Disable, stop, delete the RA
+ * 'terminate' the replication protocol and disconnect the connection)
+ *
+ * Return:
+ * - CONN_OPERATION_SUCCESS if the connection is available
+ * - CONN_TIMEOUT if the overall polling/sleeping delay exceeds RA timeout
+ * - CONN_NOT_CONNECTED if the replication connection state is disconnected
+ * - other ConnResult
+ *
+ * Caller must hold conn->Lock. At the exit, conn->lock is held
+ */
+static ConnResult
+conn_is_available(Repl_Connection *conn)
+{
+ time_t poll_timeout_sec = 1; /* Polling for 1sec */
+ time_t yield_delay_msec = 100; /* Delay to wait */
+ time_t start_time = time( NULL );
+ time_t time_now;
+ ConnResult return_value = CONN_OPERATION_SUCCESS;
+
+ while (!slapi_is_shutting_down() && (conn->state != STATE_DISCONNECTED)) {
+ return_value = see_if_write_available(conn, PR_SecondsToInterval(poll_timeout_sec));
+ if (return_value == CONN_TIMEOUT) {
+ /* in case of timeout we return CONN_TIMEOUT only
+ * if the RA.timeout is exceeded
+ */
+ time_now = time(NULL);
+ if (conn->timeout.tv_sec <= (time_now - start_time)) {
+ break;
+ } else {
+ /* Else give connection to others threads */
+ PR_Unlock(conn->lock);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: perform_operation transient timeout. retry)\n",
+ agmt_get_long_name(conn->agmt));
+ DS_Sleep(PR_MillisecondsToInterval(yield_delay_msec));
+ PR_Lock(conn->lock);
+ }
+ } else {
+ break;
+ }
+ }
+ if (conn->state == STATE_DISCONNECTED) {
+ return_value = CONN_NOT_CONNECTED;
+ }
+ return return_value;
+}
/*
* Common code to send an LDAPv3 operation and collect the result.
* Return values:
@@ -683,10 +836,13 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
Slapi_Eq_Context eqctx = repl5_start_debug_timeout(&setlevel);
- return_value = see_if_write_available(
- conn, PR_SecondsToInterval(conn->timeout.tv_sec));
+ return_value = conn_is_available(conn);
if (return_value != CONN_OPERATION_SUCCESS) {
PR_Unlock(conn->lock);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: perform_operation connection is not available (%d)\n",
+ agmt_get_long_name(conn->agmt),
+ return_value);
return return_value;
}
conn->last_operation = optype;
@@ -758,6 +914,9 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
*/
return_value = CONN_NOT_CONNECTED;
}
+
+ check_flow_control_tot_init(conn, optype, extop_oid, msgid);
+
PR_Unlock(conn->lock); /* release the lock */
if (message_id)
{
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index 9f81c04..5cf170c 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -108,6 +108,7 @@ typedef struct result_data
int stop_result_thread; /* Flag used to tell the result thread to exit */
int last_message_id_sent;
int last_message_id_received;
+ int flowcontrol_detection;
int result; /* The UPDATE_TRANSIENT_ERROR etc */
} result_data;
@@ -460,6 +461,23 @@ repl5_inc_destroy_async_result_thread(result_data *rd)
return retval;
}
+/* The interest of this routine is to give time to the consumer
+ * to apply the sent updates and return the acks.
+ * So the caller should not hold the replication connection lock
+ * to let the RA.reader receives the acks.
+ */
+static void
+repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
+{
+ PR_Lock(rd->lock);
+ if ((rd->last_message_id_received <= rd->last_message_id_sent) &&
+ ((rd->last_message_id_sent - rd->last_message_id_received) >= agmt_get_flowcontrolwindow(agmt))) {
+ rd->flowcontrol_detection++;
+ DS_Sleep(PR_MillisecondsToInterval(agmt_get_flowcontrolpause(agmt)));
+ }
+ PR_Unlock(rd->lock);
+}
+
static void
repl5_inc_waitfor_async_results(result_data *rd)
{
@@ -1682,7 +1700,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
{
int finished = 0;
ConnResult replay_crc;
- char csn_str[CSN_STRSIZE];
+ char csn_str[CSN_STRSIZE];
/* Start the results reading thread */
rd = repl5_inc_rd_new(prp);
@@ -1817,6 +1835,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
sop->replica_id = replica_id;
PL_strncpyz(sop->uniqueid, uniqueid, sizeof(sop->uniqueid));
repl5_int_push_operation(rd,sop);
+ repl5_inc_flow_control_results(prp->agmt, rd);
} else {
slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
"%s: Skipping update operation with no message_id (uniqueid %s, CSN %s):\n",
@@ -1905,6 +1924,17 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
}
*num_changes_sent = rd->num_changes_sent;
}
+ PR_Lock(rd->lock);
+ if (rd->flowcontrol_detection) {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: Incremental update flow control triggered %d times\n"
+ "You may increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(prp->agmt),
+ rd->flowcontrol_detection,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ PR_Unlock(rd->lock);
repl5_inc_rd_destroy(&rd);
cl5_operation_parameters_done ( entry.op );
diff --git a/ldap/servers/plugins/replication/repl5_prot_private.h b/ldap/servers/plugins/replication/repl5_prot_private.h
index 586e1eb..1b1c00b 100644
--- a/ldap/servers/plugins/replication/repl5_prot_private.h
+++ b/ldap/servers/plugins/replication/repl5_prot_private.h
@@ -79,6 +79,8 @@ typedef struct private_repl_protocol
extern Private_Repl_Protocol *Repl_5_Inc_Protocol_new();
extern Private_Repl_Protocol *Repl_5_Tot_Protocol_new();
+extern int repl5_tot_last_rcv_msgid(Repl_Connection *conn);
+extern int repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment);
extern Private_Repl_Protocol *Windows_Inc_Protocol_new();
extern Private_Repl_Protocol *Windows_Tot_Protocol_new();
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
index 2db5178..8ed46e8 100644
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
@@ -82,6 +82,7 @@ typedef struct callback_data
int stop_result_thread; /* Flag used to tell the result thread to exit */
int last_message_id_sent;
int last_message_id_received;
+ int flowcontrol_detection;
} callback_data;
/*
@@ -419,13 +420,19 @@ repl5_tot_run(Private_Repl_Protocol *prp)
LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL,
repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
- cb_data.prp = prp;
- cb_data.rc = 0;
+ cb_data.prp = prp;
+ cb_data.rc = 0;
cb_data.num_entries = 0UL;
cb_data.sleep_on_busy = 0UL;
cb_data.last_busy = current_time ();
+ cb_data.flowcontrol_detection = 0;
cb_data.lock = PR_NewLock();
+ /* This allows during perform_operation to check the callback data
+ * especially to do flow contol on delta send msgid / recv msgid
+ */
+ conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
+
/* Before we get started on sending entries to the replica, we need to
* setup things for async propagation:
* 1. Create a thread that will read the LDAP results from the connection.
@@ -495,6 +502,17 @@ repl5_tot_run(Private_Repl_Protocol *prp)
done:
slapi_sdn_free(&area_sdn);
slapi_ch_free_string(&hostname);
+ if (cb_data.flowcontrol_detection > 1)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Total update flow control triggered %d times\n"
+ "You may increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(prp->agmt),
+ cb_data.flowcontrol_detection,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ conn_set_tot_update_cb(prp->conn, NULL);
if (cb_data.lock)
{
PR_DestroyLock(cb_data.lock);
@@ -634,6 +652,37 @@ void get_result (int rc, void *cb_data)
((callback_data*)cb_data)->rc = rc;
}
+/* Call must hold the connection lock */
+int
+repl5_tot_last_rcv_msgid(Repl_Connection *conn)
+{
+ struct callback_data *cb_data;
+
+ conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
+ if (cb_data == NULL) {
+ return -1;
+ } else {
+ return cb_data->last_message_id_received;
+ }
+}
+
+/* Increase the flowcontrol counter
+ * Call must hold the connection lock
+ */
+int
+repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment)
+{
+ struct callback_data *cb_data;
+
+ conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
+ if (cb_data == NULL) {
+ return -1;
+ } else {
+ cb_data->flowcontrol_detection += increment;
+ return cb_data->flowcontrol_detection;
+ }
+}
+
static
int send_entry (Slapi_Entry *e, void *cb_data)
{
diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c
index 007dd28..70301ff 100644
--- a/ldap/servers/plugins/replication/repl_globals.c
+++ b/ldap/servers/plugins/replication/repl_globals.c
@@ -133,6 +133,8 @@ const char *type_nsds5ReplicaBusyWaitTime = "nsds5ReplicaBusyWaitTime";
const char *type_nsds5ReplicaSessionPauseTime = "nsds5ReplicaSessionPauseTime";
const char *type_nsds5ReplicaEnabled = "nsds5ReplicaEnabled";
const char *type_nsds5ReplicaStripAttrs = "nsds5ReplicaStripAttrs";
+const char* type_nsds5ReplicaFlowControlWindow = "nsds5ReplicaFlowControlWindow";
+const char* type_nsds5ReplicaFlowControlPause = "nsds5ReplicaFlowControlPause";
/* windows sync specific attributes */
const char *type_nsds7WindowsReplicaArea = "nsds7WindowsReplicaSubtree";
9 years, 3 months
Branch '389-ds-base-1.3.3' - ldap/schema ldap/servers
by thierry bordaz
ldap/schema/01core389.ldif | 4
ldap/servers/plugins/replication/repl5.h | 10 +
ldap/servers/plugins/replication/repl5_agmt.c | 160 +++++++++++++++++
ldap/servers/plugins/replication/repl5_agmtlist.c | 26 ++
ldap/servers/plugins/replication/repl5_connection.c | 163 +++++++++++++++++-
ldap/servers/plugins/replication/repl5_inc_protocol.c | 32 +++
ldap/servers/plugins/replication/repl5_prot_private.h | 2
ldap/servers/plugins/replication/repl5_tot_protocol.c | 53 +++++
ldap/servers/plugins/replication/repl_globals.c | 2
9 files changed, 446 insertions(+), 6 deletions(-)
New commits:
commit 69df018ceb6f735479d95dfd8a5a6bcfee886486
Author: Thierry bordaz (tbordaz) <tbordaz(a)redhat.com>
Date: Mon Dec 15 15:12:35 2014 +0100
Ticket 47942: DS hangs during online total update
Bug Description:
During incremental or total update of a consumer the replica agreement thread may hang.
For total update:
The replica agreement thread that send the entries flowed the consumer that is not
able to process fast enough the entries. So the TCP connection get full and
the RA sender sleep on the connection to be able to write the next entries.
Sleeping on the poll or write the RA.sender holds the connection lock.
It prevents the replica agreement result thread to read the results from the
network. So the consumer is also halted because is can no longer send the results.
For incrementatl update:
During incremental update, all updates are sent by the RA.sender.
If many updates need to be send, the supplier may overflow the consumer
that is very late. This flow of updates can fill the TCP connection
so that the RA.sender hang when writing the next update.
On the hang, it holds the connection lock preventing the RA.reader
to receive the acks. And so the consumer can also hang trying to send the
acks.
Fix Description:
For total update there are two parts of the fix:
To prevent the RA.sender to sleep too long on the poll, the fix (conn_is_available)
splits the RA.timeout into 1s period.
If unable to write for 1s, it releases the connection for a short period of time 100ms.
To prevent the RA.sender to sleep on the write, the fix (check_flow_control_tot_init)
checks how late is the consumer and if it is too late, it pauses (releasing the connection
during that time). This second part of the fix is configurable and it may need to be
tune according to the observed failures.
For incremental update:
The fix is to implement a flow control on the RA.sender.
After each sent update, if the window (update.sent - update.acked) cross the limit
The RA.sender pause during a configured delay.
When the RA.sender pause it does not hold the connection lock
Tuning can be done with nsds5ReplicaFlowControlWindow (how late is the consumer in terms of
number of entries/updates acknowledged) and nsds5ReplicaFlowControlPause (how long the RA.sender will
pause if the consumer is too late)
Logging:
For total update, the first time the flow control pauses, it logs a message (FATAL level).
If flow control happened, then at the end of the total update, it also logs the number
of flow control pauses (FATAL level).
For incremental update, if flow control happened it logs the number of pause (REPL level).
https://fedorahosted.org/389/ticket/47942
Reviewed by: Mark Reynolds, Rich Megginson, Andrey Ivanov, Noriko Hosoi (many many thanks to all of you !)
Platforms tested: RHEL 7.0, Centos
Flag Day: no
Doc impact: no
diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif
index c7aec70..c59d762 100644
--- a/ldap/schema/01core389.ldif
+++ b/ldap/schema/01core389.ldif
@@ -302,6 +302,8 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2306 NAME 'nsslapd-return-default-opattr
attributeTypes: ( 2.16.840.1.113730.3.1.2307 NAME 'nsslapd-allow-hashed-passwords' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
attributeTypes: ( 2.16.840.1.113730.3.1.2308 NAME 'nstombstonecsn' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
attributeTypes: ( 2.16.840.1.113730.3.1.2309 NAME 'nsds5ReplicaPreciseTombstonePurging' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2310 NAME 'nsds5ReplicaFlowControlWindow' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
#
# objectclasses
#
@@ -313,7 +315,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape d
objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nstombstonecsn $ nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' )
-objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout ) X-ORIGIN 'Netscape Directory Server' )
+objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause )
X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.317 NAME 'nsSaslMapping' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSaslMapRegexString $ nsSaslMapBaseDNTemplate $ nsSaslMapFilterTemplate ) MAY ( nsSaslMapPriority ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.43 NAME 'nsSNMP' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSNMPEnabled ) MAY ( nsSNMPOrganization $ nsSNMPLocation $ nsSNMPContact $ nsSNMPDescription $ nsSNMPName $ nsSNMPMasterHost $ nsSNMPMasterPort ) X-ORIGIN 'Netscape Directory Server' )
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 86c77ce..e2b6209 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -170,6 +170,8 @@ extern const char *type_nsds5ReplicaBusyWaitTime;
extern const char *type_nsds5ReplicaSessionPauseTime;
extern const char *type_nsds5ReplicaEnabled;
extern const char *type_nsds5ReplicaStripAttrs;
+extern const char *type_nsds5ReplicaFlowControlWindow;
+extern const char *type_nsds5ReplicaFlowControlPause;
extern const char *type_replicaProtocolTimeout;
extern const char *type_replicaBackoffMin;
extern const char *type_replicaBackoffMax;
@@ -332,6 +334,8 @@ int agmt_get_auto_initialize(const Repl_Agmt *ra);
long agmt_get_timeout(const Repl_Agmt *ra);
long agmt_get_busywaittime(const Repl_Agmt *ra);
long agmt_get_pausetime(const Repl_Agmt *ra);
+long agmt_get_flowcontrolwindow(const Repl_Agmt *ra);
+long agmt_get_flowcontrolpause(const Repl_Agmt *ra);
int agmt_start(Repl_Agmt *ra);
int windows_agmt_start(Repl_Agmt *ra);
int agmt_stop(Repl_Agmt *ra);
@@ -352,6 +356,8 @@ int agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name);
int agmt_schedule_in_window_now(const Repl_Agmt *ra);
int agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_timeout_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
+int agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
+int agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
int agmt_set_busywaittime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_pausetime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_credentials_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
@@ -490,6 +496,10 @@ void conn_lock(Repl_Connection *conn);
void conn_unlock(Repl_Connection *conn);
void conn_delete_internal_ext(Repl_Connection *conn);
const char* conn_get_bindmethod(Repl_Connection *conn);
+void conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data);
+void conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data);
+void conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data);
+void conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data);
/* In repl5_protocol.c */
typedef struct repl_protocol Repl_Protocol;
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c
index 7c5c37c..91be757 100644
--- a/ldap/servers/plugins/replication/repl5_agmt.c
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
@@ -87,6 +87,8 @@
#include "slapi-plugin.h"
#define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */
+#define DEFAULT_FLOWCONTROL_WINDOW 1000 /* #entries sent without acknowledgment */
+#define DEFAULT_FLOWCONTROL_PAUSE 2000 /* msec of pause when #entries sent witout acknowledgment */
#define STATUS_LEN 1024
struct changecounter {
@@ -145,6 +147,12 @@ typedef struct repl5agmt {
int agreement_type;
Slapi_Counter *protocol_timeout;
char *maxcsn; /* agmt max csn */
+ long flowControlWindow; /* This is the maximum number of entries
+ * sent without acknowledgment
+ */
+ long flowControlPause; /* When nb of not acknowledged entries overpass totalUpdateWindow
+ * This is the duration (in msec) that the RA will pause before sending the next entry
+ */
Slapi_RWLock *attr_lock; /* RW lock for all the stripped attrs */
} repl5agmt;
@@ -345,6 +353,28 @@ agmt_new_from_entry(Slapi_Entry *e)
}
}
+ /* flow control update window. */
+ ra->flowControlWindow = DEFAULT_FLOWCONTROL_WINDOW;
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr) == 0)
+ {
+ Slapi_Value *sval;
+ if (slapi_attr_first_value(sattr, &sval) == 0)
+ {
+ ra->flowControlWindow = slapi_value_get_long(sval);
+ }
+ }
+
+ /* flow control update pause. */
+ ra->flowControlPause = DEFAULT_FLOWCONTROL_PAUSE;
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr) == 0)
+ {
+ Slapi_Value *sval;
+ if (slapi_attr_first_value(sattr, &sval) == 0)
+ {
+ ra->flowControlPause = slapi_value_get_long(sval);
+ }
+ }
+
/* DN of entry at root of replicated area */
tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot);
if (NULL != tmpstr)
@@ -1014,6 +1044,26 @@ agmt_get_pausetime(const Repl_Agmt *ra)
return return_value;
}
+long
+agmt_get_flowcontrolwindow(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->flowControlWindow;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+long
+agmt_get_flowcontrolpause(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->flowControlPause;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
/*
* Warning - reference to the long name of the agreement is returned.
* The long name of an agreement is the DN of the agreement entry,
@@ -1775,6 +1825,90 @@ agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
return return_value;
}
+/*
+ * Set or reset the windows of entries sent without acknowledgment.
+ * The window is used during update to determine the number of
+ * entries will be send by the replica agreement without acknowledgment from the consumer
+ *
+ * Returns 0 if window set, or -1 if an error occurred.
+ */
+int
+agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->flowControlWindow = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
+/*
+ * Set or reset the pause duration when #entries sent without acknowledgment overpass flow control window
+ *
+ * Returns 0 if pause set, or -1 if an error occurred.
+ */
+int
+agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->flowControlPause = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
int
agmt_set_timeout(Repl_Agmt *ra, long timeout)
{
@@ -1788,6 +1922,32 @@ agmt_set_timeout(Repl_Agmt *ra, long timeout)
return 0;
}
+int
+agmt_set_flowcontrolwindow(Repl_Agmt *ra, long window)
+{
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress){
+ PR_Unlock(ra->lock);
+ return -1;
+ }
+ ra->flowControlWindow = window;
+ PR_Unlock(ra->lock);
+
+ return 0;
+}
+int
+agmt_set_flowcontrolpause(Repl_Agmt *ra, long pause)
+{
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress){
+ PR_Unlock(ra->lock);
+ return -1;
+ }
+ ra->flowControlPause = pause;
+ PR_Unlock(ra->lock);
+
+ return 0;
+}
/*
* Set or reset the busywaittime
diff --git a/ldap/servers/plugins/replication/repl5_agmtlist.c b/ldap/servers/plugins/replication/repl5_agmtlist.c
index 90bf7e9..4a1ff5d 100644
--- a/ldap/servers/plugins/replication/repl5_agmtlist.c
+++ b/ldap/servers/plugins/replication/repl5_agmtlist.c
@@ -330,6 +330,32 @@ agmtlist_modify_callback(Slapi_PBlock *pb, Slapi_Entry *entryBefore, Slapi_Entry
}
}
else if (slapi_attr_types_equivalent(mods[i]->mod_type,
+ type_nsds5ReplicaFlowControlWindow))
+ {
+ /* New replica timeout */
+ if (agmt_set_flowcontrolwindow_from_entry(agmt, e) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: "
+ "failed to update the flow control window for agreement %s\n",
+ agmt_get_long_name(agmt));
+ *returncode = LDAP_OPERATIONS_ERROR;
+ rc = SLAPI_DSE_CALLBACK_ERROR;
+ }
+ }
+ else if (slapi_attr_types_equivalent(mods[i]->mod_type,
+ type_nsds5ReplicaFlowControlPause))
+ {
+ /* New replica timeout */
+ if (agmt_set_flowcontrolpause_from_entry(agmt, e) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: "
+ "failed to update the flow control pause for agreement %s\n",
+ agmt_get_long_name(agmt));
+ *returncode = LDAP_OPERATIONS_ERROR;
+ rc = SLAPI_DSE_CALLBACK_ERROR;
+ }
+ }
+ else if (slapi_attr_types_equivalent(mods[i]->mod_type,
type_nsds5ReplicaBusyWaitTime))
{
/* New replica busywaittime */
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
index c004bfb..2971025 100644
--- a/ldap/servers/plugins/replication/repl5_connection.c
+++ b/ldap/servers/plugins/replication/repl5_connection.c
@@ -52,6 +52,7 @@ replica locked. Seems like right thing to do.
*/
#include "repl5.h"
+#include "repl5_prot_private.h"
#include "slapi-private.h"
#if defined(USE_OPENLDAP)
#include "ldap.h"
@@ -91,6 +92,7 @@ typedef struct repl_connection
struct timeval timeout;
int flag_agmt_changed;
char *plain;
+ void *tot_init_callback; /* Used during total update to do flow control */
} repl_connection;
/* #define DEFAULT_LINGER_TIME (5 * 60) */ /* 5 minutes */
@@ -274,6 +276,32 @@ conn_delete(Repl_Connection *conn)
PR_Unlock(conn->lock);
}
+void
+conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data)
+{
+ conn->tot_init_callback = (void *) cb_data;
+}
+void
+conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data)
+{
+ PR_Lock(conn->lock);
+ conn_set_tot_update_cb_nolock(conn, cb_data);
+ PR_Unlock(conn->lock);
+}
+
+void
+conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data)
+{
+ *cb_data = (void *) conn->tot_init_callback;
+}
+void
+conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data)
+{
+ PR_Lock(conn->lock);
+ conn_get_tot_update_cb_nolock(conn, cb_data);
+ PR_Unlock(conn->lock);
+}
+
/*
* Return the last operation type processed by the connection
* object, and the LDAP error encountered.
@@ -640,6 +668,131 @@ see_if_write_available(Repl_Connection *conn, PRIntervalTime timeout)
}
#endif /* ! USE_OPENLDAP */
+/*
+ * During a total update, this function checks how much entries
+ * have been sent to the consumer without having received their acknowledgment.
+ * Basically it checks how late is the consumer.
+ *
+ * If the consumer is too late, it pause the RA.sender (releasing the lock) to
+ * let the consumer to catch up and RA.reader to receive the acknowledgments.
+ *
+ * Caller must hold conn->lock
+ */
+static void
+check_flow_control_tot_init(Repl_Connection *conn, int optype, const char *extop_oid, int sent_msgid)
+{
+ int rcv_msgid;
+ int once;
+
+ if ((sent_msgid != 0) && (optype == CONN_EXTENDED_OPERATION) && (strcmp(extop_oid, REPL_NSDS50_REPLICATION_ENTRY_REQUEST_OID) == 0)) {
+ /* We are sending entries part of the total update of a consumer
+ * Wait a bit if the consumer needs to catchup from the current sent entries
+ */
+ rcv_msgid = repl5_tot_last_rcv_msgid(conn);
+ if (rcv_msgid == -1) {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: check_flow_control_tot_init no callback data [ msgid sent: %d]\n",
+ agmt_get_long_name(conn->agmt),
+ sent_msgid);
+ } else if (sent_msgid < rcv_msgid) {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: check_flow_control_tot_init invalid message ids [ msgid sent: %d, rcv: %d]\n",
+ agmt_get_long_name(conn->agmt),
+ sent_msgid,
+ rcv_msgid);
+ } else if ((sent_msgid - rcv_msgid) > agmt_get_flowcontrolwindow(conn->agmt)) {
+ int totalUpdatePause;
+
+ totalUpdatePause = agmt_get_flowcontrolpause(conn->agmt);
+ if (totalUpdatePause) {
+ /* The consumer is late. Last sent entry compare to last acknowledged entry
+ * overpass the allowed limit (flowcontrolwindow)
+ * Give some time to the consumer to catch up
+ */
+ once = repl5_tot_flowcontrol_detection(conn, 1);
+ PR_Unlock(conn->lock);
+ if (once == 1) {
+ /* This is the first time we hit total update flow control.
+ * Log it at least once to inform administrator there is
+ * a potential configuration issue here
+ */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Total update flow control gives time (%d msec) to the consumer before sending more entries [ msgid sent: %d, rcv: %d])\n"
+ "If total update fails you can try to increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(conn->agmt),
+ totalUpdatePause,
+ sent_msgid,
+ rcv_msgid,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ DS_Sleep(PR_MillisecondsToInterval(totalUpdatePause));
+ PR_Lock(conn->lock);
+ }
+ }
+ }
+
+}
+/*
+ * Test if the connection is available to do a write.
+ * This function is doing a periodic polling of the connection.
+ * If the polling times out:
+ * - it releases the connection lock (to let other thread ,i.e.
+ * replication result thread, the opportunity to use the connection)
+ * - Sleeps for a short period (100ms)
+ * - acquires the connection lock
+ *
+ * It loops until
+ * - it is available
+ * - exceeds RA complete timeout
+ * - server is shutdown
+ * - connection is disconnected (Disable, stop, delete the RA
+ * 'terminate' the replication protocol and disconnect the connection)
+ *
+ * Return:
+ * - CONN_OPERATION_SUCCESS if the connection is available
+ * - CONN_TIMEOUT if the overall polling/sleeping delay exceeds RA timeout
+ * - CONN_NOT_CONNECTED if the replication connection state is disconnected
+ * - other ConnResult
+ *
+ * Caller must hold conn->Lock. At the exit, conn->lock is held
+ */
+static ConnResult
+conn_is_available(Repl_Connection *conn)
+{
+ time_t poll_timeout_sec = 1; /* Polling for 1sec */
+ time_t yield_delay_msec = 100; /* Delay to wait */
+ time_t start_time = time( NULL );
+ time_t time_now;
+ ConnResult return_value = CONN_OPERATION_SUCCESS;
+
+ while (!slapi_is_shutting_down() && (conn->state != STATE_DISCONNECTED)) {
+ return_value = see_if_write_available(conn, PR_SecondsToInterval(poll_timeout_sec));
+ if (return_value == CONN_TIMEOUT) {
+ /* in case of timeout we return CONN_TIMEOUT only
+ * if the RA.timeout is exceeded
+ */
+ time_now = time(NULL);
+ if (conn->timeout.tv_sec <= (time_now - start_time)) {
+ break;
+ } else {
+ /* Else give connection to others threads */
+ PR_Unlock(conn->lock);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: perform_operation transient timeout. retry)\n",
+ agmt_get_long_name(conn->agmt));
+ DS_Sleep(PR_MillisecondsToInterval(yield_delay_msec));
+ PR_Lock(conn->lock);
+ }
+ } else {
+ break;
+ }
+ }
+ if (conn->state == STATE_DISCONNECTED) {
+ return_value = CONN_NOT_CONNECTED;
+ }
+ return return_value;
+}
/*
* Common code to send an LDAPv3 operation and collect the result.
* Return values:
@@ -683,10 +836,13 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
Slapi_Eq_Context eqctx = repl5_start_debug_timeout(&setlevel);
- return_value = see_if_write_available(
- conn, PR_SecondsToInterval(conn->timeout.tv_sec));
+ return_value = conn_is_available(conn);
if (return_value != CONN_OPERATION_SUCCESS) {
PR_Unlock(conn->lock);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: perform_operation connection is not available (%d)\n",
+ agmt_get_long_name(conn->agmt),
+ return_value);
return return_value;
}
conn->last_operation = optype;
@@ -758,6 +914,9 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
*/
return_value = CONN_NOT_CONNECTED;
}
+
+ check_flow_control_tot_init(conn, optype, extop_oid, msgid);
+
PR_Unlock(conn->lock); /* release the lock */
if (message_id)
{
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index 9f81c04..5cf170c 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -108,6 +108,7 @@ typedef struct result_data
int stop_result_thread; /* Flag used to tell the result thread to exit */
int last_message_id_sent;
int last_message_id_received;
+ int flowcontrol_detection;
int result; /* The UPDATE_TRANSIENT_ERROR etc */
} result_data;
@@ -460,6 +461,23 @@ repl5_inc_destroy_async_result_thread(result_data *rd)
return retval;
}
+/* The interest of this routine is to give time to the consumer
+ * to apply the sent updates and return the acks.
+ * So the caller should not hold the replication connection lock
+ * to let the RA.reader receives the acks.
+ */
+static void
+repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
+{
+ PR_Lock(rd->lock);
+ if ((rd->last_message_id_received <= rd->last_message_id_sent) &&
+ ((rd->last_message_id_sent - rd->last_message_id_received) >= agmt_get_flowcontrolwindow(agmt))) {
+ rd->flowcontrol_detection++;
+ DS_Sleep(PR_MillisecondsToInterval(agmt_get_flowcontrolpause(agmt)));
+ }
+ PR_Unlock(rd->lock);
+}
+
static void
repl5_inc_waitfor_async_results(result_data *rd)
{
@@ -1682,7 +1700,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
{
int finished = 0;
ConnResult replay_crc;
- char csn_str[CSN_STRSIZE];
+ char csn_str[CSN_STRSIZE];
/* Start the results reading thread */
rd = repl5_inc_rd_new(prp);
@@ -1817,6 +1835,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
sop->replica_id = replica_id;
PL_strncpyz(sop->uniqueid, uniqueid, sizeof(sop->uniqueid));
repl5_int_push_operation(rd,sop);
+ repl5_inc_flow_control_results(prp->agmt, rd);
} else {
slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
"%s: Skipping update operation with no message_id (uniqueid %s, CSN %s):\n",
@@ -1905,6 +1924,17 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
}
*num_changes_sent = rd->num_changes_sent;
}
+ PR_Lock(rd->lock);
+ if (rd->flowcontrol_detection) {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: Incremental update flow control triggered %d times\n"
+ "You may increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(prp->agmt),
+ rd->flowcontrol_detection,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ PR_Unlock(rd->lock);
repl5_inc_rd_destroy(&rd);
cl5_operation_parameters_done ( entry.op );
diff --git a/ldap/servers/plugins/replication/repl5_prot_private.h b/ldap/servers/plugins/replication/repl5_prot_private.h
index 586e1eb..1b1c00b 100644
--- a/ldap/servers/plugins/replication/repl5_prot_private.h
+++ b/ldap/servers/plugins/replication/repl5_prot_private.h
@@ -79,6 +79,8 @@ typedef struct private_repl_protocol
extern Private_Repl_Protocol *Repl_5_Inc_Protocol_new();
extern Private_Repl_Protocol *Repl_5_Tot_Protocol_new();
+extern int repl5_tot_last_rcv_msgid(Repl_Connection *conn);
+extern int repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment);
extern Private_Repl_Protocol *Windows_Inc_Protocol_new();
extern Private_Repl_Protocol *Windows_Tot_Protocol_new();
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
index d4f0fcc..adadd44 100644
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
@@ -82,6 +82,7 @@ typedef struct callback_data
int stop_result_thread; /* Flag used to tell the result thread to exit */
int last_message_id_sent;
int last_message_id_received;
+ int flowcontrol_detection;
} callback_data;
/*
@@ -428,13 +429,19 @@ repl5_tot_run(Private_Repl_Protocol *prp)
LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL,
repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
- cb_data.prp = prp;
- cb_data.rc = 0;
+ cb_data.prp = prp;
+ cb_data.rc = 0;
cb_data.num_entries = 0UL;
cb_data.sleep_on_busy = 0UL;
cb_data.last_busy = current_time ();
+ cb_data.flowcontrol_detection = 0;
cb_data.lock = PR_NewLock();
+ /* This allows during perform_operation to check the callback data
+ * especially to do flow contol on delta send msgid / recv msgid
+ */
+ conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
+
/* Before we get started on sending entries to the replica, we need to
* setup things for async propagation:
* 1. Create a thread that will read the LDAP results from the connection.
@@ -506,6 +513,17 @@ repl5_tot_run(Private_Repl_Protocol *prp)
done:
slapi_sdn_free(&area_sdn);
slapi_ch_free_string(&hostname);
+ if (cb_data.flowcontrol_detection > 1)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Total update flow control triggered %d times\n"
+ "You may increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(prp->agmt),
+ cb_data.flowcontrol_detection,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ conn_set_tot_update_cb(prp->conn, NULL);
if (cb_data.lock)
{
PR_DestroyLock(cb_data.lock);
@@ -645,6 +663,37 @@ void get_result (int rc, void *cb_data)
((callback_data*)cb_data)->rc = rc;
}
+/* Call must hold the connection lock */
+int
+repl5_tot_last_rcv_msgid(Repl_Connection *conn)
+{
+ struct callback_data *cb_data;
+
+ conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
+ if (cb_data == NULL) {
+ return -1;
+ } else {
+ return cb_data->last_message_id_received;
+ }
+}
+
+/* Increase the flowcontrol counter
+ * Call must hold the connection lock
+ */
+int
+repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment)
+{
+ struct callback_data *cb_data;
+
+ conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
+ if (cb_data == NULL) {
+ return -1;
+ } else {
+ cb_data->flowcontrol_detection += increment;
+ return cb_data->flowcontrol_detection;
+ }
+}
+
static
int send_entry (Slapi_Entry *e, void *cb_data)
{
diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c
index 5609def..e2157fa 100644
--- a/ldap/servers/plugins/replication/repl_globals.c
+++ b/ldap/servers/plugins/replication/repl_globals.c
@@ -139,6 +139,8 @@ const char *type_nsds5ReplicaBusyWaitTime = "nsds5ReplicaBusyWaitTime";
const char *type_nsds5ReplicaSessionPauseTime = "nsds5ReplicaSessionPauseTime";
const char *type_nsds5ReplicaEnabled = "nsds5ReplicaEnabled";
const char *type_nsds5ReplicaStripAttrs = "nsds5ReplicaStripAttrs";
+const char* type_nsds5ReplicaFlowControlWindow = "nsds5ReplicaFlowControlWindow";
+const char* type_nsds5ReplicaFlowControlPause = "nsds5ReplicaFlowControlPause";
/* windows sync specific attributes */
const char *type_nsds7WindowsReplicaArea = "nsds7WindowsReplicaSubtree";
9 years, 3 months
ldap/schema ldap/servers
by thierry bordaz
ldap/schema/01core389.ldif | 4
ldap/servers/plugins/replication/repl5.h | 10 +
ldap/servers/plugins/replication/repl5_agmt.c | 160 +++++++++++++++++
ldap/servers/plugins/replication/repl5_agmtlist.c | 26 ++
ldap/servers/plugins/replication/repl5_connection.c | 163 +++++++++++++++++-
ldap/servers/plugins/replication/repl5_inc_protocol.c | 32 +++
ldap/servers/plugins/replication/repl5_prot_private.h | 2
ldap/servers/plugins/replication/repl5_tot_protocol.c | 53 +++++
ldap/servers/plugins/replication/repl_globals.c | 2
9 files changed, 446 insertions(+), 6 deletions(-)
New commits:
commit fbafee54dc17e0673004d6d26d739ea1b19dd578
Author: Thierry bordaz (tbordaz) <tbordaz(a)redhat.com>
Date: Mon Dec 15 15:12:35 2014 +0100
Ticket 47942: DS hangs during online total update
Bug Description:
During incremental or total update of a consumer the replica agreement thread may hang.
For total update:
The replica agreement thread that send the entries flowed the consumer that is not
able to process fast enough the entries. So the TCP connection get full and
the RA sender sleep on the connection to be able to write the next entries.
Sleeping on the poll or write the RA.sender holds the connection lock.
It prevents the replica agreement result thread to read the results from the
network. So the consumer is also halted because is can no longer send the results.
For incrementatl update:
During incremental update, all updates are sent by the RA.sender.
If many updates need to be send, the supplier may overflow the consumer
that is very late. This flow of updates can fill the TCP connection
so that the RA.sender hang when writing the next update.
On the hang, it holds the connection lock preventing the RA.reader
to receive the acks. And so the consumer can also hang trying to send the
acks.
Fix Description:
For total update there are two parts of the fix:
To prevent the RA.sender to sleep too long on the poll, the fix (conn_is_available)
splits the RA.timeout into 1s period.
If unable to write for 1s, it releases the connection for a short period of time 100ms.
To prevent the RA.sender to sleep on the write, the fix (check_flow_control_tot_init)
checks how late is the consumer and if it is too late, it pauses (releasing the connection
during that time). This second part of the fix is configurable and it may need to be
tune according to the observed failures.
For incremental update:
The fix is to implement a flow control on the RA.sender.
After each sent update, if the window (update.sent - update.acked) cross the limit
The RA.sender pause during a configured delay.
When the RA.sender pause it does not hold the connection lock
Tuning can be done with nsds5ReplicaFlowControlWindow (how late is the consumer in terms of
number of entries/updates acknowledged) and nsds5ReplicaFlowControlPause (how long the RA.sender will
pause if the consumer is too late)
Logging:
For total update, the first time the flow control pauses, it logs a message (FATAL level).
If flow control happened, then at the end of the total update, it also logs the number
of flow control pauses (FATAL level).
For incremental update, if flow control happened it logs the number of pause (REPL level).
https://fedorahosted.org/389/ticket/47942
Reviewed by: Mark Reynolds, Rich Megginson, Andrey Ivanov, Noriko Hosoi (many many thanks to all of you !)
Platforms tested: RHEL 7.0, Centos
Flag Day: no
Doc impact: no
diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif
index c7aec70..c59d762 100644
--- a/ldap/schema/01core389.ldif
+++ b/ldap/schema/01core389.ldif
@@ -302,6 +302,8 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2306 NAME 'nsslapd-return-default-opattr
attributeTypes: ( 2.16.840.1.113730.3.1.2307 NAME 'nsslapd-allow-hashed-passwords' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
attributeTypes: ( 2.16.840.1.113730.3.1.2308 NAME 'nstombstonecsn' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
attributeTypes: ( 2.16.840.1.113730.3.1.2309 NAME 'nsds5ReplicaPreciseTombstonePurging' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2310 NAME 'nsds5ReplicaFlowControlWindow' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
#
# objectclasses
#
@@ -313,7 +315,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape d
objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nstombstonecsn $ nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' )
-objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout ) X-ORIGIN 'Netscape Directory Server' )
+objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause )
X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.317 NAME 'nsSaslMapping' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSaslMapRegexString $ nsSaslMapBaseDNTemplate $ nsSaslMapFilterTemplate ) MAY ( nsSaslMapPriority ) X-ORIGIN 'Netscape Directory Server' )
objectClasses: ( 2.16.840.1.113730.3.2.43 NAME 'nsSNMP' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSNMPEnabled ) MAY ( nsSNMPOrganization $ nsSNMPLocation $ nsSNMPContact $ nsSNMPDescription $ nsSNMPName $ nsSNMPMasterHost $ nsSNMPMasterPort ) X-ORIGIN 'Netscape Directory Server' )
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 86c77ce..e2b6209 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -170,6 +170,8 @@ extern const char *type_nsds5ReplicaBusyWaitTime;
extern const char *type_nsds5ReplicaSessionPauseTime;
extern const char *type_nsds5ReplicaEnabled;
extern const char *type_nsds5ReplicaStripAttrs;
+extern const char *type_nsds5ReplicaFlowControlWindow;
+extern const char *type_nsds5ReplicaFlowControlPause;
extern const char *type_replicaProtocolTimeout;
extern const char *type_replicaBackoffMin;
extern const char *type_replicaBackoffMax;
@@ -332,6 +334,8 @@ int agmt_get_auto_initialize(const Repl_Agmt *ra);
long agmt_get_timeout(const Repl_Agmt *ra);
long agmt_get_busywaittime(const Repl_Agmt *ra);
long agmt_get_pausetime(const Repl_Agmt *ra);
+long agmt_get_flowcontrolwindow(const Repl_Agmt *ra);
+long agmt_get_flowcontrolpause(const Repl_Agmt *ra);
int agmt_start(Repl_Agmt *ra);
int windows_agmt_start(Repl_Agmt *ra);
int agmt_stop(Repl_Agmt *ra);
@@ -352,6 +356,8 @@ int agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name);
int agmt_schedule_in_window_now(const Repl_Agmt *ra);
int agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_timeout_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
+int agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
+int agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
int agmt_set_busywaittime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_pausetime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
int agmt_set_credentials_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
@@ -490,6 +496,10 @@ void conn_lock(Repl_Connection *conn);
void conn_unlock(Repl_Connection *conn);
void conn_delete_internal_ext(Repl_Connection *conn);
const char* conn_get_bindmethod(Repl_Connection *conn);
+void conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data);
+void conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data);
+void conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data);
+void conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data);
/* In repl5_protocol.c */
typedef struct repl_protocol Repl_Protocol;
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c
index 7c5c37c..91be757 100644
--- a/ldap/servers/plugins/replication/repl5_agmt.c
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
@@ -87,6 +87,8 @@
#include "slapi-plugin.h"
#define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */
+#define DEFAULT_FLOWCONTROL_WINDOW 1000 /* #entries sent without acknowledgment */
+#define DEFAULT_FLOWCONTROL_PAUSE 2000 /* msec of pause when #entries sent witout acknowledgment */
#define STATUS_LEN 1024
struct changecounter {
@@ -145,6 +147,12 @@ typedef struct repl5agmt {
int agreement_type;
Slapi_Counter *protocol_timeout;
char *maxcsn; /* agmt max csn */
+ long flowControlWindow; /* This is the maximum number of entries
+ * sent without acknowledgment
+ */
+ long flowControlPause; /* When nb of not acknowledged entries overpass totalUpdateWindow
+ * This is the duration (in msec) that the RA will pause before sending the next entry
+ */
Slapi_RWLock *attr_lock; /* RW lock for all the stripped attrs */
} repl5agmt;
@@ -345,6 +353,28 @@ agmt_new_from_entry(Slapi_Entry *e)
}
}
+ /* flow control update window. */
+ ra->flowControlWindow = DEFAULT_FLOWCONTROL_WINDOW;
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr) == 0)
+ {
+ Slapi_Value *sval;
+ if (slapi_attr_first_value(sattr, &sval) == 0)
+ {
+ ra->flowControlWindow = slapi_value_get_long(sval);
+ }
+ }
+
+ /* flow control update pause. */
+ ra->flowControlPause = DEFAULT_FLOWCONTROL_PAUSE;
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr) == 0)
+ {
+ Slapi_Value *sval;
+ if (slapi_attr_first_value(sattr, &sval) == 0)
+ {
+ ra->flowControlPause = slapi_value_get_long(sval);
+ }
+ }
+
/* DN of entry at root of replicated area */
tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot);
if (NULL != tmpstr)
@@ -1014,6 +1044,26 @@ agmt_get_pausetime(const Repl_Agmt *ra)
return return_value;
}
+long
+agmt_get_flowcontrolwindow(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->flowControlWindow;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+long
+agmt_get_flowcontrolpause(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->flowControlPause;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
/*
* Warning - reference to the long name of the agreement is returned.
* The long name of an agreement is the DN of the agreement entry,
@@ -1775,6 +1825,90 @@ agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
return return_value;
}
+/*
+ * Set or reset the windows of entries sent without acknowledgment.
+ * The window is used during update to determine the number of
+ * entries will be send by the replica agreement without acknowledgment from the consumer
+ *
+ * Returns 0 if window set, or -1 if an error occurred.
+ */
+int
+agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->flowControlWindow = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
+/*
+ * Set or reset the pause duration when #entries sent without acknowledgment overpass flow control window
+ *
+ * Returns 0 if pause set, or -1 if an error occurred.
+ */
+int
+agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->flowControlPause = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
int
agmt_set_timeout(Repl_Agmt *ra, long timeout)
{
@@ -1788,6 +1922,32 @@ agmt_set_timeout(Repl_Agmt *ra, long timeout)
return 0;
}
+int
+agmt_set_flowcontrolwindow(Repl_Agmt *ra, long window)
+{
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress){
+ PR_Unlock(ra->lock);
+ return -1;
+ }
+ ra->flowControlWindow = window;
+ PR_Unlock(ra->lock);
+
+ return 0;
+}
+int
+agmt_set_flowcontrolpause(Repl_Agmt *ra, long pause)
+{
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress){
+ PR_Unlock(ra->lock);
+ return -1;
+ }
+ ra->flowControlPause = pause;
+ PR_Unlock(ra->lock);
+
+ return 0;
+}
/*
* Set or reset the busywaittime
diff --git a/ldap/servers/plugins/replication/repl5_agmtlist.c b/ldap/servers/plugins/replication/repl5_agmtlist.c
index 90bf7e9..4a1ff5d 100644
--- a/ldap/servers/plugins/replication/repl5_agmtlist.c
+++ b/ldap/servers/plugins/replication/repl5_agmtlist.c
@@ -330,6 +330,32 @@ agmtlist_modify_callback(Slapi_PBlock *pb, Slapi_Entry *entryBefore, Slapi_Entry
}
}
else if (slapi_attr_types_equivalent(mods[i]->mod_type,
+ type_nsds5ReplicaFlowControlWindow))
+ {
+ /* New replica timeout */
+ if (agmt_set_flowcontrolwindow_from_entry(agmt, e) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: "
+ "failed to update the flow control window for agreement %s\n",
+ agmt_get_long_name(agmt));
+ *returncode = LDAP_OPERATIONS_ERROR;
+ rc = SLAPI_DSE_CALLBACK_ERROR;
+ }
+ }
+ else if (slapi_attr_types_equivalent(mods[i]->mod_type,
+ type_nsds5ReplicaFlowControlPause))
+ {
+ /* New replica timeout */
+ if (agmt_set_flowcontrolpause_from_entry(agmt, e) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: "
+ "failed to update the flow control pause for agreement %s\n",
+ agmt_get_long_name(agmt));
+ *returncode = LDAP_OPERATIONS_ERROR;
+ rc = SLAPI_DSE_CALLBACK_ERROR;
+ }
+ }
+ else if (slapi_attr_types_equivalent(mods[i]->mod_type,
type_nsds5ReplicaBusyWaitTime))
{
/* New replica busywaittime */
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
index c004bfb..2971025 100644
--- a/ldap/servers/plugins/replication/repl5_connection.c
+++ b/ldap/servers/plugins/replication/repl5_connection.c
@@ -52,6 +52,7 @@ replica locked. Seems like right thing to do.
*/
#include "repl5.h"
+#include "repl5_prot_private.h"
#include "slapi-private.h"
#if defined(USE_OPENLDAP)
#include "ldap.h"
@@ -91,6 +92,7 @@ typedef struct repl_connection
struct timeval timeout;
int flag_agmt_changed;
char *plain;
+ void *tot_init_callback; /* Used during total update to do flow control */
} repl_connection;
/* #define DEFAULT_LINGER_TIME (5 * 60) */ /* 5 minutes */
@@ -274,6 +276,32 @@ conn_delete(Repl_Connection *conn)
PR_Unlock(conn->lock);
}
+void
+conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data)
+{
+ conn->tot_init_callback = (void *) cb_data;
+}
+void
+conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data)
+{
+ PR_Lock(conn->lock);
+ conn_set_tot_update_cb_nolock(conn, cb_data);
+ PR_Unlock(conn->lock);
+}
+
+void
+conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data)
+{
+ *cb_data = (void *) conn->tot_init_callback;
+}
+void
+conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data)
+{
+ PR_Lock(conn->lock);
+ conn_get_tot_update_cb_nolock(conn, cb_data);
+ PR_Unlock(conn->lock);
+}
+
/*
* Return the last operation type processed by the connection
* object, and the LDAP error encountered.
@@ -640,6 +668,131 @@ see_if_write_available(Repl_Connection *conn, PRIntervalTime timeout)
}
#endif /* ! USE_OPENLDAP */
+/*
+ * During a total update, this function checks how much entries
+ * have been sent to the consumer without having received their acknowledgment.
+ * Basically it checks how late is the consumer.
+ *
+ * If the consumer is too late, it pause the RA.sender (releasing the lock) to
+ * let the consumer to catch up and RA.reader to receive the acknowledgments.
+ *
+ * Caller must hold conn->lock
+ */
+static void
+check_flow_control_tot_init(Repl_Connection *conn, int optype, const char *extop_oid, int sent_msgid)
+{
+ int rcv_msgid;
+ int once;
+
+ if ((sent_msgid != 0) && (optype == CONN_EXTENDED_OPERATION) && (strcmp(extop_oid, REPL_NSDS50_REPLICATION_ENTRY_REQUEST_OID) == 0)) {
+ /* We are sending entries part of the total update of a consumer
+ * Wait a bit if the consumer needs to catchup from the current sent entries
+ */
+ rcv_msgid = repl5_tot_last_rcv_msgid(conn);
+ if (rcv_msgid == -1) {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: check_flow_control_tot_init no callback data [ msgid sent: %d]\n",
+ agmt_get_long_name(conn->agmt),
+ sent_msgid);
+ } else if (sent_msgid < rcv_msgid) {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: check_flow_control_tot_init invalid message ids [ msgid sent: %d, rcv: %d]\n",
+ agmt_get_long_name(conn->agmt),
+ sent_msgid,
+ rcv_msgid);
+ } else if ((sent_msgid - rcv_msgid) > agmt_get_flowcontrolwindow(conn->agmt)) {
+ int totalUpdatePause;
+
+ totalUpdatePause = agmt_get_flowcontrolpause(conn->agmt);
+ if (totalUpdatePause) {
+ /* The consumer is late. Last sent entry compare to last acknowledged entry
+ * overpass the allowed limit (flowcontrolwindow)
+ * Give some time to the consumer to catch up
+ */
+ once = repl5_tot_flowcontrol_detection(conn, 1);
+ PR_Unlock(conn->lock);
+ if (once == 1) {
+ /* This is the first time we hit total update flow control.
+ * Log it at least once to inform administrator there is
+ * a potential configuration issue here
+ */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Total update flow control gives time (%d msec) to the consumer before sending more entries [ msgid sent: %d, rcv: %d])\n"
+ "If total update fails you can try to increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(conn->agmt),
+ totalUpdatePause,
+ sent_msgid,
+ rcv_msgid,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ DS_Sleep(PR_MillisecondsToInterval(totalUpdatePause));
+ PR_Lock(conn->lock);
+ }
+ }
+ }
+
+}
+/*
+ * Test if the connection is available to do a write.
+ * This function is doing a periodic polling of the connection.
+ * If the polling times out:
+ * - it releases the connection lock (to let other thread ,i.e.
+ * replication result thread, the opportunity to use the connection)
+ * - Sleeps for a short period (100ms)
+ * - acquires the connection lock
+ *
+ * It loops until
+ * - it is available
+ * - exceeds RA complete timeout
+ * - server is shutdown
+ * - connection is disconnected (Disable, stop, delete the RA
+ * 'terminate' the replication protocol and disconnect the connection)
+ *
+ * Return:
+ * - CONN_OPERATION_SUCCESS if the connection is available
+ * - CONN_TIMEOUT if the overall polling/sleeping delay exceeds RA timeout
+ * - CONN_NOT_CONNECTED if the replication connection state is disconnected
+ * - other ConnResult
+ *
+ * Caller must hold conn->Lock. At the exit, conn->lock is held
+ */
+static ConnResult
+conn_is_available(Repl_Connection *conn)
+{
+ time_t poll_timeout_sec = 1; /* Polling for 1sec */
+ time_t yield_delay_msec = 100; /* Delay to wait */
+ time_t start_time = time( NULL );
+ time_t time_now;
+ ConnResult return_value = CONN_OPERATION_SUCCESS;
+
+ while (!slapi_is_shutting_down() && (conn->state != STATE_DISCONNECTED)) {
+ return_value = see_if_write_available(conn, PR_SecondsToInterval(poll_timeout_sec));
+ if (return_value == CONN_TIMEOUT) {
+ /* in case of timeout we return CONN_TIMEOUT only
+ * if the RA.timeout is exceeded
+ */
+ time_now = time(NULL);
+ if (conn->timeout.tv_sec <= (time_now - start_time)) {
+ break;
+ } else {
+ /* Else give connection to others threads */
+ PR_Unlock(conn->lock);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: perform_operation transient timeout. retry)\n",
+ agmt_get_long_name(conn->agmt));
+ DS_Sleep(PR_MillisecondsToInterval(yield_delay_msec));
+ PR_Lock(conn->lock);
+ }
+ } else {
+ break;
+ }
+ }
+ if (conn->state == STATE_DISCONNECTED) {
+ return_value = CONN_NOT_CONNECTED;
+ }
+ return return_value;
+}
/*
* Common code to send an LDAPv3 operation and collect the result.
* Return values:
@@ -683,10 +836,13 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
Slapi_Eq_Context eqctx = repl5_start_debug_timeout(&setlevel);
- return_value = see_if_write_available(
- conn, PR_SecondsToInterval(conn->timeout.tv_sec));
+ return_value = conn_is_available(conn);
if (return_value != CONN_OPERATION_SUCCESS) {
PR_Unlock(conn->lock);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: perform_operation connection is not available (%d)\n",
+ agmt_get_long_name(conn->agmt),
+ return_value);
return return_value;
}
conn->last_operation = optype;
@@ -758,6 +914,9 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
*/
return_value = CONN_NOT_CONNECTED;
}
+
+ check_flow_control_tot_init(conn, optype, extop_oid, msgid);
+
PR_Unlock(conn->lock); /* release the lock */
if (message_id)
{
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index 9f81c04..5cf170c 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -108,6 +108,7 @@ typedef struct result_data
int stop_result_thread; /* Flag used to tell the result thread to exit */
int last_message_id_sent;
int last_message_id_received;
+ int flowcontrol_detection;
int result; /* The UPDATE_TRANSIENT_ERROR etc */
} result_data;
@@ -460,6 +461,23 @@ repl5_inc_destroy_async_result_thread(result_data *rd)
return retval;
}
+/* The interest of this routine is to give time to the consumer
+ * to apply the sent updates and return the acks.
+ * So the caller should not hold the replication connection lock
+ * to let the RA.reader receives the acks.
+ */
+static void
+repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
+{
+ PR_Lock(rd->lock);
+ if ((rd->last_message_id_received <= rd->last_message_id_sent) &&
+ ((rd->last_message_id_sent - rd->last_message_id_received) >= agmt_get_flowcontrolwindow(agmt))) {
+ rd->flowcontrol_detection++;
+ DS_Sleep(PR_MillisecondsToInterval(agmt_get_flowcontrolpause(agmt)));
+ }
+ PR_Unlock(rd->lock);
+}
+
static void
repl5_inc_waitfor_async_results(result_data *rd)
{
@@ -1682,7 +1700,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
{
int finished = 0;
ConnResult replay_crc;
- char csn_str[CSN_STRSIZE];
+ char csn_str[CSN_STRSIZE];
/* Start the results reading thread */
rd = repl5_inc_rd_new(prp);
@@ -1817,6 +1835,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
sop->replica_id = replica_id;
PL_strncpyz(sop->uniqueid, uniqueid, sizeof(sop->uniqueid));
repl5_int_push_operation(rd,sop);
+ repl5_inc_flow_control_results(prp->agmt, rd);
} else {
slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
"%s: Skipping update operation with no message_id (uniqueid %s, CSN %s):\n",
@@ -1905,6 +1924,17 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
}
*num_changes_sent = rd->num_changes_sent;
}
+ PR_Lock(rd->lock);
+ if (rd->flowcontrol_detection) {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+ "%s: Incremental update flow control triggered %d times\n"
+ "You may increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(prp->agmt),
+ rd->flowcontrol_detection,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ PR_Unlock(rd->lock);
repl5_inc_rd_destroy(&rd);
cl5_operation_parameters_done ( entry.op );
diff --git a/ldap/servers/plugins/replication/repl5_prot_private.h b/ldap/servers/plugins/replication/repl5_prot_private.h
index 586e1eb..1b1c00b 100644
--- a/ldap/servers/plugins/replication/repl5_prot_private.h
+++ b/ldap/servers/plugins/replication/repl5_prot_private.h
@@ -79,6 +79,8 @@ typedef struct private_repl_protocol
extern Private_Repl_Protocol *Repl_5_Inc_Protocol_new();
extern Private_Repl_Protocol *Repl_5_Tot_Protocol_new();
+extern int repl5_tot_last_rcv_msgid(Repl_Connection *conn);
+extern int repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment);
extern Private_Repl_Protocol *Windows_Inc_Protocol_new();
extern Private_Repl_Protocol *Windows_Tot_Protocol_new();
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
index d4f0fcc..adadd44 100644
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
@@ -82,6 +82,7 @@ typedef struct callback_data
int stop_result_thread; /* Flag used to tell the result thread to exit */
int last_message_id_sent;
int last_message_id_received;
+ int flowcontrol_detection;
} callback_data;
/*
@@ -428,13 +429,19 @@ repl5_tot_run(Private_Repl_Protocol *prp)
LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL,
repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
- cb_data.prp = prp;
- cb_data.rc = 0;
+ cb_data.prp = prp;
+ cb_data.rc = 0;
cb_data.num_entries = 0UL;
cb_data.sleep_on_busy = 0UL;
cb_data.last_busy = current_time ();
+ cb_data.flowcontrol_detection = 0;
cb_data.lock = PR_NewLock();
+ /* This allows during perform_operation to check the callback data
+ * especially to do flow contol on delta send msgid / recv msgid
+ */
+ conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
+
/* Before we get started on sending entries to the replica, we need to
* setup things for async propagation:
* 1. Create a thread that will read the LDAP results from the connection.
@@ -506,6 +513,17 @@ repl5_tot_run(Private_Repl_Protocol *prp)
done:
slapi_sdn_free(&area_sdn);
slapi_ch_free_string(&hostname);
+ if (cb_data.flowcontrol_detection > 1)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Total update flow control triggered %d times\n"
+ "You may increase %s and/or decrease %s in the replica agreement configuration\n",
+ agmt_get_long_name(prp->agmt),
+ cb_data.flowcontrol_detection,
+ type_nsds5ReplicaFlowControlPause,
+ type_nsds5ReplicaFlowControlWindow);
+ }
+ conn_set_tot_update_cb(prp->conn, NULL);
if (cb_data.lock)
{
PR_DestroyLock(cb_data.lock);
@@ -645,6 +663,37 @@ void get_result (int rc, void *cb_data)
((callback_data*)cb_data)->rc = rc;
}
+/* Call must hold the connection lock */
+int
+repl5_tot_last_rcv_msgid(Repl_Connection *conn)
+{
+ struct callback_data *cb_data;
+
+ conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
+ if (cb_data == NULL) {
+ return -1;
+ } else {
+ return cb_data->last_message_id_received;
+ }
+}
+
+/* Increase the flowcontrol counter
+ * Call must hold the connection lock
+ */
+int
+repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment)
+{
+ struct callback_data *cb_data;
+
+ conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
+ if (cb_data == NULL) {
+ return -1;
+ } else {
+ cb_data->flowcontrol_detection += increment;
+ return cb_data->flowcontrol_detection;
+ }
+}
+
static
int send_entry (Slapi_Entry *e, void *cb_data)
{
diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c
index 5609def..e2157fa 100644
--- a/ldap/servers/plugins/replication/repl_globals.c
+++ b/ldap/servers/plugins/replication/repl_globals.c
@@ -139,6 +139,8 @@ const char *type_nsds5ReplicaBusyWaitTime = "nsds5ReplicaBusyWaitTime";
const char *type_nsds5ReplicaSessionPauseTime = "nsds5ReplicaSessionPauseTime";
const char *type_nsds5ReplicaEnabled = "nsds5ReplicaEnabled";
const char *type_nsds5ReplicaStripAttrs = "nsds5ReplicaStripAttrs";
+const char* type_nsds5ReplicaFlowControlWindow = "nsds5ReplicaFlowControlWindow";
+const char* type_nsds5ReplicaFlowControlPause = "nsds5ReplicaFlowControlPause";
/* windows sync specific attributes */
const char *type_nsds7WindowsReplicaArea = "nsds7WindowsReplicaSubtree";
9 years, 3 months
Branch '389-ds-base-1.3.3' - 2 commits - ldap/servers
by Noriko Hosoi
ldap/servers/plugins/sync/sync.h | 11 +++--
ldap/servers/plugins/sync/sync_refresh.c | 32 ++++++++++----
ldap/servers/plugins/sync/sync_util.c | 68 ++++++++++++++++++++-----------
3 files changed, 76 insertions(+), 35 deletions(-)
New commits:
commit 61a4e7035742612a1a8bf42b16e93cc55776dc31
Author: Noriko Hosoi <nhosoi(a)redhat.com>
Date: Fri Dec 12 15:41:36 2014 -0800
Ticket #47960 - cookie_change_info returns random negative number if there was no change in a tree
Description: An additional fix for the type mismatch for the change
numbers. Change Number is declared as "unsigned long" in the Retro
Changelog plugin, while cookie_change_info in the Content Sync plugin
is "int", which could end up with a negative number when the change
number passes (2^31 - 1).
Changing the type of cookie_change_info to "unsigned long".
https://fedorahosted.org/389/ticket/47960
Reviewed by rmeggins(a)redhat.com and tbordaz(a)redhat.com (Thank you,
Rich and Thierry!!)
(cherry picked from commit 96c130b432ce0b15028e325c0e337679291aef9f)
diff --git a/ldap/servers/plugins/sync/sync.h b/ldap/servers/plugins/sync/sync.h
index 0bcec7a..803c656 100644
--- a/ldap/servers/plugins/sync/sync.h
+++ b/ldap/servers/plugins/sync/sync.h
@@ -64,10 +64,12 @@
#define CL_ATTR_NEWSUPERIOR "newsuperior"
#define CL_SRCH_BASE "cn=changelog"
+#define SYNC_INVALID_CHANGENUM ((unsigned long)-1)
+
typedef struct sync_cookie {
char *cookie_client_signature;
char *cookie_server_signature;
- int cookie_change_info;
+ unsigned long cookie_change_info;
} Sync_Cookie;
typedef struct sync_update {
@@ -80,8 +82,8 @@ typedef struct sync_update {
typedef struct sync_callback {
Slapi_PBlock *orig_pb;
- int changenr;
- int change_start;
+ unsigned long changenr;
+ unsigned long change_start;
int cb_err;
Sync_UpdateNode *cb_updates;
} Sync_CallBackData;
@@ -112,6 +114,7 @@ int sync_cookie_isvalid (Sync_Cookie *testcookie, Sync_Cookie *refcookie);
void sync_cookie_free (Sync_Cookie **freecookie);
char * sync_cookie2str(Sync_Cookie *cookie);
int sync_number2int(char *nrstr);
+unsigned long sync_number2ulong(char *nrstr);
char *sync_nsuniqueid2uuid(const char *nsuniqueid);
int sync_is_active (Slapi_Entry *e, Slapi_PBlock *pb);
diff --git a/ldap/servers/plugins/sync/sync_refresh.c b/ldap/servers/plugins/sync/sync_refresh.c
index 4e256e6..bfff77b 100644
--- a/ldap/servers/plugins/sync/sync_refresh.c
+++ b/ldap/servers/plugins/sync/sync_refresh.c
@@ -293,9 +293,9 @@ sync_refresh_update_content(Slapi_PBlock *pb, Sync_Cookie *client_cookie, Sync_C
cb_data.orig_pb = pb;
cb_data.change_start = client_cookie->cookie_change_info;
- filter = slapi_ch_smprintf("(&(changenumber>=%d)(changenumber<=%d))",
- client_cookie->cookie_change_info,
- server_cookie->cookie_change_info);
+ filter = slapi_ch_smprintf("(&(changenumber>=%lu)(changenumber<=%lu))",
+ client_cookie->cookie_change_info,
+ server_cookie->cookie_change_info);
slapi_search_internal_set_pb(
seq_pb,
CL_SRCH_BASE,
@@ -305,7 +305,7 @@ sync_refresh_update_content(Slapi_PBlock *pb, Sync_Cookie *client_cookie, Sync_C
0,
NULL, NULL,
plugin_get_default_component_id(),
- 0);
+ 0);
rc = slapi_search_internal_callback_pb (
seq_pb, &cb_data, NULL, sync_read_entry_from_changelog, NULL);
@@ -460,6 +460,7 @@ sync_read_entry_from_changelog( Slapi_Entry *cl_entry, void *cb_data)
int chg_req;
int prev = 0;
int index = 0;
+ unsigned long chgnum = 0;
Sync_CallBackData *cb = (Sync_CallBackData *) cb_data;
if (cb == NULL) {
@@ -470,13 +471,28 @@ sync_read_entry_from_changelog( Slapi_Entry *cl_entry, void *cb_data)
if (uniqueid == NULL) {
slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM,
"Retro Changelog does not provied nsuniquedid."
- "Check RCL plugin configuration." );
+ "Check RCL plugin configuration.\n" );
return(1);
}
- chgtype = sync_get_attr_value_from_entry (cl_entry, CL_ATTR_CHGTYPE);
chgnr = sync_get_attr_value_from_entry (cl_entry, CL_ATTR_CHANGENUMBER);
-
- index = sync_number2int(chgnr) - cb->change_start;
+ chgnum = sync_number2ulong(chgnr);
+ if (SYNC_INVALID_CHANGENUM == chgnum) {
+ slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM,
+ "Change number provided by Retro Changelog is invalid: %s\n", chgnr);
+ slapi_ch_free_string(&chgnr);
+ slapi_ch_free_string(&uniqueid);
+ return(1);
+ }
+ if (chgnum < cb->change_start) {
+ slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM,
+ "Change number provided by Retro Changelog %s is less than the initial number %lu\n",
+ chgnr, cb->change_start);
+ slapi_ch_free_string(&chgnr);
+ slapi_ch_free_string(&uniqueid);
+ return(1);
+ }
+ index = chgnum - cb->change_start;
+ chgtype = sync_get_attr_value_from_entry (cl_entry, CL_ATTR_CHGTYPE);
chg_req = sync_str2chgreq(chgtype);
switch (chg_req){
case LDAP_REQ_ADD:
diff --git a/ldap/servers/plugins/sync/sync_util.c b/ldap/servers/plugins/sync/sync_util.c
index af22bcb..67cb453 100644
--- a/ldap/servers/plugins/sync/sync_util.c
+++ b/ldap/servers/plugins/sync/sync_util.c
@@ -266,10 +266,10 @@ sync_cookie2str(Sync_Cookie *cookie)
char *cookiestr = NULL;
if (cookie) {
- cookiestr = slapi_ch_smprintf("%s#%s#%d",
- cookie->cookie_server_signature,
- cookie->cookie_client_signature,
- cookie->cookie_change_info);
+ cookiestr = slapi_ch_smprintf("%s#%s#%lu",
+ cookie->cookie_server_signature,
+ cookie->cookie_client_signature,
+ cookie->cookie_change_info);
}
return(cookiestr);
}
@@ -370,10 +370,11 @@ sync_handle_cnum_entry(Slapi_Entry *e, void *cb_data)
slapi_attr_first_value( chattr,&sval );
if ( NULL != sval ) {
value = slapi_value_get_berval ( sval );
- if( NULL != value && NULL != value->bv_val &&
- '\0' != value->bv_val[0]) {
- cb->changenr = sync_number2int(value->bv_val);
- cb->cb_err = 0; /* changenr successfully set */
+ if (value && value->bv_val && ('\0' != value->bv_val[0])) {
+ cb->changenr = sync_number2ulong(value->bv_val);
+ if (SYNC_INVALID_CHANGENUM != cb->changenr) {
+ cb->cb_err = 0; /* changenr successfully set */
+ }
}
}
}
@@ -452,31 +453,30 @@ sync_cookie_get_client_info(Slapi_PBlock *pb)
clientinfo = slapi_ch_smprintf("%s:%s:%s",clientdn,targetdn,strfilter);
return (clientinfo);
}
-static int
+static unsigned long
sync_cookie_get_change_number(int lastnr, const char *uniqueid)
{
Slapi_PBlock *srch_pb;
Slapi_Entry **entries;
Slapi_Entry *cl_entry;
int rv;
- int newnr = -1;
+ unsigned long newnr = SYNC_INVALID_CHANGENUM;
char *filter = slapi_ch_smprintf("(&(changenumber>=%d)(targetuniqueid=%s))",lastnr,uniqueid);
srch_pb = slapi_pblock_new();
- slapi_search_internal_set_pb(srch_pb, CL_SRCH_BASE,
- LDAP_SCOPE_SUBTREE, filter,
- NULL, 0, NULL, NULL, plugin_get_default_component_id(), 0);
- slapi_search_internal_pb(srch_pb);
+ slapi_search_internal_set_pb(srch_pb, CL_SRCH_BASE, LDAP_SCOPE_SUBTREE, filter,
+ NULL, 0, NULL, NULL, plugin_get_default_component_id(), 0);
+ slapi_search_internal_pb(srch_pb);
slapi_pblock_get(srch_pb, SLAPI_PLUGIN_INTOP_RESULT, &rv);
- if ( rv == LDAP_SUCCESS) {
+ if (rv == LDAP_SUCCESS) {
slapi_pblock_get(srch_pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
- if (entries && *entries) {
+ if (entries && *entries) {
Slapi_Attr *attr;
Slapi_Value *val;
cl_entry = *entries; /* only use teh first one */
slapi_entry_attr_find(cl_entry, CL_ATTR_CHANGENUMBER, &attr);
slapi_attr_first_value(attr, &val);
- newnr = sync_number2int((char *)slapi_value_get_string(val));
+ newnr = sync_number2ulong((char *)slapi_value_get_string(val));
}
}
@@ -579,8 +579,8 @@ sync_cookie_parse (char *cookie)
if (p) {
*p = '\0';
sc->cookie_client_signature = slapi_ch_strdup(q);
- sc->cookie_change_info = sync_number2int(p+1);
- if (sc->cookie_change_info < 0) {
+ sc->cookie_change_info = sync_number2ulong(p+1);
+ if (SYNC_INVALID_CHANGENUM == sc->cookie_change_info) {
goto error_return;
}
} else {
@@ -716,14 +716,28 @@ sync_pblock_copy(Slapi_PBlock *src)
return dest;
}
-int sync_number2int(char *chgnrstr)
+int
+sync_number2int(char *chgnrstr)
{
char *end;
int nr;
- nr = strtoul(chgnrstr, &end, 10);
+ nr = (int)strtoul(chgnrstr, &end, 10);
if ( *end == '\0') {
return (nr);
} else {
return (-1);
}
}
+
+unsigned long
+sync_number2ulong(char *chgnrstr)
+{
+ char *end;
+ unsigned long nr;
+ nr = strtoul(chgnrstr, &end, 10);
+ if ( *end == '\0') {
+ return (nr);
+ } else {
+ return SYNC_INVALID_CHANGENUM;
+ }
+}
commit b7a472c71db4671c127584fcf7bfd8c75930bc8c
Author: Noriko Hosoi <nhosoi(a)redhat.com>
Date: Wed Dec 10 17:12:00 2014 -0800
Ticket #47960 - cookie_change_info returns random negative number if there was no change in a tree
Description: When no changes had not been made, Retro Changelog db
was empty and the search callback sync_handle_cnum_entry in the
Content Synchronization had no chance to be called. If it was not
called, an uninitialized garbage value in Sync_CallBackData was set
to cookie_change_info.
This patch checks if the search callback sync_handle_cnum_entry is
called or not. If it is not called, set 0 to cookie_change_info.
https://fedorahosted.org/389/ticket/47960
Reviewed by rmeggins(a)redhat.com and tbordaz(a)redhat.com (Thank you,
Rich and Thierry!!)
(cherry picked from commit a908c6b57cd77ff2f6e2fe0fe1fa2e0eccba77e0)
diff --git a/ldap/servers/plugins/sync/sync.h b/ldap/servers/plugins/sync/sync.h
index 9c2d8be..0bcec7a 100644
--- a/ldap/servers/plugins/sync/sync.h
+++ b/ldap/servers/plugins/sync/sync.h
@@ -76,6 +76,8 @@ typedef struct sync_update {
Slapi_Entry *upd_e;
} Sync_UpdateNode;
+#define SYNC_CALLBACK_PREINIT (-1)
+
typedef struct sync_callback {
Slapi_PBlock *orig_pb;
int changenr;
diff --git a/ldap/servers/plugins/sync/sync_util.c b/ldap/servers/plugins/sync/sync_util.c
index de65b99..af22bcb 100644
--- a/ldap/servers/plugins/sync/sync_util.c
+++ b/ldap/servers/plugins/sync/sync_util.c
@@ -373,6 +373,7 @@ sync_handle_cnum_entry(Slapi_Entry *e, void *cb_data)
if( NULL != value && NULL != value->bv_val &&
'\0' != value->bv_val[0]) {
cb->changenr = sync_number2int(value->bv_val);
+ cb->cb_err = 0; /* changenr successfully set */
}
}
}
@@ -500,7 +501,7 @@ sync_cookie_get_change_info(Sync_CallBackData *scbd)
slapi_pblock_init(seq_pb);
slapi_seq_internal_set_pb(seq_pb, base, SLAPI_SEQ_LAST, attrname, NULL, NULL, 0, 0,
- plugin_get_default_component_id(), 0);
+ plugin_get_default_component_id(), 0);
rc = slapi_seq_internal_callback_pb (seq_pb, scbd, NULL, sync_handle_cnum_entry, NULL);
slapi_pblock_destroy(seq_pb);
@@ -518,15 +519,20 @@ sync_cookie_create (Slapi_PBlock *pb)
Sync_CallBackData scbd;
int rc;
- Sync_Cookie *sc = (Sync_Cookie *)slapi_ch_malloc(sizeof(Sync_Cookie));
-
+ Sync_Cookie *sc = (Sync_Cookie *)slapi_ch_calloc(1, sizeof(Sync_Cookie));
+ scbd.cb_err = SYNC_CALLBACK_PREINIT;
rc = sync_cookie_get_change_info (&scbd);
if (rc == 0) {
sc->cookie_server_signature = sync_cookie_get_server_info(pb);
sc->cookie_client_signature = sync_cookie_get_client_info(pb);
- sc->cookie_change_info = scbd.changenr;
+ if (scbd.cb_err == SYNC_CALLBACK_PREINIT) {
+ /* changenr is not initialized. */
+ sc->cookie_change_info = 0;
+ } else {
+ sc->cookie_change_info = scbd.changenr;
+ }
} else {
slapi_ch_free ((void **)&sc);
sc = NULL;
9 years, 3 months
2 commits - ldap/servers
by Noriko Hosoi
ldap/servers/plugins/sync/sync.h | 11 +++--
ldap/servers/plugins/sync/sync_refresh.c | 32 ++++++++++----
ldap/servers/plugins/sync/sync_util.c | 68 ++++++++++++++++++++-----------
3 files changed, 76 insertions(+), 35 deletions(-)
New commits:
commit 96c130b432ce0b15028e325c0e337679291aef9f
Author: Noriko Hosoi <nhosoi(a)redhat.com>
Date: Fri Dec 12 15:41:36 2014 -0800
Ticket #47960 - cookie_change_info returns random negative number if there was no change in a tree
Description: An additional fix for the type mismatch for the change
numbers. Change Number is declared as "unsigned long" in the Retro
Changelog plugin, while cookie_change_info in the Content Sync plugin
is "int", which could end up with a negative number when the change
number passes (2^31 - 1).
Changing the type of cookie_change_info to "unsigned long".
https://fedorahosted.org/389/ticket/47960
Reviewed by rmeggins(a)redhat.com and tbordaz(a)redhat.com (Thank you,
Rich and Thierry!!)
diff --git a/ldap/servers/plugins/sync/sync.h b/ldap/servers/plugins/sync/sync.h
index 0bcec7a..803c656 100644
--- a/ldap/servers/plugins/sync/sync.h
+++ b/ldap/servers/plugins/sync/sync.h
@@ -64,10 +64,12 @@
#define CL_ATTR_NEWSUPERIOR "newsuperior"
#define CL_SRCH_BASE "cn=changelog"
+#define SYNC_INVALID_CHANGENUM ((unsigned long)-1)
+
typedef struct sync_cookie {
char *cookie_client_signature;
char *cookie_server_signature;
- int cookie_change_info;
+ unsigned long cookie_change_info;
} Sync_Cookie;
typedef struct sync_update {
@@ -80,8 +82,8 @@ typedef struct sync_update {
typedef struct sync_callback {
Slapi_PBlock *orig_pb;
- int changenr;
- int change_start;
+ unsigned long changenr;
+ unsigned long change_start;
int cb_err;
Sync_UpdateNode *cb_updates;
} Sync_CallBackData;
@@ -112,6 +114,7 @@ int sync_cookie_isvalid (Sync_Cookie *testcookie, Sync_Cookie *refcookie);
void sync_cookie_free (Sync_Cookie **freecookie);
char * sync_cookie2str(Sync_Cookie *cookie);
int sync_number2int(char *nrstr);
+unsigned long sync_number2ulong(char *nrstr);
char *sync_nsuniqueid2uuid(const char *nsuniqueid);
int sync_is_active (Slapi_Entry *e, Slapi_PBlock *pb);
diff --git a/ldap/servers/plugins/sync/sync_refresh.c b/ldap/servers/plugins/sync/sync_refresh.c
index 4e256e6..bfff77b 100644
--- a/ldap/servers/plugins/sync/sync_refresh.c
+++ b/ldap/servers/plugins/sync/sync_refresh.c
@@ -293,9 +293,9 @@ sync_refresh_update_content(Slapi_PBlock *pb, Sync_Cookie *client_cookie, Sync_C
cb_data.orig_pb = pb;
cb_data.change_start = client_cookie->cookie_change_info;
- filter = slapi_ch_smprintf("(&(changenumber>=%d)(changenumber<=%d))",
- client_cookie->cookie_change_info,
- server_cookie->cookie_change_info);
+ filter = slapi_ch_smprintf("(&(changenumber>=%lu)(changenumber<=%lu))",
+ client_cookie->cookie_change_info,
+ server_cookie->cookie_change_info);
slapi_search_internal_set_pb(
seq_pb,
CL_SRCH_BASE,
@@ -305,7 +305,7 @@ sync_refresh_update_content(Slapi_PBlock *pb, Sync_Cookie *client_cookie, Sync_C
0,
NULL, NULL,
plugin_get_default_component_id(),
- 0);
+ 0);
rc = slapi_search_internal_callback_pb (
seq_pb, &cb_data, NULL, sync_read_entry_from_changelog, NULL);
@@ -460,6 +460,7 @@ sync_read_entry_from_changelog( Slapi_Entry *cl_entry, void *cb_data)
int chg_req;
int prev = 0;
int index = 0;
+ unsigned long chgnum = 0;
Sync_CallBackData *cb = (Sync_CallBackData *) cb_data;
if (cb == NULL) {
@@ -470,13 +471,28 @@ sync_read_entry_from_changelog( Slapi_Entry *cl_entry, void *cb_data)
if (uniqueid == NULL) {
slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM,
"Retro Changelog does not provied nsuniquedid."
- "Check RCL plugin configuration." );
+ "Check RCL plugin configuration.\n" );
return(1);
}
- chgtype = sync_get_attr_value_from_entry (cl_entry, CL_ATTR_CHGTYPE);
chgnr = sync_get_attr_value_from_entry (cl_entry, CL_ATTR_CHANGENUMBER);
-
- index = sync_number2int(chgnr) - cb->change_start;
+ chgnum = sync_number2ulong(chgnr);
+ if (SYNC_INVALID_CHANGENUM == chgnum) {
+ slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM,
+ "Change number provided by Retro Changelog is invalid: %s\n", chgnr);
+ slapi_ch_free_string(&chgnr);
+ slapi_ch_free_string(&uniqueid);
+ return(1);
+ }
+ if (chgnum < cb->change_start) {
+ slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM,
+ "Change number provided by Retro Changelog %s is less than the initial number %lu\n",
+ chgnr, cb->change_start);
+ slapi_ch_free_string(&chgnr);
+ slapi_ch_free_string(&uniqueid);
+ return(1);
+ }
+ index = chgnum - cb->change_start;
+ chgtype = sync_get_attr_value_from_entry (cl_entry, CL_ATTR_CHGTYPE);
chg_req = sync_str2chgreq(chgtype);
switch (chg_req){
case LDAP_REQ_ADD:
diff --git a/ldap/servers/plugins/sync/sync_util.c b/ldap/servers/plugins/sync/sync_util.c
index af22bcb..67cb453 100644
--- a/ldap/servers/plugins/sync/sync_util.c
+++ b/ldap/servers/plugins/sync/sync_util.c
@@ -266,10 +266,10 @@ sync_cookie2str(Sync_Cookie *cookie)
char *cookiestr = NULL;
if (cookie) {
- cookiestr = slapi_ch_smprintf("%s#%s#%d",
- cookie->cookie_server_signature,
- cookie->cookie_client_signature,
- cookie->cookie_change_info);
+ cookiestr = slapi_ch_smprintf("%s#%s#%lu",
+ cookie->cookie_server_signature,
+ cookie->cookie_client_signature,
+ cookie->cookie_change_info);
}
return(cookiestr);
}
@@ -370,10 +370,11 @@ sync_handle_cnum_entry(Slapi_Entry *e, void *cb_data)
slapi_attr_first_value( chattr,&sval );
if ( NULL != sval ) {
value = slapi_value_get_berval ( sval );
- if( NULL != value && NULL != value->bv_val &&
- '\0' != value->bv_val[0]) {
- cb->changenr = sync_number2int(value->bv_val);
- cb->cb_err = 0; /* changenr successfully set */
+ if (value && value->bv_val && ('\0' != value->bv_val[0])) {
+ cb->changenr = sync_number2ulong(value->bv_val);
+ if (SYNC_INVALID_CHANGENUM != cb->changenr) {
+ cb->cb_err = 0; /* changenr successfully set */
+ }
}
}
}
@@ -452,31 +453,30 @@ sync_cookie_get_client_info(Slapi_PBlock *pb)
clientinfo = slapi_ch_smprintf("%s:%s:%s",clientdn,targetdn,strfilter);
return (clientinfo);
}
-static int
+static unsigned long
sync_cookie_get_change_number(int lastnr, const char *uniqueid)
{
Slapi_PBlock *srch_pb;
Slapi_Entry **entries;
Slapi_Entry *cl_entry;
int rv;
- int newnr = -1;
+ unsigned long newnr = SYNC_INVALID_CHANGENUM;
char *filter = slapi_ch_smprintf("(&(changenumber>=%d)(targetuniqueid=%s))",lastnr,uniqueid);
srch_pb = slapi_pblock_new();
- slapi_search_internal_set_pb(srch_pb, CL_SRCH_BASE,
- LDAP_SCOPE_SUBTREE, filter,
- NULL, 0, NULL, NULL, plugin_get_default_component_id(), 0);
- slapi_search_internal_pb(srch_pb);
+ slapi_search_internal_set_pb(srch_pb, CL_SRCH_BASE, LDAP_SCOPE_SUBTREE, filter,
+ NULL, 0, NULL, NULL, plugin_get_default_component_id(), 0);
+ slapi_search_internal_pb(srch_pb);
slapi_pblock_get(srch_pb, SLAPI_PLUGIN_INTOP_RESULT, &rv);
- if ( rv == LDAP_SUCCESS) {
+ if (rv == LDAP_SUCCESS) {
slapi_pblock_get(srch_pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
- if (entries && *entries) {
+ if (entries && *entries) {
Slapi_Attr *attr;
Slapi_Value *val;
cl_entry = *entries; /* only use teh first one */
slapi_entry_attr_find(cl_entry, CL_ATTR_CHANGENUMBER, &attr);
slapi_attr_first_value(attr, &val);
- newnr = sync_number2int((char *)slapi_value_get_string(val));
+ newnr = sync_number2ulong((char *)slapi_value_get_string(val));
}
}
@@ -579,8 +579,8 @@ sync_cookie_parse (char *cookie)
if (p) {
*p = '\0';
sc->cookie_client_signature = slapi_ch_strdup(q);
- sc->cookie_change_info = sync_number2int(p+1);
- if (sc->cookie_change_info < 0) {
+ sc->cookie_change_info = sync_number2ulong(p+1);
+ if (SYNC_INVALID_CHANGENUM == sc->cookie_change_info) {
goto error_return;
}
} else {
@@ -716,14 +716,28 @@ sync_pblock_copy(Slapi_PBlock *src)
return dest;
}
-int sync_number2int(char *chgnrstr)
+int
+sync_number2int(char *chgnrstr)
{
char *end;
int nr;
- nr = strtoul(chgnrstr, &end, 10);
+ nr = (int)strtoul(chgnrstr, &end, 10);
if ( *end == '\0') {
return (nr);
} else {
return (-1);
}
}
+
+unsigned long
+sync_number2ulong(char *chgnrstr)
+{
+ char *end;
+ unsigned long nr;
+ nr = strtoul(chgnrstr, &end, 10);
+ if ( *end == '\0') {
+ return (nr);
+ } else {
+ return SYNC_INVALID_CHANGENUM;
+ }
+}
commit a908c6b57cd77ff2f6e2fe0fe1fa2e0eccba77e0
Author: Noriko Hosoi <nhosoi(a)redhat.com>
Date: Wed Dec 10 17:12:00 2014 -0800
Ticket #47960 - cookie_change_info returns random negative number if there was no change in a tree
Description: When no changes had not been made, Retro Changelog db
was empty and the search callback sync_handle_cnum_entry in the
Content Synchronization had no chance to be called. If it was not
called, an uninitialized garbage value in Sync_CallBackData was set
to cookie_change_info.
This patch checks if the search callback sync_handle_cnum_entry is
called or not. If it is not called, set 0 to cookie_change_info.
https://fedorahosted.org/389/ticket/47960
Reviewed by rmeggins(a)redhat.com and tbordaz(a)redhat.com (Thank you,
Rich and Thierry!!)
diff --git a/ldap/servers/plugins/sync/sync.h b/ldap/servers/plugins/sync/sync.h
index 9c2d8be..0bcec7a 100644
--- a/ldap/servers/plugins/sync/sync.h
+++ b/ldap/servers/plugins/sync/sync.h
@@ -76,6 +76,8 @@ typedef struct sync_update {
Slapi_Entry *upd_e;
} Sync_UpdateNode;
+#define SYNC_CALLBACK_PREINIT (-1)
+
typedef struct sync_callback {
Slapi_PBlock *orig_pb;
int changenr;
diff --git a/ldap/servers/plugins/sync/sync_util.c b/ldap/servers/plugins/sync/sync_util.c
index de65b99..af22bcb 100644
--- a/ldap/servers/plugins/sync/sync_util.c
+++ b/ldap/servers/plugins/sync/sync_util.c
@@ -373,6 +373,7 @@ sync_handle_cnum_entry(Slapi_Entry *e, void *cb_data)
if( NULL != value && NULL != value->bv_val &&
'\0' != value->bv_val[0]) {
cb->changenr = sync_number2int(value->bv_val);
+ cb->cb_err = 0; /* changenr successfully set */
}
}
}
@@ -500,7 +501,7 @@ sync_cookie_get_change_info(Sync_CallBackData *scbd)
slapi_pblock_init(seq_pb);
slapi_seq_internal_set_pb(seq_pb, base, SLAPI_SEQ_LAST, attrname, NULL, NULL, 0, 0,
- plugin_get_default_component_id(), 0);
+ plugin_get_default_component_id(), 0);
rc = slapi_seq_internal_callback_pb (seq_pb, scbd, NULL, sync_handle_cnum_entry, NULL);
slapi_pblock_destroy(seq_pb);
@@ -518,15 +519,20 @@ sync_cookie_create (Slapi_PBlock *pb)
Sync_CallBackData scbd;
int rc;
- Sync_Cookie *sc = (Sync_Cookie *)slapi_ch_malloc(sizeof(Sync_Cookie));
-
+ Sync_Cookie *sc = (Sync_Cookie *)slapi_ch_calloc(1, sizeof(Sync_Cookie));
+ scbd.cb_err = SYNC_CALLBACK_PREINIT;
rc = sync_cookie_get_change_info (&scbd);
if (rc == 0) {
sc->cookie_server_signature = sync_cookie_get_server_info(pb);
sc->cookie_client_signature = sync_cookie_get_client_info(pb);
- sc->cookie_change_info = scbd.changenr;
+ if (scbd.cb_err == SYNC_CALLBACK_PREINIT) {
+ /* changenr is not initialized. */
+ sc->cookie_change_info = 0;
+ } else {
+ sc->cookie_change_info = scbd.changenr;
+ }
} else {
slapi_ch_free ((void **)&sc);
sc = NULL;
9 years, 3 months