How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments

How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments

In this tutorial, we build a Streaming Decision Agent that thinks and acts in an online, changing environment while continuously streaming safe, partial reasoning updates. We implement a dynamic grid world with moving obstacles and a shifting goal, then use an online A* planner in a receding-horizon loop to commit to only a few near-term moves and re-evaluate frequently. As we execute, we make intermediate decisions that can override the plan when a step becomes invalid or locally risky, allowing us to adapt mid-run rather than unthinkingly following a stale trajectory.

import random, math, time
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional, Generator, Any
from collections import deque, defaultdict


try:
   from pydantic import BaseModel, Field
except Exception:
   raise RuntimeError("Please install pydantic: `!pip -q install pydantic` (then rerun).")


class StreamEvent(BaseModel):
   t: float = Field(..., description="Wall-clock time (seconds since start)")
   kind: str = Field(..., description="event type, e.g., plan/update/act/observe/done")
   step: int = Field(..., description="agent step counter")
   msg: str = Field(..., description="human-readable partial reasoning summary")
   data: Dict[str, Any] = Field(default_factory=dict, description="structured payload")


Coord = Tuple[int, int]

We define the streaming event schema and core type structures that allow us to emit structured reasoning updates. We use Pydantic to formalize the structure of each streamed decision or observation safely and consistently. We establish the foundational interface that powers incremental reasoning throughout the agent lifecycle.

@dataclass
class DynamicGridWorld:
   w: int = 18
   h: int = 10
   obstacle_ratio: float = 0.18
   seed: int = 7
   move_obstacles_every: int = 6
   spawn_obstacle_prob: float = 0.25
   clear_obstacle_prob: float = 0.15
   target_jitter_prob: float = 0.35
   rng: random.Random = field(init=False)
   obstacles: set = field(init=False, default_factory=set)
   agent: Coord = field(init=False, default=(1, 1))
   target: Coord = field(init=False, default=(15, 7))
   step_count: int = field(init=False, default=0)


   def __post_init__(self):
       self.rng = random.Random(self.seed)
       self.reset()


   def reset(self):
       self.step_count = 0
       self.obstacles = set()
       for y in range(self.h):
           for x in range(self.w):
               if (x, y) in [(1, 1), (self.w - 2, self.h - 2)]:
                   continue
               if self.rng.random() < self.obstacle_ratio:
                   self.obstacles.add((x, y))
       self.agent = (1, 1)
       self.target = (self.w - 2, self.h - 2)
       self._ensure_free(self.agent)
       self._ensure_free(self.target)


   def _ensure_free(self, c: Coord):
       if c in self.obstacles:
           self.obstacles.remove(c)


   def in_bounds(self, c: Coord) -> bool:
       x, y = c
       return 0 <= x < self.w and 0 <= y < self.h


   def passable(self, c: Coord) -> bool:
       return c not in self.obstacles


   def neighbors4(self, c: Coord) -> List[Coord]:
       x, y = c
       cand = [(x+1,y), (x-1,y), (x,y+1), (x,y-1)]
       return [p for p in cand if self.in_bounds(p) and self.passable(p)]


   def manhattan(self, a: Coord, b: Coord) -> int:
       return abs(a[0]-b[0]) + abs(a[1]-b[1])


   def maybe_world_changes(self) -> Dict[str, Any]:
       changes = {"obstacles_added": [], "obstacles_cleared": [], "target_moved": False}
       self.step_count += 1
       if self.rng.random() < self.target_jitter_prob:
           tx, ty = self.target
           options = [(tx+1,ty),(tx-1,ty),(tx,ty+1),(tx,ty-1)]
           options = [c for c in options if self.in_bounds(c) and c != self.agent]
           self.rng.shuffle(options)
           for c in options[:3]:
               if c not in self.obstacles:
                   self.target = c
                   changes["target_moved"] = True
                   break
       if self.step_count % self.move_obstacles_every == 0:
           for _ in range(4):
               if self.rng.random() < self.clear_obstacle_prob and self.obstacles:
                   c = self.rng.choice(tuple(self.obstacles))
                   self.obstacles.remove(c)
                   changes["obstacles_cleared"].append(c)
           for _ in range(5):
               if self.rng.random() < self.spawn_obstacle_prob:
                   c = (self.rng.randrange(self.w), self.rng.randrange(self.h))
                   if c != self.agent and c != self.target:
                       self.obstacles.add(c)
                       changes["obstacles_added"].append(c)
           self._ensure_free(self.agent)
           self._ensure_free(self.target)
       return changes


   def step(self, action: str) -> Dict[str, Any]:
       ax, ay = self.agent
       move = {"R": (ax+1, ay), "L": (ax-1, ay), "D": (ax, ay+1), "U": (ax, ay-1), "S": (ax, ay)}[action]
       moved = False
       if self.in_bounds(move) and self.passable(move):
           self.agent = move
           moved = True
       changes = self.maybe_world_changes()
       done = (self.agent == self.target)
       return {"moved": moved, "agent": self.agent, "target": self.target, "done": done, "changes": changes}


   def render(self, path: Optional[List[Coord]] = None) -> str:
       path_set = set(path or [])
       lines = []
       for y in range(self.h):
           row = []
           for x in range(self.w):
               c = (x, y)
               if c == self.agent:
                   row.append("A")
               elif c == self.target:
                   row.append("T")
               elif c in path_set:
                   row.append("·")
               elif c in self.obstacles:
                   row.append("█")
               else:
                   row.append(" ")
           lines.append("".join(row))
       border = "+" + "-" * self.w + "+"
       body = "n".join(["|" + ln + "|" for ln in lines])
       return f"{border}n{body}n{border}"

