How to Design an Advanced Multi-Agent Reasoning System with spaCy Featuring Planning, Reflection, Memory, and Knowledge Graphs

How to Design an Advanced Multi-Agent Reasoning System with spaCy Featuring Planning, Reflection, Memory, and Knowledge Graphs

In this tutorial, we build an advanced Agentic AI system using spaCy, designed to allow multiple intelligent agents to reason, collaborate, reflect, and learn from experience. We work through the entire pipeline step by step, observing how each agent processes tasks using planning, memory, communication, and semantic reasoning. By the end, we see how the system evolves into a dynamic multi-agent architecture capable of extracting entities, interpreting context, forming reasoning chains, and constructing knowledge graphs, all while continuously improving through reflection and episodic learning. Check out the FULL CODES here.

!pip install spacy networkx matplotlib -q


import spacy
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from collections import defaultdict, deque
from enum import Enum
import json
import hashlib
from datetime import datetime


class MessageType(Enum):
   REQUEST = "request"
   RESPONSE = "response"
   BROADCAST = "broadcast"
   QUERY = "query"


@dataclass
class Message:
   sender: str
   receiver: str
   msg_type: MessageType
   content: Dict[str, Any]
   timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
   priority: int = 1
   def get_id(self) -> str:
       return hashlib.md5(f"{self.sender}{self.timestamp}".encode()).hexdigest()[:8]


@dataclass
class AgentTask:
   task_id: str
   task_type: str
   data: Any
   priority: int = 1
   dependencies: List[str] = field(default_factory=list)
   metadata: Dict = field(default_factory=dict)


@dataclass
class Observation:
   state: str
   action: str
   result: Any
   confidence: float
   timestamp: float = field(default_factory=lambda: datetime.now().timestamp())


class WorkingMemory:
   def __init__(self, capacity: int = 10):
       self.capacity = capacity
       self.items = deque(maxlen=capacity)
       self.attention_scores = {}
   def add(self, key: str, value: Any, attention: float = 1.0):
       self.items.append((key, value))
       self.attention_scores[key] = attention
   def recall(self, n: int = 5) -> List[Tuple[str, Any]]:
       sorted_items = sorted(self.items, key=lambda x: self.attention_scores.get(x[0], 0), reverse=True)
       return sorted_items[:n]
   def get(self, key: str) -> Optional[Any]:
       for k, v in self.items:
           if k == key:
               return v
       return None


class EpisodicMemory:
   def __init__(self):
       self.episodes = []
       self.success_patterns = defaultdict(int)
   def store(self, observation: Observation):
       self.episodes.append(observation)
       if observation.confidence > 0.7:
           pattern = f"{observation.state}→{observation.action}"
           self.success_patterns[pattern] += 1
   def query_similar(self, state: str, top_k: int = 3) -> List[Observation]:
       scored = [(obs, self._similarity(state, obs.state)) for obs in self.episodes[-50:]]
       scored.sort(key=lambda x: x[1], reverse=True)
       return [obs for obs, _ in scored[:top_k]]
   def _similarity(self, state1: str, state2: str) -> float:
       words1, words2 = set(state1.split()), set(state2.split())
       if not words1 or not words2:
           return 0.0
       return len(words1 & words2) / len(words1 | words2)

We establish all the core structures required for our agentic system. We import key libraries, define message and task formats, and build both working and episodic memory modules. As we define these foundations, we lay the groundwork for reasoning, storage, and communication. Check out the FULL CODES here.

class ReflectionModule:
   def __init__(self):
       self.performance_log = []
   def reflect(self, task_type: str, confidence: float, result: Any) -> Dict[str, Any]:
       self.performance_log.append({'task': task_type, 'confidence': confidence, 'timestamp': datetime.now().timestamp()})
       recent = [p for p in self.performance_log if p['task'] == task_type][-5:]
       avg_conf = sum(p['confidence'] for p in recent) / len(recent) if recent else 0.5
       insights = {
           'performance_trend': 'improving' if confidence > avg_conf else 'declining',
           'avg_confidence': avg_conf,
           'recommendation': self._get_recommendation(confidence, avg_conf)
       }
       return insights
   def _get_recommendation(self, current: float, average: float) -> str:
       if current < 0.4:
           return "Request assistance from specialized agent"
       elif current < average:
           return "Review similar past cases for patterns"
       else:
           return "Continue with current approach"


class AdvancedAgent:
   def __init__(self, name: str, specialty: str, nlp):
       self.name = name
       self.specialty = specialty
       self.nlp = nlp
       self.working_memory = WorkingMemory()
       self.episodic_memory = EpisodicMemory()
       self.reflector = ReflectionModule()
       self.message_queue = deque()
       self.collaboration_graph = defaultdict(int)
   def plan(self, task: AgentTask) -> List[str]:
       similar = self.episodic_memory.query_similar(str(task.data))
       if similar and similar[0].confidence > 0.7:
           return [similar[0].action]
       return self._default_plan(task)
   def _default_plan(self, task: AgentTask) -> List[str]:
       return ['analyze', 'extract', 'validate']
   def send_message(self, receiver: str, msg_type: MessageType, content: Dict):
       msg = Message(self.name, receiver, msg_type, content)
       self.message_queue.append(msg)
       return msg
   def receive_message(self, message: Message):
       self.message_queue.append(message)
       self.collaboration_graph[message.sender] += 1
   def process(self, task: AgentTask) -> Dict[str, Any]:
       raise NotImplementedError


