diff --git a/Framework/Core/include/Framework/DeviceSpec.h b/Framework/Core/include/Framework/DeviceSpec.h index bcfcf2ae61368..84b6cc4e8eb1a 100644 --- a/Framework/Core/include/Framework/DeviceSpec.h +++ b/Framework/Core/include/Framework/DeviceSpec.h @@ -51,7 +51,10 @@ struct DeviceSpec { std::vector forwards; size_t rank; // Id of a parallel processing I am part of size_t nSlots; // Total number of parallel units I am part of + /// The time pipelining id of this particular device. size_t inputTimesliceId; + /// The maximum number of time pipelining for this device. + size_t maxInputTimeslices; /// The completion policy to use for this device. CompletionPolicy completionPolicy; DispatchPolicy dispatchPolicy; diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index be73d0022cebf..b5471ee4db096 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -110,6 +110,7 @@ int main(int argc, char** argv) // the default one. // The default policy is a catch all pub/sub setup to be consistent with the past. std::vector workflowOptions; + workflowOptions.push_back(ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}}); UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0); std::vector channelPolicies; UserCustomizationsHelper::userDefinedCustomization(channelPolicies, 0); diff --git a/Framework/Core/src/AODReaderHelpers.cxx b/Framework/Core/src/AODReaderHelpers.cxx index 9e4842179ba3a..8583e29f48500 100644 --- a/Framework/Core/src/AODReaderHelpers.cxx +++ b/Framework/Core/src/AODReaderHelpers.cxx @@ -282,14 +282,19 @@ AlgorithmSpec AODReaderHelpers::rootFileReaderCallback() auto counter = std::make_shared(0); return adaptStateless([readMask, counter, - filenames](DataAllocator& outputs, ControlService& control) { - if (*counter >= filenames.size()) { + filenames](DataAllocator& outputs, ControlService& control, DeviceSpec const& device) { + // Each parallel reader reads the files whose index is associated to + // their inputTimesliceId + assert(device.inputTimesliceId < device.maxInputTimeslices); + size_t fi = (*counter * device.maxInputTimeslices) + device.inputTimesliceId; + if (fi >= filenames.size()) { LOG(info) << "All input files processed"; control.endOfStream(); control.readyToQuit(QuitRequest::Me); return; } - auto f = filenames[*counter]; + auto f = filenames[fi]; + LOG(INFO) << "Processing " << f; auto infile = std::make_unique(f.c_str()); *counter += 1; if (infile.get() == nullptr || infile->IsOpen() == false) { diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index b8446e28963bb..bd883da10499d 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -248,6 +248,7 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, device.rank = processor.rank; device.nSlots = processor.nSlots; device.inputTimesliceId = edge.timeIndex; + device.maxInputTimeslices = processor.maxInputTimeslices; device.resource = {acceptedOffer}; devices.push_back(device); return devices.size() - 1; @@ -451,6 +452,7 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector& devices, device.rank = processor.rank; device.nSlots = processor.nSlots; device.inputTimesliceId = edge.timeIndex; + device.maxInputTimeslices = processor.maxInputTimeslices; device.resource = {acceptedOffer}; // FIXME: maybe I should use an std::map in the end diff --git a/Framework/Core/src/FrameworkGUIDeviceInspector.cxx b/Framework/Core/src/FrameworkGUIDeviceInspector.cxx index 6a27b835a2392..09326bc3f4d44 100644 --- a/Framework/Core/src/FrameworkGUIDeviceInspector.cxx +++ b/Framework/Core/src/FrameworkGUIDeviceInspector.cxx @@ -119,7 +119,7 @@ void displayDeviceInspector(DeviceSpec const& spec, ImGui::Text("Name: %s", spec.name.c_str()); ImGui::Text("Executable: %s", metadata.executable.c_str()); ImGui::Text("Pid: %d", info.pid); - ImGui::Text("Rank: %zu/%zu%%%zu", spec.rank, spec.nSlots, spec.inputTimesliceId); + ImGui::Text("Rank: %zu/%zu%%%zu/%zu", spec.rank, spec.nSlots, spec.inputTimesliceId, spec.maxInputTimeslices); if (ImGui::Button("Attach debugger")) { std::string pid = std::to_string(info.pid); diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 9bdff2ed17939..a9b71e18b3836 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -12,6 +12,7 @@ #include "Framework/AODReaderHelpers.h" #include "Framework/ChannelMatching.h" #include "Framework/CommonDataProcessors.h" +#include "Framework/ConfigContext.h" #include "Framework/DeviceSpec.h" #include "Framework/DataSpecUtils.h" #include "Framework/ControlService.h" @@ -142,7 +143,7 @@ void addMissingOutputsToReader(std::vector const& providedOutputs, } } -void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow) +void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx) { auto fakeCallback = AlgorithmSpec{[](InitContext& ic) { LOG(INFO) << "This is not a real device, merely a placeholder for external inputs"; @@ -279,7 +280,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow) extraSpecs.push_back(qaStore); } if (aodReader.outputs.empty() == false) { - extraSpecs.push_back(aodReader); + extraSpecs.push_back(timePipeline(aodReader, ctx.options().get("readers"))); auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]); timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration}); } diff --git a/Framework/Core/src/WorkflowHelpers.h b/Framework/Core/src/WorkflowHelpers.h index 4ec478bb6fd54..671519585aa78 100644 --- a/Framework/Core/src/WorkflowHelpers.h +++ b/Framework/Core/src/WorkflowHelpers.h @@ -7,8 +7,8 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_WORKFLOWHELPERS_H -#define FRAMEWORK_WORKFLOWHELPERS_H +#ifndef O2_FRAMEWORK_WORKFLOWHELPERS_H_ +#define O2_FRAMEWORK_WORKFLOWHELPERS_H_ #include "Framework/InputSpec.h" #include "Framework/OutputSpec.h" @@ -20,11 +20,10 @@ #include #include -namespace o2 -{ -namespace framework +namespace o2::framework { +struct ConfigContext; // Structure to hold information which was derived // for output channels. struct LogicalOutputInfo { @@ -146,7 +145,9 @@ struct WorkflowHelpers { // Depending on the workflow and the dangling inputs inside it, inject "fake" // devices to mark the fact we might need some extra action to make sure // dangling inputs are satisfied. - static void injectServiceDevices(WorkflowSpec& workflow); + // @a workflow the workflow to decorate + // @a ctx the context for the configuration phase + static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx); static void constructGraph(const WorkflowSpec& workflow, std::vector& logicalEdges, @@ -173,7 +174,6 @@ struct WorkflowHelpers { static std::vector computeDanglingOutputs(WorkflowSpec const& workflow); }; -} // namespace framework -} // namespace o2 +} // namespace o2::framework -#endif // FRAMEWORK_WORKFLOWHELPERS_H +#endif // O2_FRAMEWORK_WORKFLOWHELPERS_H_ diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index ca9564852fbe7..973a05468f6e1 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1309,10 +1309,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, } } - WorkflowHelpers::injectServiceDevices(physicalWorkflow); - std::stable_sort(physicalWorkflow.begin(), physicalWorkflow.end(), [&rankIndex](DataProcessorSpec const& a, DataProcessorSpec const& b) { - return rankIndex[a.name] < rankIndex[b.name]; - }); + WorkflowHelpers::injectServiceDevices(physicalWorkflow, configContext); // Use the hidden options as veto, all config specs matching a definition // in the hidden options are skipped in order to avoid duplicate definitions diff --git a/Framework/Core/test/benchmark_WorkflowHelpers.cxx b/Framework/Core/test/benchmark_WorkflowHelpers.cxx index 0510898d742a3..6c46cd881033a 100644 --- a/Framework/Core/test/benchmark_WorkflowHelpers.cxx +++ b/Framework/Core/test/benchmark_WorkflowHelpers.cxx @@ -8,14 +8,27 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamRegistry.h" +#include "Framework/ConfigContext.h" #include "Framework/DataSpecUtils.h" #include "Framework/OutputSpec.h" +#include "Framework/SimpleOptionsRetriever.h" #include "../src/WorkflowHelpers.h" #include #include using namespace o2::framework; +std::unique_ptr makeEmptyConfigContext() +{ + // FIXME: Ugly... We need to fix ownership and make sure the ConfigContext + // either owns or shares ownership of the registry. + static std::unique_ptr retriever(new SimpleOptionsRetriever); + static ConfigParamRegistry registry{std::move(retriever)}; + auto context = std::make_unique(registry); + return context; +} + static void BM_CreateGraphOverhead(benchmark::State& state) { @@ -40,7 +53,8 @@ static void BM_CreateGraphOverhead(benchmark::State& state) std::vector availableForwardsInfo; WorkflowHelpers::verifyWorkflow(workflow); - WorkflowHelpers::injectServiceDevices(workflow); + auto context = makeEmptyConfigContext(); + WorkflowHelpers::injectServiceDevices(workflow, *context); WorkflowHelpers::constructGraph(workflow, logicalEdges, outputs, @@ -75,7 +89,8 @@ static void BM_CreateGraphReverseOverhead(benchmark::State& state) std::vector availableForwardsInfo; WorkflowHelpers::verifyWorkflow(workflow); - WorkflowHelpers::injectServiceDevices(workflow); + auto context = makeEmptyConfigContext(); + WorkflowHelpers::injectServiceDevices(workflow, *context); WorkflowHelpers::constructGraph(workflow, logicalEdges, outputs, availableForwardsInfo); diff --git a/Framework/Core/test/test_WorkflowHelpers.cxx b/Framework/Core/test/test_WorkflowHelpers.cxx index 08007d9995f40..52367513678c3 100644 --- a/Framework/Core/test/test_WorkflowHelpers.cxx +++ b/Framework/Core/test/test_WorkflowHelpers.cxx @@ -12,15 +12,28 @@ #define BOOST_TEST_DYN_LINK #include "test_HelperMacros.h" +#include "Framework/ConfigContext.h" #include "Framework/WorkflowSpec.h" #include "Framework/DataSpecUtils.h" +#include "Framework/SimpleOptionsRetriever.h" #include "../src/WorkflowHelpers.h" #include #include #include +#include using namespace o2::framework; +std::unique_ptr makeEmptyConfigContext() +{ + // FIXME: Ugly... We need to fix ownership and make sure the ConfigContext + // either owns or shares ownership of the registry. + static std::unique_ptr retriever(new SimpleOptionsRetriever); + static ConfigParamRegistry registry{std::move(retriever)}; + auto context = std::make_unique(registry); + return context; +} + BOOST_AUTO_TEST_CASE(TestVerifyWorkflow) { using namespace o2::framework; @@ -206,7 +219,8 @@ BOOST_AUTO_TEST_CASE(TestSimpleConnection) std::vector availableForwardsInfo; WorkflowHelpers::verifyWorkflow(workflow); - WorkflowHelpers::injectServiceDevices(workflow); + auto context = makeEmptyConfigContext(); + WorkflowHelpers::injectServiceDevices(workflow, *context); BOOST_CHECK_EQUAL(workflow.size(), 3); WorkflowHelpers::constructGraph(workflow, logicalEdges, outputs, @@ -245,9 +259,9 @@ BOOST_AUTO_TEST_CASE(TestSimpleForward) std::vector logicalEdges; std::vector outputs; std::vector availableForwardsInfo; - WorkflowHelpers::verifyWorkflow(workflow); - WorkflowHelpers::injectServiceDevices(workflow); + auto context = makeEmptyConfigContext(); + WorkflowHelpers::injectServiceDevices(workflow, *context); WorkflowHelpers::constructGraph(workflow, logicalEdges, outputs, availableForwardsInfo); @@ -303,7 +317,8 @@ BOOST_AUTO_TEST_CASE(TestGraphConstruction) std::vector outputs; WorkflowHelpers::verifyWorkflow(workflow); - WorkflowHelpers::injectServiceDevices(workflow); + auto context = makeEmptyConfigContext(); + WorkflowHelpers::injectServiceDevices(workflow, *context); WorkflowHelpers::constructGraph(workflow, logicalEdges, outputs, availableForwardsInfo); @@ -427,7 +442,8 @@ BOOST_AUTO_TEST_CASE(TestExternalInput) BOOST_CHECK_EQUAL(workflow.size(), 1); - WorkflowHelpers::injectServiceDevices(workflow); + auto context = makeEmptyConfigContext(); + WorkflowHelpers::injectServiceDevices(workflow, *context); // The added devices are the one which should connect to // the condition DB and the sink for the dangling outputs. BOOST_CHECK_EQUAL(workflow.size(), 3); @@ -507,7 +523,8 @@ BOOST_AUTO_TEST_CASE(TestOriginWildcard) std::vector availableForwardsInfo; WorkflowHelpers::verifyWorkflow(workflow); - WorkflowHelpers::injectServiceDevices(workflow); + auto context = makeEmptyConfigContext(); + WorkflowHelpers::injectServiceDevices(workflow, *context); BOOST_CHECK_EQUAL(workflow.size(), 3); BOOST_REQUIRE(workflow.size() >= 3); BOOST_CHECK_EQUAL(workflow[0].name, "A");