In this tutorial, we build a Streaming Decision Agent that thinks and acts in an online, changing environment while continuously streaming safe, partial reasoning updates. We implement a dynamic grid world with moving obstacles and a shifting goal, then use an online A* planner in a receding-horizon loop to commit to only a few near-term moves and re-evaluate frequently. As we execute, we make intermediate decisions that can override the plan when a step becomes invalid or locally risky, allowing us to adapt mid-run rather than unthinkingly following a stale trajectory.
import random, math, time
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional, Generator, Any
from collections import deque, defaultdict
try:
from pydantic import BaseModel, Field
except Exception:
raise RuntimeError("Please install pydantic: `!pip -q install pydantic` (then rerun).")
class StreamEvent(BaseModel):
t: float = Field(..., description="Wall-clock time (seconds since start)")
kind: str = Field(..., description="event type, e.g., plan/update/act/observe/done")
step: int = Field(..., description="agent step counter")
msg: str = Field(..., description="human-readable partial reasoning summary")
data: Dict[str, Any] = Field(default_factory=dict, description="structured payload")
Coord = Tuple[int, int]
We define the streaming event schema and core type structures that allow us to emit structured reasoning updates. We use Pydantic to formalize the structure of each streamed decision or observation safely and consistently. We establish the foundational interface that powers incremental reasoning throughout the agent lifecycle.
@dataclass
class DynamicGridWorld:
w: int = 18
h: int = 10
obstacle_ratio: float = 0.18
seed: int = 7
move_obstacles_every: int = 6
spawn_obstacle_prob: float = 0.25
clear_obstacle_prob: float = 0.15
target_jitter_prob: float = 0.35
rng: random.Random = field(init=False)
obstacles: set = field(init=False, default_factory=set)
agent: Coord = field(init=False, default=(1, 1))
target: Coord = field(init=False, default=(15, 7))
step_count: int = field(init=False, default=0)
def __post_init__(self):
self.rng = random.Random(self.seed)
self.reset()
def reset(self):
self.step_count = 0
self.obstacles = set()
for y in range(self.h):
for x in range(self.w):
if (x, y) in [(1, 1), (self.w - 2, self.h - 2)]:
continue
if self.rng.random() < self.obstacle_ratio:
self.obstacles.add((x, y))
self.agent = (1, 1)
self.target = (self.w - 2, self.h - 2)
self._ensure_free(self.agent)
self._ensure_free(self.target)
def _ensure_free(self, c: Coord):
if c in self.obstacles:
self.obstacles.remove(c)
def in_bounds(self, c: Coord) -> bool:
x, y = c
return 0 <= x < self.w and 0 <= y < self.h
def passable(self, c: Coord) -> bool:
return c not in self.obstacles
def neighbors4(self, c: Coord) -> List[Coord]:
x, y = c
cand = [(x+1,y), (x-1,y), (x,y+1), (x,y-1)]
return [p for p in cand if self.in_bounds(p) and self.passable(p)]
def manhattan(self, a: Coord, b: Coord) -> int:
return abs(a[0]-b[0]) + abs(a[1]-b[1])
def maybe_world_changes(self) -> Dict[str, Any]:
changes = {"obstacles_added": [], "obstacles_cleared": [], "target_moved": False}
self.step_count += 1
if self.rng.random() < self.target_jitter_prob:
tx, ty = self.target
options = [(tx+1,ty),(tx-1,ty),(tx,ty+1),(tx,ty-1)]
options = [c for c in options if self.in_bounds(c) and c != self.agent]
self.rng.shuffle(options)
for c in options[:3]:
if c not in self.obstacles:
self.target = c
changes["target_moved"] = True
break
if self.step_count % self.move_obstacles_every == 0:
for _ in range(4):
if self.rng.random() < self.clear_obstacle_prob and self.obstacles:
c = self.rng.choice(tuple(self.obstacles))
self.obstacles.remove(c)
changes["obstacles_cleared"].append(c)
for _ in range(5):
if self.rng.random() < self.spawn_obstacle_prob:
c = (self.rng.randrange(self.w), self.rng.randrange(self.h))
if c != self.agent and c != self.target:
self.obstacles.add(c)
changes["obstacles_added"].append(c)
self._ensure_free(self.agent)
self._ensure_free(self.target)
return changes
def step(self, action: str) -> Dict[str, Any]:
ax, ay = self.agent
move = {"R": (ax+1, ay), "L": (ax-1, ay), "D": (ax, ay+1), "U": (ax, ay-1), "S": (ax, ay)}[action]
moved = False
if self.in_bounds(move) and self.passable(move):
self.agent = move
moved = True
changes = self.maybe_world_changes()
done = (self.agent == self.target)
return {"moved": moved, "agent": self.agent, "target": self.target, "done": done, "changes": changes}
def render(self, path: Optional[List[Coord]] = None) -> str:
path_set = set(path or [])
lines = []
for y in range(self.h):
row = []
for x in range(self.w):
c = (x, y)
if c == self.agent:
row.append("A")
elif c == self.target:
row.append("T")
elif c in path_set:
row.append("·")
elif c in self.obstacles:
row.append("█")
else:
row.append(" ")
lines.append("".join(row))
border = "+" + "-" * self.w + "+"
body = "n".join(["|" + ln + "|" for ln in lines])
return f"{border}n{body}n{border}"
We construct a dynamic grid world that evolves with shifting obstacles and a moving target. We simulate environmental non-stationarity to test online planning under uncertainty. We implement rendering and world-transition logic to observe how the agent reacts to real-time changes.
@dataclass
class PlanResult:
path: List[Coord]
cost: float
expanded: int
reason: str
def astar(world: DynamicGridWorld, start: Coord, goal: Coord, max_expand: int = 5000) -> PlanResult:
frontier = []
import heapq
heapq.heappush(frontier, (world.manhattan(start, goal), 0, start))
came_from: Dict[Coord, Optional[Coord]] = {start: None}
gscore: Dict[Coord, float] = {start: 0}
expanded = 0
while frontier and expanded < max_expand:
f, g, cur = heapq.heappop(frontier)
expanded += 1
if cur == goal:
path = []
c = cur
while c is not None:
path.append(c)
c = came_from[c]
path.reverse()
return PlanResult(path=path, cost=gscore[cur], expanded=expanded, reason="found_path")
for nb in world.neighbors4(cur):
ng = gscore[cur] + 1
if nb not in gscore or ng < gscore[nb]:
gscore[nb] = ng
came_from[nb] = cur
h = world.manhattan(nb, goal)
heapq.heappush(frontier, (ng + h, ng, nb))
return PlanResult(path=[start], cost=float("inf"), expanded=expanded, reason="no_path_or_budget")
def path_to_actions(path: List[Coord]) -> List[str]:
actions = []
for (x1,y1),(x2,y2) in zip(path, path[1:]):
if x2 == x1+1 and y2 == y1: actions.append("R")
elif x2 == x1-1 and y2 == y1: actions.append("L")
elif x2 == x1 and y2 == y1+1: actions.append("D")
elif x2 == x1 and y2 == y1-1: actions.append("U")
else: actions.append("S")
return actions
def action_risk(world: DynamicGridWorld, next_pos: Coord) -> float:
x, y = next_pos
near = 0
for dx, dy in [(1,0),(-1,0),(0,1),(0,-1)]:
c = (x+dx, y+dy)
if world.in_bounds(c) and c in world.obstacles:
near += 1
edge = 1 if (x in [0, world.w-1] or y in [0, world.h-1]) else 0
return 0.25 * near + 0.15 * edge
We implement the A* online planner along with action extraction and local risk evaluation. We compute shortest paths incrementally while respecting a computational budget. We also introduce a lightweight risk model to enable us to override unsafe planned moves during execution.
@dataclass
class AgentConfig:
horizon: int = 6
replan_on_target_move: bool = True
replan_on_obstacle_change: bool = True
max_steps: int = 120
think_latency: float = 0.02
act_latency: float = 0.01
risk_gate: float = 0.85
alt_search_depth: int = 2
@dataclass
class StreamingDecisionAgent:
cfg: AgentConfig
world: DynamicGridWorld
start_time: float = field(init=False, default_factory=time.time)
step_id: int = field(init=False, default=0)
current_plan: List[Coord] = field(init=False, default_factory=list)
current_actions: List[str] = field(init=False, default_factory=list)
last_snapshot: Dict[str, Any] = field(init=False, default_factory=dict)
stats: Dict[str, Any] = field(init=False, default_factory=lambda: defaultdict(int))
def _now(self) -> float:
return time.time() - self.start_time
def _emit(self, kind: str, msg: str, data: Optional[Dict[str, Any]] = None) -> StreamEvent:
return StreamEvent(t=self._now(), kind=kind, step=self.step_id, msg=msg, data=data or {})
def _need_replan(self, obs: Dict[str, Any]) -> bool:
ch = obs["changes"]
if obs["done"]:
return False
if not self.current_plan or len(self.current_plan) <= 1:
return True
if self.cfg.replan_on_target_move and ch.get("target_moved"):
return True
if self.cfg.replan_on_obstacle_change and (ch.get("obstacles_added") or ch.get("obstacles_cleared")):
return True
if len(self.current_plan) > 1 and self.current_plan[1] in self.world.obstacles:
return True
return False
def _plan(self) -> PlanResult:
time.sleep(self.cfg.think_latency)
self.stats["replans"] += 1
return astar(self.world, self.world.agent, self.world.target)
def _choose_action(self, planned_action: str) -> Tuple[str, str]:
ax, ay = self.world.agent
action_to_delta = {"R": (1,0), "L": (-1,0), "D": (0,1), "U": (0,-1), "S": (0,0)}
dx, dy = action_to_delta[planned_action]
nxt = (ax+dx, ay+dy)
if not self.world.in_bounds(nxt) or not self.world.passable(nxt):
self.stats["overrides"] += 1
return "S", "planned_move_invalid -> wait."
r = action_risk(self.world, nxt)
if r > self.cfg.risk_gate:
candidates = ["U","D","L","R","S"]
best = (planned_action, float("inf"), "keep_plan")
for a in candidates:
dx, dy = action_to_delta[a]
p = (ax+dx, ay+dy)
if not self.world.in_bounds(p) or not self.world.passable(p):
continue
score = action_risk(self.world, p) + 0.05 * self.world.manhattan(p, self.world.target)
if score < best[1]:
best = (a, score, "risk_avoidance_override")
if best[0] != planned_action:
self.stats["overrides"] += 1
return best[0], best[2]
return planned_action, "follow_plan"
def run(self) -> Generator[StreamEvent, None, None]:
yield self._emit("observe", "Initialize: reading initial state.", {"agent": self.world.agent, "target": self.world.target})
yield self._emit("world", "Initial world snapshot.", {"grid": self.world.render()})
for self.step_id in range(1, self.cfg.max_steps + 1):
if self.step_id == 1 or self._need_replan(self.last_snapshot):
pr = self._plan()
self.current_plan = pr.path
self.current_actions = path_to_actions(pr.path)
if pr.reason != "found_path":
yield self._emit("plan", "Planner could not find a path within budget; switching to reactive exploration.", {"reason": pr.reason, "expanded": pr.expanded})
self.current_actions = []
else:
horizon_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]
yield self._emit("plan", f"Plan updated (online A*). Commit to next {len(horizon_path)-1} moves, then re-evaluate.", {"reason": pr.reason, "path_len": len(pr.path), "expanded": pr.expanded, "commit_horizon": self.cfg.horizon, "horizon_path": horizon_path, "grid_with_path": self.world.render(path=horizon_path)})
if self.current_actions:
planned_action = self.current_actions[0]
else:
ax, ay = self.world.agent
tx, ty = self.world.target
options = []
if tx > ax: options.append("R")
if tx < ax: options.append("L")
if ty > ay: options.append("D")
if ty < ay: options.append("U")
options += ["S","U","D","L","R"]
planned_action = options[0]
action, why = self._choose_action(planned_action)
yield self._emit("decide", f"Intermediate decision: action={action} ({why}).", {"planned_action": planned_action, "chosen_action": action, "agent": self.world.agent, "target": self.world.target})
time.sleep(self.cfg.act_latency)
obs = self.world.step(action)
self.last_snapshot = obs
if self.current_actions:
if action == planned_action:
self.current_actions = self.current_actions[1:]
if len(self.current_plan) > 1:
self.current_plan = self.current_plan[1:]
ch = obs["changes"]
surprise = []
if ch.get("target_moved"): surprise.append("target_moved")
if ch.get("obstacles_added"): surprise.append(f"obstacles_added={len(ch['obstacles_added'])}")
if ch.get("obstacles_cleared"): surprise.append(f"obstacles_cleared={len(ch['obstacles_cleared'])}")
surprise_msg = ("Surprises: " + ", ".join(surprise)) if surprise else "No major surprises."
self.stats["steps"] += 1
if obs["moved"]: self.stats["moves"] += 1
if ch.get("target_moved"): self.stats["target_moves"] += 1
if ch.get("obstacles_added") or ch.get("obstacles_cleared"): self.stats["world_shifts"] += 1
yield self._emit("observe", f"Observed outcome. {surprise_msg}", {"moved": obs["moved"], "agent": obs["agent"], "target": obs["target"], "done": obs["done"], "changes": ch, "grid": self.world.render(path=self.current_plan[: min(len(self.current_plan), 10)])})
if obs["done"]:
yield self._emit("done", "Goal reached. Stopping execution.", {"final_agent": obs["agent"], "final_target": obs["target"], "stats": dict(self.stats)})
return
yield self._emit("done", "Max steps reached without reaching the goal.", {"final_agent": self.world.agent, "final_target": self.world.target, "stats": dict(self.stats)})
We design the Streaming Decision Agent that integrates planning, monitoring, and reactive overrides. We implement receding-horizon control, committing only to near-term steps and replanning when surprises occur. We stream structured events at every stage, planning, deciding, acting, and observing, to demonstrate incremental reasoning in action.
def run_and_print(agent: StreamingDecisionAgent, throttle: float = 0.0):
last_kind = None
for ev in agent.run():
header = f"[t={ev.t:6.2f}s | step={ev.step:03d} | {ev.kind.upper():7}]"
print(header, ev.msg)
if ev.kind in {"plan", "observe", "world"}:
if "grid_with_path" in ev.data:
print(ev.data["grid_with_path"])
elif "grid" in ev.data:
print(ev.data["grid"])
if throttle > 0:
time.sleep(throttle)
world = DynamicGridWorld(w=18, h=10, obstacle_ratio=0.18, seed=10, move_obstacles_every=6)
cfg = AgentConfig(horizon=6, replan_on_target_move=True, replan_on_obstacle_change=True, max_steps=120, think_latency=0.01, act_latency=0.01, risk_gate=0.85, alt_search_depth=2)
agent = StreamingDecisionAgent(cfg=cfg, world=world)
run_and_print(agent, throttle=0.0)
We build the streaming runner and execute the full agent loop inside the dynamic environment. We print structured reasoning updates in real time to simulate a live decision stream. We finally observe the agent adapting continuously, replanning when needed, and completing the task under changing conditions.
In conclusion, we have an agent that demonstrates incremental reasoning, online planning, and reactive behavior, and it is easy to run and inspect in Colab. We saw how streaming structured events makes the agent’s decision process observable while still keeping reasoning summaries concise and user-safe. Also, we showed how continuous monitoring and replanning turn a static planner into a responsive system that can handle surprises, moving targets, changing obstacles, and step-level risk without stopping execution.
Check out Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments appeared first on MarkTechPost.
