Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-pick] Change use_calc_stream to sync_op #46493

Merged
merged 1 commit into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/paddle/distributed/auto_parallel/process_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def instantiate(self):
tmp = paddle.to_tensor(
[1], dtype="int32") if _non_static_mode() else fill_constant(
[0], dtype="int32", value="1")
paddle.distributed.all_reduce(tmp, use_calc_stream=True, group=self)
paddle.distributed.all_reduce(tmp, sync_op=True, group=self)
paddle.distributed.wait(tmp, group=self)
paddle.enable_static()

Expand Down
123 changes: 63 additions & 60 deletions python/paddle/distributed/collective.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def _sharding_sync_parameters(self):
# instead of the relative logic rank id within group
src=self._hcg.get_sharding_parallel_group().ranks[rank],
group=self._hcg.get_sharding_parallel_group(),
use_calc_stream=True)
sync_op=True)

def _update_trainable(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _sync_params_and_buffers(self):
broadcast(p,
src=self._global_root_rank,
group=self.group,
use_calc_stream=True)
sync_op=True)

# Multi stream operation will be supported later
wait(tensor=p, group=self.group, use_calc_stream=True)
Expand Down Expand Up @@ -415,7 +415,7 @@ def _broadcast_params(self):
broadcast(tensor=internal_storage.buffer,
src=self.group.ranks[dst_rank],
group=self.group,
use_calc_stream=True)
sync_op=True)

# Multi stream operation will be supported later
wait(tensor=internal_storage.buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,18 @@ def _broadcast_final_loss(self):
1) if loss.dtype == paddle.float32 else paddle.to_tensor(0)
paddle.distributed.broadcast(is_fp32,
src=self.global_rank,
use_calc_stream=True,
sync_op=True,
group=self.pp_group)
paddle.distributed.broadcast(loss,
src=self.global_rank,
use_calc_stream=True,
sync_op=True,
group=self.pp_group)
else:
is_fp32 = paddle.to_tensor(1)
paddle.distributed.broadcast(
is_fp32,
src=self._hcg.get_rank_from_stage(self.num_stages - 1),
use_calc_stream=True,
sync_op=True,
group=self.pp_group)
loss = paddle.zeros(shape=[
1
Expand All @@ -397,7 +397,7 @@ def _broadcast_final_loss(self):
paddle.distributed.broadcast(
loss,
src=self._hcg.get_rank_from_stage(self.num_stages - 1),
use_calc_stream=True,
sync_op=True,
group=self.pp_group)
return loss

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _sync_params_and_buffers(self):
broadcast(p,
src=self._global_root_rank,
group=self._group,
use_calc_stream=True)
sync_op=True)

def _generate_master_params(self, trainable_params):
if self.offload:
Expand Down Expand Up @@ -413,4 +413,4 @@ def _broadcast_params(self):
broadcast(tensor=internal_storage.buffer,
src=self._group.ranks[dst_rank],
group=self._group,
use_calc_stream=True)
sync_op=True)
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def __sync_buffers(self):
collective.broadcast(buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
sync_op=True)

def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _sync_params_and_buffers(self):
collective.broadcast(p,
src=self._global_root_rank,
group=self._group,
use_calc_stream=True)
sync_op=True)

def _clear_gradients(self):
assert len(self._trainable_params.keys()) > 0
Expand Down Expand Up @@ -446,7 +446,7 @@ def _sync_buffers(self):
collective.broadcast(buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
sync_op=True)

def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def __sync_buffers(self):
dist.broadcast(buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
sync_op=True)
# Multi stream operation will be supported later
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True)

Expand Down Expand Up @@ -340,7 +340,7 @@ def cleanup():
tensor=param.grad,
dst=self._group.ranks[dst_rank],
group=self._group,
use_calc_stream=True),
sync_op=True),
callback=cleanup))

# Multi stream operation will be supported later
Expand Down Expand Up @@ -396,7 +396,7 @@ def cleanup():
tensor=grad_storage.buffer,
dst=self._group.ranks[grad_storage.destination],
group=self._group,
use_calc_stream=True),
sync_op=True),
callback=cleanup))

# Multi stream operation will be supported later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def _sync_params_and_buffers(self):
dist.broadcast(p,
src=self._global_root_rank,
group=self._group,
use_calc_stream=True)
sync_op=True)

# Multi stream operation will be supported later
dist.wait(tensor=p, group=self._group, use_calc_stream=True)
Expand Down Expand Up @@ -435,7 +435,7 @@ def _sync_buffers(self):
dist.broadcast(buffer,
self._global_root_rank,
self._group,
use_calc_stream=True)
sync_op=True)
# Multi stream operation will be supported later
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True)

Expand Down Expand Up @@ -478,7 +478,7 @@ def _update_params(self):
grad_storage.buffer.scale_(scale=self._world_size_scaling)
dist.all_reduce(tensor=grad_storage.buffer,
group=self._group,
use_calc_stream=True)
sync_op=True)
dist.wait(tensor=grad_storage.buffer,
group=self._group,
use_calc_stream=True)
Expand Down Expand Up @@ -541,7 +541,7 @@ def allreduce_(*_):
# Only support sync allreduce current rank's layer now
dist.all_reduce(tensor=full_grad,
group=self._group,
use_calc_stream=True)
sync_op=True)
dist.wait(tensor=full_grad,
group=self._group,
use_calc_stream=True)
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/distributed/fleet/utils/hybrid_parallel_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _broadcast_data_help(data, shape, dtype, hcg):
paddle.distributed.broadcast(shape_gpu,
src=src_rank,
group=model_parallel_group,
use_calc_stream=True)
sync_op=True)

