Skip to content

Commit

Permalink
Revert "Merge remote-tracking branch 'origin/trunk' into refine-block…
Browse files Browse the repository at this point in the history
…s-to-reconstruct-logic"

This reverts commit 0582ce1, reversing
changes made to 5de85a0.
  • Loading branch information
VicoWu committed Jul 26, 2024
1 parent 0582ce1 commit 1c11467
Show file tree
Hide file tree
Showing 78 changed files with 492 additions and 4,658 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,6 @@ public class CommonConfigurationKeysPublic {
"fs.s3a.*.server-side-encryption.key",
"fs.s3a.encryption.algorithm",
"fs.s3a.encryption.key",
"fs.s3a.encryption.context",
"fs.azure\\.account.key.*",
"credential$",
"oauth.*secret",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,6 @@
fs.s3a.*.server-side-encryption.key
fs.s3a.encryption.algorithm
fs.s3a.encryption.key
fs.s3a.encryption.context
fs.s3a.secret.key
fs.s3a.*.secret.key
fs.s3a.session.key
Expand Down Expand Up @@ -1761,15 +1760,6 @@
</description>
</property>

<property>
<name>fs.s3a.encryption.context</name>
<description>Specific encryption context to use if fs.s3a.encryption.algorithm
has been set to 'SSE-KMS' or 'DSSE-KMS'. The value of this property is a set
of non-secret comma-separated key-value pairs of additional contextual
information about the data that are separated by equal operator (=).
</description>
</property>

<property>
<name>fs.s3a.signing-algorithm</name>
<description>Override the default signing algorithm so legacy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,9 +1471,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
"dfs.journalnode.sync.interval";
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
public static final String DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY =
"dfs.journalnode.enable.sync.format";
public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT = false;
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
"dfs.journalnode.edit-cache-size.bytes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.security.KerberosInfo;
Expand Down Expand Up @@ -52,13 +51,4 @@ GetEditLogManifestResponseProto getEditLogManifestFromJournal(
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
throws IOException;

/**
* Get the storage info for the specified journal.
* @param jid the journal identifier
* @param nameServiceId the name service id
* @return the storage info object
*/
StorageInfoProto getStorageInfo(String jid, String nameServiceId)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;

Expand Down Expand Up @@ -62,18 +60,4 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
throw new ServiceException(e);
}
}

@Override
public StorageInfoProto getStorageInfo(
RpcController controller, GetStorageInfoRequestProto request)
throws ServiceException {
try {
return impl.getStorageInfo(
request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null
);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.hadoop.hdfs.qjournal.protocolPB;

import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -77,18 +75,6 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
req.build()));
}

@Override
public StorageInfoProto getStorageInfo(String jid, String nameServiceId)
throws IOException {
InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req =
InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder()
.setJid(convertJournalId(jid));
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build()));
}

private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(jid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.slf4j.Logger;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -72,14 +71,14 @@ public class JournalNodeRpcServer implements QJournalProtocol,

JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
this.jn = jn;

Check failure on line 74 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L74

blanks: end of line
Configuration confCopy = new Configuration(conf);

Check failure on line 76 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L76

blanks: end of line
// Ensure that nagling doesn't kick in, which could cause latency issues.
confCopy.setBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
true);

Check failure on line 81 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L81

