Class ReplicationDomain
- java.lang.Object
- 
- org.opends.server.replication.service.ReplicationDomain
 
- 
- Direct Known Subclasses:
- LDAPReplicationDomain
 
 public abstract class ReplicationDomain extends Object This class should be used as a base for Replication implementations.It is intended that developer in need of a replication mechanism subclass this class with their own implementation. The startup phase of the ReplicationDomain subclass, should read the list of replication servers from the configuration, instantiate a ServerStatethen start the publish service by callingstartPublishService(). At this point it can start calling thepublish(ReplicationMsg)method if needed.When the startup phase reach the point when the subclass is ready to handle updates the Replication Domain implementation should call the startListenService()method. At this point a Listener thread is created on the Replication Service and which can start receiving updates.When updates are received the Replication Service calls the dispatchUpdateForReplay(UpdateMsg)method. ReplicationDomain implementation should implement the appropriate code for replaying the update on the local repository. When fully done the subclass must call theprocessUpdateAfterReplay(UpdateMsg)method. This allows to process the update asynchronously if necessary.To propagate changes to other replica, a ReplicationDomain implementation must use the publish(ReplicationMsg)method.If the Full Initialization process is needed then implementation for importBackend(InputStream)andexportBackend(OutputStream)must be provided.Full Initialization of a replica can be triggered by LDAP clients by creating InitializeTasks or InitializeTargetTask. Full initialization can also be triggered from the ReplicationDomain implementation using methods initializeRemote(ReplicaId, Task),initializeRemote(ReplicaId, ReplicaId, Task, int)orinitializeFromRemote(ReplicaId, Task).At shutdown time, the suspendService()method should be called to cleanly stop the replication service.
- 
- 
Nested Class SummaryNested Classes Modifier and Type Class Description protected static classReplicationDomain.ImportExportContextThis class contains the context related to an import or export launched on the domain.
 - 
Field SummaryFields Modifier and Type Field Description protected ReplicationBrokerbrokerThe ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.protected booleancanPersistROMDecides whether to include theReplicaOfflineMsg(ROM) sent by the local replica in the persisted state.protected ReplicationDomainCfgconfigThe configuration of the replication domain.protected longgenerationIdThe generationId for this replication domain.protected longreplicationPurgeDelayInMsThe replication purge delay is available asLongfrom theconfigobject.protected ServerContextserverContextThe directory server context.
 - 
Constructor SummaryConstructors Constructor Description ReplicationDomain(ServerContext serverContext, ReplicationDomainCfg config, long generationId)Creates a ReplicationDomain with the provided parameters.
 - 
Method SummaryAll Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidaddAdditionalMonitoring(MeterRegistryHolder registry)Subclasses should use this method to add additional monitoring information in the ReplicationDomain.protected booleanchangeConfig(ReplicationDomainCfg config)Change some ReplicationDomain parameters.abstract longcountEntries()This method should return the total number of objects in the replicated domain.ReplicaIddecodeTarget(String targetString)Verifies that the given string represents a valid source from which this server can be initialized.abstract voiddispatchUpdateForReplay(UpdateMsg updateMsg)This method ensures thisUpdateMsgreceived from remote replication entities will be replayed, by dispatching it to the replay threads.protected abstract voidexportBackend(OutputStream output)This method should trigger an export of the replicated data.DngetBaseDN()Returns the base DN of this ReplicationDomain.protected CSNGeneratorgetCsnGenerator()Returns theCSNGeneratorthat will be used to generateCSNfor this domain.Set<String>getEclIncludes()Get the attributes to include in each change for the ECL.protected Set<String>getEclIncludesForDeletes()Get the attributes to include in each delete change for the ECL.longgetGenerationID()This method should return the generationID to use for this ReplicationDomain.HealthStatusgetHealthStatus(long delayThresholdMs)Returns the health status based on the current replication delay.protected ReplicationDomain.ImportExportContextgetImportExportContext()Returns the Import/Export context associated to this ReplicationDomain.CSNgetLastLocalChange()Returns the CSN of the last Change that was fully processed by this ReplicationDomain.ReplicaIdgetReplicaId()Get the replica ID which identifies this Replication Domain inside the Replication Service.intgetReplicationServerPort()Get the port of the replicationServer to which this domain is currently connected.ServerStategetServerState()Get the ServerState maintained by the Concrete class.ServerStatusgetStatus()Gets the status for this domain.protected booleanhasConnectionError()Check if the domain has a connection error.booleanieRunning()Returns a boolean indicating if an import or export is currently processed.protected abstract voidimportBackend(InputStream input)This method should trigger an import of the replicated data.voidinitializeFromRemote(ReplicaId source, Task initTask)Initializes asynchronously this domain from a remote source server.voidinitializeRemote(ReplicaId target, Task initTask)Initializes a remote server from this server.protected voidinitializeRemote(ReplicaId replicaToInitialize, ReplicaId replicaRunningTheTask, Task initTask, int initWindow)Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.booleanisConnected()Check if the domain is connected to a ReplicationServer.protected booleanisListenerShuttingDown()Returnstrueif the listener thread is shutting down or has shutdown.protected booleanisSuspending()Returntrueif the domain is being suspended.protected voidprocessUpdateAfterReplay(UpdateMsg msg)This method must be called after each call todispatchUpdateForReplay(UpdateMsg)when the processing of the update is completed.voidpublish(ReplicationMsg msg)Publish anUpdateMsgto the Replication Service.voidpublishHeartbeatMsg()Publishes a heartbeat message if all pending changes for current replica have been sent out.voidpublishReplicaOfflineMsg()Publishes a replica offline message if all pending changes for current replica have been sent out.protected byte[]receiveEntryBytes()Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).protected voidrecreateRemoteReplicasFromState()Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.protected booleanreplicaIsLate()Returns whether the connected RS has notified this replica to be late with respect to changelog contents.voidresetGenerationId(long generationIdNewValue)Reset the generationId of this domain in the whole topology.protected voidresumeService()Restart the Replication service after asuspendService().voidsessionInitiated(ServerStatus initStatus, ServerState rsState, ServerState newestChangelogStateOfReplica)Set the initial status of the domain and perform necessary initializations.booleansetEclIncludes(ReplicaId replicaId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes)Set the attributes configured on a server to be included in the ECL.protected voidstartListenService()Starts the receiver side of the Replication Service.protected voidstartPublishService()Start the publish mechanism of the Replication Service.voidsuspendService()Temporarily suspend the Replication Service.protected voidtoBadGenerationIdStatus()Sets the status to bad generation ID.StringtoString()protected voidtrace(String msg, Object... args)Logs at trace level a message for this replication domain.protected voidtrace(LocalizableMessageDescriptor.Arg0 msg)Logs at trace level a message for this replication domain.protected voidupdateState()Update the server state if necessary.
 
- 
- 
- 
Field Detail- 
configprotected volatile ReplicationDomainCfg config The configuration of the replication domain.
 - 
replicationPurgeDelayInMsprotected volatile long replicationPurgeDelayInMs The replication purge delay is available asLongfrom theconfigobject. Since the value is retrieved for every update, avoid unboxing performance issues by using a dedicated long field.
 - 
brokerprotected ReplicationBroker broker The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.
 - 
generationIdprotected volatile long generationId The generationId for this replication domain. It is made of a hash of the 1000 first entries for this domain.
 - 
serverContextprotected final ServerContext serverContext The directory server context.
 - 
canPersistROMprotected volatile boolean canPersistROM Decides whether to include theReplicaOfflineMsg(ROM) sent by the local replica in the persisted state.The ROM should not be part of the persisted state while the replica is recovering updates from the connected RS. Since the RS state is sent as part of the response to the start message, we can detect the case and only persist the ROM when the replica has finished receiving its own updates or there are no updates to receive. See OPENDJ-8062 and OPENDJ-8992 for details. 
 
- 
 - 