class CognitiveEntityAgent(AdvancedAgent):
   def process(self, task: AgentTask) -> Dict[str, Any]:
       doc = self.nlp(task.data)
       entities = defaultdict(list)
       entity_contexts = []
       for ent in doc.ents:
           context_start = max(0, ent.start - 5)
           context_end = min(len(doc), ent.end + 5)
           context = doc[context_start:context_end].text
           entities[ent.label_].append(ent.text)
           entity_contexts.append({'entity': ent.text, 'type': ent.label_, 'context': context, 'position': (ent.start_char, ent.end_char)})
       for ent_type, ents in entities.items():
           attention = len(ents) / len(doc.ents) if doc.ents else 0
           self.working_memory.add(f"entities_{ent_type}", ents, attention)
       confidence = min(len(entities) / 4, 1.0) if entities else 0.3
       obs = Observation(state=f"entity_extraction_{len(doc)}tokens", action="extract_with_context", result=len(entity_contexts), confidence=confidence)
       self.episodic_memory.store(obs)
       reflection = self.reflector.reflect('entity_extraction', confidence, entities)
       return {'entities': dict(entities), 'contexts': entity_contexts, 'confidence': confidence, 'reflection': reflection, 'next_actions': ['semantic_analysis', 'knowledge_graph'] if confidence > 0.5 else []}

We construct the reflection engine and the base agent class, which provides every agent with reasoning, planning, and memory capabilities. We then implement the Cognitive Entity Agent, which processes text to extract entities with context and stores meaningful observations. As we run this part, we watch the agent learn from experience while dynamically adjusting its strategy. Check out the FULL CODES here.

class SemanticReasoningAgent(AdvancedAgent):
   def process(self, task: AgentTask) -> Dict[str, Any]:
       doc = self.nlp(task.data)
       reasoning_chains = []
       for sent in doc.sents:
           chain = self._extract_reasoning_chain(sent)
           if chain:
               reasoning_chains.append(chain)
       entity_memory = self.working_memory.recall(3)
       semantic_clusters = self._cluster_by_semantics(doc)
       confidence = min(len(reasoning_chains) / 3, 1.0) if reasoning_chains else 0.4
       obs = Observation(state=f"semantic_analysis_{len(list(doc.sents))}sents", action="reason_and_cluster", result=len(reasoning_chains), confidence=confidence)
       self.episodic_memory.store(obs)
       return {'reasoning_chains': reasoning_chains, 'semantic_clusters': semantic_clusters, 'memory_context': entity_memory, 'confidence': confidence, 'next_actions': ['knowledge_integration']}
   def _extract_reasoning_chain(self, sent) -> Optional[Dict]:
       subj, verb, obj = None, None, None
       for token in sent:
           if token.dep_ == 'nsubj':
               subj = token
           elif token.pos_ == 'VERB':
               verb = token
           elif token.dep_ in ['dobj', 'attr', 'pobj']:
               obj = token
       if subj and verb and obj:
           return {'subject': subj.text, 'predicate': verb.lemma_, 'object': obj.text, 'confidence': 0.8}
       return None
   def _cluster_by_semantics(self, doc) -> List[Dict]:
       clusters = []
       nouns = [token for token in doc if token.pos_ in ['NOUN', 'PROPN']]
       visited = set()
       for noun in nouns:
           if noun.i in visited:
               continue
           cluster = [noun.text]
           visited.add(noun.i)
           for other in nouns:
               if other.i != noun.i and other.i not in visited:
                   if noun.similarity(other) > 0.5:
                       cluster.append(other.text)
                       visited.add(other.i)
           if len(cluster) > 1:
               clusters.append({'concepts': cluster, 'size': len(cluster)})
       return clusters

We design the Semantic Reasoning Agent, which analyzes sentence structures, forms reasoning chains, and groups concepts based on semantic similarity. We integrate working memory to enrich the understanding the agent builds. As we execute this, we see how the system moves from surface-level extraction to deeper inference. Check out the FULL CODES here.

