Skip to content

Python for networking tools

If you're into automation, scripting, or software engineering, you’ve probably wondered, "Why choose this language?"

There could be a bunch of reasons. Maybe it has the right libraries for what you’re building, or it’s just what your team prefers. At the end of the day, a programming language is just a tool. Unless there’s a specific needβ€”like using C for a distributed system instead of JavaScriptβ€”the language itself usually isn’t a big deal.

The same goes for networking and cyber. But here, the community has really taken a liking to Python and we will see why πŸ€“

1. Simple and Readable Syntax

  • Ease of Learning and Use: Python's clean and straightforward syntax makes it easy for both beginners and experienced programmers to write and understand code quickly.
  • Rapid Prototyping: The simplicity allows developers to prototype and implement ideas swiftly, which is crucial in the fast-paced cybersecurity environment.

2. Extensive Standard Library

  • Built-in Modules: Python comes with a rich standard library that includes numerous modules for handling various networking protocols (e.g., HTTP, FTP, SMTP, and more).
  • Ready-to-Use Functions: These built-in functionalities reduce the need to write code from scratch, speeding up development time.

3. Rich Ecosystem of Third-Party Libraries

  • Specialized Libraries: There are numerous third-party libraries and frameworks tailored for networking and cybersecurity tasks, such as:
  • Scapy: For packet manipulation and network scanning.
  • Paramiko: For SSH2 protocol implementation, allowing secure connections.
  • Twisted: An event-driven networking engine supporting numerous protocols.
  • Nmap: Python bindings for network scanning and discovery.
  • Continuous Updates: The community actively maintains and updates these libraries, ensuring they stay relevant with emerging technologies and threats.

4. Cross-Platform Compatibility

  • Uniformity Across Systems: Python runs seamlessly on various operating systems including Windows, Linux, and macOS.
  • Flexible Deployment: This allows developers to create tools that can be easily deployed and run across different environments without significant modifications.

5. High-Level and Low-Level Programming Capabilities

  • Abstraction and Control: Python allows high-level abstraction for complex tasks while also providing the ability to perform low-level network interactions when needed.
  • Socket Programming: Developers can easily implement custom network protocols and interactions using Python's socket programming capabilities.

6. Rapid Development and Deployment

  • Fast Iterations: The combination of simple syntax and extensive libraries facilitates quick development cycles.
  • Agility in Response: In cybersecurity, the ability to quickly develop and deploy tools in response to new threats is crucial, and Python excels in this aspect.

7. Strong Community Support

  • Active Community: A large and active community contributes to a wealth of resources, tutorials, and support forums.
  • Open Source Contributions: Continuous contributions from the community enhance existing tools and foster the development of new ones.

8. Integration Capabilities

  • Interoperability: Python can easily integrate with code written in other languages like C/C++ and Java, allowing developers to leverage existing tools and libraries.
  • API Development: It’s efficient for developing and consuming APIs, facilitating communication between different software components and services.

9. Versatility and Flexibility

  • Wide Range of Applications: Beyond networking and cybersecurity, Python is used in web development, data analysis, machine learning, and more, making it a versatile tool in a developer's toolkit.
  • Customization: Developers can tailor tools to specific needs and scenarios with relative ease.

10. Educational and Research Use

  • Preferred in Academia: Many educational institutions use Python for teaching networking and cybersecurity concepts, fostering early familiarity among new professionals.
  • Research and Development: Its simplicity and power make it suitable for research purposes, allowing for experimentation and innovation.

Examples of Python in Networking and Cybersecurity Tools

  • Metasploit Framework: Widely used for penetration testing, includes modules written in Python.
  • Wireshark (with PyShark): Network protocol analyzer that can be extended using Python.
  • Volatility Framework: A memory forensics framework written in Python.

Summary of why python

Python's combination of simplicity, powerful libraries, and flexibility makes it an excellent choice for developing networking and cybersecurity tools. Its ability to facilitate quick development cycles and adapt to various platforms and requirements enables professionals to effectively address and manage complex networking and security challenges.