Constructor Detail- 
ReplicationDomainpublic ReplicationDomain(ServerContext serverContext, ReplicationDomainCfg config, long generationId) Creates a ReplicationDomain with the provided parameters.- Parameters:
- serverContext- The directory server context
- config- The configuration object for this ReplicationDomain
- generationId- the generation of this ReplicationDomain
 
 
- 
 - 
Method Detail- 
getHealthStatuspublic HealthStatus getHealthStatus(long delayThresholdMs) Returns the health status based on the current replication delay. If the delay is above the provided threshold, the replication domain is considered not healthy.- Parameters:
- delayThresholdMs- The maximum replication delay in milliseconds for considering this replication domain healthy.
- Returns:
- This replication domain health status.
 
 - 
getCsnGeneratorprotected CSNGenerator getCsnGenerator() Returns theCSNGeneratorthat will be used to generateCSNfor this domain.- Returns:
- The CSNGeneratorthat will be used to generateCSNfor this domain.
 
 - 
sessionInitiatedpublic void sessionInitiated(ServerStatus initStatus, ServerState rsState, ServerState newestChangelogStateOfReplica) Set the initial status of the domain and perform necessary initializations. This method will be called by the Broker each time the ReplicationBroker establish a new session to a Replication Server. Implementations may override this method when they need to perform additional computing after session establishment. The default implementation should be sufficient for ReplicationDomains that don't need to perform additional computing.- Parameters:
- initStatus- The status to enter the state machine with.
- rsState- The ServerState of the ReplicationServer with which the session was established.
- newestChangelogStateOfReplica- The newest known ServerState of the change-log for this replica across the contacted ReplicationServers
 
 - 
getStatuspublic ServerStatus getStatus() Gets the status for this domain.- Returns:
- The status for this domain.
 
 - 
getBaseDNpublic Dn getBaseDN() Returns the base DN of this ReplicationDomain. All Replication Domain using this baseDN will be connected through the Replication Service.- Returns:
- The base DN of this ReplicationDomain
 
 - 
getReplicaIdpublic ReplicaId getReplicaId() Get the replica ID which identifies this Replication Domain inside the Replication Service.- Returns:
- The replica ID.
 
 - 
recreateRemoteReplicasFromStateprotected void recreateRemoteReplicasFromState() Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.
 - 
updateStateprotected void updateState() Update the server state if necessary.
 - 
decodeTargetpublic ReplicaId decodeTarget(String targetString) throws LdapException Verifies that the given string represents a valid source from which this server can be initialized.- Parameters:
- targetString- The string representing the source
- Returns:
- The source as a integer value
- Throws:
- LdapException- if the string is not valid
 
 - 
initializeRemotepublic void initializeRemote(ReplicaId target, Task initTask) throws LdapException Initializes a remote server from this server.The exportBackend(OutputStream)will therefore be called on this server, and theimportBackend(InputStream)will be called on the remote server.The InputStream and OutputStream given as a parameter to those methods will be connected through the replication protocol. - Parameters:
- target- The server-id of the server that should be initialized. The target can be discovered using the- getReplicaInfos()method.
- initTask- The task that triggers this initialization and that should be updated with its progress.
- Throws:
- LdapException- If it was not possible to publish the Initialization message to the Topology.
 
 - 
initializeRemoteprotected void initializeRemote(ReplicaId replicaToInitialize, ReplicaId replicaRunningTheTask, Task initTask, int initWindow) throws LdapException Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.- Parameters:
- replicaToInitialize- The target replica that should be initialized.
- replicaRunningTheTask- The replica that initiated the export. It can be the replica id of this server, or the replica id of a remote replica.
- initTask- The task in this server that triggers this initialization and that should be updated with its progress. Null when the export is done following a request coming from a remote server (task is remote).
- initWindow- The value of the initialization window for flow control between the importer and the exporter.
- Throws:
- LdapException- When an error occurs. No exception raised means success.
 
 - 
getServerStatepublic ServerState getServerState() Get the ServerState maintained by the Concrete class.- Returns:
- the ServerState maintained by the Concrete class.
 
 - 
receiveEntryBytesprotected byte[] receiveEntryBytes() Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).- Returns:
- The bytes. Null when the Done or Err message has been received
 
 - 
initializeFromRemotepublic void initializeFromRemote(ReplicaId source, Task initTask) throws LdapException Initializes asynchronously this domain from a remote source server. Before returning from this call, for the provided task :- the progressing counters are updated during the initialization using setTotal() and setLeft().
- the end of the initialization using updateTaskCompletionState().
 When this method is called, a request for initialization is sent to the remote source server requesting initialization. - Parameters:
- source- The server-id of the source from which to initialize. The source can be discovered using the- getReplicaInfos()method.
- initTask- The task that launched the initialization and should be updated of its progress.
- Throws:
- LdapException- If it was not possible to publish the Initialization message to the Topology. The task state is updated.
 
 - 
toBadGenerationIdStatusprotected void toBadGenerationIdStatus() Sets the status to bad generation ID.
 - 
ieRunningpublic boolean ieRunning() Returns a boolean indicating if an import or export is currently processed.- Returns:
- The status
 
 - 
resetGenerationIdpublic void resetGenerationId(long generationIdNewValue) throws LdapExceptionReset the generationId of this domain in the whole topology. A message is sent to the Replication Servers for them to reset their change dbs.- Parameters:
- generationIdNewValue- The new value of the generation Id.
- Throws:
- LdapException- When an error occurs
 
 - 
isConnectedpublic boolean isConnected() Check if the domain is connected to a ReplicationServer.- Returns:
- true if the server is connected, false if not.
 
 - 
hasConnectionErrorprotected boolean hasConnectionError() Check if the domain has a connection error. A Connection error happens when the broker could not be created or when the broker could not find any ReplicationServer to connect to.- Returns:
- true if the domain has a connection error.
 
 - 
replicaIsLateprotected boolean replicaIsLate() Returns whether the connected RS has notified this replica to be late with respect to changelog contents.- Returns:
- whether the connected RS has notified this replica to be late
 
 - 
getReplicationServerPortpublic int getReplicationServerPort() Get the port of the replicationServer to which this domain is currently connected.- Returns:
- the replication server port to which this domain is currently connected.
 
 - 
startPublishServiceprotected void startPublishService() Start the publish mechanism of the Replication Service. After this method has been called, the publish service can be used by calling thepublish(ReplicationMsg)method.
 - 
startListenServiceprotected void startListenService() Starts the receiver side of the Replication Service.After this method has been called, the Replication Service will start calling the dispatchUpdateForReplay(UpdateMsg).This method must be called once and must be called after the startPublishService().
 - 
suspendServicepublic void suspendService() Temporarily suspend the Replication Service. The Replication Service can be resumed again usingresumeService().It can be useful to suspend the Replication Service when the repository where the replicated information is stored becomes temporarily unavailable and replicated updates can therefore not be replayed during a while. This method is not MT safe. 
 - 
isListenerShuttingDownprotected final boolean isListenerShuttingDown() Returnstrueif the listener thread is shutting down or has shutdown.- Returns:
- trueif the listener thread is shutting down or has shutdown.
 
 - 
isSuspendingprotected boolean isSuspending() Returntrueif the domain is being suspended.- Returns:
- trueif the domain is being suspended
 
 - 
resumeServiceprotected void resumeService() Restart the Replication service after asuspendService().The Replication Service will restart from the point indicated by the ServerStatethat was given as a parameter to thestartPublishService()at startup time.If some data have changed in the repository during the period of time when the Replication Service was suspended, this ServerStateshould therefore be updated by the Replication Domain subclass before calling this method. This method is not MT safe.
 - 
changeConfigprotected boolean changeConfig(ReplicationDomainCfg config) Change some ReplicationDomain parameters.- Parameters:
- config- The new configuration that this domain should now use.
- Returns:
- whether a restart of the service is needed
 
 - 
exportBackendprotected abstract void exportBackend(OutputStream output) throws LdapException This method should trigger an export of the replicated data. to the provided outputStream. When finished the outputStream should be flushed and closed.- Parameters:
- output- The OutputStream where the export should be produced.
- Throws:
- LdapException- When needed.
 
 - 
