Déboguer le pipeline de signaux
Flux du pipeline
emit_signals_from_fixture
→ signal-emitter (Temporal workflow)
→ buffer-signals (signaux en lots, minuteur de vidage 5s)
→ safety_filter_activity
→ flush_signals_to_s3_activity
→ signal_with_start_grouping_v2_activity
→ team-signal-grouping-v2 (fenêtre de collecte par lots 30s)
→ read_signals_from_s3_activity
→ get_embedding_activity + generate_search_queries_activity
→ run_signal_semantic_search_activity
→ match_signal_to_report_activity
→ assign_and_emit_signal_activity
→ wait_for_signal_in_clickhouse_activity
→ (if new report) signal-report-summary
→ fetch_signals_for_report_activity
→ report_safety_judge_activity
→ select_repository_activity (lance un bac à sable Docker)
Émettre des signaux de test
# Émettre un seul signal depuis la fixture Zendesk au décalage 26
DEBUG=1 python manage.py emit_signals_from_fixture --type zendesk --team-id 1 --offset 26 --limit 1
# Nettoyer toutes les données de signal avant de réémettre (évite les correspondances obsolètes)
DEBUG=1 python manage.py cleanup_signals --team-id 1 --yes
# Vérifier l'état du pipeline
python manage.py signal_pipeline_status --team-id 1 --wait --expected-signals 1 --poll-interval 10
Toujours nettoyer avant de réémettre pour éviter que les embeddings obsolètes causent des correspondances de rapports fantômes.
Surveiller les workflows Temporal
L'interface Temporal s'exécute sur http://localhost:8081. L'API REST est utile pour l'inspection scriptée.
Lister les workflows récents
curl -s 'http://localhost:8081/api/v1/namespaces/default/workflows?query=ORDER+BY+StartTime+DESC&maximumPageSize=15' \
| python3 -c "
import sys, json
for wf in json.load(sys.stdin).get('executions', []):
info = wf['execution']
status = wf['status'].replace('WORKFLOW_EXECUTION_STATUS_', '')
print(f'{wf[\"startTime\"][:19]} {status:20s} {wf[\"type\"][\"name\"]:35s} {info[\"workflowId\"][:90]}')
"
Inspecter l'historique du workflow
WF_ID="buffer-signals-1" # ou team-signal-grouping-v2-1, signals-report:1:<uuid>
curl -s "http://localhost:8081/api/v1/namespaces/default/workflows/$WF_ID/history?maximumPageSize=200" \
| python3 -c "
import sys, json
for event in json.load(sys.stdin).get('history', {}).get('events', []):
etype = event['eventType'].replace('EVENT_TYPE_', '')
etime = event['eventTime'][:19]
details = ''
for key, attrs in event.items():
if key.endswith('Attributes') and isinstance(attrs, dict):
if 'activityType' in attrs: details = attrs['activityType'].get('name', '')
elif 'signalName' in attrs: details = f'signal: {attrs[\"signalName\"]}'
elif 'startToFireTimeout' in attrs: details = f'timer: {attrs[\"startToFireTimeout\"]}'
elif 'failure' in attrs: details = f'FAILED: {attrs[\"failure\"].get(\"message\", \"\")[:200]}'
if details: print(f' {etime} {etype:50s} {details}')
"
Inspecter une exécution précédente (continued-as-new)
Quand un workflow a utilisé continued-as-new, utilisez le paramètre de requête execution.runId :
curl -s "http://localhost:8081/api/v1/namespaces/default/workflows/$WF_ID/history?execution.runId=<run-id>&maximumPageSize=200"
Lire les journaux de l'agent sandbox
Les journaux de l'agent sont stockés dans le stockage d'objets (MinIO en local) sous forme de fichiers JSONL.
L'URL du journal se trouve sur le modèle TaskRun.
# Dans le shell Django (python manage.py shell)
from products.tasks.backend.models import TaskRun
from posthog.storage import object_storage
# Trouver la tâche la plus récente
run = TaskRun.objects.order_by("-created_at").first()
print(f"status: {run.status}, error: {run.error_message}")
print(f"log_url: {run.log_url}")
# Lire le journal
content = object_storage.read(run.log_url, missing_ok=True)
# Afficher les 3000 derniers caractères (plus utile — montre ce qui s'est passé avant l'échec)
print(content[-3000:])
Le journal est au format JSONL avec des entrées comme :
{
"type": "notification",
"timestamp": "...",
"notification": { "jsonrpc": "2.0", "method": "_posthog/console", "params": { "level": "debug", "message": "..." } }
}
Éléments clés à chercher dans la fin du journal :
- Événements réseau agentsh — les entrées
DENYmontrent les appels réseau bloqués - Événements
_posthog/progress— montrent quelle étape de configuration le sandbox a atteinte - Messages de débogage
_posthog/console— montrent l'approvisionnement du sandbox, le clonage, le démarrage de l'agent
Inspecter les conteneurs Docker sandbox
# Lister les conteneurs sandbox en cours d'exécution
docker ps --filter "name=task-sandbox" --format "table {{.Names}}\t{{.Status}}\t{{.Image}}"
# Voir les processus à l'intérieur d'un sandbox en cours d'exécution
docker exec <container-name> ps aux
# Lire le journal agent-server à l'intérieur du conteneur (pendant qu'il s'exécute)
docker exec <container-name> cat /tmp/agent-server.log
Le conteneur est nommé task-sandbox-<task-id>-<random> et utilise l'image posthog-sandbox-base.
Les conteneurs sont éphémères — ils sont supprimés après la fin de l'exécution de la tâche, donc inspectez pendant qu'elle s'exécute.
Défaillances courantes
SignalReport matching query does not exist
L'activité assign_and_emit_signal_activity a tenté d'assigner un signal à un rapport qui n'existe pas.
Généralement causé par des embeddings obsolètes dans ClickHouse après un cleanup_signals qui a échoué à les supprimer.
Cause racine : CLICKHOUSE_DATABASE non défini dans .env. La commande de nettoyage utilise sync_execute
qui se connecte à CLICKHOUSE_DATABASE (par défaut default), mais les tables d'embedding se trouvent dans la base de données posthog.
Correction : Ajouter CLICKHOUSE_DATABASE=posthog à .env et redémarrer les workers.
Nettoyage manuel des embeddings obsolètes :
curl -s 'http://localhost:8123/' --data-binary \
"ALTER TABLE posthog.sharded_posthog_document_embeddings_text_embedding_3_small_1536 DELETE WHERE product = 'signals' AND team_id = 1 SETTINGS mutations_sync = 1"
Vérifier que les embeddings sont propres :
curl -s 'http://localhost:8123/' --data-binary \
"SELECT count() FROM posthog.sharded_posthog_document_embeddings_text_embedding_3_small_1536 WHERE team_id = 1 AND product = 'signals'"
Run timed out due to inactivity sur select_repository_activity
L'agent Claude du sandbox est resté inactif plus longtemps que TASKS_INACTIVITY_TIMEOUT_SECONDS. Si non défini,
cela revient à un délai d'expiration de 2 heures — définissez TASKS_INACTIVITY_TIMEOUT_SECONDS=30 en local pour forcer les défaillances rapides.
Diagnostic : Lire le journal de l'agent depuis le stockage d'objets (voir ci-dessus). Vérifier la fin pour :
- Dénégations réseau agentsh —
DENY host.docker.internalsignifie que l'URL du serveur MCP est bloquée par la politique réseau du sandbox. La liste blanche du domaine de l'environnementSIGNALS_REPO_DISCOVERYn'inclut pashost.docker.internal. - Aucun contenu de journal du tout — le sandbox n'a pas démarré, vérifier les journaux du conteneur Docker.
- Erreurs d'API Claude — vérifier que
ANTHROPIC_API_KEYest valide.
buffer-signals en inactivité, ne reçoit jamais de signaux
L'émetteur de signaux signal-emitter s'est terminé mais buffer-signals n'a jamais reçu le submit_signal.
Cela se produit quand l'émetteur a envoyé le signal à une exécution précédente du buffer qui a ensuite utilisé continued-as-new,
et la nouvelle exécution a démarré à zéro sans le signal en attente. Réémettre le signal.
Les tables d'embedding ClickHouse « introuvables » pendant le nettoyage
Les tables existent dans la base de données posthog mais sync_execute interroge la base de données default.
# Vérifier que les tables existent
curl -s 'http://localhost:8123/' --data-binary "SHOW TABLES FROM posthog LIKE '%embed%'"
# Vérifier le paramètre CLICKHOUSE_DATABASE actuel
grep CLICKHOUSE_DATABASE .env
Commandes de gestion utiles
| Commande | Objet |
|---|---|
emit_signals_from_fixture |
Émettre des signaux de test depuis des fixtures JSON |
DEBUG=1 cleanup_signals --team-id N --yes |
Supprimer toutes les données de signal et terminer les workflows |
signal_pipeline_status --team-id N --wait |
Attendre que le pipeline finisse le traitement |
list_signal_reports --team-id N --signals --json |
Inspecter les résultats de groupement |
ingest_signals_json <file> --team-id N |
Ingérer des signaux pré-traités depuis JSON |
ingest_report_json <file> --team-id N |
Amorcer un rapport pré-analysé (ignorer le sandbox) |
Emplacements de fichiers clés
- Définitions du workflow du pipeline :
products/signals/backend/temporal/ - Workflow du buffer :
products/signals/backend/temporal/buffer.py - Workflow de groupement :
products/signals/backend/temporal/grouping_v2.py - Workflow du résumé du rapport :
products/signals/backend/temporal/summary.py - Implémentation du sandbox Docker :
products/tasks/backend/services/docker_sandbox.py - Dockerfiles du sandbox :
products/tasks/backend/sandbox/images/ - Polling des journaux de l'agent :
products/tasks/backend/services/custom_prompt_internals.py - Commande de nettoyage :
products/signals/backend/management/commands/cleanup_signals.py - Documentation des commandes de gestion :
products/signals/backend/management/CLAUDE.md