In this tutorial, we build an Agentic Data and Infrastructure Strategy system using the lightweight Qwen2.5-0.5B-Instruct model for efficient execution. We begin by creating a flexible LLM agent framework and then develop specialized agents that handle different layers of data management, from ingestion and quality analysis to infrastructure optimization. We integrate these agents into an orchestrator that coordinates their interactions, ensuring smooth multi-agent collaboration across the data pipeline. Through hands-on examples like e-commerce and IoT pipelines, we explore how autonomous decision-making can streamline complex data operations. Check out the FULL CODES here.
!pip install -q transformers torch accelerate datasets huggingface_hub
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json, time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
class LightweightLLMAgent:
   def __init__(self, role: str, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
       self.role = role
       self.model_name = model_name
       self.device = "cuda" if torch.cuda.is_available() else "cpu"
       print(f"Loading {model_name} for {role} agent on {self.device}...")
       self.tokenizer = AutoTokenizer.from_pretrained(model_name)
       self.model = AutoModelForCausalLM.from_pretrained(
           model_name,
           torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
           device_map="auto"
       )
       self.conversation_history = []
   def generate_response(self, prompt: str, max_tokens: int = 150) -> str:
       messages = [
           {"role": "system", "content": f"You are a {self.role} agent in a data infrastructure system."},
           {"role": "user", "content": prompt}
       ]
       text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
       model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
       with torch.no_grad():
           generated_ids = self.model.generate(
               model_inputs.input_ids,
               max_new_tokens=max_tokens,
               temperature=0.7,
               do_sample=True,
               top_p=0.95
           )
       generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]
       response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
       self.conversation_history.append({"prompt": prompt, "response": response})
       return responseWe start by setting up the lightweight LLM agent infrastructure using the Qwen2.5-0.5B-Instruct model. We load the model and tokenizer, and define a base agent class capable of handling contextual conversations and generating intelligent responses. This forms the core foundation upon which our specialized agents operate efficiently within Colab. Check out the FULL CODES here.
class DataIngestionAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Data Ingestion Specialist")
   def analyze_data_source(self, source_info: Dict) -> Dict:
       prompt = f"""Analyze this data source and provide ingestion strategy:
Source Type: {source_info.get('type', 'unknown')}
Volume: {source_info.get('volume', 'unknown')}
Frequency: {source_info.get('frequency', 'unknown')}
Provide a brief strategy focusing on: 1) Ingestion method, 2) Key considerations."""
       strategy = self.generate_response(prompt, max_tokens=100)
       return {"source": source_info, "strategy": strategy, "timestamp": datetime.now().isoformat()}
class DataQualityAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Data Quality Analyst")
   def assess_data_quality(self, data_sample: Dict) -> Dict:
       prompt = f"""Assess data quality for this sample:
Completeness: {data_sample.get('completeness', 'N/A')}%
Consistency: {data_sample.get('consistency', 'N/A')}%
Issues Found: {data_sample.get('issues', 0)}
Provide brief quality assessment and top 2 recommendations."""
       assessment = self.generate_response(prompt, max_tokens=100)
       return {"assessment": assessment, "severity": self._calculate_severity(data_sample), "timestamp": datetime.now().isoformat()}
   def _calculate_severity(self, data_sample: Dict) -> str:
       completeness = data_sample.get('completeness', 100)
       consistency = data_sample.get('consistency', 100)
       avg_score = (completeness + consistency) / 2
       if avg_score >= 90: return "LOW"
       elif avg_score >= 70: return "MEDIUM"
       else: return "HIGH"We design the Data Ingestion and Data Quality agents to focus on structured analysis of data pipelines. We let the ingestion agent determine the best approach to data flow, while the quality agent evaluates data completeness, consistency, and issues to provide actionable insights. Together, they establish the first two layers of autonomous data management. Check out the FULL CODES here.
class InfrastructureOptimizationAgent(LightweightLLMAgent):
   def __init__(self):
       super().__init__(role="Infrastructure Optimization Specialist")
   def optimize_resources(self, metrics: Dict) -> Dict:
       prompt = f"""Analyze infrastructure metrics and suggest optimizations:
CPU Usage: {metrics.get('cpu_usage', 0)}%
Memory Usage: {metrics.get('memory_usage', 0)}%
Storage: {metrics.get('storage_used', 0)}GB / {metrics.get('storage_total', 0)}GB
Query Latency: {metrics.get('query_latency', 0)}ms
Provide 2 optimization recommendations."""
       recommendations = self.generate_response(prompt, max_tokens=100)
       return {"current_metrics": metrics, "recommendations": recommendations, "priority": self._calculate_priority(metrics), "timestamp": datetime.now().isoformat()}
   def _calculate_priority(self, metrics: Dict) -> str:
       cpu = metrics.get('cpu_usage', 0)
       memory = metrics.get('memory_usage', 0)
       if cpu > 85 or memory > 85: return "CRITICAL"
       elif cpu > 70 or memory > 70: return "HIGH"
       else: return "NORMAL"We develop the Infrastructure Optimization Agent to continuously analyze key metrics like CPU, memory, and storage utilization. We use it to generate intelligent optimization suggestions, helping us maintain high performance and resource efficiency. This agent ensures that our infrastructure remains responsive and scalable during data operations. Check out the FULL CODES here.