blanks: end of line
InetSocketAddress addr = getAddress(confCopy);
String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null);
if (bindHost == null) {
Expand All @@ -105,7 +104,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
this.handlerCount = confHandlerCount;
LOG.info("The number of JournalNodeRpcServer handlers is {}.",
this.handlerCount);

Check failure on line 107 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L107

blanks: end of line
this.server = new RPC.Builder(confCopy)
.setProtocol(QJournalProtocolPB.class)
.setInstance(service)
Expand Down Expand Up @@ -150,15 +149,15 @@ void start() {
public InetSocketAddress getAddress() {
return server.getListenerAddress();
}

Check failure on line 152 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L152

blanks: end of line
void join() throws InterruptedException {
this.server.join();
}

Check failure on line 156 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L156

blanks: end of line
void stop() {
this.server.stop();
}

Check failure on line 160 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L160

blanks: end of line
static InetSocketAddress getAddress(Configuration conf) {
String addr = conf.get(
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
Expand Down Expand Up @@ -212,7 +211,7 @@ public void journal(RequestInfo reqInfo,
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}

Check failure on line 214 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L214

blanks: end of line
@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
Expand Down Expand Up @@ -246,24 +245,17 @@ public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk)
throws IOException {

Check failure on line 248 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L248

blanks: end of line
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
.getEditLogManifest(sinceTxId, inProgressOk);

Check failure on line 251 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java#L251

blanks: end of line
return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest))
.setHttpPort(jn.getBoundHttpAddress().getPort())
.setFromURL(jn.getHttpServerURI())
.build();
}

@Override
public StorageInfoProto getStorageInfo(String jid,
String nameServiceId) throws IOException {
StorageInfo storage = jn.getOrCreateJournal(jid, nameServiceId).getStorage();
return PBHelper.convert(storage);
}

@Override
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -82,7 +79,6 @@ public class JournalNodeSyncer {
private int numOtherJNs;
private int journalNodeIndexForSync = 0;
private final long journalSyncInterval;
private final boolean tryFormatting;
private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler;
private final JournalMetrics metrics;
Expand All @@ -102,9 +98,6 @@ public class JournalNodeSyncer {
logSegmentTransferTimeout = conf.getInt(
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
tryFormatting = conf.getBoolean(
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY,
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT);
throttler = getThrottler(conf);
metrics = journal.getMetrics();
journalSyncerStarted = false;
Expand Down Expand Up @@ -178,8 +171,6 @@ private void startSyncJournalsDaemon() {
// Wait for journal to be formatted to create edits.sync directory
while(!journal.isFormatted()) {
try {
// Format the journal with namespace info from the other JNs if it is not formatted
formatWithSyncer();
Thread.sleep(journalSyncInterval);
} catch (InterruptedException e) {
LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
Expand All @@ -196,15 +187,7 @@ private void startSyncJournalsDaemon() {
while(shouldSync) {
try {
if (!journal.isFormatted()) {
LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer");
formatWithSyncer();
if (journal.isFormatted() && !createEditsSyncDir()) {
LOG.error("Failed to create directory for downloading log " +
"segments: {}. Stopping Journal Node Sync.",
journal.getStorage().getEditsSyncDir());
return;
}
continue;
LOG.warn("Journal cannot sync. Not formatted.");
} else {
syncJournals();
}
Expand Down Expand Up @@ -250,68 +233,6 @@ private void syncJournals() {
journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
}

private void formatWithSyncer() {
if (!tryFormatting) {
return;
}
LOG.info("Trying to format the journal with the syncer");
try {
StorageInfo storage = null;
for (JournalNodeProxy jnProxy : otherJNProxies) {
if (!hasEditLogs(jnProxy)) {
// This avoids a race condition between `hdfs namenode -format` and
// JN syncer by checking if the other JN is not newly formatted.
continue;
}
try {
HdfsServerProtos.StorageInfoProto storageInfoResponse =
jnProxy.jnProxy.getStorageInfo(jid, nameServiceId);
storage = PBHelper.convert(
storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE
);
if (storage.getNamespaceID() == 0) {
LOG.error("Got invalid StorageInfo from " + jnProxy);
storage = null;
continue;
}
LOG.info("Got StorageInfo " + storage + " from " + jnProxy);
break;
} catch (IOException e) {
LOG.error("Could not get StorageInfo from " + jnProxy, e);
}
}
if (storage == null) {
LOG.error("Could not get StorageInfo from any JournalNode. " +
"JournalNodeSyncer cannot format the journal.");
return;
}
NamespaceInfo nsInfo = new NamespaceInfo(storage);
journal.format(nsInfo, true);
} catch (IOException e) {
LOG.error("Exception in formatting the journal with the syncer", e);
}
}

private boolean hasEditLogs(JournalNodeProxy journalProxy) {
GetEditLogManifestResponseProto editLogManifest;
try {
editLogManifest = journalProxy.jnProxy.getEditLogManifestFromJournal(
jid, nameServiceId, 0, false);
} catch (IOException e) {
LOG.error("Could not get edit log manifest from " + journalProxy, e);
return false;
}

List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
editLogManifest.getManifest()).getLogs();
if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
LOG.warn("Journal at " + journalProxy.jnAddr + " has no edit logs");
return false;
}

return true;
}

private void syncWithJournalAtIndex(int index) {
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ jn.getBoundIpcAddress().getPort() + " with "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,8 @@ package hadoop.hdfs.qjournal;
import "HdfsServer.proto";
import "QJournalProtocol.proto";

message GetStorageInfoRequestProto {
required JournalIdProto jid = 1;
optional string nameServiceId = 2;
}

service InterQJournalProtocolService {
rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto)
returns (GetEditLogManifestResponseProto);

rpc getStorageInfo(GetStorageInfoRequestProto)
returns (StorageInfoProto);
}
Loading

0 comments on commit 1c11467

Please sign in to comment.