Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions 3 Framework/Core/include/Framework/DeviceSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ struct DeviceSpec {
std::vector<ForwardRoute> forwards;
size_t rank; // Id of a parallel processing I am part of
size_t nSlots; // Total number of parallel units I am part of
/// The time pipelining id of this particular device.
size_t inputTimesliceId;
/// The maximum number of time pipelining for this device.
size_t maxInputTimeslices;
/// The completion policy to use for this device.
CompletionPolicy completionPolicy;
DispatchPolicy dispatchPolicy;
Expand Down
1 change: 1 addition & 0 deletions 1 Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ int main(int argc, char** argv)
// the default one.
// The default policy is a catch all pub/sub setup to be consistent with the past.
std::vector<o2::framework::ConfigParamSpec> workflowOptions;
workflowOptions.push_back(ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}});
UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0);
std::vector<ChannelConfigurationPolicy> channelPolicies;
UserCustomizationsHelper::userDefinedCustomization(channelPolicies, 0);
Expand Down
11 changes: 8 additions & 3 deletions 11 Framework/Core/src/AODReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,19 @@ AlgorithmSpec AODReaderHelpers::rootFileReaderCallback()
auto counter = std::make_shared<int>(0);
return adaptStateless([readMask,
counter,
filenames](DataAllocator& outputs, ControlService& control) {
if (*counter >= filenames.size()) {
filenames](DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
// Each parallel reader reads the files whose index is associated to
// their inputTimesliceId
assert(device.inputTimesliceId < device.maxInputTimeslices);
size_t fi = (*counter * device.maxInputTimeslices) + device.inputTimesliceId;
if (fi >= filenames.size()) {
LOG(info) << "All input files processed";
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
return;
}
auto f = filenames[*counter];
auto f = filenames[fi];
LOG(INFO) << "Processing " << f;
auto infile = std::make_unique<TFile>(f.c_str());
*counter += 1;
if (infile.get() == nullptr || infile->IsOpen() == false) {
Expand Down
2 changes: 2 additions & 0 deletions 2 Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
device.rank = processor.rank;
device.nSlots = processor.nSlots;
device.inputTimesliceId = edge.timeIndex;
device.maxInputTimeslices = processor.maxInputTimeslices;
device.resource = {acceptedOffer};
devices.push_back(device);
return devices.size() - 1;
Expand Down Expand Up @@ -451,6 +452,7 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
device.rank = processor.rank;
device.nSlots = processor.nSlots;
device.inputTimesliceId = edge.timeIndex;
device.maxInputTimeslices = processor.maxInputTimeslices;
device.resource = {acceptedOffer};

// FIXME: maybe I should use an std::map in the end
Expand Down
2 changes: 1 addition & 1 deletion 2 Framework/Core/src/FrameworkGUIDeviceInspector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void displayDeviceInspector(DeviceSpec const& spec,
ImGui::Text("Name: %s", spec.name.c_str());
ImGui::Text("Executable: %s", metadata.executable.c_str());
ImGui::Text("Pid: %d", info.pid);
ImGui::Text("Rank: %zu/%zu%%%zu", spec.rank, spec.nSlots, spec.inputTimesliceId);
ImGui::Text("Rank: %zu/%zu%%%zu/%zu", spec.rank, spec.nSlots, spec.inputTimesliceId, spec.maxInputTimeslices);

if (ImGui::Button("Attach debugger")) {
std::string pid = std::to_string(info.pid);
Expand Down
5 changes: 3 additions & 2 deletions 5 Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "Framework/AODReaderHelpers.h"
#include "Framework/ChannelMatching.h"
#include "Framework/CommonDataProcessors.h"
#include "Framework/ConfigContext.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/ControlService.h"
Expand Down Expand Up @@ -142,7 +143,7 @@ void addMissingOutputsToReader(std::vector<OutputSpec> const& providedOutputs,
}
}

void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow)
void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx)
{
auto fakeCallback = AlgorithmSpec{[](InitContext& ic) {
LOG(INFO) << "This is not a real device, merely a placeholder for external inputs";
Expand Down Expand Up @@ -279,7 +280,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow)
extraSpecs.push_back(qaStore);
}
if (aodReader.outputs.empty() == false) {
extraSpecs.push_back(aodReader);
extraSpecs.push_back(timePipeline(aodReader, ctx.options().get<int64_t>("readers")));
auto concrete = DataSpecUtils::asConcreteDataMatcher(aodReader.inputs[0]);
timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
}
Expand Down
18 changes: 9 additions & 9 deletions 18 Framework/Core/src/WorkflowHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_WORKFLOWHELPERS_H
#define FRAMEWORK_WORKFLOWHELPERS_H
#ifndef O2_FRAMEWORK_WORKFLOWHELPERS_H_
#define O2_FRAMEWORK_WORKFLOWHELPERS_H_

#include "Framework/InputSpec.h"
#include "Framework/OutputSpec.h"
Expand All @@ -20,11 +20,10 @@
#include <vector>
#include <iosfwd>

namespace o2
{
namespace framework
namespace o2::framework
{

struct ConfigContext;
// Structure to hold information which was derived
// for output channels.
struct LogicalOutputInfo {
Expand Down Expand Up @@ -146,7 +145,9 @@ struct WorkflowHelpers {
// Depending on the workflow and the dangling inputs inside it, inject "fake"
// devices to mark the fact we might need some extra action to make sure
// dangling inputs are satisfied.
static void injectServiceDevices(WorkflowSpec& workflow);
// @a workflow the workflow to decorate
// @a ctx the context for the configuration phase
static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx);

static void constructGraph(const WorkflowSpec& workflow,
std::vector<DeviceConnectionEdge>& logicalEdges,
Expand All @@ -173,7 +174,6 @@ struct WorkflowHelpers {
static std::vector<InputSpec> computeDanglingOutputs(WorkflowSpec const& workflow);
};

} // namespace framework
} // namespace o2
} // namespace o2::framework

#endif // FRAMEWORK_WORKFLOWHELPERS_H
#endif // O2_FRAMEWORK_WORKFLOWHELPERS_H_
5 changes: 1 addition & 4 deletions 5 Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1309,10 +1309,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
}
}

WorkflowHelpers::injectServiceDevices(physicalWorkflow);
std::stable_sort(physicalWorkflow.begin(), physicalWorkflow.end(), [&rankIndex](DataProcessorSpec const& a, DataProcessorSpec const& b) {
return rankIndex[a.name] < rankIndex[b.name];
});
WorkflowHelpers::injectServiceDevices(physicalWorkflow, configContext);

// Use the hidden options as veto, all config specs matching a definition
// in the hidden options are skipped in order to avoid duplicate definitions
Expand Down
19 changes: 17 additions & 2 deletions 19 Framework/Core/test/benchmark_WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,27 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/WorkflowSpec.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/ConfigContext.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/OutputSpec.h"
#include "Framework/SimpleOptionsRetriever.h"
#include "../src/WorkflowHelpers.h"
#include <benchmark/benchmark.h>
#include <algorithm>

using namespace o2::framework;

std::unique_ptr<ConfigContext> makeEmptyConfigContext()
{
// FIXME: Ugly... We need to fix ownership and make sure the ConfigContext
// either owns or shares ownership of the registry.
static std::unique_ptr<ParamRetriever> retriever(new SimpleOptionsRetriever);
static ConfigParamRegistry registry{std::move(retriever)};
auto context = std::make_unique<ConfigContext>(registry);
return context;
}

static void BM_CreateGraphOverhead(benchmark::State& state)
{

Expand All @@ -40,7 +53,8 @@ static void BM_CreateGraphOverhead(benchmark::State& state)
std::vector<LogicalForwardInfo> availableForwardsInfo;

WorkflowHelpers::verifyWorkflow(workflow);
WorkflowHelpers::injectServiceDevices(workflow);
auto context = makeEmptyConfigContext();
WorkflowHelpers::injectServiceDevices(workflow, *context);
WorkflowHelpers::constructGraph(workflow,
logicalEdges,
outputs,
Expand Down Expand Up @@ -75,7 +89,8 @@ static void BM_CreateGraphReverseOverhead(benchmark::State& state)
std::vector<LogicalForwardInfo> availableForwardsInfo;

WorkflowHelpers::verifyWorkflow(workflow);
WorkflowHelpers::injectServiceDevices(workflow);
auto context = makeEmptyConfigContext();
WorkflowHelpers::injectServiceDevices(workflow, *context);
WorkflowHelpers::constructGraph(workflow, logicalEdges,
outputs,
availableForwardsInfo);
Expand Down
29 changes: 23 additions & 6 deletions 29 Framework/Core/test/test_WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,28 @@
#define BOOST_TEST_DYN_LINK

#include "test_HelperMacros.h"
#include "Framework/ConfigContext.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/SimpleOptionsRetriever.h"
#include "../src/WorkflowHelpers.h"
#include <boost/test/unit_test.hpp>
#include <boost/test/tools/detail/per_element_manip.hpp>
#include <algorithm>
#include <memory>

using namespace o2::framework;

std::unique_ptr<ConfigContext> makeEmptyConfigContext()
{
// FIXME: Ugly... We need to fix ownership and make sure the ConfigContext
// either owns or shares ownership of the registry.
static std::unique_ptr<ParamRetriever> retriever(new SimpleOptionsRetriever);
static ConfigParamRegistry registry{std::move(retriever)};
auto context = std::make_unique<ConfigContext>(registry);
return context;
}

BOOST_AUTO_TEST_CASE(TestVerifyWorkflow)
{
using namespace o2::framework;
Expand Down Expand Up @@ -206,7 +219,8 @@ BOOST_AUTO_TEST_CASE(TestSimpleConnection)
std::vector<LogicalForwardInfo> availableForwardsInfo;

WorkflowHelpers::verifyWorkflow(workflow);
WorkflowHelpers::injectServiceDevices(workflow);
auto context = makeEmptyConfigContext();
WorkflowHelpers::injectServiceDevices(workflow, *context);
BOOST_CHECK_EQUAL(workflow.size(), 3);
WorkflowHelpers::constructGraph(workflow, logicalEdges,
outputs,
Expand Down Expand Up @@ -245,9 +259,9 @@ BOOST_AUTO_TEST_CASE(TestSimpleForward)
std::vector<DeviceConnectionEdge> logicalEdges;
std::vector<OutputSpec> outputs;
std::vector<LogicalForwardInfo> availableForwardsInfo;

WorkflowHelpers::verifyWorkflow(workflow);
WorkflowHelpers::injectServiceDevices(workflow);
auto context = makeEmptyConfigContext();
WorkflowHelpers::injectServiceDevices(workflow, *context);
WorkflowHelpers::constructGraph(workflow, logicalEdges,
outputs,
availableForwardsInfo);
Expand Down Expand Up @@ -303,7 +317,8 @@ BOOST_AUTO_TEST_CASE(TestGraphConstruction)
std::vector<OutputSpec> outputs;

WorkflowHelpers::verifyWorkflow(workflow);
WorkflowHelpers::injectServiceDevices(workflow);
auto context = makeEmptyConfigContext();
WorkflowHelpers::injectServiceDevices(workflow, *context);
WorkflowHelpers::constructGraph(workflow, logicalEdges,
outputs,
availableForwardsInfo);
Expand Down Expand Up @@ -427,7 +442,8 @@ BOOST_AUTO_TEST_CASE(TestExternalInput)

BOOST_CHECK_EQUAL(workflow.size(), 1);

WorkflowHelpers::injectServiceDevices(workflow);
auto context = makeEmptyConfigContext();
WorkflowHelpers::injectServiceDevices(workflow, *context);
// The added devices are the one which should connect to
// the condition DB and the sink for the dangling outputs.
BOOST_CHECK_EQUAL(workflow.size(), 3);
Expand Down Expand Up @@ -507,7 +523,8 @@ BOOST_AUTO_TEST_CASE(TestOriginWildcard)
std::vector<LogicalForwardInfo> availableForwardsInfo;

WorkflowHelpers::verifyWorkflow(workflow);
WorkflowHelpers::injectServiceDevices(workflow);
auto context = makeEmptyConfigContext();
WorkflowHelpers::injectServiceDevices(workflow, *context);
BOOST_CHECK_EQUAL(workflow.size(), 3);
BOOST_REQUIRE(workflow.size() >= 3);
BOOST_CHECK_EQUAL(workflow[0].name, "A");
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.