Build a port scanner

Let's build a mini application with a simple goal : basic port scanning in Python on a Linux target machine. Coding a mini port scanner as a Python cybersecurity project is an excellent exercise both from a learning perspective and for practical applications in real-world scenarios

Note

In a corporate environment, IT and cybersecurity teams regularly audit their network to ensure there are no unexpected open ports that could be exploited. A custom-built port scanner can be tailored to the specific needs of the company, providing flexibility that commercial tools may not offer. In many situation the traditionnal cyber tools from kali can not be installed on a corporate server but python often is 😈

First, make sure Python is installed on your Linux machine.

Set up your environment

You will need the following packages. Most of them are python native packages.

python-nmap
argparse
json
subprocess
threading
logging
queue

Do not forget to install nmap and lsof on your machine πŸ€“

Core script

For this project let's leverage the linux exisiting tools nmap and lsof then wrapp it into three main functions scan_with_nmap, get_process_details and get_process_info.

Our two mains tasks will be :

  1. Port Scanning :
    • Initializes an Nmap scanner and scans a specified range of ports on a target IP address.
    • Collects and prints information about each port's state (open or closed).
    • If a port is open and the user requests it, the script gathers details about the process using that port.
  2. Process Details (get_process_details):
    • Uses the lsof command to find which process is listening on an open port.
    • Retrieves additional details about the process, such as its working directory and the command used to start it.

Let's see the code πŸ€“

import argparse
import nmap
import json
import subprocess

def scan_with_nmap(ip, start_port, end_port, show_process):
    nm = nmap.PortScanner()
    print(f"Scanning {ip} from port {start_port} to {end_port} with nmap...")

    # Perform the scan
    nm.scan(ip, f'{start_port}-{end_port}')
    scan_data = {"host": ip, "ports": []}

    for host in nm.all_hosts():
        print(f"\nHost: {host} ({nm[host].hostname()})")
        print(f"State: {nm[host].state()}")
        for proto in nm[host].all_protocols():
            print(f"\nProtocol: {proto}")

            lport = nm[host][proto].keys()
            for port in lport:
                port_state = nm[host][proto][port]['state']
                print(f"Port: {port}\tState: {port_state}")
                port_info = {"port": port, "state": port_state}

                if port_state == 'open' and show_process:
                    process_info = get_process_details(port)
                    port_info["process"] = process_info

                scan_data["ports"].append(port_info)

    return scan_data

def get_process_details(port):
    # Run 'lsof' to find the process using the open port
    lsof_command = f"sudo lsof -iTCP:{port} -sTCP:LISTEN"
    process = subprocess.run(lsof_command, shell=True, capture_output=True, text=True)
    process_details = {}

    if process.stdout:
        lines = process.stdout.splitlines()
        for line in lines[1:]:  # Skip the header line
            details = line.split()
            pid = details[1]
            process_name = details[0]
            process_info = get_process_info(pid)
            process_details = {
                "pid": pid,
                "name": process_name,
                "location": process_info.get("cwd", "Unknown"),
                "command": process_info.get("cmdline", "Unknown")
            }
            print(f"  [*] PID: {pid}, Process: {process_name}")
            print(f"  [*] Location: {process_info['cwd']}")
            print(f"  [*] Command: {process_info['cmdline']}")
    else:
        print("  [-] No process information found.")

    return process_details

def get_process_info(pid):
    """Retrieve process information from /proc filesystem."""
    try:
        proc_info = {}
        with open(f"/proc/{pid}/cwd") as f:
            proc_info["cwd"] = f.read().strip()
        with open(f"/proc/{pid}/cmdline") as f:
            proc_info["cmdline"] = f.read().strip().replace('\x00', ' ')
        return proc_info
    except Exception as e:
        return {"cwd": "Unknown", "cmdline": "Unknown"}

Add arguments

