Manage Bor
import subprocess
import re
import json
import os
def remove_peer(enode):
command = f'bor attach /var/lib/bor/bor.ipc --exec "admin.removePeer(\'{enode}\')"'
try:
subprocess.run(command, shell=True, check=True)
print(f"Peer removed: {enode}")
except subprocess.CalledProcessError as e:
print(f"Error removing peer {enode}: {e}")
def block_ip(ip_address):
block_command = f'sudo iptables -A INPUT -s {ip_address} -j DROP'
os.system(block_command)
print(f"Traffic from IP {ip_address} blocked using iptables")
get_peers_command = 'bor attach /var/lib/bor/bor.ipc --exec "admin.peers"'
def version_compare(v1, v2):
"""
Compare version strings v1 and v2.
Returns True if v1 is less than v2.
"""
return v1 < v2
try:
result = subprocess.check_output(get_peers_command, shell=True, text=True)
result = re.sub(r'([{,]\s*)([A-Za-z_][a-zA-Z_0-9]*)(\s*:)','\\1"\\2"\\3', result)
peers = json.loads(result)
for peer in peers:
caps = peer.get('caps', [])
name_version = peer.get('name', '').split('/')[1] if '/' in peer.get('name', '') else ''
ip_match = re.search(r'@(\d+\.\d+\.\d+\.\d+):30303', peer['enode'])
if "snap/1" not in caps or version_compare(name_version, 'v1'):
remove_peer(peer['enode'])
if ip_match:
ip_address = ip_match.group(1)
block_ip(ip_address)
else:
print(f"Healthy peer: {peer['enode']}")
except subprocess.CalledProcessError as e:
print(f"Error getting the list of peers: {e}")
Last updated