Skip to content

Commit

Permalink
Merge pull request #21949 from Dr15Jones/concurrenRunsAndLumis
Browse files Browse the repository at this point in the history
Concurrent LuminosityBlock processing
  • Loading branch information
cmsbuild authored Jan 31, 2018
2 parents 4c2e463 + a8f345d commit 654a8ca
Show file tree
Hide file tree
Showing 170 changed files with 1,824 additions and 1,237 deletions.
4 changes: 4 additions & 0 deletions DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ namespace edm {
}
void mergeAuxiliary(LuminosityBlockAuxiliary const& newAux);

bool sameIdentity(LuminosityBlockAuxiliary const& iRHS) const {
return iRHS.processHistoryID_ == processHistoryID_ &&
iRHS.id_ == id_;
}
private:
// This is the ID of the full process history (not the reduced process history).
// In cases where LuminosityBlock's are merged, the ID of the first process history encountered
Expand Down
5 changes: 0 additions & 5 deletions EventFilter/Utilities/plugins/FRDStreamSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ class FRDStreamSource : public edm::ProducerSourceFromFiles {
bool setRunAndEventInfo(edm::EventID& id, edm::TimeValue_t& theTime, edm::EventAuxiliary::ExperimentType& eType) override;
void produce(edm::Event& e) override;

void beginRun(edm::Run&) override {}
void endRun(edm::Run&) override {}
void beginLuminosityBlock(edm::LuminosityBlock&) override {}
void endLuminosityBlock(edm::LuminosityBlock&) override {}

bool openFile(const std::string& fileName);


Expand Down
1 change: 1 addition & 0 deletions FWCore/Concurrency/interface/WaitingTaskHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ namespace edm {
}

// ---------- const member functions ---------------------
bool taskHasFailed() const { return m_task->exceptionPtr() != nullptr; }

// ---------- static member functions --------------------

Expand Down
6 changes: 6 additions & 0 deletions FWCore/Framework/interface/EDAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"

#include <string>

Expand Down Expand Up @@ -49,6 +50,8 @@ namespace edm {

void callWhenNewProductsRegistered(std::function<void(BranchDescription const&)> const& func);

SerialTaskQueue* globalRunsQueue() { return &runQueue_;}
SerialTaskQueue* globalLuminosityBlocksQueue() { return &luminosityBlockQueue_;}
private:
bool doEvent(EventPrincipal const& ep, EventSetup const& c,
ActivityRegistry* act,
Expand Down Expand Up @@ -97,6 +100,9 @@ namespace edm {
ModuleDescription moduleDescription_;
SharedResourcesAcquirer resourceAcquirer_;

SerialTaskQueue runQueue_;
SerialTaskQueue luminosityBlockQueue_;

std::function<void(BranchDescription const&)> callWhenNewProductsRegistered_;
};
}
Expand Down
4 changes: 4 additions & 0 deletions FWCore/Framework/interface/EDFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ namespace edm {
static bool wantsStreamRuns() {return false;}
static bool wantsStreamLuminosityBlocks() {return false;};

SerialTaskQueue* globalRunsQueue() { return &runQueue_;}
SerialTaskQueue* globalLuminosityBlocksQueue() { return &luminosityBlockQueue_;}
private:
bool doEvent(EventPrincipal const& ep, EventSetup const& c,
ActivityRegistry* act,
Expand Down Expand Up @@ -110,6 +112,8 @@ namespace edm {
ModuleDescription moduleDescription_;
std::vector<BranchID> previousParentage_;
SharedResourcesAcquirer resourceAcquirer_;
SerialTaskQueue runQueue_;
SerialTaskQueue luminosityBlockQueue_;
ParentageID previousParentageId_;
};
}
Expand Down
4 changes: 4 additions & 0 deletions FWCore/Framework/interface/EDProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ namespace edm {
static bool wantsStreamRuns() {return false;}
static bool wantsStreamLuminosityBlocks() {return false;};

SerialTaskQueue* globalRunsQueue() { return &runQueue_;}
SerialTaskQueue* globalLuminosityBlocksQueue() { return &luminosityBlockQueue_;}
private:
bool doEvent(EventPrincipal const& ep, EventSetup const& c,
ActivityRegistry* act,
Expand Down Expand Up @@ -105,6 +107,8 @@ namespace edm {
ModuleDescription moduleDescription_;
std::vector<BranchID> previousParentage_;
SharedResourcesAcquirer resourceAcquirer_;
SerialTaskQueue runQueue_;
SerialTaskQueue luminosityBlockQueue_;
ParentageID previousParentageId_;
};
}
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/interface/EventPrincipal.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ namespace edm {
}

bool luminosityBlockPrincipalPtrValid() const {
return (luminosityBlockPrincipal_) ? true : false;
return luminosityBlockPrincipal_ != nullptr;
}

void setLuminosityBlockPrincipal(std::shared_ptr<LuminosityBlockPrincipal> const& lbp);
Expand Down Expand Up @@ -191,7 +191,7 @@ namespace edm {

EventAuxiliary aux_;

edm::propagate_const<std::shared_ptr<LuminosityBlockPrincipal>> luminosityBlockPrincipal_;
edm::propagate_const<LuminosityBlockPrincipal*> luminosityBlockPrincipal_;

// Pointer to the 'retriever' that will get provenance information from the persistent store.
edm::propagate_const<std::shared_ptr<ProductProvenanceRetriever>> provRetrieverPtr_;
Expand Down
44 changes: 31 additions & 13 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ configured in the user's main() function, and is set running.
#include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"

#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
#include "FWCore/Concurrency/interface/LimitedTaskQueue.h"

#include "FWCore/Utilities/interface/get_underlying_safe.h"

#include <map>
Expand All @@ -47,6 +50,8 @@ namespace edm {
class ProcessDesc;
class SubProcess;
class WaitingTaskHolder;
class LuminosityBlockProcessingStatus;
class IOVSyncValue;

namespace eventsetup {
class EventSetupProvider;
Expand Down Expand Up @@ -181,6 +186,8 @@ namespace edm {
// transition handling.

InputSource::ItemType nextTransitionType();
InputSource::ItemType lastTransitionType() const { if(deferredExceptionPtrIsSet_) {return InputSource::IsStop;}
return lastSourceTransition_;}
std::pair<edm::ProcessHistoryID, edm::RunNumber_t> nextRunID();
edm::LuminosityBlockNumber_t nextLuminosityBlockID();

Expand All @@ -203,17 +210,26 @@ namespace edm {
void beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded);
void endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException);

void beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool& globalBeginSucceeded);
void endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool globalBeginSucceeded, bool cleaningUpAfterException);

InputSource::ItemType processLumis(std::shared_ptr<void> const& iRunResource);
void endUnfinishedLumi();

void beginLumiAsync(edm::IOVSyncValue const& iSyncValue,
std::shared_ptr<void> const& iRunResource,
edm::WaitingTaskHolder iHolder);
void continueLumiAsync(edm::WaitingTaskHolder iHolder);

void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
void streamEndLumiAsync(edm::WaitingTaskHolder iTask,
unsigned int iStreamIndex,
std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
std::pair<ProcessHistoryID,RunNumber_t> readRun();
std::pair<ProcessHistoryID,RunNumber_t> readAndMergeRun();
int readLuminosityBlock();
int readAndMergeLumi();
void readLuminosityBlock(LuminosityBlockProcessingStatus&);
int readAndMergeLumi(LuminosityBlockProcessingStatus&);
void writeRun(ProcessHistoryID const& phid, RunNumber_t run);
void deleteRunFromCache(ProcessHistoryID const& phid, RunNumber_t run);
void writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
void deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
void writeLumi(LuminosityBlockProcessingStatus& );
void deleteLumiFromCache(LuminosityBlockProcessingStatus&);

bool shouldWeStop() const;

Expand All @@ -223,8 +239,6 @@ namespace edm {

bool setDeferredException(std::exception_ptr);

InputSource::ItemType readAndProcessEvents();

private:
//------------------------------------------------------------------
//
Expand All @@ -235,11 +249,10 @@ namespace edm {
serviceregistry::ServiceLegacy);

bool readNextEventForStream(unsigned int iStreamIndex,
std::atomic<bool>* finishedProcessingEvents);
LuminosityBlockProcessingStatus& iLumiStatus);

void handleNextEventForStreamAsync(WaitingTaskHolder iTask,
unsigned int iStreamIndex,
std::atomic<bool>* finishedProcessingEvents);
unsigned int iStreamIndex);


//read the next event using Stream iStreamIndex
Expand Down Expand Up @@ -278,13 +291,19 @@ namespace edm {
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
ServiceToken serviceToken_;
edm::propagate_const<std::unique_ptr<InputSource>> input_;
InputSource::ItemType lastSourceTransition_;
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
edm::SerialTaskQueue iovQueue_;
std::unique_ptr<ExceptionToActionTable const> act_table_;
std::shared_ptr<ProcessConfiguration const> processConfiguration_;
ProcessContext processContext_;
PathsAndConsumesOfModules pathsAndConsumesOfModules_;
edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
std::vector<edm::SerialTaskQueue> streamQueues_;
std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;

std::vector<SubProcess> subProcesses_;
edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;

Expand All @@ -311,7 +330,6 @@ namespace edm {
PreallocationConfiguration preallocations_;

bool asyncStopRequestedWhileProcessingEvents_;
std::atomic<InputSource::ItemType> nextItemTypeFromProcessingEvents_;
StatusCode asyncStopStatusCodeFromProcessingEvents_;
bool firstEventInBlock_=true;

Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/interface/EventSetupProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class EventSetupProvider {

// ---------- const member functions ---------------------
std::set<ComponentDescription> proxyProviderDescriptions() const;

bool isWithinValidityInterval(IOVSyncValue const& ) const;

// ---------- static member functions --------------------

// ---------- member functions ---------------------------
Expand Down
9 changes: 2 additions & 7 deletions FWCore/Framework/interface/InputSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Some examples of InputSource subclasses may be:
#include "FWCore/Utilities/interface/RunIndex.h"
#include "FWCore/Utilities/interface/Signal.h"
#include "FWCore/Utilities/interface/get_underlying_safe.h"
#include "FWCore/Utilities/interface/StreamID.h"

#include <memory>
#include <string>
Expand Down Expand Up @@ -154,7 +155,7 @@ namespace edm {
void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb) {setLumi(lb);}

/// issue an event report
void issueReports(EventID const& eventID);
void issueReports(EventID const& eventID, StreamID streamID);

/// Register any produced products
virtual void registerProducts();
Expand Down Expand Up @@ -218,15 +219,9 @@ namespace edm {
/// Called by framework at beginning of lumi block
virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);

/// Called by framework at end of lumi block
virtual void doEndLumi(LuminosityBlockPrincipal& lbp, bool cleaningUpAfterException, ProcessContext const*);

/// Called by framework at beginning of run
virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);

/// Called by framework at end of run
virtual void doEndRun(RunPrincipal& rp, bool cleaningUpAfterException, ProcessContext const*);

/// Accessor for the current time, as seen by the input source
Timestamp const& timestamp() const {return time_;}

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/LuminosityBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace edm {
class LuminosityBlock : public LuminosityBlockBase {
public:
LuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleDescription const& md,
ModuleCallingContext const*);
ModuleCallingContext const*, bool isAtEnd);
~LuminosityBlock() override;

// AUX functions are defined in LuminosityBlockBase
Expand Down
7 changes: 6 additions & 1 deletion FWCore/Framework/interface/LuminosityBlockForOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ For its usage, see "FWCore/Framework/interface/PrincipalGetAdapter.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/OccurrenceForOutput.h"
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Utilities/interface/LuminosityBlockIndex.h"

#include <memory>
#include <string>
Expand All @@ -39,7 +40,7 @@ namespace edm {
class LuminosityBlockForOutput : public OccurrenceForOutput {
public:
LuminosityBlockForOutput(LuminosityBlockPrincipal const& lbp, ModuleDescription const& md,
ModuleCallingContext const*);
ModuleCallingContext const*, bool isAtEnd);
~LuminosityBlockForOutput() override;

LuminosityBlockAuxiliary const& luminosityBlockAuxiliary() const {return aux_;}
Expand All @@ -49,6 +50,10 @@ namespace edm {
Timestamp const& beginTime() const {return aux_.beginTime();}
Timestamp const& endTime() const {return aux_.endTime();}

/**\return Reusable index which can be used to separate data for different simultaneous LuminosityBlocks.
*/
LuminosityBlockIndex index() const;

RunForOutput const&
getRun() const {
return *run_;
Expand Down
18 changes: 5 additions & 13 deletions FWCore/Framework/interface/LuminosityBlockPrincipal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ namespace edm {
typedef LuminosityBlockAuxiliary Auxiliary;
typedef Principal Base;
LuminosityBlockPrincipal(
std::shared_ptr<LuminosityBlockAuxiliary> aux,
std::shared_ptr<ProductRegistry const> reg,
ProcessConfiguration const& pc,
HistoryAppender* historyAppender,
Expand Down Expand Up @@ -75,23 +74,24 @@ namespace edm {
}

void setEndTime(Timestamp const& time) {
aux_->setEndTime(time);
aux_.setEndTime(time);
}

LuminosityBlockNumber_t luminosityBlock() const {
return aux().luminosityBlock();
}

void setAux( LuminosityBlockAuxiliary iAux) { aux_ = std::move(iAux);}
LuminosityBlockAuxiliary const& aux() const {
return *aux_;
return aux_;
}

RunNumber_t run() const {
return aux().run();
}

void mergeAuxiliary(LuminosityBlockAuxiliary const& aux) {
return aux_->mergeAuxiliary(aux);
return aux_.mergeAuxiliary(aux);
}

void put(
Expand All @@ -101,23 +101,15 @@ namespace edm {
void put(ProductResolverIndex index,
std::unique_ptr<WrapperBase> edp) const;

void setComplete() {
complete_ = true;
}

private:

bool isComplete_() const override {return complete_;}

unsigned int transitionIndex_() const override;

edm::propagate_const<std::shared_ptr<RunPrincipal>> runPrincipal_;

edm::propagate_const<std::shared_ptr<LuminosityBlockAuxiliary>> aux_;
LuminosityBlockAuxiliary aux_;

LuminosityBlockIndex index_;

bool complete_;
};
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/OccurrenceForOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace edm {
class OccurrenceForOutput {
public:
OccurrenceForOutput(Principal const& ep, ModuleDescription const& md,
ModuleCallingContext const*);
ModuleCallingContext const*, bool isAtEnd);
virtual ~OccurrenceForOutput();

//Used in conjunction with EDGetToken
Expand Down
5 changes: 5 additions & 0 deletions FWCore/Framework/interface/OutputModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ namespace edm {
static bool wantsStreamRuns() {return false;}
static bool wantsStreamLuminosityBlocks() {return false;};

SerialTaskQueue* globalRunsQueue() { return &runQueue_;}
SerialTaskQueue* globalLuminosityBlocksQueue() { return &luminosityBlockQueue_;}

bool wantAllEvents() const {return wantAllEvents_;}

BranchIDLists const* branchIDLists();
Expand Down Expand Up @@ -179,6 +182,8 @@ namespace edm {
std::map<BranchID, bool> keepAssociation_;

SharedResourcesAcquirer resourceAcquirer_;
SerialTaskQueue runQueue_;
SerialTaskQueue luminosityBlockQueue_;

//------------------------------------------------------------------
// private member functions
Expand Down
Loading

0 comments on commit 654a8ca

Please sign in to comment.