We construct a dynamic grid world that evolves with shifting obstacles and a moving target. We simulate environmental non-stationarity to test online planning under uncertainty. We implement rendering and world-transition logic to observe how the agent reacts to real-time changes.

@dataclass
class PlanResult:
   path: List[Coord]
   cost: float
   expanded: int
   reason: str


def astar(world: DynamicGridWorld, start: Coord, goal: Coord, max_expand: int = 5000) -> PlanResult:
   frontier = []
   import heapq
   heapq.heappush(frontier, (world.manhattan(start, goal), 0, start))
   came_from: Dict[Coord, Optional[Coord]] = {start: None}
   gscore: Dict[Coord, float] = {start: 0}
   expanded = 0
   while frontier and expanded < max_expand:
       f, g, cur = heapq.heappop(frontier)
       expanded += 1
       if cur == goal:
           path = []
           c = cur
           while c is not None:
               path.append(c)
               c = came_from[c]
           path.reverse()
           return PlanResult(path=path, cost=gscore[cur], expanded=expanded, reason="found_path")
       for nb in world.neighbors4(cur):
           ng = gscore[cur] + 1
           if nb not in gscore or ng < gscore[nb]:
               gscore[nb] = ng
               came_from[nb] = cur
               h = world.manhattan(nb, goal)
               heapq.heappush(frontier, (ng + h, ng, nb))
   return PlanResult(path=[start], cost=float("inf"), expanded=expanded, reason="no_path_or_budget")


def path_to_actions(path: List[Coord]) -> List[str]:
   actions = []
   for (x1,y1),(x2,y2) in zip(path, path[1:]):
       if x2 == x1+1 and y2 == y1: actions.append("R")
       elif x2 == x1-1 and y2 == y1: actions.append("L")
       elif x2 == x1 and y2 == y1+1: actions.append("D")
       elif x2 == x1 and y2 == y1-1: actions.append("U")
       else: actions.append("S")
   return actions


def action_risk(world: DynamicGridWorld, next_pos: Coord) -> float:
   x, y = next_pos
   near = 0
   for dx, dy in [(1,0),(-1,0),(0,1),(0,-1)]:
       c = (x+dx, y+dy)
       if world.in_bounds(c) and c in world.obstacles:
           near += 1
   edge = 1 if (x in [0, world.w-1] or y in [0, world.h-1]) else 0
   return 0.25 * near + 0.15 * edge

We implement the A* online planner along with action extraction and local risk evaluation. We compute shortest paths incrementally while respecting a computational budget. We also introduce a lightweight risk model to enable us to override unsafe planned moves during execution.

