Skip to content

Instantly share code, notes, and snippets.

@tenuki
Last active March 1, 2018 13:33
Show Gist options
  • Save tenuki/a95883cdbbcc65e4f5a7bb5788229463 to your computer and use it in GitHub Desktop.
Save tenuki/a95883cdbbcc65e4f5a7bb5788229463 to your computer and use it in GitHub Desktop.
multi target ping based on impacket's standard ping
#!/usr/bin/env python
# Copyright (c) 2003 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Multiple target ICMP ping.
#
# This implementation of ping uses the ICMP echo and echo-reply packets
# to check the status of a host. If the remote host is up, it should reply
# to the echo probe with an echo-reply packet.
# Note that this isn't a definite test, as in the case the remote host is up
# but refuses to reply the probes.
# Also note that the user must have special access to be able to open a raw
# socket, which this program requires.
#
# Authors:
# Gerardo Richarte <gera@coresecurity.com>
# Javier Kohen <jkohen@coresecurity.com>
#
# Reference for:
# ImpactPacket: IP, ICMP, DATA.
# ImpactDecoder.
import Queue
import select
import socket
import sys
import threading
import time
from impacket import ImpactDecoder, ImpactPacket
class Ping:
def __init__(self, src, dst, data="A" * 156):
self.src, self.dst = src, dst
# Create a new IP packet and set its source and destination addresses.
self.ip = ImpactPacket.IP()
self.ip.set_ip_src(src)
self.ip.set_ip_dst(dst)
# Create a new ICMP packet of type ECHO.
self.icmp = ImpactPacket.ICMP()
self.icmp.set_icmp_type(self.icmp.ICMP_ECHO)
# Include a 156-character long payload inside the ICMP packet.
self.icmp.contains(ImpactPacket.Data(data))
# Have the IP packet contain the ICMP packet (along with its payload).
self.ip.contains(self.icmp)
def generate(self, seq_id=0):
seq_id = seq_id
while 1:
# Give the ICMP packet the next ID in the sequence.
seq_id += 1
self.icmp.set_icmp_id(seq_id)
# Calculate its checksum.
self.icmp.set_icmp_cksum(0)
self.icmp.auto_checksum = 1
# Send it to the target host.
yield self.ip.get_packet(), (self.dst, 0)
def print_queue(q):
ipdec = ImpactDecoder.IPDecoder()
reply = q.get()
while reply is not None:
# Use ImpactDecoder to reconstruct the packet hierarchy.
try:
rip = ipdec.decode(reply)
except Exception:
# fix error in decode and remove this try
continue
# Extract the ICMP packet from its container (the IP packet).
ricmp = rip.child()
if ImpactPacket.ICMP.ICMP_ECHOREPLY == ricmp.get_icmp_type():
print "Ping reply for sequence #%d / %s -> %s" % (
ricmp.get_icmp_id(), rip.get_ip_src(), rip.get_ip_dst())
reply = q.get()
def receive_thread(q, e, s):
while not e.is_set():
if s in select.select([s], [], [], 1)[0]:
q.put(s.recvfrom(2000)[0])
q.put(None)
if __name__=="__main__":
if len(sys.argv) < 3:
print "Use: %s <src ip> <dst ip> [ <dst ip2> .. ]" % sys.argv[0]
sys.exit(1)
# Open a raw socket. Special permissions are usually required.
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
generators = []
src = sys.argv[1]
for dst in sys.argv[2:]:
generators.append( Ping(src, dst).generate() )
quit_ev = threading.Event()
queue = Queue.Queue()
t = threading.Thread(target=print_queue, args=(queue,))
recv_t = threading.Thread(target=receive_thread, args=(queue, quit_ev, s))
t.start()
recv_t.start()
try:
while 1:
for g in generators:
s.sendto(*next(g))
time.sleep(1)
except KeyboardInterrupt:
print('user interrupt. ok')
finally:
quit_ev.set()
recv_t.join()
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment