diff --git a/FWCore/Framework/interface/Schedule.h b/FWCore/Framework/interface/Schedule.h index 0ed76f06c7d2f..067e279ce493b 100644 --- a/FWCore/Framework/interface/Schedule.h +++ b/FWCore/Framework/interface/Schedule.h @@ -135,6 +135,9 @@ namespace edm { void beginJob(ProductRegistry const&); void endJob(ExceptionCollector & collector); + + void beginStream(); + void endStream(); // Write the luminosity block void writeLumi(LuminosityBlockPrincipal const& lbp); @@ -169,6 +172,8 @@ namespace edm { void preForkReleaseResources(); void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren); + StreamID streamID() const { return streamID_; } + std::pair timeCpuReal() const { return std::pair(stopwatch_->cpuTime(), stopwatch_->realTime()); } diff --git a/FWCore/Framework/interface/SubProcess.h b/FWCore/Framework/interface/SubProcess.h index 65999403591d8..d11eb265b0865 100644 --- a/FWCore/Framework/interface/SubProcess.h +++ b/FWCore/Framework/interface/SubProcess.h @@ -56,6 +56,18 @@ namespace edm { void doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException); + + void doBeginStream(StreamID); + void doEndStream(StreamID); + void doStreamBeginRun(StreamID, RunPrincipal const& principal, IOVSyncValue const& ts); + + void doStreamEndRun(StreamID, RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException); + + void doStreamBeginLuminosityBlock(StreamID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts); + + void doStreamEndLuminosityBlock(StreamID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException); + + // Write the luminosity block void writeLumi(ProcessHistoryID const& parentPhID, int runNumber, int lumiNumber); diff --git a/FWCore/Framework/interface/WorkerManager.h b/FWCore/Framework/interface/WorkerManager.h index 021237fe76478..9844880c7b667 100644 --- a/FWCore/Framework/interface/WorkerManager.h +++ b/FWCore/Framework/interface/WorkerManager.h @@ -49,6 +49,9 @@ namespace edm { void endJob(); void endJob(ExceptionCollector& collector); + void beginStream(StreamID iID); + void endStream(StreamID iID); + AllWorkers const& allWorkers() const {return allWorkers_;} void addToAllWorkers(Worker* w, bool useStopwatch); diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index b31013a72cfa0..c82deb5880a81 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -738,6 +738,9 @@ namespace edm { // toerror.succeeded(); // should we add this? if(hasSubProcess()) subProcess_->doBeginJob(); actReg_->postBeginJobSignal_(); + + schedule_->beginStream(); + if(hasSubProcess()) subProcess_->doBeginStream(schedule_->streamID()); } void @@ -751,6 +754,12 @@ namespace edm { //make the services available ServiceRegistry::Operate operate(serviceToken_); + //NOTE: this really should go elsewhere in the future + c.call([this](){this->schedule_->endStream();}); + if(hasSubProcess()) { + c.call([this](){ this->subProcess_->doEndStream(this->schedule_->streamID()); } ); + } + schedule_->endJob(c); if(hasSubProcess()) { c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get())); @@ -1987,12 +1996,12 @@ namespace edm { ScheduleSignalSentry sentry(actReg_.get(), &runPrincipal, &es); schedule_->processOneOccurrence(runPrincipal, es); if(hasSubProcess()) { - //subProcess_->doStreamBeginRun(StreamID{0}, runPrincipal, ts); + subProcess_->doStreamBeginRun(schedule_->streamID(), runPrincipal, ts); } } FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n"; if(looper_) { - //looper_->doStreamBeginRun(StreamID{0},runPrincipal, es); + //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es); } } @@ -2008,12 +2017,12 @@ namespace edm { ScheduleSignalSentry sentry(actReg_.get(), &runPrincipal, &es); schedule_->processOneOccurrence(runPrincipal, es, cleaningUpAfterException); if(hasSubProcess()) { - //subProcess_->doStreamEndRun(runPrincipal, ts, cleaningUpAfterException); + subProcess_->doStreamEndRun(schedule_->streamID(),runPrincipal, ts, cleaningUpAfterException); } } FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n"; if(looper_) { - //looper_->doStreamEndRun(runPrincipal, es); + //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es); } { typedef OccurrenceTraits Traits; @@ -2061,12 +2070,12 @@ namespace edm { ScheduleSignalSentry sentry(actReg_.get(), &lumiPrincipal, &es); schedule_->processOneOccurrence(lumiPrincipal, es); if(hasSubProcess()) { - //subProcess_->doStreamBeginLuminosityBlock(lumiPrincipal, ts); + subProcess_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, ts); } } FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n"; if(looper_) { - //looper_->doStreamBeginLuminosityBlock(lumiPrincipal, es); + //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es); } } @@ -2084,12 +2093,12 @@ namespace edm { ScheduleSignalSentry sentry(actReg_.get(), &lumiPrincipal, &es); schedule_->processOneOccurrence(lumiPrincipal, es, cleaningUpAfterException); if(hasSubProcess()) { - //subProcess_->doStreamEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); + subProcess_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, ts, cleaningUpAfterException); } } FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n"; if(looper_) { - //looper_->doStreamEndLuminosityBlock(lumiPrincipal, es); + //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es); } { typedef OccurrenceTraits Traits; diff --git a/FWCore/Framework/src/Schedule.cc b/FWCore/Framework/src/Schedule.cc index 7418077ec7bd3..77954651f4ed4 100644 --- a/FWCore/Framework/src/Schedule.cc +++ b/FWCore/Framework/src/Schedule.cc @@ -1258,6 +1258,14 @@ namespace edm { void Schedule::beginJob(ProductRegistry const& iRegistry) { workerManager_.beginJob(iRegistry); } + + void Schedule::beginStream() { + workerManager_.beginStream(streamID_); + } + + void Schedule::endStream() { + workerManager_.endStream(streamID_); + } void Schedule::preForkReleaseResources() { for_all(allWorkers(), boost::bind(&Worker::preForkReleaseResources, _1)); diff --git a/FWCore/Framework/src/SubProcess.cc b/FWCore/Framework/src/SubProcess.cc index a93f79f2ec407..8a5bcf8aafa16 100644 --- a/FWCore/Framework/src/SubProcess.cc +++ b/FWCore/Framework/src/SubProcess.cc @@ -148,10 +148,10 @@ namespace edm { ServiceRegistry::Operate operate(serviceToken_); ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each."); schedule_->endJob(c); + if(subProcess_.get()) c.call([this](){ this->subProcess_->doEndJob();}); if(c.hasThrown()) { c.rethrow(); } - if(subProcess_.get()) subProcess_->doEndJob(); } void @@ -334,6 +334,36 @@ namespace edm { principalCache_.deleteLumi(it->second, runNumber, lumiNumber); if(subProcess_.get()) subProcess_->deleteLumiFromCache(it->second, runNumber, lumiNumber); } + + void + SubProcess::doBeginStream(StreamID iID) { + ServiceRegistry::Operate operate(serviceToken_); + assert(iID == schedule_->streamID()); + schedule_->beginStream(); + if(subProcess_.get()) subProcess_->doBeginStream(iID); + } + + void + SubProcess::doEndStream(StreamID iID) { + ServiceRegistry::Operate operate(serviceToken_); + assert(iID == schedule_->streamID()); + schedule_->endStream(); + if(subProcess_.get()) subProcess_->doEndStream(iID); + } + + //Dummies until SubProcess inherits from new interface + void + SubProcess::doStreamBeginRun(StreamID, RunPrincipal const& principal, IOVSyncValue const& ts) {} + + void + SubProcess::doStreamEndRun(StreamID, RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {} + + void + SubProcess::doStreamBeginLuminosityBlock(StreamID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts) {} + + void + SubProcess::doStreamEndLuminosityBlock(StreamID, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {} + void SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const { diff --git a/FWCore/Framework/src/Worker.cc b/FWCore/Framework/src/Worker.cc index 8702bc7cecb4d..3a2df21dd8d90 100644 --- a/FWCore/Framework/src/Worker.cc +++ b/FWCore/Framework/src/Worker.cc @@ -118,7 +118,51 @@ namespace edm { throw; } } + + void Worker::beginStream(StreamID id) { + try { + try { + //ModuleBeginStreamSignalSentry cpp(actReg_.get(), md_); + implBeginStream(id); + } + catch (cms::Exception& e) { throw; } + catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); } + catch (std::exception& e) { convertException::stdToEDM(e); } + catch(std::string& s) { convertException::stringToEDM(s); } + catch(char const* c) { convertException::charPtrToEDM(c); } + catch (...) { convertException::unknownToEDM(); } + } + catch(cms::Exception& ex) { + state_ = Exception; + std::ostringstream ost; + ost << "Calling beginStream for module " << md_.moduleName() << "/'" << md_.moduleLabel() << "'"; + ex.addContext(ost.str()); + throw; + } + } + void Worker::endStream(StreamID id) { + try { + try { + //ModuleEndStreamSignalSentry cpp(actReg_.get(), md_); + implEndStream(id); + } + catch (cms::Exception& e) { throw; } + catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); } + catch (std::exception& e) { convertException::stdToEDM(e); } + catch(std::string& s) { convertException::stringToEDM(s); } + catch(char const* c) { convertException::charPtrToEDM(c); } + catch (...) { convertException::unknownToEDM(); } + } + catch(cms::Exception& ex) { + state_ = Exception; + std::ostringstream ost; + ost << "Calling endStream for module " << md_.moduleName() << "/'" << md_.moduleLabel() << "'"; + ex.addContext(ost.str()); + throw; + } + } + void Worker::useStopwatch(){ stopwatch_.reset(new RunStopwatch::StopwatchPointer::element_type); } diff --git a/FWCore/Framework/src/Worker.h b/FWCore/Framework/src/Worker.h index 9418869988bdd..808caa6cb32b0 100644 --- a/FWCore/Framework/src/Worker.h +++ b/FWCore/Framework/src/Worker.h @@ -70,6 +70,8 @@ namespace edm { StreamID stream); void beginJob() ; void endJob(); + void beginStream(StreamID id); + void endStream(StreamID id); void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);} void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);} void respondToOpenOutputFiles(FileBlock const& fb) {implRespondToOpenOutputFiles(fb);} @@ -143,7 +145,9 @@ namespace edm { CurrentProcessingContext const* cpc) = 0; virtual void implBeginJob() = 0; virtual void implEndJob() = 0; - + virtual void implBeginStream(StreamID) = 0; + virtual void implEndStream(StreamID) = 0; + private: virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0; virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0; diff --git a/FWCore/Framework/src/WorkerManager.cc b/FWCore/Framework/src/WorkerManager.cc index d8018cb98c6fa..cccf787ee9ad3 100644 --- a/FWCore/Framework/src/WorkerManager.cc +++ b/FWCore/Framework/src/WorkerManager.cc @@ -95,6 +95,20 @@ namespace edm { loadMissingDictionaries(); } + void + WorkerManager::beginStream(StreamID iID) { + for(auto& worker: allWorkers_) { + worker->beginStream(iID); + } + } + + void + WorkerManager::endStream(StreamID iID) { + for(auto& worker: allWorkers_) { + worker->endStream(iID); + } + } + void WorkerManager::resetAll() { for_all(allWorkers_, boost::bind(&Worker::reset, _1)); diff --git a/FWCore/Framework/src/WorkerT.cc b/FWCore/Framework/src/WorkerT.cc index eb6ded99adec3..c164101adba33 100644 --- a/FWCore/Framework/src/WorkerT.cc +++ b/FWCore/Framework/src/WorkerT.cc @@ -122,6 +122,20 @@ namespace edm{ module_->doEndJob(); } + template + inline + void + WorkerT::implBeginStream(StreamID id) { + //module_->doBeginStream(id); + } + + template + inline + void + WorkerT::implEndStream(StreamID id) { + //module_->doEndStream(id); + } + template inline void diff --git a/FWCore/Framework/src/WorkerT.h b/FWCore/Framework/src/WorkerT.h index bb6de77d30c1b..0b879febde92a 100644 --- a/FWCore/Framework/src/WorkerT.h +++ b/FWCore/Framework/src/WorkerT.h @@ -72,6 +72,8 @@ namespace edm { CurrentProcessingContext const* cpc) override; virtual void implBeginJob() override; virtual void implEndJob() override; + virtual void implBeginStream(StreamID) override; + virtual void implEndStream(StreamID) override; virtual void implRespondToOpenInputFile(FileBlock const& fb) override; virtual void implRespondToCloseInputFile(FileBlock const& fb) override; virtual void implRespondToOpenOutputFiles(FileBlock const& fb) override;