Tech Wavo
  • Home
  • Technology
  • Computers
  • Gadgets
  • Mobile
  • Apps
  • News
  • Financial
  • Stock
Tech Wavo
No Result
View All Result

How to Design an Advanced Multi-Agent Reasoning System with spaCy Featuring Planning, Reflection, Memory, and Knowledge Graphs

Tech Wavo by Tech Wavo
November 15, 2025
in News
0


In this tutorial, we build an advanced Agentic AI system using spaCy, designed to allow multiple intelligent agents to reason, collaborate, reflect, and learn from experience. We work through the entire pipeline step by step, observing how each agent processes tasks using planning, memory, communication, and semantic reasoning. By the end, we see how the system evolves into a dynamic multi-agent architecture capable of extracting entities, interpreting context, forming reasoning chains, and constructing knowledge graphs, all while continuously improving through reflection and episodic learning. Check out the FULL CODES here.

!pip install spacy networkx matplotlib -q


import spacy
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from collections import defaultdict, deque
from enum import Enum
import json
import hashlib
from datetime import datetime


class MessageType(Enum):
   REQUEST = "request"
   RESPONSE = "response"
   BROADCAST = "broadcast"
   QUERY = "query"


@dataclass
class Message:
   sender: str
   receiver: str
   msg_type: MessageType
   content: Dict[str, Any]
   timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
   priority: int = 1
   def get_id(self) -> str:
       return hashlib.md5(f"{self.sender}{self.timestamp}".encode()).hexdigest()[:8]


@dataclass
class AgentTask:
   task_id: str
   task_type: str
   data: Any
   priority: int = 1
   dependencies: List[str] = field(default_factory=list)
   metadata: Dict = field(default_factory=dict)


@dataclass
class Observation:
   state: str
   action: str
   result: Any
   confidence: float
   timestamp: float = field(default_factory=lambda: datetime.now().timestamp())


class WorkingMemory:
   def __init__(self, capacity: int = 10):
       self.capacity = capacity
       self.items = deque(maxlen=capacity)
       self.attention_scores = {}
   def add(self, key: str, value: Any, attention: float = 1.0):
       self.items.append((key, value))
       self.attention_scores[key] = attention
   def recall(self, n: int = 5) -> List[Tuple[str, Any]]:
       sorted_items = sorted(self.items, key=lambda x: self.attention_scores.get(x[0], 0), reverse=True)
       return sorted_items[:n]
   def get(self, key: str) -> Optional[Any]:
       for k, v in self.items:
           if k == key:
               return v
       return None


class EpisodicMemory:
   def __init__(self):
       self.episodes = []
       self.success_patterns = defaultdict(int)
   def store(self, observation: Observation):
       self.episodes.append(observation)
       if observation.confidence > 0.7:
           pattern = f"{observation.state}→{observation.action}"
           self.success_patterns[pattern] += 1
   def query_similar(self, state: str, top_k: int = 3) -> List[Observation]:
       scored = [(obs, self._similarity(state, obs.state)) for obs in self.episodes[-50:]]
       scored.sort(key=lambda x: x[1], reverse=True)
       return [obs for obs, _ in scored[:top_k]]
   def _similarity(self, state1: str, state2: str) -> float:
       words1, words2 = set(state1.split()), set(state2.split())
       if not words1 or not words2:
           return 0.0
       return len(words1 & words2) / len(words1 | words2)

We establish all the core structures required for our agentic system. We import key libraries, define message and task formats, and build both working and episodic memory modules. As we define these foundations, we lay the groundwork for reasoning, storage, and communication. Check out the FULL CODES here.

class ReflectionModule:
   def __init__(self):
       self.performance_log = []
   def reflect(self, task_type: str, confidence: float, result: Any) -> Dict[str, Any]:
       self.performance_log.append({'task': task_type, 'confidence': confidence, 'timestamp': datetime.now().timestamp()})
       recent = [p for p in self.performance_log if p['task'] == task_type][-5:]
       avg_conf = sum(p['confidence'] for p in recent) / len(recent) if recent else 0.5
       insights = {
           'performance_trend': 'improving' if confidence > avg_conf else 'declining',
           'avg_confidence': avg_conf,
           'recommendation': self._get_recommendation(confidence, avg_conf)
       }
       return insights
   def _get_recommendation(self, current: float, average: float) -> str:
       if current < 0.4:
           return "Request assistance from specialized agent"
       elif current < average:
           return "Review similar past cases for patterns"
       else:
           return "Continue with current approach"