@dataclass
class AgentConfig:
   horizon: int = 6
   replan_on_target_move: bool = True
   replan_on_obstacle_change: bool = True
   max_steps: int = 120
   think_latency: float = 0.02
   act_latency: float = 0.01
   risk_gate: float = 0.85
   alt_search_depth: int = 2


@dataclass
class StreamingDecisionAgent:
   cfg: AgentConfig
   world: DynamicGridWorld
   start_time: float = field(init=False, default_factory=time.time)
   step_id: int = field(init=False, default=0)
   current_plan: List[Coord] = field(init=False, default_factory=list)
   current_actions: List[str] = field(init=False, default_factory=list)
   last_snapshot: Dict[str, Any] = field(init=False, default_factory=dict)
   stats: Dict[str, Any] = field(init=False, default_factory=lambda: defaultdict(int))


   def _now(self) -> float:
       return time.time() - self.start_time


   def _emit(self, kind: str, msg: str, data: Optional[Dict[str, Any]] = None) -> StreamEvent:
       return StreamEvent(t=self._now(), kind=kind, step=self.step_id, msg=msg, data=data or {})


   def _need_replan(self, obs: Dict[str, Any]) -> bool:
       ch = obs["changes"]
       if obs["done"]:
           return False
       if not self.current_plan or len(self.current_plan) <= 1:
           return True
       if self.cfg.replan_on_target_move and ch.get("target_moved"):
           return True
       if self.cfg.replan_on_obstacle_change and (ch.get("obstacles_added") or ch.get("obstacles_cleared")):
           return True
       if len(self.current_plan) > 1 and self.current_plan[1] in self.world.obstacles:
           return True
       return False


   def _plan(self) -> PlanResult:
       time.sleep(self.cfg.think_latency)
       self.stats["replans"] += 1
       return astar(self.world, self.world.agent, self.world.target)


   def _choose_action(self, planned_action: str) -> Tuple[str, str]:
       ax, ay = self.world.agent
       action_to_delta = {"R": (1,0), "L": (-1,0), "D": (0,1), "U": (0,-1), "S": (0,0)}
       dx, dy = action_to_delta[planned_action]
       nxt = (ax+dx, ay+dy)
       if not self.world.in_bounds(nxt) or not self.world.passable(nxt):
           self.stats["overrides"] += 1
           return "S", "planned_move_invalid -> wait."
       r = action_risk(self.world, nxt)
       if r > self.cfg.risk_gate:
           candidates = ["U","D","L","R","S"]
           best = (planned_action, float("inf"), "keep_plan")
           for a in candidates:
               dx, dy = action_to_delta[a]
               p = (ax+dx, ay+dy)
               if not self.world.in_bounds(p) or not self.world.passable(p):
                   continue
               score = action_risk(self.world, p) + 0.05 * self.world.manhattan(p, self.world.target)
               if score < best[1]:
                   best = (a, score, "risk_avoidance_override")
           if best[0] != planned_action:
               self.stats["overrides"] += 1
               return best[0], best[2]
       return planned_action, "follow_plan"


   def run(self) -> Generator[StreamEvent, None, None]:
       yield self._emit("observe", "Initialize: reading initial state.", {"agent": self.world.agent, "target": self.world.target})
       yield self._emit("world", "Initial world snapshot.", {"grid": self.world.render()})
       for self.step_id in range(1, self.cfg.max_steps + 1):
           if self.step_id == 1 or self._need_replan(self.last_snapshot):
               pr = self._plan()
               self.current_plan = pr.path
               self.current_actions = path_to_actions(pr.path)
               if pr.reason != "found_path":
                   yield self._emit("plan", "Planner could not find a path within budget; switching to reactive exploration.", {"reason": pr.reason, "expanded": pr.expanded})
                   self.current_actions = []
               else:
                   horizon_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]
                   yield self._emit("plan", f"Plan updated (online A*). Commit to next {len(horizon_path)-1} moves, then re-evaluate.", {"reason": pr.reason, "path_len": len(pr.path), "expanded": pr.expanded, "commit_horizon": self.cfg.horizon, "horizon_path": horizon_path, "grid_with_path": self.world.render(path=horizon_path)})
           if self.current_actions:
               planned_action = self.current_actions[0]
           else:
               ax, ay = self.world.agent
               tx, ty = self.world.target
               options = []
               if tx > ax: options.append("R")
               if tx < ax: options.append("L")
               if ty > ay: options.append("D")
               if ty < ay: options.append("U")
               options += ["S","U","D","L","R"]
               planned_action = options[0]
           action, why = self._choose_action(planned_action)
           yield self._emit("decide", f"Intermediate decision: action={action} ({why}).", {"planned_action": planned_action, "chosen_action": action, "agent": self.world.agent, "target": self.world.target})
           time.sleep(self.cfg.act_latency)
           obs = self.world.step(action)
           self.last_snapshot = obs
           if self.current_actions:
               if action == planned_action:
                   self.current_actions = self.current_actions[1:]
                   if len(self.current_plan) > 1:
                       self.current_plan = self.current_plan[1:]
           ch = obs["changes"]
           surprise = []
           if ch.get("target_moved"): surprise.append("target_moved")
           if ch.get("obstacles_added"): surprise.append(f"obstacles_added={len(ch['obstacles_added'])}")
           if ch.get("obstacles_cleared"): surprise.append(f"obstacles_cleared={len(ch['obstacles_cleared'])}")
           surprise_msg = ("Surprises: " + ", ".join(surprise)) if surprise else "No major surprises."
           self.stats["steps"] += 1
           if obs["moved"]: self.stats["moves"] += 1
           if ch.get("target_moved"): self.stats["target_moves"] += 1
           if ch.get("obstacles_added") or ch.get("obstacles_cleared"): self.stats["world_shifts"] += 1
           yield self._emit("observe", f"Observed outcome. {surprise_msg}", {"moved": obs["moved"], "agent": obs["agent"], "target": obs["target"], "done": obs["done"], "changes": ch, "grid": self.world.render(path=self.current_plan[: min(len(self.current_plan), 10)])})
           if obs["done"]:
               yield self._emit("done", "Goal reached. Stopping execution.", {"final_agent": obs["agent"], "final_target": obs["target"], "stats": dict(self.stats)})
               return
       yield self._emit("done", "Max steps reached without reaching the goal.", {"final_agent": self.world.agent, "final_target": self.world.target, "stats": dict(self.stats)})

