In this tutorial, we explore how to build a small yet functional multi-agent system using the uAgents framework. We set up three agents — Directory, Seller, and Buyer — that communicate via well-defined message protocols to simulate a real-world marketplace interaction. We design message schemas, define agent behaviors, and implement request-response cycles to demonstrate discovery, negotiation, and transaction among agents, all running asynchronously in a shared event loop. Through this, we understand how autonomous agents collaborate, trade, and efficiently maintain decentralized workflows. Check out the Full Codes here.
!pip -q install "uagents>=0.11.2"
import asyncio, random
from typing import List, Dict, Optional
from uagents import Agent, Context, Bureau, Model, Protocol
class ServiceAnnounce(Model):
category: str
endpoint: str
class ServiceQuery(Model):
category: str
class ServiceList(Model):
addresses: List[str]
class OfferRequest(Model):
item: str
max_price: int
class Offer(Model):
item: str
price: int
qty: int
class Order(Model):
item: str
qty: int
class Receipt(Model):
item: str
qty: int
total: int
ok: bool
note: Optional[str] = None
We begin by installing the uAgents library and defining all the message models that underpin our communication system. We create structured data types for announcements, queries, offers, and orders, enabling agents to exchange information seamlessly. Check out the Full Codes here.
registry_proto = Protocol(name="registry", version="1.0")
trade_proto = Protocol(name="trade", version="1.0")
directory = Agent(name="directory", seed="dir-seed-001")
seller = Agent(name="seller", seed="seller-seed-001")
buyer = Agent(name="buyer", seed="buyer-seed-001")
directory.include(registry_proto)
seller.include(trade_proto)
buyer.include(registry_proto)
buyer.include(trade_proto)
@registry_proto.on_message(model=ServiceAnnounce)
async def on_announce(ctx: Context, sender: str, msg: ServiceAnnounce):
reg = await ctx.storage.get("reg") or {}
reg.setdefault(msg.category, set()).add(sender)
await ctx.storage.set("reg", reg)
ctx.logger.info(f"Registered {sender} under '{msg.category}'")
@registry_proto.on_message(model=ServiceQuery)
async def on_query(ctx: Context, sender: str, msg: ServiceQuery):
reg = await ctx.storage.get("reg") or {}
addrs = sorted(list(reg.get(msg.category, set())))
await ctx.send(sender, ServiceList(addresses=addrs))
ctx.logger.info(f"Returned {len(addrs)} providers for '{msg.category}'")
We set up the Directory, Seller, and Buyer agents and define the registry protocol that manages service discovery. We make the directory respond to announcements and queries, allowing agents to register and locate each other dynamically. Check out the Full Codes here.
CATALOG: Dict[str, Dict[str, int]] = {
"camera": {"price": 120, "qty": 3},
"laptop": {"price": 650, "qty": 2},
"headphones": {"price": 60, "qty": 5},
}
@seller.on_event("startup")
async def seller_start(ctx: Context):
await ctx.send(directory.address, ServiceAnnounce(category="electronics", endpoint=seller.address))
ctx.logger.info("Seller announced to directory")
@trade_proto.on_message(model=OfferRequest)
async def on_offer_request(ctx: Context, sender: str, req: OfferRequest):
item = CATALOG.get(req.item)
if not item:
await ctx.send(sender, Offer(item=req.item, price=0, qty=0))
return
price = max(1, int(item["price"] * (0.9 + 0.2 * random.random())))
if price > req.max_price or item["qty"] <= 0:
await ctx.send(sender, Offer(item=req.item, price=0, qty=0))
return
await ctx.send(sender, Offer(item=req.item, price=price, qty=item["qty"]))
ctx.logger.info(f"Offered {req.item} at {price} with qty {item['qty']}")
@trade_proto.on_message(model=Order)
async def on_order(ctx: Context, sender: str, order: Order):
item = CATALOG.get(order.item)
if not item or item["qty"] < order.qty:
await ctx.send(sender, Receipt(item=order.item, qty=0, total=0, ok=False, note="Not enough stock"))
return
total = item["price"] * order.qty
item["qty"] -= order.qty
await ctx.send(sender, Receipt(item=order.item, qty=order.qty, total=total, ok=True, note="Thanks!"))
We create the Seller agent’s catalog and implement logic for responding to offer requests and processing orders. We simulate real-world trading by adding variable pricing and stock management, showing how the seller negotiates and completes transactions. Check out the Full Codes here.
@buyer.on_event("startup")
async def buyer_start(ctx: Context):
ctx.logger.info("Buyer querying directory for electronics...")
resp = await ctx.ask(directory.address, ServiceQuery(category="electronics"), expects=ServiceList, timeout=5.0)
sellers = resp.addresses if resp else []
if not sellers:
return
target = sellers[0]
desired = "laptop"
budget = 700
ctx.logger.info(f"Requesting offer for '{desired}' within budget {budget} from {target}")
offer = await ctx.ask(target, OfferRequest(item=desired, max_price=budget), expects=Offer, timeout=5.0)
if not offer or offer.price <= 0:
return
qty = 1 if offer.qty >= 1 else 0
if qty == 0:
return
ctx.logger.info(f"Placing order for {qty} x {offer.item} at {offer.price}")
receipt = await ctx.ask(target, Order(item=offer.item, qty=qty), expects=Receipt, timeout=5.0)
if receipt and receipt.ok:
ctx.logger.info(f"ORDER SUCCESS: {receipt.qty} x {receipt.item} | total={receipt.total}")
We program the Buyer agent to discover sellers, request offers, and place orders based on availability and budget. We observe how the buyer interacts with the seller through asynchronous communication to complete a purchase successfully. Check out the Full Codes here.
@buyer.on_interval(period=6.0)
async def periodic_discovery(ctx: Context):
seen = await ctx.storage.get("seen") or 0
if seen >= 1:
return
await ctx.storage.set("seen", seen + 1)
ctx.logger.info("Periodic discovery tick -> re-query directory")
resp = await ctx.ask(directory.address, ServiceQuery(category="electronics"), expects=ServiceList, timeout=3.0)
n = len(resp.addresses) if resp else 0
ctx.logger.info(f"Periodic: directory reports {n} seller(s)")
bureau = Bureau()
bureau.add(directory)
bureau.add(seller)
bureau.add(buyer)
async def run_demo(seconds=10):
task = asyncio.create_task(bureau.run_async())
try:
await asyncio.sleep(seconds)
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
print("n
Demo run complete.n")
try:
loop = asyncio.get_running_loop()
await run_demo(10)
except RuntimeError:
asyncio.run(run_demo(10))
We add periodic discovery to have the buyer recheck available sellers, then have the Bureau run all agents together. We launch the asynchronous runtime to see the full marketplace simulation unfold and complete smoothly.
In conclusion, we have seen our agents discover one another, negotiate an offer, and complete a transaction entirely through message-based interactions. We realize how uAgents simplifies multi-agent orchestration by combining structure, communication, and state management seamlessly within Python. As we run this example, we not only witness a dynamic, autonomous system in action but also gain insight into how the same architecture can be extended to complex decentralized marketplaces, AI collaborations, and intelligent service networks, all within a lightweight, easy-to-use framework.
Check out the Full Codes here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post A Coding Guide to Build a Fully Functional Multi-Agent Marketplace Using uAgent appeared first on MarkTechPost.