Skip to content

Commit

Permalink
Add a mutex guard around using the prism servers artifacts cache map.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyje committed Oct 8, 2024
1 parent d84cfff commit 61a5bfd
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
* (Java) Fixed custom delimiter issues in TextIO ([#32249](https://github.com/apache/beam/issues/32249), [#32251](https://github.com/apache/beam/issues/32251)).
* (Java, Python, Go) Fixed PeriodicSequence backlog bytes reporting, which was preventing Dataflow Runner autoscaling from functioning properly ([#32506](https://github.com/apache/beam/issues/32506)).
* (Java) Fix improper decoding of rows with schemas containing nullable fields when encoded with a schema with equal encoding positions but modified field order. ([#32388](https://github.com/apache/beam/issues/32388)).
* (Go) Fixed race conditions in the Prism runner's JobServices server.

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer
return err
}
}
if len(s.artifacts) == 0 {
s.artifacts = map[string][]byte{}
}
s.artifactsMu.Lock()
s.artifacts[string(dep.GetTypePayload())] = buf.Bytes()
s.artifactsMu.Unlock()
}
}
return nil
Expand All @@ -98,7 +97,9 @@ func (s *Server) ResolveArtifacts(_ context.Context, req *jobpb.ResolveArtifacts

func (s *Server) GetArtifact(req *jobpb.GetArtifactRequest, stream jobpb.ArtifactRetrievalService_GetArtifactServer) error {
info := req.GetArtifact()
s.artifactsMu.RLock()
buf, ok := s.artifacts[string(info.GetTypePayload())]
s.artifactsMu.RUnlock()
if !ok {
pt := prototext.Format(info)
slog.Warn("unable to provide artifact to worker", "artifact_info", pt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,4 @@ func (s *Server) GetStateStream(req *jobpb.GetJobStateRequest, stream jobpb.JobS
}
state = newState
}
}
}
10 changes: 6 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type Server struct {
execute func(*Job)

// Artifact hack
artifacts map[string][]byte
artifacts map[string][]byte
artifactsMu sync.RWMutex
}

// NewServer acquires the indicated port.
Expand All @@ -68,9 +69,10 @@ func NewServer(port int, execute func(*Job)) *Server {
panic(fmt.Sprintf("failed to listen: %v", err))
}
s := &Server{
lis: lis,
jobs: make(map[string]*Job),
execute: execute,
lis: lis,
jobs: make(map[string]*Job),
execute: execute,
artifacts: map[string][]byte{},
}
slog.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint()))
opts := []grpc.ServerOption{
Expand Down

0 comments on commit 61a5bfd

Please sign in to comment.