class AgenticDataOrchestrator:
   def __init__(self):
       print("n" + "="*70)
       print("Initializing Agentic Data Infrastructure System")
       print("="*70 + "n")
       self.ingestion_agent = DataIngestionAgent()
       self.quality_agent = DataQualityAgent()
       self.optimization_agent = InfrastructureOptimizationAgent()
       self.execution_log = []
   def process_data_pipeline(self, pipeline_config: Dict) -> Dict:
       results = {"pipeline_id": pipeline_config.get("id", "unknown"), "start_time": datetime.now().isoformat(), "stages": []}
       print("n[Stage 1] Data Ingestion Analysis")
       ingestion_result = self.ingestion_agent.analyze_data_source(pipeline_config.get("source", {}))
       print(f"Strategy: {ingestion_result['strategy'][:150]}...")
       results["stages"].append({"stage": "ingestion", "result": ingestion_result})
       print("n[Stage 2] Data Quality Assessment")
       quality_result = self.quality_agent.assess_data_quality(pipeline_config.get("quality_metrics", {}))
       print(f"Assessment: {quality_result['assessment'][:150]}...")
       print(f"Severity: {quality_result['severity']}")
       results["stages"].append({"stage": "quality", "result": quality_result})
       print("n[Stage 3] Infrastructure Optimization")
       optimization_result = self.optimization_agent.optimize_resources(pipeline_config.get("infrastructure_metrics", {}))
       print(f"Recommendations: {optimization_result['recommendations'][:150]}...")
       print(f"Priority: {optimization_result['priority']}")
       results["stages"].append({"stage": "optimization", "result": optimization_result})
       results["end_time"] = datetime.now().isoformat()
       results["status"] = "completed"
       self.execution_log.append(results)
       return results
   def generate_summary_report(self) -> pd.DataFrame:
       if not self.execution_log: return pd.DataFrame()
       summary_data = []
       for log in self.execution_log:
           summary_data.append({"Pipeline ID": log["pipeline_id"], "Start Time": log["start_time"], "Status": log["status"], "Stages Completed": len(log["stages"])})
       return pd.DataFrame(summary_data)We built an Agentic Data Orchestrator to coordinate all specialized agents under a unified workflow. We use it to manage end-to-end pipeline execution, triggering ingestion, quality checks, and optimization sequentially. By doing this, we bring structure, collaboration, and automation to the entire multi-agent system. Check out the FULL CODES here.
def main():
   orchestrator = AgenticDataOrchestrator()
   print("n" + "="*70)
   print("EXAMPLE 1: E-commerce Data Pipeline")
   print("="*70)
   ecommerce_pipeline = {
       "id": "ecommerce_pipeline_001",
       "source": {"type": "REST API", "volume": "10GB/day", "frequency": "real-time"},
       "quality_metrics": {"completeness": 87, "consistency": 92, "issues": 15},
       "infrastructure_metrics": {"cpu_usage": 78, "memory_usage": 82, "storage_used": 450, "storage_total": 1000, "query_latency": 250}
   }
   result1 = orchestrator.process_data_pipeline(ecommerce_pipeline)
   print("nn" + "="*70)
   print("EXAMPLE 2: IoT Sensor Data Pipeline")
   print("="*70)
   iot_pipeline = {
       "id": "iot_pipeline_002",
       "source": {"type": "Message Queue (Kafka)", "volume": "50GB/day", "frequency": "streaming"},
       "quality_metrics": {"completeness": 95, "consistency": 88, "issues": 8},
       "infrastructure_metrics": {"cpu_usage": 65, "memory_usage": 71, "storage_used": 780, "storage_total": 2000, "query_latency": 180}
   }
   result2 = orchestrator.process_data_pipeline(iot_pipeline)
   print("nn" + "="*70)
   print("EXECUTION SUMMARY REPORT")
   print("="*70 + "n")
   summary_df = orchestrator.generate_summary_report()
   print(summary_df.to_string(index=False))
   print("n" + "="*70)
   print("Tutorial Complete!")
   print("="*70)
   print("nKey Concepts Demonstrated:")
   print("✓ Lightweight LLM agent architecture")
   print("✓ Specialized agents for different data tasks")
   print("✓ Multi-agent orchestration")
   print("✓ Infrastructure monitoring and optimization")
   print("✓ Autonomous decision-making in data pipelines")
if __name__ == "__main__":
   main()We demonstrate our complete system through two real-world examples, an e-commerce and an IoT data pipeline. We observe how each agent performs its role autonomously while contributing to a shared objective. Finally, we generate a summary report, confirming the orchestration’s efficiency and the power of lightweight agentic intelligence.
In conclusion, we design and execute an intelligent, multi-agent data infrastructure framework powered by a compact open-source model. We witness how independent yet cooperative agents can autonomously analyze, assess, and optimize real-world data systems. The entire setup demonstrates how lightweight LLMs can efficiently handle infrastructure intelligence, while also highlighting how agentic orchestration transforms traditional data workflows into adaptive, self-optimizing systems ready for scalable enterprise applications.
Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Design an Autonomous Multi-Agent Data and Infrastructure Strategy System Using Lightweight Qwen Models for Efficient Pipeline Intelligence? appeared first on MarkTechPost.

 
			 
			 
			