Let's add some arguments in order to increase the user interaction and save the results in a json file like pro πŸ₯·πŸΌ

def main():
    # Argument parsing
    parser = argparse.ArgumentParser(description="A simple port scanner using nmap with additional process information.")
    parser.add_argument("-t", "--target", default="127.0.0.1", help="Target IP address to scan (default: localhost).")
    parser.add_argument("-sp", "--start-port", type=int, default=1000, help="Starting port number for the scan (default: 1000).")
    parser.add_argument("-ep", "--end-port", type=int, default=8000, help="Ending port number for the scan (default: 8000).")
    parser.add_argument("-p", "--process", action="store_true", help="Show process information for open ports.")
    parser.add_argument("-o", "--output", default="scan_results.json", help="Output JSON file for scan results (default: scan_results.json).")

    args = parser.parse_args()

    # Run the scan with the provided arguments
    scan_results = scan_with_nmap(args.target, args.start_port, args.end_port, args.process)

    # Write the results to a JSON file
    with open(args.output, 'w') as json_file:
        json.dump(scan_results, json_file, indent=4)
        print(f"\nScan results have been saved to {args.output}")

if __name__ == "__main__":
    main()

Let's add multithread

No as you seen the script is not very fast, because scanning network packets it's not an easy task for CPUs and RAM, let's see how we can improve this by addind multithreading πŸ₯·πŸΌ

Note

A thread is an entity within a process that can be scheduled for execution. Also, it is the smallest unit of processing that can be performed in an OS (Operating System). In simple words, a thread is a sequence of such instructions within a program that can be executed independently of other code. For simplicity, you can assume that a thread is simply a subset of a process! More information on Geekforgeeks

We also adding more logs in this refactored function :

import argparse
import nmap
import json
import subprocess
import threading
import logging
from queue import Queue

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

def worker(queue, ip, show_process, results, thread_id):
    logging.info(f"Thread-{thread_id} started.")
    while not queue.empty():
        port = queue.get()
        logging.info(f"Thread-{thread_id} scanning port {port}.")
        scan_data = scan_port(ip, port, show_process)
        if scan_data:
            results.append(scan_data)
        queue.task_done()
    logging.info(f"Thread-{thread_id} finished.")

def scan_port(ip, port, show_process):
    nm = nmap.PortScanner()
    nm.scan(ip, str(port))

    for host in nm.all_hosts():
        for proto in nm[host].all_protocols():
            lport = nm[host][proto].keys()
            for p in lport:
                port_state = nm[host][proto][p]['state']
                if p == port:
                    port_info = {"port": port, "state": port_state}
                    if port_state == 'open' and show_process:
                        process_info = get_process_details(port)
                        port_info["process"] = process_info
                    logging.info(f"Port {port} on {ip} is {port_state}.")
                    return port_info
    return None

def get_process_details(port):
    lsof_command = f"sudo lsof -iTCP:{port} -sTCP:LISTEN"
    try:
        process = subprocess.run(lsof_command, shell=True, capture_output=True, text=True)
    except Exception as e:
        logging.error(f"Error running lsof on port {port}: {e}")
        return {}

    process_details = {}
    if process.stdout:
        lines = process.stdout.splitlines()
        logging.info(f"lsof output for port {port}:\n{process.stdout}")

        if len(lines) > 1:
            for line in lines[1:]:  # Skip the header line
                details = line.split()
                if len(details) < 2:
                    logging.warning(f"Unexpected lsof line format: {line}")
                    continue
                pid = details[1]
                process_name = details[0]
                process_info = get_process_info(pid)
                process_details = {
                    "pid": pid,
                    "name": process_name,
                    "location": process_info.get("cwd", "Unknown"),
                    "command": process_info.get("cmdline", "Unknown")
                }
                logging.info(f"Found process {process_name} with PID {pid} on port {port}.")
        else:
            logging.info(f"No processes are listening on port {port}.")
    else:
        logging.info(f"No process information found for port {port}.")
    return process_details


def get_process_info(pid):
    """Retrieve process information from /proc filesystem."""
    try:
        proc_info = {}
        with open(f"/proc/{pid}/cwd") as f:
            proc_info["cwd"] = f.read().strip()
        with open(f"/proc/{pid}/cmdline") as f:
            proc_info["cmdline"] = f.read().strip().replace('\x00', ' ')
        return proc_info
    except Exception as e:
        logging.error(f"Failed to retrieve process information for PID {pid}: {e}")
        return {"cwd": "Unknown", "cmdline": "Unknown"}

def main():
    # Argument parsing
    parser = argparse.ArgumentParser(description="A simple port scanner using nmap with additional process information.")
    parser.add_argument("-t", "--target", default="127.0.0.1", help="Target IP address to scan (default: localhost).")
    parser.add_argument("-sp", "--start-port", type=int, default=1000, help="Starting port number for the scan (default: 1000).")
    parser.add_argument("-ep", "--end-port", type=int, default=8000, help="Ending port number for the scan (default: 8000).")
    parser.add_argument("-p", "--process", action="store_true", help="Show process information for open ports.")
    parser.add_argument("-o", "--output", default="scan_results.json", help="Output JSON file for scan results (default: scan_results.json).")
    parser.add_argument("-th", "--threads", type=int, default=10, help="Number of threads to use for scanning (default: 10).")

    args = parser.parse_args()

    logging.info(f"Starting scan on {args.target} from port {args.start_port} to {args.end_port} with {args.threads} threads.")

    # Queue to hold the ports to scan
    port_queue = Queue()

    # Enqueue ports
    for port in range(args.start_port, args.end_port + 1):
        port_queue.put(port)

    # List to hold scan results
    results = []

    # Create worker threads
    threads = []
    for i in range(args.threads):
        thread = threading.Thread(target=worker, args=(port_queue, args.target, args.process, results, i+1))
        threads.append(thread)
        thread.start()

    # Wait for all threads to finish
    for thread in threads:
        thread.join()

    logging.info("All threads have finished scanning.")

    # Write the results to a JSON file
    with open(args.output, 'w') as json_file:
        json.dump({"host": args.target, "ports": results}, json_file, indent=4)
        logging.info(f"Scan results have been saved to {args.output}")

if __name__ == "__main__":
    main()

You can find more information on the official documentation

When you run the script, you will see output similar to this:

2024-09-02 12:00:00 [INFO] Starting scan on 192.168.1.1 from port 2000 to 3000 with 20 threads.
2024-09-02 12:00:01 [INFO] Thread-1 started.
2024-09-02 12:00:01 [INFO] Thread-1 scanning port 2000.
2024-09-02 12:00:01 [INFO] Port 2000 on 192.168.1.1 is closed.
2024-09-02 12:00:02 [INFO] Thread-2 started.
...
2024-09-02 12:00:10 [INFO] Thread-20 scanning port 3000.
2024-09-02 12:00:10 [INFO] Port 3000 on 192.168.1.1 is open.
2024-09-02 12:00:10 [INFO] Found process nginx with PID 1024 on port 3000.
2024-09-02 12:00:11 [INFO] All threads have finished scanning.
2024-09-02 12:00:11 [INFO] Scan results have been saved to scan_results.json

For the demo let's run a simple python server with the command below :

python3 -m http.server

You shoult see this kind of output :

Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...

Then run the script above and see the saved json :

{
    "host": "0.0.0.0",
    "ports": [
        {
            "port": 8001,
            "state": "closed"
        },
        {
            "port": 8000,
            "state": "open",
            "process": {
                "pid": "1856829",
                "name": "python3",
                "location": "Unknown",
                "command": "Unknown"
            }
        }
    ]
}

Conclusion

In summary, we developed a mini project directly transferable to real-world scenarios in a company, from network security auditing and continuous monitoring to ethical hacking and incident response. Hope this project have strengthened your Python programming skills and now you feel more confident with networking programming