importBackendprotected abstract void importBackend(InputStream input) throws LdapException This method should trigger an import of the replicated data.- Parameters:
- input- The InputStream from which the import should be reading entries.
- Throws:
- LdapException- When needed.
 
 - 
countEntriespublic abstract long countEntries() throws LdapExceptionThis method should return the total number of objects in the replicated domain. This count will be used for reporting.- Returns:
- The number of objects in the replication domain.
- Throws:
- LdapException- when needed.
 
 - 
dispatchUpdateForReplaypublic abstract void dispatchUpdateForReplay(UpdateMsg updateMsg) This method ensures thisUpdateMsgreceived from remote replication entities will be replayed, by dispatching it to the replay threads.- Parameters:
- updateMsg- The- UpdateMsgto replay.
 
 - 
processUpdateAfterReplayprotected final void processUpdateAfterReplay(UpdateMsg msg) This method must be called after each call todispatchUpdateForReplay(UpdateMsg)when the processing of the update is completed.It is useful for implementation needing to process the update in an asynchronous way or using several threads, but must be called even by implementation doing it in a synchronous, single-threaded way. - Parameters:
- msg- The UpdateMsg whose processing was completed.
 
 - 
publishpublic void publish(ReplicationMsg msg) Publish anUpdateMsgto the Replication Service.The Replication Service will handle the delivery of this UpdateMsgto all the participants of this Replication Domain. These members will be receive thisUpdateMsgthrough a call of thedispatchUpdateForReplay(UpdateMsg)message.- Parameters:
- msg- The UpdateMsg that should be published.
 
 - 
publishHeartbeatMsgpublic void publishHeartbeatMsg() Publishes a heartbeat message if all pending changes for current replica have been sent out.
 - 
publishReplicaOfflineMsgpublic void publishReplicaOfflineMsg() Publishes a replica offline message if all pending changes for current replica have been sent out.
 - 
getGenerationIDpublic long getGenerationID() This method should return the generationID to use for this ReplicationDomain. This method can be called at any time after the ReplicationDomain has been started.- Returns:
- The GenerationID.
 
 - 
addAdditionalMonitoringpublic void addAdditionalMonitoring(MeterRegistryHolder registry) Subclasses should use this method to add additional monitoring information in the ReplicationDomain.- Parameters:
- registry- where to additional monitoring attributes
 
 - 
getImportExportContextprotected ReplicationDomain.ImportExportContext getImportExportContext() Returns the Import/Export context associated to this ReplicationDomain.- Returns:
- the Import/Export context associated to this ReplicationDomain
 
 - 
setEclIncludespublic boolean setEclIncludes(ReplicaId replicaId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes) Set the attributes configured on a server to be included in the ECL.- Parameters:
- replicaId- Server where these attributes are configured.
- includeAttributes- Attributes to be included with all change records, may include wild-cards.
- includeAttributesForDeletes- Additional attributes to be included with delete change records, may include wild-cards.
- Returns:
- trueif the set of attributes was modified.
 
 - 
getEclIncludespublic Set<String> getEclIncludes() Get the attributes to include in each change for the ECL.- Returns:
- The attributes to include in each change for the ECL.
 
 - 
getEclIncludesForDeletesprotected Set<String> getEclIncludesForDeletes() Get the attributes to include in each delete change for the ECL.- Returns:
- The attributes to include in each delete change for the ECL.
 
 - 
getLastLocalChangepublic CSN getLastLocalChange() Returns the CSN of the last Change that was fully processed by this ReplicationDomain.- Returns:
- The CSN of the last Change that was fully processed by this ReplicationDomain.
 
 - 
traceprotected void trace(String msg, Object... args) Logs at trace level a message for this replication domain.- Parameters:
- msg- the message
- args- the arguments of the message
 
 - 
traceprotected void trace(LocalizableMessageDescriptor.Arg0 msg) Logs at trace level a message for this replication domain.- Parameters:
- msg- a localizable message
 
 
- 
 
-