diff --git a/CHANGES.md b/CHANGES.md index 93c0e19dcba6..a905aa64928f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -95,6 +95,7 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * (Java) Fixed custom delimiter issues in TextIO ([#32249](https://github.com/apache/beam/issues/32249), [#32251](https://github.com/apache/beam/issues/32251)). * (Java, Python, Go) Fixed PeriodicSequence backlog bytes reporting, which was preventing Dataflow Runner autoscaling from functioning properly ([#32506](https://github.com/apache/beam/issues/32506)). * (Java) Fix improper decoding of rows with schemas containing nullable fields when encoded with a schema with equal encoding positions but modified field order. ([#32388](https://github.com/apache/beam/issues/32388)). +* (Go) Fixed race conditions in the Prism runner's JobServices server. ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go index 99b786d45980..185b7e16ad4e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go @@ -81,10 +81,9 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer return err } } - if len(s.artifacts) == 0 { - s.artifacts = map[string][]byte{} - } + s.artifactsMu.Lock() s.artifacts[string(dep.GetTypePayload())] = buf.Bytes() + s.artifactsMu.Unlock() } } return nil @@ -98,7 +97,9 @@ func (s *Server) ResolveArtifacts(_ context.Context, req *jobpb.ResolveArtifacts func (s *Server) GetArtifact(req *jobpb.GetArtifactRequest, stream jobpb.ArtifactRetrievalService_GetArtifactServer) error { info := req.GetArtifact() + s.artifactsMu.RLock() buf, ok := s.artifacts[string(info.GetTypePayload())] + s.artifactsMu.RUnlock() if !ok { pt := prototext.Format(info) slog.Warn("unable to provide artifact to worker", "artifact_info", pt) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index b957b99ca63d..20acef062fde 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -491,4 +491,4 @@ func (s *Server) GetStateStream(req *jobpb.GetJobStateRequest, stream jobpb.JobS } state = newState } -} +} \ No newline at end of file diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index 320159f54c06..b834bf786784 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -58,7 +58,8 @@ type Server struct { execute func(*Job) // Artifact hack - artifacts map[string][]byte + artifacts map[string][]byte + artifactsMu sync.RWMutex } // NewServer acquires the indicated port. @@ -68,9 +69,10 @@ func NewServer(port int, execute func(*Job)) *Server { panic(fmt.Sprintf("failed to listen: %v", err)) } s := &Server{ - lis: lis, - jobs: make(map[string]*Job), - execute: execute, + lis: lis, + jobs: make(map[string]*Job), + execute: execute, + artifacts: map[string][]byte{}, } slog.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint())) opts := []grpc.ServerOption{