diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 54ff4a33cddcd..ea798872730a7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -103,12 +103,6 @@ protected Boolean initialValue() { return false; } }; - private static AlignmentContext alignmentContext; - - /** Set alignment context to use to fetch state alignment info from RPC. */ - public static void setAlignmentContext(AlignmentContext ac) { - alignmentContext = ac; - } @SuppressWarnings("unchecked") @Unstable @@ -345,6 +339,7 @@ static class Call { final RPC.RpcKind rpcKind; // Rpc EngineKind boolean done; // true when call is done private final Object externalHandler; + private AlignmentContext alignmentContext; private Call(RPC.RpcKind rpcKind, Writable param) { this.rpcKind = rpcKind; @@ -386,6 +381,15 @@ protected synchronized void callComplete() { } } + /** + * Set an AlignmentContext for the call to update when call is done. + * + * @param ac alignment context to update. + */ + public synchronized void setAlignmentContext(AlignmentContext ac) { + this.alignmentContext = ac; + } + /** Set the exception when there is an error. * Notify the caller the call is done. * @@ -1114,7 +1118,7 @@ public void sendRpcRequest(final Call call) // Items '1' and '2' are prepared here. RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, - clientId, alignmentContext); + clientId, call.alignmentContext); final ResponseBuffer buf = new ResponseBuffer(); header.writeDelimitedTo(buf); @@ -1191,9 +1195,9 @@ private void receiveRpcResponse() { Writable value = packet.newInstance(valueClass, conf); final Call call = calls.remove(callId); call.setRpcResponse(value); - } - if (alignmentContext != null) { - alignmentContext.receiveResponseState(header); + if (call.alignmentContext != null) { + call.alignmentContext.receiveResponseState(header); + } } // verify that packet length was correct if (packet.remaining() > 0) { @@ -1374,7 +1378,15 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth) throws IOException { return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT, - fallbackToSimpleAuth); + fallbackToSimpleAuth, null); + } + + public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, + ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth, + AlignmentContext alignmentContext) + throws IOException { + return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT, + fallbackToSimpleAuth, alignmentContext); } private void checkAsyncCall() throws IOException { @@ -1391,6 +1403,14 @@ private void checkAsyncCall() throws IOException { } } + Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, + ConnectionId remoteId, int serviceClass, + AtomicBoolean fallbackToSimpleAuth) + throws IOException { + return call(rpcKind, rpcRequest, remoteId, serviceClass, + fallbackToSimpleAuth, null); + } + /** * Make a call, passing rpcRequest, to the IPC server defined by * remoteId, returning the rpc response. @@ -1401,14 +1421,17 @@ private void checkAsyncCall() throws IOException { * @param serviceClass - service class for RPC * @param fallbackToSimpleAuth - set to true or false during this method to * indicate if a secure client falls back to simple auth + * @param alignmentContext - state alignment context * @return the rpc response * Throws exceptions if there are network problems or if the remote code * threw an exception. */ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, - AtomicBoolean fallbackToSimpleAuth) throws IOException { + AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) + throws IOException { final Call call = createCall(rpcKind, rpcRequest); + call.setAlignmentContext(alignmentContext); final Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 2734a95375ab5..554856625238a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -86,7 +86,7 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy ) throws IOException { return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, null); + rpcTimeout, connectionRetryPolicy, null, null); } @Override @@ -94,10 +94,12 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion, public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth) throws IOException { + AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) + throws IOException { final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); + rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth, + alignmentContext); return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker), false); } @@ -122,15 +124,18 @@ private static class Invoker implements RpcInvocationHandler { private final long clientProtocolVersion; private final String protocolName; private AtomicBoolean fallbackToSimpleAuth; + private AlignmentContext alignmentContext; private Invoker(Class protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth) throws IOException { + AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) + throws IOException { this(protocol, Client.ConnectionId.getConnectionId( addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), conf, factory); this.fallbackToSimpleAuth = fallbackToSimpleAuth; + this.alignmentContext = alignmentContext; } /** @@ -227,7 +232,7 @@ public Message invoke(Object proxy, final Method method, Object[] args) try { val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId, - fallbackToSimpleAuth); + fallbackToSimpleAuth, alignmentContext); } catch (Throwable e) { if (LOG.isTraceEnabled()) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 36d54002320a3..5440780c4663d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -586,7 +586,44 @@ public static ProtocolProxy getProtocolProxy(Class protocol, } return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, - fallbackToSimpleAuth); + fallbackToSimpleAuth, null); + } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server + * + * @param protocol protocol + * @param clientVersion client's version + * @param addr server address + * @param ticket security ticket + * @param conf configuration + * @param factory socket factory + * @param rpcTimeout max time for each rpc; 0 means no timeout + * @param connectionRetryPolicy retry policy + * @param fallbackToSimpleAuth set to true or false during calls to indicate + * if a secure client falls back to simple auth + * @param alignmentContext state alignment context + * @return the proxy + * @throws IOException if any error occurs + */ + public static ProtocolProxy getProtocolProxy(Class protocol, + long clientVersion, + InetSocketAddress addr, + UserGroupInformation ticket, + Configuration conf, + SocketFactory factory, + int rpcTimeout, + RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth, + AlignmentContext alignmentContext) + throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + SaslRpcServer.init(conf); + } + return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion, + addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, + fallbackToSimpleAuth, alignmentContext); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index 8a43172693847..0f5769e705028 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -50,7 +50,8 @@ ProtocolProxy getProxy(Class protocol, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth) throws IOException; + AtomicBoolean fallbackToSimpleAuth, + AlignmentContext alignmentContext) throws IOException; /** * Construct a server for a protocol implementation instance. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 7a9959abddb69..3cbd670a30060 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -856,10 +856,15 @@ private class RpcCall extends Call { final Writable rpcRequest; // Serialized Rpc request from client ByteBuffer rpcResponse; // the response for this call + private RpcResponseHeaderProto bufferedHeader; // the response header + private Writable bufferedRv; // the byte response + RpcCall(RpcCall call) { super(call); this.connection = call.connection; this.rpcRequest = call.rpcRequest; + this.bufferedRv = call.bufferedRv; + this.bufferedHeader = call.bufferedHeader; } RpcCall(Connection connection, int id) { @@ -880,6 +885,14 @@ private class RpcCall extends Call { this.rpcRequest = param; } + public void setBufferedHeader(RpcResponseHeaderProto header) { + this.bufferedHeader = header; + } + + public void setBufferedRv(Writable rv) { + this.bufferedRv = rv; + } + @Override public String getProtocol() { return "rpc"; @@ -968,6 +981,13 @@ void doResponse(Throwable t) throws IOException { setupResponse(call, RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER, null, t.getClass().getName(), StringUtils.stringifyException(t)); + } else if (alignmentContext != null) { + // rebuild response with state context in header + RpcResponseHeaderProto.Builder responseHeader = + call.bufferedHeader.toBuilder(); + alignmentContext.updateResponseState(responseHeader); + RpcResponseHeaderProto builtHeader = responseHeader.build(); + setupResponse(call, builtHeader, call.bufferedRv); } connection.sendResponse(call); } @@ -2992,9 +3012,6 @@ private void setupResponse( headerBuilder.setRetryCount(call.retryCount); headerBuilder.setStatus(status); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); - if(alignmentContext != null) { - alignmentContext.updateResponseState(headerBuilder); - } if (status == RpcStatusProto.SUCCESS) { RpcResponseHeaderProto header = headerBuilder.build(); @@ -3021,6 +3038,12 @@ private void setupResponse( private void setupResponse(RpcCall call, RpcResponseHeaderProto header, Writable rv) throws IOException { + if (alignmentContext != null && call.bufferedHeader == null + && call.bufferedRv == null) { + call.setBufferedHeader(header); + call.setBufferedRv(rv); + } + final byte[] response; if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) { response = setupResponseForProtobuf(header, rv); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 507517b293344..2e3b5594e5cf3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -214,16 +214,19 @@ private static class Invoker implements RpcInvocationHandler { private Client client; private boolean isClosed = false; private final AtomicBoolean fallbackToSimpleAuth; + private final AlignmentContext alignmentContext; public Invoker(Class protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout, AtomicBoolean fallbackToSimpleAuth) + int rpcTimeout, AtomicBoolean fallbackToSimpleAuth, + AlignmentContext alignmentContext) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, null, conf); this.client = CLIENTS.getClient(conf, factory); this.fallbackToSimpleAuth = fallbackToSimpleAuth; + this.alignmentContext = alignmentContext; } @Override @@ -246,7 +249,7 @@ public Object invoke(Object proxy, Method method, Object[] args) try { value = (ObjectWritable) client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), - remoteId, fallbackToSimpleAuth); + remoteId, fallbackToSimpleAuth, alignmentContext); } finally { if (traceScope != null) traceScope.close(); } @@ -289,7 +292,7 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, null); + rpcTimeout, connectionRetryPolicy, null, null); } /** Construct a client-side proxy object that implements the named protocol, @@ -301,7 +304,8 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth) + AtomicBoolean fallbackToSimpleAuth, + AlignmentContext alignmentContext) throws IOException { if (connectionRetryPolicy != null) { @@ -311,7 +315,7 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion, T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, - factory, rpcTimeout, fallbackToSimpleAuth)); + factory, rpcTimeout, fallbackToSimpleAuth, alignmentContext)); return new ProtocolProxy(protocol, proxy, true); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index f8f41ba1de573..d4fa60dd53632 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -278,7 +278,7 @@ public ProtocolProxy getProxy( SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, null); + rpcTimeout, connectionRetryPolicy, null, null); } @SuppressWarnings("unchecked") @@ -287,7 +287,8 @@ public ProtocolProxy getProxy( Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth) throws IOException { + AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) + throws IOException { T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new StoppedInvocationHandler()); return new ProtocolProxy(protocol, proxy, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 7276e265af9a0..2badbb14b9367 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -166,7 +166,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; -import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; @@ -242,7 +241,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private final int smallBufferSize; private final long serverDefaultsValidityPeriod; - private final ClientGCIContext alignmentContext; public DfsClientConf getConf() { return dfsClientConf; @@ -398,8 +396,6 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, this.saslClient = new SaslDataTransferClient( conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); - this.alignmentContext = new ClientGCIContext(); - Client.setAlignmentContext(alignmentContext); } /** @@ -548,11 +544,6 @@ public boolean isClientRunning() { return clientRunning; } - @VisibleForTesting - ClientGCIContext getAlignmentContext() { - return alignmentContext; - } - long getLastLeaseRenewal() { return lastLeaseRenewal; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 897ecc8c163d7..65c79df332265 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; +import org.apache.hadoop.ipc.AlignmentContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -337,6 +338,15 @@ public static ClientProtocol createNonHAProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException { + return createProxyWithAlignmentContext(address, conf, ugi, withRetries, + fallbackToSimpleAuth, null); + } + + public static ClientProtocol createProxyWithAlignmentContext( + InetSocketAddress address, Configuration conf, UserGroupInformation ugi, + boolean withRetries, AtomicBoolean fallbackToSimpleAuth, + AlignmentContext alignmentContext) + throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); @@ -354,7 +364,7 @@ public static ClientProtocol createNonHAProxyWithClientProtocol( ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, - fallbackToSimpleAuth).getProxy(); + fallbackToSimpleAuth, alignmentContext).getProxy(); if (withRetries) { // create the proxy with retries Map methodNameToPolicyMap = new HashMap<>(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index 252b70dde44ae..1cf00cfe118e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +107,11 @@ public synchronized AtomicBoolean getFallbackToSimpleAuth() { return fallbackToSimpleAuth; } - /** + public synchronized AlignmentContext getAlignmentContext() { + return null; // by default the context is null + } + + /** * ProxyInfo to a NameNode. Includes its address. */ public static class NNProxyInfo extends ProxyInfo { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java index b887d87100e45..7b251d8a7b815 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.NameNodeProxiesClient; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; @@ -26,11 +27,22 @@ import java.util.concurrent.atomic.AtomicBoolean; public class ClientHAProxyFactory implements HAProxyFactory { + + private AlignmentContext alignmentContext; + + public void setAlignmentContext(AlignmentContext alignmentContext) { + this.alignmentContext = alignmentContext; + } + @Override @SuppressWarnings("unchecked") public T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException { + if (alignmentContext != null) { + return (T) NameNodeProxiesClient.createProxyWithAlignmentContext( + nnAddr, conf, ugi, false, fallbackToSimpleAuth, alignmentContext); + } return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol( nnAddr, conf, ugi, false, fallbackToSimpleAuth); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java deleted file mode 100644 index ce4639f8efa99..0000000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.assertThat; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.ipc.AlignmentContext; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * Class is used to test server sending state alignment information to clients - * via RPC and likewise clients receiving and updating their last known - * state alignment info. - * These tests check that after a single RPC call a client will have caught up - * to the most recent alignment state of the server. - */ -public class TestStateAlignmentContext { - - static final long BLOCK_SIZE = 64 * 1024; - private static final int NUMDATANODES = 3; - private static final Configuration CONF = new HdfsConfiguration(); - - private static MiniDFSCluster cluster; - private static DistributedFileSystem dfs; - - @BeforeClass - public static void startUpCluster() throws IOException { - // disable block scanner - CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); - // Set short retry timeouts so this test runs faster - CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); - CONF.setBoolean("fs.hdfs.impl.disable.cache", true); - cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES) - .build(); - cluster.waitActive(); - } - - @Before - public void before() throws IOException { - dfs = cluster.getFileSystem(); - } - - @AfterClass - public static void shutDownCluster() throws IOException { - if (dfs != null) { - dfs.close(); - dfs = null; - } - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - @After - public void after() throws IOException { - dfs.close(); - } - - /** - * This test checks if after a client writes we can see the state id in - * updated via the response. - */ - @Test - public void testStateTransferOnWrite() throws Exception { - long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId(); - DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc"); - long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId(); - long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId(); - // Write(s) should have increased state. Check for greater than. - assertThat(clientState > preWriteState, is(true)); - // Client and server state should be equal. - assertThat(clientState, is(postWriteState)); - } - - /** - * This test checks if after a client reads we can see the state id in - * updated via the response. - */ - @Test - public void testStateTransferOnRead() throws Exception { - DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123"); - long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId(); - DFSTestUtil.readFile(dfs, new Path("/testFile2")); - // Read should catch client up to last written state. - long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId(); - assertThat(clientState, is(lastWrittenId)); - } - - /** - * This test checks that a fresh client starts with no state and becomes - * updated of state from RPC call. - */ - @Test - public void testStateTransferOnFreshClient() throws Exception { - DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz"); - long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId(); - try (DistributedFileSystem clearDfs = - (DistributedFileSystem) FileSystem.get(CONF)) { - ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext(); - assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); - DFSTestUtil.readFile(clearDfs, new Path("/testFile3")); - assertThat(clientState.getLastSeenStateId(), is(lastWrittenId)); - } - } - - /** - * This test mocks an AlignmentContext and ensures that DFSClient - * writes its lastSeenStateId into RPC requests. - */ - @Test - public void testClientSendsState() throws Exception { - AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext(); - AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext); - Client.setAlignmentContext(spiedAlignContext); - - // Collect RpcRequestHeaders for verification later. - final List collectedHeaders = - new ArrayList<>(); - Mockito.doAnswer(a -> { - Object[] arguments = a.getArguments(); - RpcHeaderProtos.RpcRequestHeaderProto.Builder header = - (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; - collectedHeaders.add(header); - return a.callRealMethod(); - }).when(spiedAlignContext).updateRequestState(Mockito.any()); - - DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv"); - - // Ensure first header and last header have different state. - assertThat(collectedHeaders.size() > 1, is(true)); - assertThat(collectedHeaders.get(0).getStateId(), - is(not(collectedHeaders.get(collectedHeaders.size() - 1)))); - - // Ensure collected RpcRequestHeaders are in increasing order. - long lastHeader = collectedHeaders.get(0).getStateId(); - for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header : - collectedHeaders.subList(1, collectedHeaders.size())) { - long currentHeader = header.getStateId(); - assertThat(currentHeader >= lastHeader, is(true)); - lastHeader = header.getStateId(); - } - } - - /** - * This test mocks an AlignmentContext to send stateIds greater than - * server's stateId in RPC requests. - */ - @Test - public void testClientSendsGreaterState() throws Exception { - AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext(); - AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext); - Client.setAlignmentContext(spiedAlignContext); - - // Make every client call have a stateId > server's stateId. - Mockito.doAnswer(a -> { - Object[] arguments = a.getArguments(); - RpcHeaderProtos.RpcRequestHeaderProto.Builder header = - (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; - try { - return a.callRealMethod(); - } finally { - header.setStateId(Long.MAX_VALUE); - } - }).when(spiedAlignContext).updateRequestState(Mockito.any()); - - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG); - DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv"); - logCapturer.stopCapturing(); - - String output = logCapturer.getOutput(); - assertThat(output, containsString("A client sent stateId: ")); - } - -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java new file mode 100644 index 0000000000000..3437bb0a10fd0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; +import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Class is used to test server sending state alignment information to clients + * via RPC and likewise clients receiving and updating their last known + * state alignment info. + * These tests check that after a single RPC call a client will have caught up + * to the most recent alignment state of the server. + */ +public class TestStateAlignmentContextWithHA { + + private static final int NUMDATANODES = 1; + private static final int NUMCLIENTS = 10; + private static final int NUMFILES = 300; + private static final Configuration CONF = new HdfsConfiguration(); + private static final String NAMESERVICE = "nameservice"; + private static final List AC_LIST = new ArrayList<>(); + + private static MiniDFSCluster cluster; + private static List clients; + private static ClientGCIContext spy; + + private DistributedFileSystem dfs; + private int active = 0; + private int standby = 1; + + static class AlignmentContextProxyProvider + extends ConfiguredFailoverProxyProvider { + + private ClientGCIContext alignmentContext; + + public AlignmentContextProxyProvider( + Configuration conf, URI uri, Class xface, + HAProxyFactory factory) throws IOException { + super(conf, uri, xface, factory); + + // Create and set AlignmentContext in HAProxyFactory. + // All proxies by factory will now have AlignmentContext assigned. + this.alignmentContext = (spy != null ? spy : new ClientGCIContext()); + ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); + + AC_LIST.add(alignmentContext); + } + + @Override // AbstractNNFailoverProxyProvider + public synchronized ClientGCIContext getAlignmentContext() { + return this.alignmentContext; + } + } + + static class SpyConfiguredContextProxyProvider + extends ConfiguredFailoverProxyProvider { + + private ClientGCIContext alignmentContext; + + public SpyConfiguredContextProxyProvider( + Configuration conf, URI uri, Class xface, + HAProxyFactory factory) throws IOException { + super(conf, uri, xface, factory); + + // Create but DON'T set in HAProxyFactory. + this.alignmentContext = (spy != null ? spy : new ClientGCIContext()); + + AC_LIST.add(alignmentContext); + } + } + + @BeforeClass + public static void startUpCluster() throws IOException { + // disable block scanner + CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + // Set short retry timeouts so this test runs faster + CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); + CONF.setBoolean("fs.hdfs.impl.disable.cache", true); + + MiniDFSNNTopology.NSConf nsConf = new MiniDFSNNTopology.NSConf(NAMESERVICE); + nsConf.addNN(new MiniDFSNNTopology.NNConf("nn1")); + nsConf.addNN(new MiniDFSNNTopology.NNConf("nn2")); + + cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES) + .nnTopology(MiniDFSNNTopology.simpleHATopology().addNameservice(nsConf)) + .build(); + cluster.waitActive(); + cluster.transitionToActive(0); + } + + @Before + public void before() throws IOException, URISyntaxException { + killWorkers(); + HATestUtil.setFailoverConfigurations(cluster, CONF, NAMESERVICE, 0); + CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + NAMESERVICE, AlignmentContextProxyProvider.class.getName()); + dfs = (DistributedFileSystem) FileSystem.get(CONF); + } + + @AfterClass + public static void shutDownCluster() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @After + public void after() throws IOException { + cluster.transitionToStandby(1); + cluster.transitionToActive(0); + active = 0; + standby = 1; + if (dfs != null) { + dfs.close(); + dfs = null; + } + AC_LIST.clear(); + spy = null; + } + + /** + * This test checks if after a client writes we can see the state id in + * updated via the response. + */ + @Test + public void testNoStateOnConfiguredProxyProvider() throws Exception { + Configuration confCopy = new Configuration(CONF); + confCopy.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + NAMESERVICE, SpyConfiguredContextProxyProvider.class.getName()); + + try (DistributedFileSystem clearDfs = + (DistributedFileSystem) FileSystem.get(confCopy)) { + ClientGCIContext clientState = getContext(1); + assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); + DFSTestUtil.writeFile(clearDfs, new Path("/testFileNoState"), "no_state"); + assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); + } + } + + /** + * This test checks if after a client writes we can see the state id in + * updated via the response. + */ + @Test + public void testStateTransferOnWrite() throws Exception { + long preWriteState = + cluster.getNamesystem(active).getLastWrittenTransactionId(); + DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc"); + long clientState = getContext(0).getLastSeenStateId(); + long postWriteState = + cluster.getNamesystem(active).getLastWrittenTransactionId(); + // Write(s) should have increased state. Check for greater than. + assertThat(clientState > preWriteState, is(true)); + // Client and server state should be equal. + assertThat(clientState, is(postWriteState)); + } + + /** + * This test checks if after a client reads we can see the state id in + * updated via the response. + */ + @Test + public void testStateTransferOnRead() throws Exception { + DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123"); + long lastWrittenId = + cluster.getNamesystem(active).getLastWrittenTransactionId(); + DFSTestUtil.readFile(dfs, new Path("/testFile2")); + // Read should catch client up to last written state. + long clientState = getContext(0).getLastSeenStateId(); + assertThat(clientState, is(lastWrittenId)); + } + + /** + * This test checks that a fresh client starts with no state and becomes + * updated of state from RPC call. + */ + @Test + public void testStateTransferOnFreshClient() throws Exception { + DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz"); + long lastWrittenId = + cluster.getNamesystem(active).getLastWrittenTransactionId(); + try (DistributedFileSystem clearDfs = + (DistributedFileSystem) FileSystem.get(CONF)) { + ClientGCIContext clientState = getContext(1); + assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); + DFSTestUtil.readFile(clearDfs, new Path("/testFile3")); + assertThat(clientState.getLastSeenStateId(), is(lastWrittenId)); + } + } + + /** + * This test mocks an AlignmentContext and ensures that DFSClient + * writes its lastSeenStateId into RPC requests. + */ + @Test + public void testClientSendsState() throws Exception { + ClientGCIContext alignmentContext = new ClientGCIContext(); + ClientGCIContext spiedAlignContext = Mockito.spy(alignmentContext); + spy = spiedAlignContext; + + try (DistributedFileSystem clearDfs = + (DistributedFileSystem) FileSystem.get(CONF)) { + + // Collect RpcRequestHeaders for verification later. + final List headers = + new ArrayList<>(); + Mockito.doAnswer(a -> { + Object[] arguments = a.getArguments(); + RpcHeaderProtos.RpcRequestHeaderProto.Builder header = + (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; + headers.add(header); + return a.callRealMethod(); + }).when(spiedAlignContext).updateRequestState(Mockito.any()); + + DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv"); + + // Ensure first header and last header have different state. + assertThat(headers.size() > 1, is(true)); + assertThat(headers.get(0).getStateId(), + is(not(headers.get(headers.size() - 1)))); + + // Ensure collected RpcRequestHeaders are in increasing order. + long lastHeader = headers.get(0).getStateId(); + for (RpcHeaderProtos.RpcRequestHeaderProto.Builder header : + headers.subList(1, headers.size())) { + long currentHeader = header.getStateId(); + assertThat(currentHeader >= lastHeader, is(true)); + lastHeader = header.getStateId(); + } + } + } + + /** + * This test mocks an AlignmentContext to send stateIds greater than + * server's stateId in RPC requests. + */ + @Test + public void testClientSendsGreaterState() throws Exception { + ClientGCIContext alignmentContext = new ClientGCIContext(); + ClientGCIContext spiedAlignContext = Mockito.spy(alignmentContext); + spy = spiedAlignContext; + + try (DistributedFileSystem clearDfs = + (DistributedFileSystem) FileSystem.get(CONF)) { + + // Make every client call have a stateId > server's stateId. + Mockito.doAnswer(a -> { + Object[] arguments = a.getArguments(); + RpcHeaderProtos.RpcRequestHeaderProto.Builder header = + (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; + try { + return a.callRealMethod(); + } finally { + header.setStateId(Long.MAX_VALUE); + } + }).when(spiedAlignContext).updateRequestState(Mockito.any()); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG); + + DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv"); + logCapturer.stopCapturing(); + + String output = logCapturer.getOutput(); + assertThat(output, containsString("A client sent stateId: ")); + } + } + + /** + * This test checks if after a client writes we can see the state id in + * updated via the response. + */ + @Test + public void testStateTransferOnWriteWithFailover() throws Exception { + long preWriteState = + cluster.getNamesystem(active).getLastWrittenTransactionId(); + // Write using HA client. + DFSTestUtil.writeFile(dfs, new Path("/testFile1FO"), "123"); + long clientState = getContext(0).getLastSeenStateId(); + long postWriteState = + cluster.getNamesystem(active).getLastWrittenTransactionId(); + // Write(s) should have increased state. Check for greater than. + assertThat(clientState > preWriteState, is(true)); + // Client and server state should be equal. + assertThat(clientState, is(postWriteState)); + + // Failover NameNode. + failOver(); + + // Write using HA client. + DFSTestUtil.writeFile(dfs, new Path("/testFile2FO"), "456"); + long clientStateFO = getContext(0).getLastSeenStateId(); + long writeStateFO = + cluster.getNamesystem(active).getLastWrittenTransactionId(); + + // Write(s) should have increased state. Check for greater than. + assertThat(clientStateFO > postWriteState, is(true)); + // Client and server state should be equal. + assertThat(clientStateFO, is(writeStateFO)); + } + + @Test(timeout=300000) + public void testMultiClientStatesWithRandomFailovers() throws Exception { + // We want threads to run during failovers; assuming at minimum 4 cores, + // would like to see 2 clients competing against 2 NameNodes. + ExecutorService execService = Executors.newFixedThreadPool(2); + clients = new ArrayList<>(NUMCLIENTS); + for (int i = 1; i <= NUMCLIENTS; i++) { + DistributedFileSystem haClient = + (DistributedFileSystem) FileSystem.get(CONF); + clients.add(new Worker(haClient, NUMFILES, "/testFile3FO_", i)); + } + + // Execute workers in threadpool with random failovers. + List> futures = submitAll(execService, clients); + execService.shutdown(); + + boolean finished = false; + while (!finished) { + failOver(); + finished = execService.awaitTermination(1L, TimeUnit.SECONDS); + } + + // Validation. + for (Future future : futures) { + assertThat(future.get(), is(STATE.SUCCESS)); + } + } + + private ClientGCIContext getContext(int clientCreationIndex) { + return AC_LIST.get(clientCreationIndex); + } + + private void failOver() throws IOException { + cluster.transitionToStandby(active); + cluster.transitionToActive(standby); + int tempActive = active; + active = standby; + standby = tempActive; + } + + /* Executor.invokeAll() is blocking so utilizing submit instead. */ + private static List> submitAll(ExecutorService executor, + Collection calls) { + List> futures = new ArrayList<>(calls.size()); + for (Worker call : calls) { + Future future = executor.submit(call); + futures.add(future); + } + return futures; + } + + private void killWorkers() throws IOException { + if (clients != null) { + for(Worker worker : clients) { + worker.kill(); + } + clients = null; + } + } + + private enum STATE { SUCCESS, FAIL, ERROR } + + private class Worker implements Callable { + private final DistributedFileSystem client; + private final int filesToMake; + private String filePath; + private final int nonce; + + Worker(DistributedFileSystem client, + int filesToMake, + String filePath, + int nonce) { + this.client = client; + this.filesToMake = filesToMake; + this.filePath = filePath; + this.nonce = nonce; + } + + @Override + public STATE call() { + try { + for (int i = 0; i < filesToMake; i++) { + long preClientStateFO = + getContext(nonce).getLastSeenStateId(); + + // Write using HA client. + Path path = new Path(filePath + nonce + i); + DFSTestUtil.writeFile(client, path, "erk"); + + long postClientStateFO = + getContext(nonce).getLastSeenStateId(); + + // Write(s) should have increased state. Check for greater than. + if (postClientStateFO <= preClientStateFO) { + System.out.println("FAIL: Worker started with: " + + preClientStateFO + ", but finished with: " + postClientStateFO); + return STATE.FAIL; + } + } + client.close(); + return STATE.SUCCESS; + } catch (IOException e) { + System.out.println("ERROR: Worker failed with: " + e); + return STATE.ERROR; + } + } + + public void kill() throws IOException { + client.dfs.closeAllFilesBeingWritten(true); + client.dfs.closeOutputStreams(true); + client.dfs.closeConnectionToNamenode(); + client.dfs.close(); + client.close(); + } + } +}