Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

begin and end stream transitions #148

Merged
merged 3 commits into from
Jul 21, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions FWCore/Framework/interface/Schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ namespace edm {

void beginJob(ProductRegistry const&);
void endJob(ExceptionCollector & collector);

void beginStream();
void endStream();

// Write the luminosity block
void writeLumi(LuminosityBlockPrincipal const& lbp);
Expand Down Expand Up @@ -169,6 +172,8 @@ namespace edm {
void preForkReleaseResources();
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);

StreamID streamID() const { return streamID_; }

std::pair<double, double> timeCpuReal() const {
return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
}
Expand Down
12 changes: 12 additions & 0 deletions FWCore/Framework/interface/SubProcess.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ namespace edm {

void doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);


void doBeginStream(StreamID);
void doEndStream(StreamID);
void doStreamBeginRun(StreamID, RunPrincipal const& principal, IOVSyncValue const& ts);

void doStreamEndRun(StreamID, RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);

void doStreamBeginLuminosityBlock(StreamID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts);

void doStreamEndLuminosityBlock(StreamID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException);


// Write the luminosity block
void writeLumi(ProcessHistoryID const& parentPhID, int runNumber, int lumiNumber);

Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/WorkerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ namespace edm {
void endJob();
void endJob(ExceptionCollector& collector);

void beginStream(StreamID iID);
void endStream(StreamID iID);

AllWorkers const& allWorkers() const {return allWorkers_;}

void addToAllWorkers(Worker* w, bool useStopwatch);
Expand Down
25 changes: 17 additions & 8 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,9 @@ namespace edm {
// toerror.succeeded(); // should we add this?
if(hasSubProcess()) subProcess_->doBeginJob();
actReg_->postBeginJobSignal_();

schedule_->beginStream();
if(hasSubProcess()) subProcess_->doBeginStream(schedule_->streamID());
}

void
Expand All @@ -751,6 +754,12 @@ namespace edm {
//make the services available
ServiceRegistry::Operate operate(serviceToken_);

//NOTE: this really should go elsewhere in the future
c.call([this](){this->schedule_->endStream();});
if(hasSubProcess()) {
c.call([this](){ this->subProcess_->doEndStream(this->schedule_->streamID()); } );
}

schedule_->endJob(c);
if(hasSubProcess()) {
c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
Expand Down Expand Up @@ -1987,12 +1996,12 @@ namespace edm {
ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
schedule_->processOneOccurrence<Traits>(runPrincipal, es);
if(hasSubProcess()) {
//subProcess_->doStreamBeginRun(StreamID{0}, runPrincipal, ts);
subProcess_->doStreamBeginRun(schedule_->streamID(), runPrincipal, ts);
}
}
FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
if(looper_) {
//looper_->doStreamBeginRun(StreamID{0},runPrincipal, es);
//looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
}
}

Expand All @@ -2008,12 +2017,12 @@ namespace edm {
ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
if(hasSubProcess()) {
//subProcess_->doStreamEndRun(runPrincipal, ts, cleaningUpAfterException);
subProcess_->doStreamEndRun(schedule_->streamID(),runPrincipal, ts, cleaningUpAfterException);
}
}
FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
if(looper_) {
//looper_->doStreamEndRun(runPrincipal, es);
//looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
}
{
typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
Expand Down Expand Up @@ -2061,12 +2070,12 @@ namespace edm {
ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
if(hasSubProcess()) {
//subProcess_->doStreamBeginLuminosityBlock(lumiPrincipal, ts);
subProcess_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, ts);
}
}
FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
if(looper_) {
//looper_->doStreamBeginLuminosityBlock(lumiPrincipal, es);
//looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
}
}

Expand All @@ -2084,12 +2093,12 @@ namespace edm {
ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
if(hasSubProcess()) {
//subProcess_->doStreamEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
subProcess_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, ts, cleaningUpAfterException);
}
}
FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
if(looper_) {
//looper_->doStreamEndLuminosityBlock(lumiPrincipal, es);
//looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
}
{
typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
Expand Down
8 changes: 8 additions & 0 deletions FWCore/Framework/src/Schedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,14 @@ namespace edm {
void Schedule::beginJob(ProductRegistry const& iRegistry) {
workerManager_.beginJob(iRegistry);
}

void Schedule::beginStream() {
workerManager_.beginStream(streamID_);
}

void Schedule::endStream() {
workerManager_.endStream(streamID_);
}

void Schedule::preForkReleaseResources() {
for_all(allWorkers(), boost::bind(&Worker::preForkReleaseResources, _1));
Expand Down
32 changes: 31 additions & 1 deletion FWCore/Framework/src/SubProcess.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ namespace edm {
ServiceRegistry::Operate operate(serviceToken_);
ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
schedule_->endJob(c);
if(subProcess_.get()) c.call([this](){ this->subProcess_->doEndJob();});
if(c.hasThrown()) {
c.rethrow();
}
if(subProcess_.get()) subProcess_->doEndJob();
}

void
Expand Down Expand Up @@ -334,6 +334,36 @@ namespace edm {
principalCache_.deleteLumi(it->second, runNumber, lumiNumber);
if(subProcess_.get()) subProcess_->deleteLumiFromCache(it->second, runNumber, lumiNumber);
}

void
SubProcess::doBeginStream(StreamID iID) {
ServiceRegistry::Operate operate(serviceToken_);
assert(iID == schedule_->streamID());
schedule_->beginStream();
if(subProcess_.get()) subProcess_->doBeginStream(iID);
}

void
SubProcess::doEndStream(StreamID iID) {
ServiceRegistry::Operate operate(serviceToken_);
assert(iID == schedule_->streamID());
schedule_->endStream();
if(subProcess_.get()) subProcess_->doEndStream(iID);
}

//Dummies until SubProcess inherits from new interface
void
SubProcess::doStreamBeginRun(StreamID, RunPrincipal const& principal, IOVSyncValue const& ts) {}

void
SubProcess::doStreamEndRun(StreamID, RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {}

void
SubProcess::doStreamBeginLuminosityBlock(StreamID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts) {}

void
SubProcess::doStreamEndLuminosityBlock(StreamID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {}


void
SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const {
Expand Down
44 changes: 44 additions & 0 deletions FWCore/Framework/src/Worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,51 @@ namespace edm {
throw;
}
}

void Worker::beginStream(StreamID id) {
try {
try {
//ModuleBeginStreamSignalSentry cpp(actReg_.get(), md_);
implBeginStream(id);
}
catch (cms::Exception& e) { throw; }
catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
catch (std::exception& e) { convertException::stdToEDM(e); }
catch(std::string& s) { convertException::stringToEDM(s); }
catch(char const* c) { convertException::charPtrToEDM(c); }
catch (...) { convertException::unknownToEDM(); }
}
catch(cms::Exception& ex) {
state_ = Exception;
std::ostringstream ost;
ost << "Calling beginStream for module " << md_.moduleName() << "/'" << md_.moduleLabel() << "'";
ex.addContext(ost.str());
throw;
}
}

void Worker::endStream(StreamID id) {
try {
try {
//ModuleEndStreamSignalSentry cpp(actReg_.get(), md_);
implEndStream(id);
}
catch (cms::Exception& e) { throw; }
catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
catch (std::exception& e) { convertException::stdToEDM(e); }
catch(std::string& s) { convertException::stringToEDM(s); }
catch(char const* c) { convertException::charPtrToEDM(c); }
catch (...) { convertException::unknownToEDM(); }
}
catch(cms::Exception& ex) {
state_ = Exception;
std::ostringstream ost;
ost << "Calling endStream for module " << md_.moduleName() << "/'" << md_.moduleLabel() << "'";
ex.addContext(ost.str());
throw;
}
}

void Worker::useStopwatch(){
stopwatch_.reset(new RunStopwatch::StopwatchPointer::element_type);
}
Expand Down
6 changes: 5 additions & 1 deletion FWCore/Framework/src/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ namespace edm {
StreamID stream);
void beginJob() ;
void endJob();
void beginStream(StreamID id);
void endStream(StreamID id);
void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
void respondToOpenOutputFiles(FileBlock const& fb) {implRespondToOpenOutputFiles(fb);}
Expand Down Expand Up @@ -143,7 +145,9 @@ namespace edm {
CurrentProcessingContext const* cpc) = 0;
virtual void implBeginJob() = 0;
virtual void implEndJob() = 0;

virtual void implBeginStream(StreamID) = 0;
virtual void implEndStream(StreamID) = 0;

private:
virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
Expand Down
14 changes: 14 additions & 0 deletions FWCore/Framework/src/WorkerManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ namespace edm {
loadMissingDictionaries();
}

void
WorkerManager::beginStream(StreamID iID) {
for(auto& worker: allWorkers_) {
worker->beginStream(iID);
}
}

void
WorkerManager::endStream(StreamID iID) {
for(auto& worker: allWorkers_) {
worker->endStream(iID);
}
}

void
WorkerManager::resetAll() {
for_all(allWorkers_, boost::bind(&Worker::reset, _1));
Expand Down
14 changes: 14 additions & 0 deletions FWCore/Framework/src/WorkerT.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ namespace edm{
module_->doEndJob();
}

template<typename T>
inline
void
WorkerT<T>::implBeginStream(StreamID id) {
//module_->doBeginStream(id);
}

template<typename T>
inline
void
WorkerT<T>::implEndStream(StreamID id) {
//module_->doEndStream(id);
}

template<typename T>
inline
void
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/src/WorkerT.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ namespace edm {
CurrentProcessingContext const* cpc) override;
virtual void implBeginJob() override;
virtual void implEndJob() override;
virtual void implBeginStream(StreamID) override;
virtual void implEndStream(StreamID) override;
virtual void implRespondToOpenInputFile(FileBlock const& fb) override;
virtual void implRespondToCloseInputFile(FileBlock const& fb) override;
virtual void implRespondToOpenOutputFiles(FileBlock const& fb) override;
Expand Down