class AdvancedAgent:
   def __init__(self, name: str, specialty: str, nlp):
       self.name = name
       self.specialty = specialty
       self.nlp = nlp
       self.working_memory = WorkingMemory()
       self.episodic_memory = EpisodicMemory()
       self.reflector = ReflectionModule()
       self.message_queue = deque()
       self.collaboration_graph = defaultdict(int)
   def plan(self, task: AgentTask) -> List[str]:
       similar = self.episodic_memory.query_similar(str(task.data))
       if similar and similar[0].confidence > 0.7:
           return [similar[0].action]
       return self._default_plan(task)
   def _default_plan(self, task: AgentTask) -> List[str]:
       return ['analyze', 'extract', 'validate']
   def send_message(self, receiver: str, msg_type: MessageType, content: Dict):
       msg = Message(self.name, receiver, msg_type, content)
       self.message_queue.append(msg)
       return msg
   def receive_message(self, message: Message):
       self.message_queue.append(message)
       self.collaboration_graph[message.sender] += 1
   def process(self, task: AgentTask) -> Dict[str, Any]:
       raise NotImplementedError


class CognitiveEntityAgent(AdvancedAgent):
   def process(self, task: AgentTask) -> Dict[str, Any]:
       doc = self.nlp(task.data)
       entities = defaultdict(list)
       entity_contexts = []
       for ent in doc.ents:
           context_start = max(0, ent.start - 5)
           context_end = min(len(doc), ent.end + 5)
           context = doc[context_start:context_end].text
           entities[ent.label_].append(ent.text)
           entity_contexts.append({'entity': ent.text, 'type': ent.label_, 'context': context, 'position': (ent.start_char, ent.end_char)})
       for ent_type, ents in entities.items():
           attention = len(ents) / len(doc.ents) if doc.ents else 0
           self.working_memory.add(f"entities_{ent_type}", ents, attention)
       confidence = min(len(entities) / 4, 1.0) if entities else 0.3
       obs = Observation(state=f"entity_extraction_{len(doc)}tokens", action="extract_with_context", result=len(entity_contexts), confidence=confidence)
       self.episodic_memory.store(obs)
       reflection = self.reflector.reflect('entity_extraction', confidence, entities)
       return {'entities': dict(entities), 'contexts': entity_contexts, 'confidence': confidence, 'reflection': reflection, 'next_actions': ['semantic_analysis', 'knowledge_graph'] if confidence > 0.5 else []}

We construct the reflection engine and the base agent class, which provides every agent with reasoning, planning, and memory capabilities. We then implement the Cognitive Entity Agent, which processes text to extract entities with context and stores meaningful observations. As we run this part, we watch the agent learn from experience while dynamically adjusting its strategy. Check out the FULL CODES here.

class SemanticReasoningAgent(AdvancedAgent):
   def process(self, task: AgentTask) -> Dict[str, Any]:
       doc = self.nlp(task.data)
       reasoning_chains = []
       for sent in doc.sents:
           chain = self._extract_reasoning_chain(sent)
           if chain:
               reasoning_chains.append(chain)
       entity_memory = self.working_memory.recall(3)
       semantic_clusters = self._cluster_by_semantics(doc)
       confidence = min(len(reasoning_chains) / 3, 1.0) if reasoning_chains else 0.4
       obs = Observation(state=f"semantic_analysis_{len(list(doc.sents))}sents", action="reason_and_cluster", result=len(reasoning_chains), confidence=confidence)
       self.episodic_memory.store(obs)
       return {'reasoning_chains': reasoning_chains, 'semantic_clusters': semantic_clusters, 'memory_context': entity_memory, 'confidence': confidence, 'next_actions': ['knowledge_integration']}
   def _extract_reasoning_chain(self, sent) -> Optional[Dict]:
       subj, verb, obj = None, None, None
       for token in sent:
           if token.dep_ == 'nsubj':
               subj = token
           elif token.pos_ == 'VERB':
               verb = token
           elif token.dep_ in ['dobj', 'attr', 'pobj']:
               obj = token
       if subj and verb and obj:
           return {'subject': subj.text, 'predicate': verb.lemma_, 'object': obj.text, 'confidence': 0.8}
       return None
   def _cluster_by_semantics(self, doc) -> List[Dict]:
       clusters = []
       nouns = [token for token in doc if token.pos_ in ['NOUN', 'PROPN']]
       visited = set()
       for noun in nouns:
           if noun.i in visited:
               continue
           cluster = [noun.text]
           visited.add(noun.i)
           for other in nouns:
               if other.i != noun.i and other.i not in visited:
                   if noun.similarity(other) > 0.5:
                       cluster.append(other.text)
                       visited.add(other.i)
           if len(cluster) > 1:
               clusters.append({'concepts': cluster, 'size': len(cluster)})
       return clusters