if mp_rank != 0:
input_data = paddle.zeros(shape_gpu, dtype=dtype)
Expand All @@ -104,7 +104,7 @@ def _broadcast_data_help(data, shape, dtype, hcg):
paddle.distributed.broadcast(input_data,
src=src_rank,
group=model_parallel_group,
use_calc_stream=True)
sync_op=True)

if mp_rank != 0:
if in_dygraph_mode():
Expand Down Expand Up @@ -186,7 +186,7 @@ def sharding_reduce_gradients(parameter_list, hcg):
paddle.distributed.all_reduce(
param.grad,
group=hcg.get_sharding_parallel_group(),
use_calc_stream=True)
sync_op=True)

elif _in_legacy_dygraph():
g_var = param._grad_ivar()
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/fluid/dygraph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def sync_params_buffers(model,
paddle.distributed.broadcast(coalesced_var,
src=src_rank,
group=comm_group,
use_calc_stream=True)
sync_op=True)

for coalesced_var, origin_vars, var_shapes in coalesced_vars:
var_len = [np.prod(v_shape) for v_shape in var_shapes]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ def get_model(self, main_prog, startup_program, rank):
shape=[10, 1000],
dtype='float32')
gp = paddle.distributed.new_group([0, 1])
paddle.distributed.all_reduce(tindata,
group=gp,
use_calc_stream=True)
paddle.distributed.all_reduce(tindata, group=gp, sync_op=True)
return [tindata]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_collective_alltoall_single(self):
output,
in_split_sizes,
out_split_sizes,
use_calc_stream=False,
sync_op=False,
group=group)
task.wait()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ def test_collective_reduce_scatter_base(self):
# [1, 2, 3, 4] # Rank-1

output = paddle.empty(shape=[2], dtype=input.dtype)
task = paddle.distributed.collective._reduce_scatter_base(
output, input, use_calc_stream=False)
task = paddle.distributed.collective._reduce_scatter_base(output,
input,
sync_op=False)

task.wait()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,29 @@ def test_all(self):
paddle.distributed.scatter(result, [self.tensor2, self.tensor1],
src=dp_src_rank,
group=dp_gp,
use_calc_stream=True)
sync_op=True)
if dp_rank == 0:
assert np.array_equal(result, self.tensor2)
elif dp_rank == 1:
assert np.array_equal(result, self.tensor1)
print("test scatter api ok")

paddle.distributed.broadcast(result,
src=1,
group=dp_gp,
use_calc_stream=True)
paddle.distributed.broadcast(result, src=1, group=dp_gp, sync_op=True)
assert np.array_equal(result, self.tensor1)
print("test broadcast api ok")

paddle.distributed.reduce(result,
dst=dp_src_rank,
group=dp_gp,
use_calc_stream=True)
sync_op=True)
if dp_rank == 0:
assert np.array_equal(result, paddle.add(self.tensor1,
self.tensor1))
elif dp_rank == 1:
assert np.array_equal(result, self.tensor1)
print("test reduce api ok")

paddle.distributed.all_reduce(result, use_calc_stream=True)
paddle.distributed.all_reduce(result, sync_op=True)
assert np.array_equal(
result,
paddle.add(paddle.add(self.tensor1, self.tensor1), self.tensor1))
Expand All @@ -92,7 +89,7 @@ def test_all(self):
paddle.distributed.all_gather(result,
self.tensor1,
group=dp_gp,
use_calc_stream=True)
sync_op=True)
assert np.array_equal(result[0], self.tensor1)
assert np.array_equal(result[1], self.tensor1)
print("test all_gather api ok")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,26 @@ def test_all(self):
paddle.distributed.scatter(result, [self.tensor2, self.tensor1],
src=0,
group=gp,
use_calc_stream=True)
sync_op=True)
if gp.rank == 0:
assert np.array_equal(result, self.tensor2)
elif gp.rank == 1:
assert np.array_equal(result, self.tensor1)
print("test scatter api ok")

paddle.distributed.broadcast(result,
src=1,
group=gp,
use_calc_stream=True)
paddle.distributed.broadcast(result, src=1, group=gp, sync_op=True)
assert np.array_equal(result, self.tensor1)
print("test broadcast api ok")

paddle.distributed.reduce(result, dst=0, group=gp, use_calc_stream=True)
paddle.distributed.reduce(result, dst=0, group=gp, sync_op=True)
if gp.rank == 0:
assert np.array_equal(result, paddle.add(self.tensor1,
self.tensor1))
elif gp.rank == 1:
assert np.array_equal(result, self.tensor1)
print("test reduce api ok")

paddle.distributed.all_reduce(result, use_calc_stream=True)
paddle.distributed.all_reduce(result, sync_op=True)
assert np.array_equal(
result,
paddle.add(paddle.add(self.tensor1, self.tensor1), self.tensor1))
Expand All @@ -72,7 +69,7 @@ def test_all(self):
paddle.distributed.all_gather(result,
self.tensor1,
group=gp,
use_calc_stream=True)
sync_op=True)
assert np.array_equal(result[0], self.tensor1)
assert np.array_equal(result[1], self.tensor1)
print("test all_gather api ok")
Expand Down
Loading