Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use nodeName instead of replicaId #772

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion clientlib/src/main/proto/yelp/nrtsearch/luceneserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -917,11 +917,13 @@ message StateResponse {
message AddReplicaRequest {
int32 magicNumber = 1; //magic number send on all requests since these are meant for internal communication only
string indexName = 2; //index name
int32 replicaId = 3; //replica Id
int32 replicaId = 3 [deprecated = true]; //replica Id
string hostName = 4; // replica host name
int32 port = 5; // replica port number
// index id
string indexId = 6;
// node name
string nodeName = 7;
}

message AddReplicaResponse {
Expand Down Expand Up @@ -1046,6 +1048,7 @@ message GetNodesResponse {
message NodeInfo {
string hostname = 1; //name or ip address of the remote host that this node is connected to for binary replication
int32 port = 2; //port number of the remote host that this node is connected to for binary replication
string nodeName = 3; //name of the remote node
}

message DeleteByQueryRequest {
Expand Down
1,203 changes: 613 additions & 590 deletions grpc-gateway/luceneserver.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions grpc-gateway/luceneserver.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4142,6 +4142,9 @@
"port": {
"type": "integer",
"format": "int32"
},
"nodeName": {
"type": "string"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public NrtsearchConfig(InputStream yamlStream) {
configReader.getLong("maxConnectionAgeForReplication", AS_LARGE_AS_INFINITE);
maxConnectionAgeGraceForReplication =
configReader.getLong("maxConnectionAgeGraceForReplication", AS_LARGE_AS_INFINITE);
nodeName = configReader.getString("nodeName", DEFAULT_NODE_NAME);
nodeName = substituteEnvVariables(configReader.getString("nodeName", DEFAULT_NODE_NAME));
hostName = substituteEnvVariables(configReader.getString("hostName", DEFAULT_HOSTNAME));
stateDir = configReader.getString("stateDir", DEFAULT_STATE_DIR.toString());
indexDir = configReader.getString("indexDir", DEFAULT_INDEX_DIR.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ private void shutdown() throws InterruptedException {
}

public void addReplicas(
String indexName, String indexId, int replicaId, String hostName, int port) {
String indexName, String indexId, String nodeName, String hostName, int port) {
AddReplicaRequest addReplicaRequest =
AddReplicaRequest.newBuilder()
.setMagicNumber(BINARY_MAGIC)
.setIndexName(indexName)
.setIndexId(indexId)
.setReplicaId(replicaId)
.setNodeName(nodeName)
.setHostName(hostName)
.setPort(port)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addRe
}
try {
shardState.nrtPrimaryNode.addReplica(
addReplicaRequest.getReplicaId(),
addReplicaRequest.getNodeName(),
// channel for primary to talk to replica
new ReplicationServerClient(
addReplicaRequest.getHostName(), addReplicaRequest.getPort(), useKeepAlive));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private GetNodesResponse handle(IndexState indexState) {
NodeInfo.newBuilder()
.setHostname(hostPort.getHostName())
.setPort(hostPort.getPort())
.setNodeName(replica.getNodeName())
.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private SearcherVersion handle(IndexState indexState, String indexId) {
Iterator<NRTPrimaryNode.ReplicaDetails> it = replicasInfos.iterator();
while (it.hasNext()) {
NRTPrimaryNode.ReplicaDetails replicaDetails = it.next();
int replicaID = replicaDetails.getReplicaId();
String nodeName = replicaDetails.getNodeName();
ReplicationServerClient currentReplicaServerClient =
replicaDetails.getReplicationServerClient();
try {
Expand All @@ -89,8 +89,8 @@ private SearcherVersion handle(IndexState indexState, String indexId) {
Status status = e.getStatus();
if (status.getCode().equals(Status.UNAVAILABLE.getCode())) {
logger.info(
"NRTPRimaryNode: sendNRTPoint, lost connection to replicaId: "
+ replicaDetails.getReplicaId()
"NRTPRimaryNode: sendNRTPoint, lost connection to nodeName: "
+ nodeName
+ " host: "
+ replicaDetails.getHostPort().getHostName()
+ " port: "
Expand All @@ -99,13 +99,13 @@ private SearcherVersion handle(IndexState indexState, String indexId) {
}
} catch (Exception e) {
shardState.nrtPrimaryNode.message(
"top: failed to connect R"
+ replicaID
"top: failed to connect "
+ nodeName
+ " for newNRTPoint; skipping: "
+ e.getMessage());
logger.info(
"top: failed to connect R"
+ replicaID
"top: failed to connect "
+ nodeName
+ " for newNRTPoint; skipping: "
+ e.getMessage());
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/yelp/nrtsearch/server/index/ShardState.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@

public class ShardState implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(ShardState.class);
public static final int REPLICA_ID = 0;
public static final String INDEX_DATA_DIR_NAME = "index";
public static final String TAXONOMY_DATA_DIR_NAME = "taxonomy";
final ExecutorService searchExecutor;
Expand Down Expand Up @@ -980,7 +979,7 @@ public synchronized void startReplica(
indexStateManager.getIndexId(),
primaryAddress,
hostPort,
REPLICA_ID,
configuration.getNodeName(),
indexDir,
new ShardSearcherFactory(true, false),
verbose ? System.out : new PrintStream(OutputStream.nullOutputStream()),
Expand Down Expand Up @@ -1115,7 +1114,7 @@ public void run() {
.addReplicas(
shardState.indexStateManager.getCurrent().getName(),
shardState.indexStateManager.getIndexId(),
REPLICA_ID,
nrtReplicaNode.getNodeName(),
nrtReplicaNode.getHostPort().getHostName(),
nrtReplicaNode.getHostPort().getPort());
}
Expand Down
69 changes: 48 additions & 21 deletions src/main/java/com/yelp/nrtsearch/server/nrt/NRTPrimaryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ public NrtDataManager getNrtDataManager() {
}

public static class ReplicaDetails {
private final int replicaId;
private final String nodeName;
private final HostPort hostPort;
private final ReplicationServerClient replicationServerClient;

public int getReplicaId() {
return replicaId;
public String getNodeName() {
return nodeName;
}

public ReplicationServerClient getReplicationServerClient() {
Expand All @@ -108,9 +108,9 @@ public HostPort getHostPort() {
return hostPort;
}

ReplicaDetails(int replicaId, ReplicationServerClient replicationServerClient) {
this.replicaId = replicaId;
ReplicaDetails(String nodeName, ReplicationServerClient replicationServerClient) {
this.replicationServerClient = replicationServerClient;
this.nodeName = nodeName;
this.hostPort =
new HostPort(replicationServerClient.getHost(), replicationServerClient.getPort());
}
Expand All @@ -124,7 +124,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReplicaDetails that = (ReplicaDetails) o;
return replicaId == that.replicaId && Objects.equals(hostPort, that.hostPort);
return Objects.equals(nodeName, that.nodeName) && Objects.equals(hostPort, that.hostPort);
}

/*
Expand All @@ -133,7 +133,7 @@ public boolean equals(Object o) {
* */
@Override
public int hashCode() {
return Objects.hash(replicaId, hostPort);
return Objects.hash(nodeName, hostPort);
}
}

Expand Down Expand Up @@ -207,7 +207,7 @@ void sendNewNRTPointToReplicas() {
Iterator<ReplicaDetails> it = replicasInfos.iterator();
while (it.hasNext()) {
ReplicaDetails replicaDetails = it.next();
int replicaID = replicaDetails.replicaId;
String nodeName = replicaDetails.getNodeName();
ReplicationServerClient currentReplicaServerClient = replicaDetails.replicationServerClient;
try {
currentReplicaServerClient.newNRTPoint(
Expand All @@ -216,17 +216,17 @@ void sendNewNRTPointToReplicas() {
Status status = e.getStatus();
if (status.getCode().equals(Status.UNAVAILABLE.getCode())) {
logger.warn(
"NRTPrimaryNode: sendNRTPoint, lost connection to replicaId: {} host: {} port: {}",
replicaDetails.replicaId,
"NRTPrimaryNode: sendNRTPoint, lost connection to nodeName: {} host: {} port: {}",
nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort());
currentReplicaServerClient.close();
it.remove();
} else if (status.getCode().equals(Status.FAILED_PRECONDITION.getCode())
|| status.getCode().equals(Status.NOT_FOUND.getCode())) {
logger.warn(
"NRTPrimaryNode: sendNRTPoint, replicaId: {} host: {} port: {} cannot process nrt point, closing connection: {}",
replicaDetails.replicaId,
"NRTPrimaryNode: sendNRTPoint, nodeName: {} host: {} port: {} cannot process nrt point, closing connection: {}",
nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort(),
status);
Expand All @@ -236,8 +236,8 @@ void sendNewNRTPointToReplicas() {
} catch (Exception e) {
String msg =
String.format(
"top: failed to connect R%d for newNRTPoint; skipping: %s",
replicaID, e.getMessage());
"top: failed to connect %s for newNRTPoint; skipping: %s",
nodeName, e.getMessage());
message(msg);
logger.warn(msg);
}
Expand Down Expand Up @@ -476,11 +476,34 @@ public void setRAMBufferSizeMB(double mb) {
writer.getConfig().setRAMBufferSizeMB(mb);
}

public void addReplica(int replicaID, ReplicationServerClient replicationServerClient)
public void addReplica(String nodeName, ReplicationServerClient replicationServerClient)
throws IOException {
logMessage("add replica: " + warmingSegments.size() + " current warming merges ");
ReplicaDetails replicaDetails = new ReplicaDetails(replicaID, replicationServerClient);
ReplicaDetails replicaDetails = new ReplicaDetails(nodeName, replicationServerClient);
logMessage(
String.format(
"Add replica %s (%s:%d) : %d current warming merges ",
replicaDetails.nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort(),
warmingSegments.size()));
if (!replicasInfos.contains(replicaDetails)) {
Iterator<ReplicaDetails> it = replicasInfos.iterator();
while (it.hasNext()) {
ReplicaDetails existingReplicaDetails = it.next();
// This replica may have reused the address of a previous one that has not been cleaned up
// yet. We should remove any existing replica with the same address to avoid duplicate
// requests.
if (existingReplicaDetails.hostPort.equals(replicaDetails.hostPort)) {
logMessage(
String.format(
"Removing existing replica with same address %s (%s:%d)",
existingReplicaDetails.nodeName,
existingReplicaDetails.hostPort.getHostName(),
existingReplicaDetails.hostPort.getPort()));
existingReplicaDetails.replicationServerClient.close();
it.remove();
}
}
replicasInfos.add(replicaDetails);
}
// Step through all currently warming segments and try to add this replica if it isn't there
Expand All @@ -492,7 +515,8 @@ public void addReplica(int replicaID, ReplicationServerClient replicationServerC
if (preCopy.connections.contains(replicationServerClient)) {
logMessage(
String.format(
"Replica %s:%d is already warming this segment",
"Replica %s (%s:%d) is already warming this segment",
replicaDetails.nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort()));
// It's possible (maybe) that the replica started up, then a merge kicked off, and it
Expand All @@ -513,7 +537,8 @@ public void addReplica(int replicaID, ReplicationServerClient replicationServerC
filesMetadata)) {
logMessage(
String.format(
"Start precopying merged segments for new replica %s:%d",
"Start precopying merged segments for new replica %s (%s:%d)",
replicaDetails.nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort()));
} else {
Expand All @@ -522,7 +547,8 @@ public void addReplica(int replicaID, ReplicationServerClient replicationServerC
// nrt point sent to this replica
logMessage(
String.format(
"Merge precopy already completed, unable to add new replica %s:%d",
"Merge precopy already completed, unable to add new replica %s (%s:%d)",
replicaDetails.nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort()));
}
Expand All @@ -543,7 +569,8 @@ public void close() throws IOException {
ReplicationServerClient replicationServerClient = replicaDetails.getReplicationServerClient();
HostPort replicaHostPort = replicaDetails.getHostPort();
logger.info(
"CLOSE NRT PRIMARY, closing replica channel host:{}, port:{}",
"CLOSE NRT PRIMARY, closing replica channel nodeName: {} host:{}, port:{}",
replicaDetails.getNodeName(),
replicaHostPort.getHostName(),
replicaHostPort.getPort());
replicationServerClient.close();
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/com/yelp/nrtsearch/server/nrt/NRTReplicaNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class NRTReplicaNode extends ReplicaNode {
private final ReplicaDeleterManager replicaDeleterManager;
private final String indexName;
private final String indexId;
private final String nodeName;
private final boolean ackedCopy;
private final boolean filterIncompatibleSegmentReaders;
final NrtCopyThread nrtCopyThread;
Expand All @@ -62,7 +63,7 @@ public NRTReplicaNode(
String indexId,
ReplicationServerClient primaryAddress,
HostPort hostPort,
int replicaId,
String nodeName,
Directory indexDir,
SearcherFactory searcherFactory,
PrintStream printStream,
Expand All @@ -71,10 +72,12 @@ public NRTReplicaNode(
boolean filterIncompatibleSegmentReaders,
int lowPriorityCopyPercentage)
throws IOException {
super(replicaId, indexDir, searcherFactory, printStream);
// the id is always 0, the nodeName is the identifier
super(0, indexDir, searcherFactory, printStream);
this.primaryAddress = primaryAddress;
this.indexName = indexName;
this.indexId = indexId;
this.nodeName = nodeName;
this.ackedCopy = ackedCopy;
this.hostPort = hostPort;
replicaDeleterManager = decInitialCommit ? new ReplicaDeleterManager(this) : null;
Expand Down Expand Up @@ -232,7 +235,7 @@ protected void launch(CopyJob job) {
protected void sendNewReplica() throws IOException {
logger.info(String.format("send new_replica to primary: %s", primaryAddress));
primaryAddress.addReplicas(
indexName, this.indexId, this.id, hostPort.getHostName(), hostPort.getPort());
indexName, this.indexId, this.nodeName, hostPort.getHostName(), hostPort.getPort());
}

public CopyJob launchPreCopyFiles(
Expand Down Expand Up @@ -281,6 +284,10 @@ public ReplicationServerClient getPrimaryAddress() {
return primaryAddress;
}

public String getNodeName() {
return nodeName;
}

public HostPort getHostPort() {
return hostPort;
}
Expand All @@ -290,7 +297,8 @@ public HostPort getHostPort() {
public boolean isKnownToPrimary() {
GetNodesResponse getNodesResponse = primaryAddress.getConnectedNodes(indexName);
for (NodeInfo nodeInfo : getNodesResponse.getNodesList()) {
if (hostPort.equals(new HostPort(nodeInfo.getHostname(), nodeInfo.getPort()))) {
if (nodeName.equals(nodeInfo.getNodeName())
&& hostPort.equals(new HostPort(nodeInfo.getHostname(), nodeInfo.getPort()))) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public void registerWithPrimary(String indexName, long timeoutMs) throws IOExcep
.setMagicNumber(BINARY_MAGIC)
.setIndexName(indexName)
.setIndexId(indexStateManager.getIndexId())
.setReplicaId(ShardState.REPLICA_ID)
.setNodeName(getGlobalState().getNodeName())
.setHostName("localhost")
.setPort(getGlobalState().getReplicationPort())
.build();
Expand Down
Loading