We design the Semantic Reasoning Agent, which analyzes sentence structures, forms reasoning chains, and groups concepts based on semantic similarity. We integrate working memory to enrich the understanding the agent builds. As we execute this, we see how the system moves from surface-level extraction to deeper inference. Check out the FULL CODES here.

class KnowledgeGraphAgent(AdvancedAgent):
   def process(self, task: AgentTask) -> Dict[str, Any]:
       doc = self.nlp(task.data)
       graph = {'nodes': set(), 'edges': []}
       for sent in doc.sents:
           entities = list(sent.ents)
           if len(entities) >= 2:
               for ent in entities:
                   graph['nodes'].add((ent.text, ent.label_))
               root = sent.root
               if root.pos_ == 'VERB':
                   for i in range(len(entities) - 1):
                       graph['edges'].append({'from': entities[i].text, 'relation': root.lemma_, 'to': entities[i+1].text, 'sentence': sent.text[:100]})
       graph['nodes'] = list(graph['nodes'])
       confidence = min(len(graph['edges']) / 5, 1.0) if graph['edges'] else 0.3
       obs = Observation(state=f"knowledge_graph_{len(graph['nodes'])}nodes", action="construct_graph", result=len(graph['edges']), confidence=confidence)
       self.episodic_memory.store(obs)
       return {'graph': graph, 'node_count': len(graph['nodes']), 'edge_count': len(graph['edges']), 'confidence': confidence, 'next_actions': []}


class MetaController:
   def __init__(self):
       self.nlp = spacy.load('en_core_web_sm')
       self.agents = {
           'cognitive_entity': CognitiveEntityAgent('CognitiveEntity', 'entity_analysis', self.nlp),
           'semantic_reasoning': SemanticReasoningAgent('SemanticReasoner', 'reasoning', self.nlp),
           'knowledge_graph': KnowledgeGraphAgent('KnowledgeBuilder', 'graph_construction', self.nlp)
       }
       self.task_history = []
       self.global_memory = WorkingMemory(capacity=20)
   def execute_with_planning(self, text: str) -> Dict[str, Any]:
       initial_task = AgentTask(task_id="task_001", task_type="cognitive_entity", data=text, metadata={'source': 'user_input'})
       results = {}
       task_queue = [initial_task]
       iterations = 0
       max_iterations = 10
       while task_queue and iterations < max_iterations:
           task = task_queue.pop(0)
           agent = self.agents.get(task.task_type)
           if not agent or task.task_type in results:
               continue
           result = agent.process(task)
           results[task.task_type] = result
           self.global_memory.add(task.task_type, result, result['confidence'])
           for next_action in result.get('next_actions', []):
               if next_action in self.agents and next_action not in results:
                   next_task = AgentTask(task_id=f"task_{iterations+1:03d}", task_type=next_action, data=text, dependencies=[task.task_id])
                   task_queue.append(next_task)
           iterations += 1
       self.task_history.append({'results': results, 'iterations': iterations, 'timestamp': datetime.now().isoformat()})
       return results
   def generate_insights(self, results: Dict[str, Any]) -> str:
       report = "=" * 70 + "\n"
       report += "     ADVANCED AGENTIC AI SYSTEM - ANALYSIS REPORT\n"
       report += "=" * 70 + "\n\n"
       for agent_type, result in results.items():
           agent = self.agents[agent_type]
           report += f"🤖 {agent.name}\n"
           report += f"   Specialty: {agent.specialty}\n"
           report += f"   Confidence: {result['confidence']:.2%}\n"
           if 'reflection' in result:
               report += f"   Performance: {result['reflection'].get('performance_trend', 'N/A')}\n"
           report += "   Key Findings:\n"
           report += json.dumps({k: v for k, v in result.items() if k not in ['reflection', 'next_actions']}, indent=6) + "\n\n"
       report += "📊 System-Level Insights:\n"
       report += f"   Total iterations: {len(self.task_history)}\n"
       report += f"   Active agents: {len(results)}\n"
       report += f"   Global memory size: {len(self.global_memory.items)}\n"
       return report

