Try an interactive version of this dialog: Sign up at solve.it.com, click Upload, and pass this URL.
FamilyBuddy runs a WhatsApp chatbot using the neonize library and passes messages to a solveit dialog, returning the AI outputs via WhatsApp. This means we can create a WhatsApp assistant in less than 150 lines of code. While supporting voice transcription, tool calling, and memory.
How I use it
I've set this up to work only in a group chat between me and my wife. I created a separate CRAFT file with specific tools for our context:
- Update our Google Calendar
- Update a Google Sheet (for lots of usecases, from logging todos, to favorite movies and happy memories)
- Update our shopping baskets at our grocery store
- Scraping real estate websites
Setup Requirements
Before you start, you need to buy a prepaid SIM card that you won't be sad about if it gets blocked. The WhatsApp library we use (neonize) is not officially supported by Meta—they want you to use the Business API, which is incredibly expensive. However, this works fine as long as you don't spam lots of people.
Your account gets minus points for non-human-like behavior, so make sure to:
- Add the accounts to your address book
- Show human-like behavior (marking messages as read, starting/stopping typing, etc.)
- I've implemented these behaviors in the code
Initialization
- Pip install
neonize - To initialize the bot, I made a
testbot.pyfile. You need to run it from the terminal because that's the only way the QR code renders properly. After you've paired you can continue in this Dialog - Create Dialog in subdirectory
dlgssodlgs/hist, and at the top make a## Memorysection and keep it pinned. - Create a
dlgs/CRAFT, provide it with context about how you want the assistant to be have and add custom tools
...and after that continue w this dialog.
from neonize.client import NewClient
from neonize.events import MessageEv, ConnectedEv, event
client = NewClient("familybuddy")
@client.event(ConnectedEv)
def on_connected(c, ev):
print(f"✅ Connected! Status: {ev.status}")
@client.event(MessageEv)
def on_message(c, ev):
text = ev.Message.conversation or ''
print(f"📨 Got: '{text}' from {ev.Info.MessageSource.Sender}")
if text == "ping":
c.reply_message("pong! 🏓", ev)
print("Registered handlers:", client.event.list_func)
print("Connecting...")
client.connect()
event.wait()
import time, requests, os, re, json, litellm
import datetime as dt
from zoneinfo import ZoneInfo
from threading import Thread
from fastcore.utils import *
from dialoghelper.core import _add_msg_unsafe
from lisette.core import mk_msg
from neonize.client import NewClient
from neonize.events import MessageEv, ConnectedEv, event
from neonize.utils.jid import build_jid
from neonize.utils.enum import ChatPresence, ChatPresenceMedia, ReceiptType
HIST_DNAME = 'dlgs/hist'
MEDIA_DIR = Path('dlgs/media')
def wait_for_completion(msg_id, dname=HIST_DNAME, timeout=120, poll_interval=0.5):
"Poll until prompt completes. Returns the output text."
start = time.time()
while time.time() - start < timeout:
msg = read_msg(id=msg_id, n=0, relative=True, dname=dname)
if not msg.get('run'): return msg.get('output', '')
time.sleep(poll_interval)
raise TimeoutError(f"Prompt {msg_id} did not complete within {timeout}s")
def hide_section(header_text, dname=HIST_DNAME):
"Skip all messages in a section and collapse the header."
msgs = find_msgs(header_section=header_text, dname=dname)
for m in msgs: update_msg(id=m['id'], skipped=1, dname=dname)
if msgs: update_msg(id=msgs[0]['id'], heading_collapsed=1, dname=dname)
def ensure_today_section(dname=HIST_DNAME):
"Create today's section header if needed, hide previous day's section."
today = dt.datetime.now(ZoneInfo('Europe/Amsterdam')).date().strftime('%a %d %b %Y')
if (existing:=find_msgs(re_pattern=f"^## {today}$", dname=dname)): return existing[0]['id']
for sec in find_msgs(re_pattern=r"^## [A-Z][a-z]{2} \d{2} [A-Z][a-z]{2} \d{4}$", dname=dname): hide_section(sec['content'], dname=dname)
return add_msg(f"## {today}", placement='at_end', dname=dname)
def start_typing(chat_jid): client.send_chat_presence(chat_jid, ChatPresence.CHAT_PRESENCE_COMPOSING, ChatPresenceMedia.CHAT_PRESENCE_MEDIA_TEXT)
def stop_typing(chat_jid): client.send_chat_presence(chat_jid, ChatPresence.CHAT_PRESENCE_PAUSED, ChatPresenceMedia.CHAT_PRESENCE_MEDIA_TEXT)
def mark_read(chat_jid, sender_jid, msg_ids): client.mark_read(*msg_ids, chat=chat_jid, sender=sender_jid, receipt=ReceiptType.READ)
def send_reply(chat_jid, text): client.send_message(chat_jid, text)
def jid_str(jid): return f"{jid.User}@{jid.Server}"
def save_image(img_bytes, ext='jpg'):
"Save image to media dir and return markdown reference."
fname = f"{unqid()}.{ext}"
(MEDIA_DIR / fname).write_bytes(img_bytes)
return f"  "
def format_batch(events: list, participants: dict) -> str | None:
"""Convert Neonize MessageEv list to prompt text for dialoghelper."""
lines = []
for event in events:
msg,info = event.Message,event.Info
sender_jid = jid_str(info.MessageSource.Sender)
name = participants.get(sender_jid, 'unknown')
time_str = dt.datetime.fromtimestamp(info.Timestamp / 1000, AMS).strftime('%a %Y-%m-%d %H:%M')
body = msg.conversation or ''
if msg.extendedTextMessage and msg.extendedTextMessage.text: body = msg.extendedTextMessage.text
if msg.HasField('imageMessage'):
img_bytes = client.download_any(msg)
img_ref = save_image(img_bytes)
caption = msg.imageMessage.caption or ''
body = f'{caption}\n{img_ref}'.strip() if caption else img_ref
elif msg.HasField('audioMessage'):
audio_bytes = client.download_any(msg)
text = transcribe_audio(audio_bytes)
body = f'{body} [voice: {text}]'.strip() if body else f'[voice: {text}]'
elif msg.HasField('videoMessage') or msg.HasField('documentMessage'): body = f'[media] {body}' if body else '[media]'
if not body: continue
lines.append(f'<{name} time="{time_str}">{body}</{name}>')
if not lines: return None
return '\n'.join(lines)
def format_output_for_wa(output):
"Convert tool call details to concise format for WhatsApp"
def format_tool(match): return f"🔧`{match.group(1)}`"
output = re.sub(r"<details class='tool-usage-details'>\s*<summary>(.*?)</summary>.*?</details>", format_tool, output, flags=re.DOTALL)
output = re.sub(r'\n{3,}', '\n\n', output)
return output.strip('🧠\n')
pending_events = []
@client.event(MessageEv)
def on_message(c: NewClient, event: MessageEv):
chat_jid = event.Info.MessageSource.Chat
if jid_str(chat_jid) != group_id: return
if event.Info.MessageSource.IsFromMe: return
pending_events.append(event)
msg = event.Message
body = msg.conversation or (msg.extendedTextMessage.text if msg.extendedTextMessage and msg.extendedTextMessage.text else '')
if body and body[0] == '.': return
mark_read(chat_jid, event.Info.MessageSource.Sender, [event.Info.ID])
start_typing(chat_jid)
try:
ensure_today_section()
if prompt_text:=format_batch(pending_events, participants):
msg_id = _add_msg_unsafe(prompt_text, msg_type='prompt', run=True, placement='at_end', dname=HIST_DNAME)
if reply:=wait_for_completion(msg_id): send_reply(chat_jid, format_output_for_wa(reply))
pending_events.clear()
except Exception as e: send_reply(chat_jid, f"🚨 Faby error:\n{type(e).__name__}: {e}")
finally: stop_typing(chat_jid)