Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Executor] rm Executor._run_parallel #51616

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions python/paddle/fluid/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ def __init__(self, program_or_graph, build_strategy=None):
self._place = None
self._executor = None
self._compiled = False
self._is_data_parallel = False
self._is_inference = False
self._loss_name = None
self._share_vars_from = None
self._places = None
self._build_strategy = build_strategy
Expand All @@ -178,9 +176,6 @@ def _with_inference_optimize(self, config):
Returns:
self
"""
assert (
not self._is_data_parallel
), "Cannot compile with both data parallel and inference"
assert (
not self._is_inference
), "Already compiled with inference, cannot be recompiled."
Expand All @@ -204,11 +199,6 @@ def _compile_data_parallel(self, places, use_device, scope=None):
if self._share_vars_from:
if scope:
sys.stderr.write("share_vars_from is set, scope is ignored.\n")
if not self._share_vars_from._is_data_parallel:
raise ValueError(
"The shared Program is not data parallel, cannot "
"share variables from it."
)
if self._share_vars_from._executor is None:
raise ValueError(
"The shared Program is not compiled and executed, so there is no "
Expand Down Expand Up @@ -328,7 +318,7 @@ def _compile_data_parallel(self, places, use_device, scope=None):
return core.ParallelExecutor(
places,
self._persistable_vars,
self._loss_name if self._loss_name else '',
'',
self._scope,
self._local_scopes,
self._exec_strategy,
Expand Down Expand Up @@ -364,10 +354,7 @@ def _compile(self, scope, place):
if self._is_inference:
self._executor = self._compile_inference()
else:
if self._is_data_parallel:
self._places = self._get_places(self._place, self._places)
else:
self._places = [self._place]
self._places = [self._place]

if isinstance(self._place, core.CUDAPlace):
use_device = DeviceType.CUDA
Expand Down
130 changes: 7 additions & 123 deletions python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,81 +1295,6 @@ def close(self):
del trainer_instance
self._default_executor.close()

def _run_parallel(
self, program, scope, feed, fetch_list, fetch_var_name, return_numpy
):
from paddle.optimizer.lr import LRScheduler

exe = program._executor
# TODO(zhenghuihuang): quantization uses Graph in CompiledProgram
# instead of program. We will add support for checking Vars in Graph
need_check_feed = program._program is not None
if need_check_feed:
global_block = program._program.global_block()
if isinstance(feed, dict):
feed_tensor_dict = dict()
for feed_name in feed:
feed_tensor = feed[feed_name]
var = global_block.var(feed_name) if need_check_feed else None
if not isinstance(feed_tensor, core.LoDTensor):
# always set to CPU place, since the tensor need to be split
# it is fast in CPU
feed_tensor = _as_lodtensor(
feed[feed_name],
core.CPUPlace(),
var.dtype if var else None,
)
if need_check_feed:
check_feed_shape_type(var, feed_tensor, exe.device_count())
feed_tensor_dict[feed_name] = feed_tensor
exe.feed_and_split_tensor_into_local_scopes(feed_tensor_dict)

elif isinstance(feed, list) or isinstance(feed, tuple):
res = list()
for i, each in enumerate(feed):
if not isinstance(each, dict):
raise TypeError(
"Each element of feed list should be a dict"
)
res_dict = dict()
for feed_name in each:
tensor = each[feed_name]
var = (
global_block.var(feed_name) if need_check_feed else None
)
if not isinstance(tensor, core.LoDTensor):
tensor = _as_lodtensor(
each[feed_name],
program._places[i],
var.dtype if var else None,
)
if need_check_feed:
check_feed_shape_type(var, tensor)
res_dict[feed_name] = tensor
res.append(res_dict)

exe.feed_tensors_into_local_scopes(res)

if hasattr(program._program, 'lr_sheduler'):
lr_sheduler = program._program.lr_sheduler
assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler"
lr_value = lr_sheduler()
lr_var = program._program.global_block().vars[lr_sheduler._var_name]
lr_tensor = _as_lodtensor(lr_value, core.CPUPlace(), lr_var.dtype)
if core.is_cuda_graph_capturing():
warnings.warn(
"Caution!!! When capturing CUDA Graph, the learning rate scheduler would not "
"take any effect! Please set the learning rate manually before each batch!"
)
else:
exe.feed_and_split_tensor_into_local_scopes(
{lr_sheduler._var_name: lr_tensor}
)

fetch_var_names = list(map(_to_name_str, fetch_list))
tensors = exe.run(fetch_var_names, True)._move_to_list()
return as_numpy(tensors) if return_numpy else tensors

def run(
self,
program=None,
Expand Down Expand Up @@ -1673,31 +1598,15 @@ def _can_use_interpreter_core(program, place):
else program._graph
)

# Unsupported case 1: data parallel
if (
compiled_program._is_data_parallel
and len(
compiled_program._get_places(
place, compiled_program._places
)
)
!= 1
):
warnings.warn(
"Standalone executor is not used for data parallel",
UserWarning,
)
return False

# Unsupported case 2: inference
# Unsupported case 1: inference
if compiled_program._is_inference:
warnings.warn(
"Standalone executor is not used for inference",
UserWarning,
)
return False

# Unsupported case 3: async mode
# Unsupported case 2: async mode
if (
compiled_program._build_strategy is not None
and compiled_program._build_strategy.async_mode
Expand All @@ -1708,7 +1617,7 @@ def _can_use_interpreter_core(program, place):
)
return False

# Unsupported case 4: CUDA Graph
# Unsupported case 3: CUDA Graph
if (
compiled_program._build_strategy is not None
and compiled_program._build_strategy.allow_cuda_graph_capture
Expand Down Expand Up @@ -1803,24 +1712,6 @@ def _can_use_interpreter_core(program, place):

# For backward compatibility, run directly.
if not compiled:
# In distributed training, the compiled program is saved in Program._graph
has_compiled_graph = isinstance(
program._graph, compiler.CompiledProgram
)

if has_compiled_graph:
program._graph._compile(scope, self.place)
# _graph in program does not support inference since the _graph is optimized
# through optimizer.minimize function and should not be used as inference graph
# assert not program._graph._is_inference
return self._run_parallel(
program._graph,
scope=scope,
feed=feed,
fetch_list=fetch_list,
fetch_var_name=fetch_var_name,
return_numpy=return_numpy,
)

return self._run_program(
program,
Expand All @@ -1834,17 +1725,10 @@ def _can_use_interpreter_core(program, place):
)

program._compile(scope, self.place)
if program._is_inference:
return self._run_inference(program._executor, feed)
else:
return self._run_parallel(
program,
scope=scope,
feed=feed,
fetch_list=fetch_list,
fetch_var_name=fetch_var_name,
return_numpy=return_numpy,
)
assert (
program._is_inference
), f"Program must have _is_inference = True, but get {program._is_inference}"
return self._run_inference(program._executor, feed)

def _run_program(
self,
Expand Down
15 changes: 2 additions & 13 deletions python/paddle/fluid/tests/unittests/feed_data_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,8 @@ def get_next(self, exe, program):
assert isinstance(exe, fluid.Executor), "exe must be Executor"
use_cuda = isinstance(exe.place, fluid.CUDAPlace)
if isinstance(program, fluid.CompiledProgram):
if program._is_data_parallel:
use_executor = False
if program._places is None:
device_num = (
len(fluid.cuda_places())
if use_cuda
else len(fluid.cpu_places())
)
else:
device_num = len(program._places)
else:
use_executor = True
device_num = 1
use_executor = True
device_num = 1
else:
use_executor = True
device_num = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,24 +283,6 @@ def test_with_error(self):
with framework._enable_standalone_executor():
self._run(feed[0], add_wrong_fetch=True)

def test_compiled_program(self):
data = np.ones([2, 2], dtype="float32")
feed = {"a": data}

res = self.run_new_executor(feed, use_compiled=True)
gt = self.run_raw_executor(feed, use_compiled=True)
for x, y in zip(gt, res):
np.testing.assert_array_equal(x, y)

def test_compiled_program_convert_graph_to_program(self):
data = np.ones([2, 2], dtype="float32")
feed = {"a": data}

res = self.run_new_executor(feed, use_compiled=True)
gt = self.run_raw_executor(feed, use_compiled=True)
for x, y in zip(gt, res):
np.testing.assert_array_equal(x, y)

def test_empty_program(self):
program = paddle.static.Program()
exe = paddle.static.Executor(self.place)
Expand Down