Building a Custom TCP Monitor with PythonMonitoring TCP connections on your network lets you detect failures, diagnose latency, understand traffic patterns, and ensure services are available. This guide walks through building a practical, extensible TCP monitor in Python: design goals, required tools, implementation steps, and suggestions for improvement and deployment.
Goals and scope
- Primary goal: create a TCP monitor that periodically checks reachability and basic performance (connection success, handshake time, simple latency) of specified TCP services (IP/hostname + port).
- Secondary goals: log results, alert on failures, support concurrent checks, and be easy to extend (e.g., TLS checks, banner capture, health endpoints).
- Not covered: deep packet inspection, full protocol analysis, or replacing full-featured commercial network monitoring suites.
Architecture overview
Core components:
- Target list — hosts/ports to check (static file, database, or API).
- Checker — establishes TCP connections and measures metrics.
- Scheduler — controls check frequency per target.
- Storage — writes results to local files, time-series DB, or stdout.
- Alerting — simple email/Slack/webhook on failures.
- Concurrency — use threads, asyncio, or multiprocessing to scale.
For this article we’ll implement a working reference using Python 3.11+, asyncio for concurrency, and plain TCP sockets via asyncio.open_connection. The reference will:
- Read targets from a YAML/JSON file.
- Perform periodic checks with configurable timeout.
- Measure handshake time (time to open TCP connection).
- Optionally perform a simple application-level check (send/receive, TLS optional).
- Write structured JSON lines to a log file.
- Emit basic alerts via console (you can add webhook/email later).
Required libraries and environment
- Python 3.11+ (asyncio improvements; works on 3.8+ with minor adjustments)
- pip packages:
- pyyaml (for YAML target config)
- aiohttp (optional, for webhook alerts)
- certifi and ssl (standard lib) for TLS checks if needed
Install:
python -m pip install pyyaml aiohttp
Target configuration format
Use a YAML file (targets.yaml). Each target includes an id, host, port, frequency (seconds), timeout (seconds), optional tls flag, and optional probe payload/expectation.
Example targets.yaml:
targets: - id: web-01 host: example.com port: 80 freq: 30 timeout: 5 tls: false - id: https-01 host: example.com port: 443 freq: 30 timeout: 5 tls: true - id: db-redis host: 10.0.0.5 port: 6379 freq: 15 timeout: 3 tls: false probe: send: "PING " expect: "+PONG"
Core implementation (asyncio-based)
The following example implements the monitor with structured logging and concurrency via asyncio tasks.
# tcp_monitor.py import asyncio import json import ssl import time from dataclasses import dataclass from typing import Optional import yaml import aiohttp LOG_FILE = "tcp_monitor.log" ALERT_WEBHOOK = None # set to webhook URL to enable @dataclass class Target: id: str host: str port: int freq: int = 30 timeout: int = 5 tls: bool = False probe_send: Optional[bytes] = None probe_expect: Optional[bytes] = None async def send_alert(session, target_id, msg): if not ALERT_WEBHOOK: print(f"ALERT {target_id}: {msg}") return payload = {"target": target_id, "message": msg} try: async with session.post(ALERT_WEBHOOK, json=payload, timeout=5) as resp: await resp.text() except Exception as e: print("alert send failed:", e) def load_targets(path="targets.yaml"): with open(path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) out = [] for t in cfg.get("targets", []): probe_send = None probe_expect = None if "probe" in t: if "send" in t["probe"]: probe_send = t["probe"]["send"].encode() if "expect" in t["probe"]: probe_expect = t["probe"]["expect"].encode() out.append(Target( id=t["id"], host=t["host"], port=int(t["port"]), freq=int(t.get("freq", 30)), timeout=int(t.get("timeout", 5)), tls=bool(t.get("tls", False)), probe_send=probe_send, probe_expect=probe_expect )) return out async def check_target(target: Target, session): start = time.monotonic() result = { "time": time.time(), "target": target.id, "host": target.host, "port": target.port, "success": False, "error": None, "connect_time_ms": None, "rtt_ms": None, "tls": target.tls, } try: ssl_context = None if target.tls: ssl_context = ssl.create_default_context() conn_start = time.monotonic() reader, writer = await asyncio.wait_for( asyncio.open_connection(target.host, target.port, ssl=ssl_context), timeout=target.timeout ) conn_end = time.monotonic() result["connect_time_ms"] = int((conn_end - conn_start) * 1000) # simple probe: send and read if target.probe_send: writer.write(target.probe_send) await writer.drain() probe_start = time.monotonic() data = await asyncio.wait_for(reader.read(4096), timeout=target.timeout) probe_end = time.monotonic() result["rtt_ms"] = int((probe_end - probe_start) * 1000) if target.probe_expect and target.probe_expect not in data: result["error"] = "unexpected_probe_response" writer.close() await writer.wait_closed() # log and alert below raise RuntimeError("unexpected probe response") writer.close() await writer.wait_closed() result["success"] = result["error"] is None except Exception as e: result["error"] = str(e) finally: # write JSON line to log with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(result) + " ") # alert if failure if not result["success"]: await send_alert(session, target.id, result["error"]) return result async def monitor_loop(target: Target): async with aiohttp.ClientSession() as session: while True: await check_target(target, session) await asyncio.sleep(target.freq) async def main(): targets = load_targets("targets.yaml") tasks = [asyncio.create_task(monitor_loop(t)) for t in targets] await asyncio.gather(*tasks) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("Stopped")
Deployment and scaling tips
- Concurrency model: asyncio suits many I/O-bound TCP checks. For extremely high target counts (thousands), consider batching, connection pooling, or running multiple worker processes to avoid one event loop becoming a bottleneck.
- Storage: for production, send metrics to time-series DBs (Prometheus, InfluxDB, Timescale) or log aggregator (ELK, Loki). Write directly in Prometheus exporter format or push via a gateway.
- Alerting: integrate with PagerDuty, Slack, or email via webhook/http APIs. Implement alert deduplication and recovery notices.
- TLS verification: current example uses default SSL context. For strict verification use hostname checks and custom CA bundles where needed.
- Security: run monitors from trusted networks; be mindful of scanning policies and rate limits on external services. Respect robots/terms of service.
- Tests: add unit tests for parsing, and integration tests using test servers (e.g., asyncio.start_server) and tools like tcpreplay for replaying traffic.
Enhancements and extensions
- Prometheus exporter: collect metrics (success_rate, avg_connect_time_ms) and serve them via HTTP for Prometheus scraping.
- Active latency measurement: measure full application response times for HTTP with aiohttp or use native protocol clients (MySQL, Redis) to run meaningful health checks.
- Connection reuse: for some protocols, reusing connections or pooling reduces overhead and gives a better sense of application responsiveness.
- Geo-distributed checks: run monitors from multiple locations (cloud regions, edge runners) to detect routing or regional outages.
- UI/dashboard: small web UI to view recent failures and latency graphs using Grafana or a simple Flask/FastAPI app.
- Rate limiting and backoff: on repeated failures, back off checks to avoid hammering a failing service.
Example: Prometheus exporter snippet
A minimal idea (using prometheus_client):
from prometheus_client import start_http_server, Gauge g_connect_ms = Gauge("tcp_connect_ms", "TCP connect time ms", ["target"]) g_success = Gauge("tcp_success", "TCP success (1/0)", ["target"]) # call g_connect_ms.labels(target=...).set(value) after each check # call g_success.labels(target=...).set(1 or 0) start_http_server(8000)
Troubleshooting common issues
- DNS resolution delays: if open_connection stalls on DNS, resolve hostnames beforehand with asyncio.get_running_loop().getaddrinfo or use explicit IPs.
- False positives: transient glitches cause alerts — add brief retry logic before alerting or require N consecutive failures.
- Permissions/network policy: running in containers might need NET_ADMIN or specific network access to reach internal services.
Conclusion
This guide gives a practical, extendable asyncio-based TCP monitor in Python suitable for development, small ops teams, or as a base for production tooling. Start with the sample script and targets file, then add metric export, alerting integrations, and deployment considerations as your monitoring needs grow.
Leave a Reply