Fork Flow
Fork Flow is the planned user-configurable pipeline system in FluxMQ.
Flows are built from sources, triggers, filters, mappers, routers, and sinks.
source or trigger
-> filter, router, or mapper
-> sink or projectionExamples
Record and inspect messages:
MQTT connection resource
-> MQTT trigger
-> Topic filter
-> Payload inspector
-> UI projectionRecord selected messages:
MQTT connection resource
-> MQTT trigger
-> Topic filter
-> Recording sinkReplay selected traffic:
Replay source
-> Topic filter
-> MQTT publish sinkReact to connection state:
Connection state trigger
-> State router
-> Notification sinkBranch live traffic:
MQTT connection resource
-> MQTT trigger
-> Condition router
-> Matching branch / non-matching branchObserve traffic:
MQTT connection resource
-> MQTT trigger
-> Metrics sink
-> UI projectionDesign Goal
The same flow application definition should eventually be editable through configuration and through a drag-and-drop interface.
FluxMQ now uses the .NET configuration system as the first loading path for flow application definitions. A JSON file can provide FluxMq:FlowApplication, and future hosts can layer command-line values, environment values, saved settings, or UI-produced definitions through the same configuration model.
Application Definition Shape
A Fork Flow application definition describes shared resources, workflows, nodes, and links. It is the configuration package the future runtime will load regardless of whether FluxMQ is hosted by the desktop app, a console runner, or another tool process.
Flow application definition
-> resources
-> workflows
-> nodes
-> receiving port linksEach workflow is an object. Each node is a property inside that workflow object.
{
"resources": {
"broker": {
"type": "mqtt.connection",
"configuration": {
"profile": {
"name": "local-broker",
"host": "localhost",
"port": 1883
}
}
}
},
"workflows": {
"observeTraffic": {
"trigger": {
"type": "mqtt.trigger",
"configuration": {
"connection": "broker",
"subscriptions": [
"factory/#",
{ "topicFilter": "telemetry/#", "qos": 1 }
]
}
},
"metrics": {
"type": "mqtt.metrics-sink",
"Input": "trigger.Output"
}
}
}
}Links are declared on receiving ports. A port can accept one link, many links, or link objects with a condition.
{
"Input": [
"source.Output",
{
"From": "replay.Output",
"When": "topic.startsWith('factory/')"
}
]
}A component-level When can provide the default condition for links that do not specify one.
Validation catches broken node references, malformed links, empty ports, and duplicate links before the flow runs.
The first runtime builder can create registered node types and link compatible typed ports from the definition. If a node type or port is missing, or two ports carry incompatible value types, the build returns errors instead of starting a partial flow.
The first registered runtime component types are:
mqtt.connection: shared resource that owns an MQTT session and publishesFlowErroronErrors.mqtt.trigger: workflow node that references a connection resource, subscribes to topic filters, emitsMqttEnvelopeonOutput, and publishesFlowErroronErrors.mqtt.payload-inspector:InputreceivesMqttEnvelope,OutputpublishesInspectedMqttMessage, andErrorspublishesFlowError.mqtt.metrics-sink:InputreceivesMqttEnvelope,SnapshotspublishesMqttMetricsSnapshot, andErrorspublishesFlowError.
Connection configuration supports:
profileobject (namerequired; host/port/clientId/useTls/username/password/keepAliveSeconds/cleanStart optional)
Trigger configuration supports:
connectionresource namesubscriptionsas string, array of strings, or array of objectsqosper subscription as0|1|2orAtMostOnce|AtLeastOnce|ExactlyOnce- optional
boundedCapacity
The mapper and metrics nodes were chosen first because they have stable constructors and do not need external services or expression configuration.
The first host boundary can build and control a configured flow application from this section:
{
"FluxMq": {
"FlowApplication": {
"workflows": {
"observe": {
"metrics": {
"type": "mqtt.metrics-sink"
}
}
}
}
}
}The first CLI command validates this shape from a JSON file:
dotnet run --project src/FluxMq.Cli -- validate --config samples/flow-applications/metrics-only.jsonFor automation, add --output json to receive structured validation results on standard output.
The same file can be started through the command-line host lifecycle:
dotnet run --project src/FluxMq.Cli -- run --config samples/flow-applications/metrics-only.json --duration-ms 1000Runtime factories can now tell whether they are building a shared resource or a workflow node. Startable resources are started before workflow nodes, and workflow nodes are stopped and disposed before shared resources.
Reloading will be owned by the runtime layer. The UI can edit and save definitions, but the runtime is responsible for validating the next definition, keeping unaffected resources alive, patching workflow graphs, and reporting reload failures.
The first desktop alpha includes a Blazor.Diagrams canvas that projects the current definition into nodes and links, a JSON editor for the same definition, local file save/load, and run controls that call the same host boundary as the command-line host.
Current Building Blocks
- MQTT connection: owns the shared broker session.
- MQTT trigger: subscribes through a connection and emits matching live messages.
- Replay source: emits messages from a stored recording.
- Topic filter: forwards only matching messages.
- Condition router: sends each message to a true or false branch.
- Payload inspector: converts raw payloads into readable inspection results.
- MQTT publish sink: publishes messages through an active session.
- Recording sink: stores messages for a recording session.
- Metrics sink: tracks counters and broadcasts metric snapshots.