class KnowledgeGraphAgent(AdvancedAgent):
   def process(self, task: AgentTask) -> Dict[str, Any]:
       doc = self.nlp(task.data)
       graph = {'nodes': set(), 'edges': []}
       for sent in doc.sents:
           entities = list(sent.ents)
           if len(entities) >= 2:
               for ent in entities:
                   graph['nodes'].add((ent.text, ent.label_))
               root = sent.root
               if root.pos_ == 'VERB':
                   for i in range(len(entities) - 1):
                       graph['edges'].append({'from': entities[i].text, 'relation': root.lemma_, 'to': entities[i+1].text, 'sentence': sent.text[:100]})
       graph['nodes'] = list(graph['nodes'])
       confidence = min(len(graph['edges']) / 5, 1.0) if graph['edges'] else 0.3
       obs = Observation(state=f"knowledge_graph_{len(graph['nodes'])}nodes", action="construct_graph", result=len(graph['edges']), confidence=confidence)
       self.episodic_memory.store(obs)
       return {'graph': graph, 'node_count': len(graph['nodes']), 'edge_count': len(graph['edges']), 'confidence': confidence, 'next_actions': []}


class MetaController:
   def __init__(self):
       self.nlp = spacy.load('en_core_web_sm')
       self.agents = {
           'cognitive_entity': CognitiveEntityAgent('CognitiveEntity', 'entity_analysis', self.nlp),
           'semantic_reasoning': SemanticReasoningAgent('SemanticReasoner', 'reasoning', self.nlp),
           'knowledge_graph': KnowledgeGraphAgent('KnowledgeBuilder', 'graph_construction', self.nlp)
       }
       self.task_history = []
       self.global_memory = WorkingMemory(capacity=20)
   def execute_with_planning(self, text: str) -> Dict[str, Any]:
       initial_task = AgentTask(task_id="task_001", task_type="cognitive_entity", data=text, metadata={'source': 'user_input'})
       results = {}
       task_queue = [initial_task]
       iterations = 0
       max_iterations = 10
       while task_queue and iterations < max_iterations:
           task = task_queue.pop(0)
           agent = self.agents.get(task.task_type)
           if not agent or task.task_type in results:
               continue
           result = agent.process(task)
           results[task.task_type] = result
           self.global_memory.add(task.task_type, result, result['confidence'])
           for next_action in result.get('next_actions', []):
               if next_action in self.agents and next_action not in results:
                   next_task = AgentTask(task_id=f"task_{iterations+1:03d}", task_type=next_action, data=text, dependencies=[task.task_id])
                   task_queue.append(next_task)
           iterations += 1
       self.task_history.append({'results': results, 'iterations': iterations, 'timestamp': datetime.now().isoformat()})
       return results
   def generate_insights(self, results: Dict[str, Any]) -> str:
       report = "=" * 70 + "n"
       report += "     ADVANCED AGENTIC AI SYSTEM - ANALYSIS REPORTn"
       report += "=" * 70 + "nn"
       for agent_type, result in results.items():
           agent = self.agents[agent_type]
           report += f"🤖 {agent.name}n"
           report += f"   Specialty: {agent.specialty}n"
           report += f"   Confidence: {result['confidence']:.2%}n"
           if 'reflection' in result:
               report += f"   Performance: {result['reflection'].get('performance_trend', 'N/A')}n"
           report += "   Key Findings:n"
           report += json.dumps({k: v for k, v in result.items() if k not in ['reflection', 'next_actions']}, indent=6) + "nn"
       report += "📊 System-Level Insights:n"
       report += f"   Total iterations: {len(self.task_history)}n"
       report += f"   Active agents: {len(results)}n"
       report += f"   Global memory size: {len(self.global_memory.items)}n"
       return report

We implement the Knowledge Graph Agent, enabling the system to connect entities through relations extracted from text. We then build the Meta-Controller, which coordinates all agents, manages planning, and handles multi-step execution. As we use this component, we watch the system behave like a true multi-agent pipeline with dynamic flow control. Check out the FULL CODES here.

if __name__ == "__main__":
   sample_text = """
   Artificial intelligence researchers at OpenAI and DeepMind are developing
   advanced language models. Sam Altman leads OpenAI in San Francisco, while
   Demis Hassabis heads DeepMind in London. These organizations collaborate
   with universities like MIT and Stanford. Their research focuses on machine
   learning, neural networks, and reinforcement learning. The breakthrough
   came when transformers revolutionized natural language processing in 2017.
   """
   controller = MetaController()
   results = controller.execute_with_planning(sample_text)
   print(controller.generate_insights(results))
   print("Advanced multi-agent analysis complete with reflection and learning!")

We run the entire agentic system end-to-end on a sample text. We execute planning, call each agent in sequence, and generate a comprehensive analysis report. As we reach this stage, we see the full power of the multi-agent architecture working together in real time.

In conclusion, we developed a comprehensive multi-agent reasoning framework that operates on real-world text using spaCy, integrating planning, learning, and memory into a cohesive workflow. We observe how each agent contributes a unique layer of understanding, and we see the Meta-Controller orchestrate them to generate rich, interpretable insights. Lastly, we recognize the flexibility and extensibility of this agentic design, and we feel confident that we can now adapt it to more complex tasks, larger datasets, or even integrate language models to further enhance the system’s intelligence.


Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

The post How to Design an Advanced Multi-Agent Reasoning System with spaCy Featuring Planning, Reflection, Memory, and Knowledge Graphs appeared first on MarkTechPost.