Building an AI-Driven Scientific Workflow & Chatbot with Nodeology
Introduction
Nodeology is a new AI workflow building library that simplifies creating robust state machine workflows through an intuitive, accessible interface. The framework enables researchers—especially those without extensive programming experience—to quickly design and deploy full-stack AI workflows simply using prompt templates and existing functions. In this post, we’ll introduce the library by building a simple interactive AI-assisted particle trajectory simulation workflow using nodeology
. The final result will be a chatbot that:
- Asks for user input on physical parameters for a simulated particle.
- Calculates the particle’s trajectory under electric and magnetic fields.
- Plots the path as a 3D visualization.
- Analyzes the motion using a Vision-Language Model (VLM).
- Decides whether to continue or finish based on user feedback.
We’ll break everything down step-by-step: from understanding Nodeology’s core concepts—State, Nodes, and Workflows—to hooking them up in an interactive UI.
Understanding Nodeology’s Core Concepts
State
The State
is like a shared “backpack” of data that every step (or Node
) in the workflow can access and update. In this specific example, it keeps track of:
- Parameters for the simulation (like
mass
,charge
, etc.). - Intermediary results (e.g., a 3D plot).
- User inputs and conversation history.
Think of State
as the single source of truth that travels with you through the workflow’s entire lifecycle.
Nodes
The Node
is the basic building block of a workflow. It can be of two types:
- A prompt-based Node uses an LLM/VLM with a prompt template. It reads certain fields from the
State
, and upon calling the model, updates or adds new values in theState
. - A function-based Node is just a decorated Python function. It can do classical scientific or data processing tasks (numerical simulation, file I/O, plotting, etc.), then store results back into the
State
.
This mix of AI-driven logic and traditional Python functions is exactly how Nodeology unifies AI with domain-specific code.
Workflow
A Workflow
in Nodeology is a directed graph of Nodes
:
- You define the flow: “After Node A finishes, pass control to Node B.”
- You can add conditional branches: “If some check is true, do Node X; otherwise do Node Y.”
- You can integrate user interactions: “Wait for user input and then proceed.”
Everything is done within Nodeology’s Workflow
class, which simplifies typical chores like error handling, concurrency, checkpointing, and so forth.
Defining Our Simulation State
We’ll begin by specifying which data we expect to carry throughout the workflow. This includes physical parameters (e.g., mass
, charge
, initial_velocity
, etc.), plus placeholders for the user’s analysis or plot results.
Click to expand the code
from typing import List, Dict
import numpy as np
from nodeology.state import State
class TrajectoryState(State):
"""
State for our particle trajectory workflow.
It holds everything from initial parameters,
user confirmations, and final analysis.
"""
float # Particle mass (kg)
mass: float # Particle charge (C)
charge: # [vx, vy, vz]
initial_velocity: np.ndarray # [Ex, Ey, Ez]
E_field: np.ndarray # [Bx, By, Bz]
B_field: np.ndarray
# A boolean for user confirmation
bool
confirm_parameters:
# For storing the raw JSON from an LLM-based node
str
parameters_updater_output:
# The computed 3D positions of the particle
positions: List[np.ndarray]
# We'll store a path or figure for the plotted trajectory
str
trajectory_plot: str
trajectory_plot_path:
# Where the LLM’s analysis result will land
analysis_result: Dict
# A boolean for continuing or finishing
bool continue_simulation:
Key Takeaway: If you want a new value to persist or be shared across nodes, define it here.
Writing Our Nodes
Displaying Parameters
We’ll start small: a Node that shows the current simulation parameters to the user.
We’ll decorate a simple Python function with @as_node(...)
. This way, Nodeology recognizes it as a building block in the workflow.
Click to expand the code
import chainlit as cl
from chainlit import Message, run_sync
from nodeology.node import as_node
@as_node(sink=[])
def display_parameters(
float,
mass: float,
charge:
initial_velocity: np.ndarray,
E_field: np.ndarray,
B_field: np.ndarray,
):"""
Display the current simulation parameters using
Chainlit's custom UI element.
"""
= {
parameters "Mass (kg)": mass,
"Charge (C)": charge,
"Initial Velocity (m/s)": initial_velocity.tolist(),
"Electric Field (N/C)": E_field.tolist(),
"Magnetic Field (T)": B_field.tolist(),
}
run_sync(
Message(="Below are the current simulation parameters:",
content=[
elements
cl.CustomElement(="DataDisplay",
name={
props"data": parameters,
"title": "Particle Parameters",
"badge": "Configured",
},
)
],
).send()
)# No return value needed, so sink=[] above
When the workflow runs, it calls this function, pulls the relevant fields from the State, then sends a UI message with the data.
Confirming Parameters
We want an interactive step where the user can say “Yes, I’m okay with these parameters” or “No, let me adjust.” This is also done with a function-based Node:
Click to expand the code
from chainlit import AskActionMessage
@as_node(sink="confirm_parameters")
def ask_confirm_parameters():
"""
Prompt user with 2 actions: 'Yes' or 'No'.
Return True if 'Yes', False otherwise.
"""
= run_sync(
res
AskActionMessage(="Are you happy with the parameters?",
content=[
actions="yes", payload={"value": "yes"}, label="Yes"),
cl.Action(name="no", payload={"value": "no"}, label="No"),
cl.Action(name
],
).send()
)# We store the boolean in 'confirm_parameters' in the State
if res and res["payload"]["value"] == "yes":
return True
else:
return False
Gathering User Edits
If the user chooses “No,” we want to ask for new parameters:
Click to expand the code
from chainlit import AskUserMessage
@as_node(sink=["human_input"])
def ask_parameters_input():
"""
Wait for a text message from the user specifying
how they want to update the parameters.
"""
= run_sync(
user_msg
AskUserMessage(="Please let me know how you want to change any of the parameters :)",
content
).send()
)return user_msg["output"] # We'll store this in 'human_input'
Updating Parameters
Now we let the LLM parse the user’s text and produce new numeric values. A typical instruction might be "Change the magnetic field to 1e4 1e4 0"
. The LLM can create a JSON object with updated mass
, charge
, E_field
, etc.
Click to expand the code
from nodeology.node import Node
import json
import numpy as np
= Node(
parameters_updater ="parameters_updater",
node_type="""Update the parameters based on the user's input.
prompt_template
Current parameters:
mass: {mass}
charge: {charge}
initial_velocity: {initial_velocity}
E_field: {E_field}
B_field: {B_field}
User input:
{human_input}
Please return the updated parameters in JSON format.
{
"mass": float,
"charge": float,
"initial_velocity": list[float],
"E_field": list[float],
"B_field": list[float]
}
""",
="parameters_updater_output", # We'll store the LLM response here (raw text/JSON)
sink="json",
sink_format
)
# We'll need a post-processing function to interpret that JSON
def parameters_updater_transform(state, client, **kwargs):
# Convert the LLM's output from text to Python objects
= json.loads(state["parameters_updater_output"])
params_dict "mass"] = params_dict["mass"]
state["charge"] = params_dict["charge"]
state["initial_velocity"] = np.array(params_dict["initial_velocity"])
state["E_field"] = np.array(params_dict["E_field"])
state["B_field"] = np.array(params_dict["B_field"])
state[return state
= parameters_updater_transform parameters_updater.post_process
Calculating the Trajectory
Next, a pure Python function that solves the equations of motion for a charged particle via the Lorentz force. This is the “traditional” science piece:
Click to expand the code
import tempfile
from scipy.integrate import solve_ivp
from typing import List
@as_node(sink=["positions"])
def calculate_trajectory(
float,
mass: float,
charge:
initial_velocity: np.ndarray,
E_field: np.ndarray,
B_field: np.ndarray,-> List[np.ndarray]:
) """
Numerically integrate the trajectory of a particle under
electric & magnetic fields, returning the positions over time.
"""
# Estimate cyclotron period if B is non-zero:
= np.linalg.norm(B_field)
B_magnitude if B_magnitude == 0 or charge == 0:
= 1e-6
cyclotron_period else:
= abs(charge) * B_magnitude / mass
cyclotron_frequency = 2 * np.pi / cyclotron_frequency
cyclotron_period
# We'll simulate 5 cycles, each with 100 steps
= 5
num_periods = 100
num_points_per_period = num_periods * cyclotron_period
total_time = num_periods * num_points_per_period
total_points
= np.linspace(0, total_time, total_points)
time_points
def lorentz_force(t, state):
= state[3:]
vel = charge * (E_field + np.cross(vel, B_field))
force = force / mass
acc return np.concatenate([vel, acc])
= np.array([0.0, 0.0, 0.0])
initial_position = np.concatenate([initial_position, initial_velocity])
initial_state
= solve_ivp(
sol
lorentz_force,0], time_points[-1]),
(time_points[
initial_state,=time_points,
t_eval="RK45",
method=1e-8,
rtol
)
if not sol.success:
# Return zeros if something fails
return [np.zeros(3) for _ in range(len(time_points))]
return [sol.y[:3, i] for i in range(len(time_points))]
Plotting the Trajectory
We can create a plot (e.g., Plotly 3D scatter) and display it through Chainlit:
Click to expand the code
import plotly.graph_objects as go
@as_node(sink=["trajectory_plot", "trajectory_plot_path"])
def plot_trajectory(positions: List[np.ndarray]) -> str:
"""
Generate a 3D plot of the positions and
save it to a temporary file.
"""
= np.array(positions)
arr = go.Figure(
fig =[
data
go.Scatter3d(=arr[:, 0],
x=arr[:, 1],
y=arr[:, 2],
z="lines",
mode=dict(width=4, color="green"),
line
)
]
)
fig.update_layout(=dict(xaxis_title="X (m)", yaxis_title="Y (m)", zaxis_title="Z (m)")
scene
)
= tempfile.mktemp(suffix=".png")
image_path
fig.write_image(image_path)
# Display in UI
run_sync(
Message(="Below is the trajectory plot:",
content=[cl.Plotly(figure=fig)],
elements
).send()
)
# Return figure + path
return fig, image_path
Analyzing the Trajectory
Finally, a Node where the LLM interprets the motion qualitatively:
Click to expand the code
= Node(
trajectory_analyzer ="trajectory_analyzer",
node_type="""Analyze this particle trajectory plot.
prompt_template
Please determine:
1. The type of motion (linear, circular, helical, or chaotic)
2. Key physical features (radius, period, pitch angle if applicable)
3. Explanation of the motion
4. Anomalies in the motion
Output in JSON format:
{
"trajectory_type": "type_name",
"key_features": {
"feature1": value,
"feature2": value
},
"explanation": "detailed explanation",
"anomalies": "anomaly description"
}""",
="analysis_result",
sink="json",
sink_format=["trajectory_plot_path"], # for image-based context, if the LLM can handle it
image_keys
)
def display_trajectory_analyzer_result(state, client, **kwargs):
"""
A post-process function that picks up the LLM's
output, converts it to a dict, and displays it nicely.
"""
import json
"analysis_result"] = json.loads(state["analysis_result"])
state[
run_sync(
Message(="Here is the trajectory analysis:",
content=[
elements
cl.CustomElement(="DataDisplay",
name={
props"data": state["analysis_result"],
"title": "Trajectory Analysis",
"badge": state["analysis_result"].get("trajectory_type", "Unknown"),
},
)
],
).send()
)return state
= display_trajectory_analyzer_result trajectory_analyzer.post_process
Asking for More Simulation or Finish
We’ll close the loop by letting the user decide if they want to run more cycles:
Click to expand the code
@as_node(sink="continue_simulation")
def ask_continue_simulation():
= run_sync(
res
AskActionMessage(="Would you like to continue the simulation?",
content=[
actions="continue", payload={"value": "continue"}, label="Continue Simulation"),
cl.Action(name="finish", payload={"value": "finish"}, label="Finish"),
cl.Action(name
],
).send()
)return (res and res["payload"]["value"] == "continue")
Orchestrating Everything in a Workflow
Now we piece all Nodes together in a Workflow
. Let’s define it:
from langgraph.graph import END
from nodeology.workflow import Workflow
class TrajectoryWorkflow(Workflow):
def create_workflow(self):
# 1) Register our Nodes
self.add_node("display_parameters", display_parameters)
self.add_node("ask_confirm_parameters", ask_confirm_parameters)
self.add_node("ask_parameters_input", ask_parameters_input)
self.add_node("update_parameters", parameters_updater)
self.add_node("calculate_trajectory", calculate_trajectory)
self.add_node("plot_trajectory", plot_trajectory)
self.add_node("analyze_trajectory", trajectory_analyzer)
self.add_node("ask_continue_simulation", ask_continue_simulation)
# 2) Define the flow (edges)
self.add_flow("display_parameters", "ask_confirm_parameters")
self.add_conditional_flow(
"ask_confirm_parameters",
"confirm_parameters", # If user is happy
="calculate_trajectory",
then="ask_parameters_input", # If not
otherwise
)self.add_flow("ask_parameters_input", "update_parameters")
self.add_flow("update_parameters", "display_parameters")
self.add_flow("calculate_trajectory", "plot_trajectory")
self.add_flow("plot_trajectory", "analyze_trajectory")
self.add_flow("analyze_trajectory", "ask_continue_simulation")
self.add_conditional_flow(
"ask_continue_simulation",
"continue_simulation", # If user wants to keep going
="display_parameters",
then=END, # If user says “finish”
otherwise
)
# 3) Set the first Node to run
self.set_entry("display_parameters")
# 4) Compile
self.compile()
Note: Once compiled, Nodeology internally organizes everything as a state machine with branching logic, enabling flexible re-runs and so on.
Launching the Interactive Chat
At last, we bring it all together. We instantiate our workflow with an initial set of parameters and run it in UI mode:
Click to expand the code
# 1) Create the workflow
= TrajectoryWorkflow(
workflow =TrajectoryState, # Our custom state
state_defs="gemini/gemini-2.0-flash",
llm_name="gemini/gemini-2.0-flash",
vlm_name=False
debug_mode
)
# 2) Provide initial data
= {
initial_state "mass": 9.1093837015e-31, # electron mass
"charge": -1.602176634e-19,
"initial_velocity": np.array([1e6, 1e6, 1e6]),
"E_field": np.array([5e6, 1e6, 5e6]),
"B_field": np.array([0.0, 0.0, 50000.0]),
}
# 3) Run with a user interface
= workflow.run(init_values=initial_state, ui=True) result
When you run this file, a Chainlit web server will pop up in your console logs, typically at http://localhost:8000
. Open that address in your browser and you now have a complete, AI-driven pipeline that merges human interaction, classical simulation, and LLM-based analysis. The user sees a straightforward chat interface, while under the hood, Nodeology coordinates each Node in a robust workflow. Below is a short video showing the workflow in action:
Visualizing, Exporting and Sharing
You can visualize your workflow as a mermaid
graph or export it as a .yaml
file:
Click to expand the code
# Visualize workflow as a mermaid graph
workflow.graph.get_graph().draw_mermaid_png(="particle_trajectory_analysis.png"
output_file_path )
Click to expand the code
# Export workflow to YAML file for sharing
"particle_trajectory_analysis.yaml") workflow.to_yaml(
Click to expand the YAML file
# Example of the exported YAML file
name: TrajectoryWorkflow_03_13_2025_20_06_45
state_defs:
- current_node_type: str
- previous_node_type: str
- human_input: str
- input: str
- output: str
- messages: List[dict]
- mass: float
- charge: float
- initial_velocity: ndarray
- E_field: ndarray
- B_field: ndarray
- confirm_parameters: bool
- parameters_updater_output: str
- positions: List[ndarray]
- trajectory_plot: str
- trajectory_plot_path: str
- analysis_result: dict
- continue_simulation: bool
nodes:
display_parameters:
type: display_parameters
next: ask_confirm_parameters
ask_confirm_parameters:
type: ask_confirm_parameters
sink: confirm_parameters
next:
condition: confirm_parameters
then: calculate_trajectory
otherwise: ask_parameters_input
ask_parameters_input:
type: ask_parameters_input
sink: human_input
next: update_parameters
update_parameters:
type: prompt
template: 'Update the parameters based on the user''s input. Current parameters:
mass: {mass} charge: {charge} initial_velocity: {initial_velocity} E_field:
{E_field} B_field: {B_field} User input: {human_input} Please return the updated
parameters in JSON format. {{ "mass": float, "charge": float, "initial_velocity":
list[float], "E_field": list[float], "B_field": list[float] }}'
sink: parameters_updater_output
next: display_parameters
calculate_trajectory:
type: calculate_trajectory
sink: positions
next: plot_trajectory
plot_trajectory:
type: plot_trajectory
sink: [trajectory_plot, trajectory_plot_path]
next: analyze_trajectory
analyze_trajectory:
type: prompt
template: 'Analyze this particle trajectory plot. Please determine: 1. The type
of motion (linear, circular, helical, or chaotic) 2. Key physical features (radius,
period, pitch angle if applicable) 3. Explanation of the motion 4. Anomalies
in the motion Output in JSON format: {{ "trajectory_type": "type_name", "key_features":
{ "feature1": value, "feature2": value }, "explanation": "detailed explanation",
"anomalies": "anomaly description" }}'
sink: analysis_result
image_keys: trajectory_plot_path
next: ask_continue_simulation
ask_continue_simulation:
type: ask_continue_simulation
sink: continue_simulation
next:
condition: continue_simulation
then: display_parameters
otherwise: END
entry_point: display_parameters
llm: gemini/gemini-2.0-flash
vlm: gemini/gemini-2.0-flash
exit_commands: [stop workflow, quit workflow, terminate workflow]
Happy Building! 🏗️ Nodeology is constantly evolving, so keep an eye on updates 👀 😊
Citation
@online{yin2025,
author = {Yin, Xiangyu},
title = {Building an {AI-Driven} {Scientific} {Workflow} \& {Chatbot}
with {Nodeology}},
date = {2025-03-13},
url = {https://xiangyu-yin.com/content/post_nodeology_example.html},
langid = {en}
}