Thrown when custom inputs for dffml.dataflow.run do not list an input with string as its primitive as the first input.
Thrown when outputs for a custom dffml.dataflow.run do not match that of it’s subflow.
By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.
Examples
>>> from dffml import config
>>>
>>> @config
... class MyConfig:
... C: int
>>>
>>> config = MyConfig(C=2)
>>> with config.no_enforce_immutable():
... config.C = 1
Starts a subflow self.config.dataflow and adds inputs in it.
inputs (dict) – The inputs to add to the subflow. These should be a key value mapping of the context string to the inputs which should be seeded for that context string.
Maps context strings in inputs to output after running through dataflow.
Examples
The following shows how to use run dataflow in its default behavior.
>>> import asyncio
>>> from dffml import *
>>>
>>> URL = Definition(name="URL", primitive="string")
>>>
>>> subflow = DataFlow.auto(GetSingle)
>>> subflow.definitions[URL.name] = URL
>>> subflow.seed.append(
... Input(
... value=[URL.name],
... definition=GetSingle.op.inputs["spec"]
... )
... )
>>>
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
>>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow)
>>> dataflow.seed.append(
... Input(
... value=[run_dataflow.op.outputs["results"].name],
... definition=GetSingle.op.inputs["spec"]
... )
... )
>>>
>>> async def main():
... async for ctx, results in MemoryOrchestrator.run(dataflow, {
... "run_subflow": [
... Input(
... value={
... "dffml": [
... {
... "value": "https://github.com/intel/dffml",
... "definition": URL.name
... }
... ]
... },
... definition=run_dataflow.op.inputs["inputs"]
... )
... ]
... }):
... print(results)
>>>
>>> asyncio.run(main())
{'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}
The following shows how to use run dataflow with custom inputs and outputs. This allows you to run a subflow as if it were an operation.
>>> import asyncio
>>> from dffml import *
>>>
>>> URL = Definition(name="URL", primitive="string")
>>>
>>> @op(
... inputs={"url": URL},
... outputs={"last": Definition("last_element_in_path", primitive="string")},
... )
... def last_path(url):
... return {"last": url.split("/")[-1]}
>>>
>>> subflow = DataFlow.auto(last_path, GetSingle)
>>> subflow.seed.append(
... Input(
... value=[last_path.op.outputs["last"].name],
... definition=GetSingle.op.inputs["spec"],
... )
... )
>>>
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
>>> dataflow.operations[run_dataflow.op.name] = run_dataflow.op._replace(
... inputs={"URL": URL},
... outputs={last_path.op.outputs["last"].name: last_path.op.outputs["last"]},
... expand=[],
... )
>>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow)
>>> dataflow.seed.append(
... Input(
... value=[last_path.op.outputs["last"].name],
... definition=GetSingle.op.inputs["spec"],
... )
... )
>>> dataflow.update(auto_flow=True)
>>>
>>> async def main():
... async for ctx, results in MemoryOrchestrator.run(
... dataflow,
... {
... "run_subflow": [
... Input(value="https://github.com/intel/dffml", definition=URL)
... ]
... },
... ):
... print(results)
>>>
>>> asyncio.run(main())
{'last_element_in_path': 'dffml'}
alias of DffmlDataflowRunImplementation