We implement the Knowledge Graph Agent, enabling the system to connect entities through relations extracted from text. We then build the Meta-Controller, which coordinates all agents, manages planning, and handles multi-step execution. As we use this component, we watch the system behave like a true multi-agent pipeline with dynamic flow control. Check out the FULL CODES here.

if __name__ == "__main__":
   sample_text = """
   Artificial intelligence researchers at OpenAI and DeepMind are developing
   advanced language models. Sam Altman leads OpenAI in San Francisco, while
   Demis Hassabis heads DeepMind in London. These organizations collaborate
   with universities like MIT and Stanford. Their research focuses on machine
   learning, neural networks, and reinforcement learning. The breakthrough
   came when transformers revolutionized natural language processing in 2017.
   """
   controller = MetaController()
   results = controller.execute_with_planning(sample_text)
   print(controller.generate_insights(results))
   print("Advanced multi-agent analysis complete with reflection and learning!")

We run the entire agentic system end-to-end on a sample text. We execute planning, call each agent in sequence, and generate a comprehensive analysis report. As we reach this stage, we see the full power of the multi-agent architecture working together in real time.

In conclusion, we developed a comprehensive multi-agent reasoning framework that operates on real-world text using spaCy, integrating planning, learning, and memory into a cohesive workflow. We observe how each agent contributes a unique layer of understanding, and we see the Meta-Controller orchestrate them to generate rich, interpretable insights. Lastly, we recognize the flexibility and extensibility of this agentic design, and we feel confident that we can now adapt it to more complex tasks, larger datasets, or even integrate language models to further enhance the system’s intelligence.


Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.

🙌 Follow MARKTECHPOST: Add us as a preferred source on Google.
Previous Post

Disney channels are back on YouTube TV

Next Post

Merchant-Initiated Transactions for Secure Payments

Next Post
Merchant-Initiated Transactions for Secure Payments

Merchant-Initiated Transactions for Secure Payments

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Halo Capsule X Black Friday Deal: Get 50% off This Top-Rated Vacuum

by Tech Wavo
November 15, 2025
0
Halo Capsule X Black Friday Deal: Get 50% off This Top-Rated Vacuum
Mobile

I love this vacuum cleaner. After I tested it, I gave it to my brother. Now he loves it –...

Read more

Tesla May Add Apple CarPlay Support After Years Of Resistance

by Tech Wavo
November 15, 2025
0
Tesla May Add Apple CarPlay Support After Years Of Resistance
Gadgets

According to a recent Bloomberg report, Tesla may soon introduce Apple CarPlay support in its vehicles, marking a major shift...

Read more

Transformers vs Mixture of Experts: What’s the Real Difference?

by Tech Wavo
November 15, 2025
0
Transformers vs Mixture of Experts: What’s the Real Difference?
News

Everyone talks about big AI models like ChatGPT, Gemini, and Grok. What many people do not realize is that most...

Read more

Precision Current Sources By The Numbers

by Tech Wavo
November 15, 2025
0
Precision Current Sources By The Numbers
Technology

It isn’t unusual to expect a precisely regulated voltage in an electronic project, but what about times when you need...

Read more

Site links

  • Home
  • About Us
  • Contact Us
  • Privacy Policy
  • Terms of use
  • Home
  • About Us
  • Contact Us
  • Privacy Policy
  • Terms of use

No Result
View All Result
  • Home
  • Technology
  • Computers
  • Gadgets
  • Mobile
  • Apps
  • News
  • Financial
  • Stock