work_script/sso/scan_burn.py
Ching L ac1fc4421a feat(sso): add network scanning and device identification tools
- Add scan_burn.sh: fast parallel SSH port scanner for subnet 172.24.11.129-254
- Add scan_burn.py: device identification tool with MAC address mapping
- Update README.md with comprehensive documentation for new tools
- Add usage examples, configuration guide, and troubleshooting section
- Support multi-threaded concurrent scanning with configurable parameters
2026-01-07 18:14:42 +08:00

124 lines
3.7 KiB
Python

#!/usr/bin/env python3
import socket
import paramiko
import concurrent.futures
import getpass
import time
from typing import Tuple
# Define subnet to scan
# SUBNET = "172.16.3"
SUBNET = "172.24.11"
SSH_PORT = 22
MAX_WORKERS = 10
TIMEOUT = 0.5
machine_macs = {
"24:5d:fc:70:00:01": 1,
"24:5d:fc:70:00:00": 2,
"24:5d:fc:70:00:05": 3,
"24:5d:fc:70:00:06": 4,
"24:5d:fc:70:00:00": 5,
"24:5d:fc:70:01:00": 6,
"24:5d:fc:70:01:01": 7,
"24:5d:fc:70:01:02": 8,
"24:5d:fc:70:01:03": 9,
"24:5d:fc:70:01:04": 10,
"24:5d:fc:70:01:05": 11,
"24:5d:fc:70:01:06": 12,
}
def check_ssh(ip: str) -> bool:
"""Check if SSH port is open on the given IP"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(TIMEOUT)
try:
sock.connect((ip, SSH_PORT))
sock.close()
return True
except:
sock.close()
return False
def execute_ssh_command(ip: str, password: str) -> Tuple[str, str]:
"""SSH to server and execute command"""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
result = ""
error = ""
try:
client.connect(ip, port=SSH_PORT, username='ifanr', password=password, timeout=5)
stdin, stdout, stderr = client.exec_command("cat /sys/class/net/end0/address")
result = stdout.read().decode().strip()
error = stderr.read().decode().strip()
except Exception as e:
error = f"Connection error: {str(e)}"
finally:
client.close()
return (result, error)
def main():
print(f"Scanning for SSH servers on {SUBNET}.1-255...")
# Ask for password once
# password = getpass.getpass("Enter SSH password: ")
password = "ifanrcool1314"
start_time = time.time()
# First scan for open SSH ports
ip_range = [f"{SUBNET}.{i}" for i in range(1, 255)]
available_ips = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_ip = {executor.submit(check_ssh, ip): ip for ip in ip_range}
for future in concurrent.futures.as_completed(future_to_ip):
ip = future_to_ip[future]
try:
if future.result():
print(f"Found SSH server: {ip}")
available_ips.append(ip)
except Exception as e:
print(f"Error scanning {ip}: {e}")
# Then connect to each available IP and run command
results = {}
if available_ips:
print(f"\nFound {len(available_ips)} SSH servers. Connecting and getting MAC addresses...")
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_ip = {executor.submit(execute_ssh_command, ip, password): ip for ip in available_ips}
for future in concurrent.futures.as_completed(future_to_ip):
ip = future_to_ip[future]
try:
result, error = future.result()
if error:
# results[ip] = f"ERROR: {error}"
pass
else:
results[ip] = result
except Exception as e:
results[ip] = f"EXCEPTION: {str(e)}"
# Display results
elapsed_time = time.time() - start_time
print(f"\nScan completed in {elapsed_time:.2f} seconds.")
if results:
print("\nResults:")
for ip in sorted(results.keys()):
mac = results[ip].strip().lower()
print(f"[{machine_macs.get(mac, '-')}] {ip} : {results[ip]}")
else:
print("No SSH servers found.")
if __name__ == "__main__":
main()