We design the Streaming Decision Agent that integrates planning, monitoring, and reactive overrides. We implement receding-horizon control, committing only to near-term steps and replanning when surprises occur. We stream structured events at every stage, planning, deciding, acting, and observing, to demonstrate incremental reasoning in action.

def run_and_print(agent: StreamingDecisionAgent, throttle: float = 0.0):
   last_kind = None
   for ev in agent.run():
       header = f"[t={ev.t:6.2f}s | step={ev.step:03d} | {ev.kind.upper():7}]"
       print(header, ev.msg)
       if ev.kind in {"plan", "observe", "world"}:
           if "grid_with_path" in ev.data:
               print(ev.data["grid_with_path"])
           elif "grid" in ev.data:
               print(ev.data["grid"])
       if throttle > 0:
           time.sleep(throttle)


world = DynamicGridWorld(w=18, h=10, obstacle_ratio=0.18, seed=10, move_obstacles_every=6)
cfg = AgentConfig(horizon=6, replan_on_target_move=True, replan_on_obstacle_change=True, max_steps=120, think_latency=0.01, act_latency=0.01, risk_gate=0.85, alt_search_depth=2)
agent = StreamingDecisionAgent(cfg=cfg, world=world)
run_and_print(agent, throttle=0.0)

We build the streaming runner and execute the full agent loop inside the dynamic environment. We print structured reasoning updates in real time to simulate a live decision stream. We finally observe the agent adapting continuously, replanning when needed, and completing the task under changing conditions.

In conclusion, we have an agent that demonstrates incremental reasoning, online planning, and reactive behavior, and it is easy to run and inspect in Colab. We saw how streaming structured events makes the agent’s decision process observable while still keeping reasoning summaries concise and user-safe. Also, we showed how continuous monitoring and replanning turn a static planner into a responsive system that can handle surprises, moving targets, changing obstacles, and step-level risk without stopping execution.


Check out Full Codes hereAlso, feel free to follow us on Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

The post How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments appeared first on MarkTechPost.