#!/usr/bin/python # (C) 2010 by Tomasz bla Fortuna # License: GNU GPLv3+ from socket import socket, inet_aton, inet_ntoa import struct class Ethernet: ## # HELPERS ## @staticmethod def __conv_mac(s, joiner = ":"): out = [] for byte in s: out += ["%02X" % ord(byte)] return joiner.join(out) def __parse_mac(self, s): return s.replace(":", "").decode('hex') def __get_socket_mac(self, sock): (i, p, pt, hwt, hwa) = sock.getsockname() return hwa def __socket_debug(self, sock): data = sock.getsockname() (i, p, pt, hwt, hwa) = data hwa = Ethernet.__conv_mac(hwa) print "if:%s proto:0x%04X packet_type:%d hwtype:%d hwaddr:%s" % (i,p,pt,hwt,hwa) def __parse_frame(self, frame): dst, src, protocol = struct.unpack('>6s6sh', frame[:14]) data = frame[14:] return (dst, src, protocol, data) ## # Interface ## def __init__(self, interface='eth0', spoof_mac = None, protocol=0x55aa): from socket import AF_PACKET, SOCK_RAW print "Creating RAW socket and binding to the interface", interface self.s = socket(AF_PACKET, SOCK_RAW, protocol) self.s.bind((interface, protocol)) if not spoof_mac: self.src_mac = self.__get_socket_mac(self.s) else: self.src_mac = spoof_mac self.protocol = protocol self.__socket_debug(self.s) def send(self, dst, data): dst_bin = self.__parse_mac(dst) frame = struct.pack('>6s6sh', dst_bin, self.src_mac, self.protocol) + data self.s.send(frame) def recv(self): # Should receive one whole packet frame = self.s.recv(8192) dst, src, protocol, data = self.__parse_frame(frame) if dst != self.src_mac: return "ETH:Destination MAC doesn't match" if protocol != self.protocol: return "ETH:Protocol doesn't match" return dict(src_mac=self.__conv_mac(src), eth_raw=data) class IP: ## # HELPERS ## @staticmethod def checksum(data): l = len(data) s = 0 for i in range(0, l, 2): part = data[i:i+2] val = int(part.encode('hex'), 16) s = (s + val) % 0xFFFF s = ~s & 0xFFFF return struct.pack('>H', s) def __decode_ip(self, ip): return inet_aton(ip) def __encode_ip(self, ip): return inet_ntoa(ip) def __init__(self, address="127.0.0.1", interface='eth0', protocol=17): # 17=UDP self.src_ip = self.__decode_ip(address) self.protocol = protocol self.net = Ethernet(interface=interface, protocol=0x0800) # socket.IPPROTO_IP is 0? :S def send(self, dst_ip, dst_mac, data): length = len(data) + 20 # Size of packet (20 is the header) dst = self.__decode_ip(dst_ip) frame = "\x45" # Version + header size frame += "\x00" # Differentiated services frame += struct.pack(">H", length) # Length of datagram frame += "\x00\x00" # Fragment identification frame += "\x40\x00" # Flags + fragment offset (Do not fragment, more packets follow) frame += "\x40" # Time To Live frame += chr(self.protocol) # Protocol (default: UDP) frame += "\x00\x00" # Header checksum frame += self.src_ip frame += dst checksum = self.checksum(frame) ready = frame[:10] + checksum + frame[12:] ready += data self.net.send(dst_mac, ready) def recv(self): frame = self.net.recv() if isinstance(frame, str): return frame header = frame['eth_raw'][:20] ip_data = frame['eth_raw'][20:] intro, length, ident, flags, ttl, protocol, checksum, src_ip, dst_ip = \ struct.unpack(">2sHHHBBH4s4s", header) if intro[0] != '\x45': return "IP:Packet version/header size not implemented" if length < 20: return "IP:Length invalid" if protocol != self.protocol: return "IP:Encapsulated data is not UDP, dropping" if dst_ip != self.src_ip: return "IP:Destination IP doesn't match, dropping (%s)" % self.__encode_ip(dst_ip) frame['ttl'] = ttl frame['ip_checksum'] = checksum frame['ip_length'] = length frame['src_ip'] = self.__encode_ip(src_ip) frame['dst_ip'] = self.__encode_ip(dst_ip) frame['ip_raw'] = ip_data return frame class UDP(): def __init__(self, src_ip, src_port, interface='eth0'): self.net = IP(src_ip, interface=interface) self.src_port = src_port def send(self, dst_ip, dst_mac, dst_port, data): l = len(data) + 8 chk = 0x0000 # Checksumming disabled frame = struct.pack(">hhhh", self.src_port, dst_port, l, chk) frame += data self.net.send(dst_ip, dst_mac, frame) def recv(self): frame = self.net.recv() if isinstance(frame, str): return frame header = frame['ip_raw'][:8] content = frame['ip_raw'][8:] src_port, dst_port, length, checksum = struct.unpack(">hhhh", header) if dst_port != self.src_port: return "UDP:Destination port doesn't match, dropping" frame['dst_port'] = dst_port frame['src_port'] = src_port frame['data_length'] = length - 8 frame['udp_checksum'] = checksum frame['data'] = content[:length-8] return frame def show_packet(pkt, debug=True): if isinstance(pkt, str): if debug: print "Error:", pkt else: if debug: print "From: %(src_mac)s(%(src_ip)s) Checksum:%(ip_checksum)04X TTL:%(ttl)d LENGTH:%(ip_length)d/%(data_length)d" % pkt print pkt['data'].encode('hex') print pkt['data'], if __name__ == "__main__": import sys, os, select if len(sys.argv) != 8: print "Usage: ./con.py " print "debug can be 0 or 1" print "Ex: ./con.py 192.168.0.2 80 10.10.10.4 999 eth0 00:0a:5e:02:70:a8" sys.exit(0) src_ip = sys.argv[1] src_port = int(sys.argv[2]) dst_ip = sys.argv[3] dst_port = int(sys.argv[4]) interface = sys.argv[5] gateway_mac = sys.argv[6] if sys.argv[7] == "1": debug = True else: debug = False udp = UDP(src_ip, src_port, interface) socket = udp.net.net.s stdin = sys.stdin while True: (rlist, wlist, xlist) = select.select([socket, stdin], [], []) if socket in rlist: a = udp.recv() show_packet(a, debug=debug) if stdin in rlist: data = stdin.readline() udp.send(dst_ip, gateway_mac, dst_port, data) # vim:expandtab:ts=4