端口扫描工具 - ThreatHunter

端口扫描工具
发布于 6 天前 阅读权限 无需登录 作者 bing 493 次浏览 来自 开源项目

使用协程并发任务

使用200协程扫描端口,大约需要35秒扫完65535
#!/user/bin python
# -*- coding:utf-8 -*- 
# Author:Bing
# Contact:[email protected]
# DateTime: 2017-01-17 19:06:06
# Description:  coding 


import sys
sys.path.append("..")

import gevent
from gevent import monkey
from gevent.pool import Pool
monkey.patch_all()
import socket,os,time

#判断是否为域名
def is_domain(domain):
    domain_regex = re.compile(
        r'(?:[A-Z0-9_](?:[A-Z0-9-_]{0,247}[A-Z0-9])?\.)+(?:[A-Z]{2,6}|[A-Z0-9-]{2,}(?<!-))\Z', re.IGNORECASE)
    return True if domain_regex.match(domain) else False

#判断是否为ip
def is_host(host):
    ip_regex = re.compile(r'(^(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])$)', re.IGNORECASE)
    return True if ip_regex.match(host) else False
        
def nessus_target_check(test):
        if is_domain(test) or is_host(test) :
                return str(test)
        else :
                return False


class Work(object):
    def __init__(self, scan_id = "", scan_target = "", scan_type = "" ,scan_args = "", back_fn = None):
        self.pool = Pool(200)
        self.timeout = 0.1

        self.scan_id = scan_id
        self.target = scan_target
        self.scan_type = scan_type
        self.args = scan_args
        self.back_fn = back_fn
        self.result = []        

    def get_port_service(self,text):
        service_path = "nmap-services.txt"            #这个地方需要修改为你自己nmap指纹的地址
        port_server = str(text)+"/tcp"
        with open(service_path,"r") as server:
            for finger in server.readlines():
                port = finger.strip().split(";")[1]
                if port == port_server:
                    fingers = str(finger.strip().split(";")[0])
                    return (port_server,fingers)
            return (port_server,"unknown")


    def port_scan(self,port):
        target = nessus_target_check(self.target)
        if target == False :
            return { "status" : 2 , "data" : "NMAP >>>> :格式错误" }

        try:
            sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sd.settimeout(self.timeout)
            try:
                sd.connect((target,int(port)))
                self.result.append(self.get_port_service(port))
            except socket.error:
                pass    
            sd.close()
        except:
            pass

    def run(self):
        res = []
        for port in range(65535):
            res.append(port)
        self.pool.map(self.port_scan,res)
        data = []
        for line in self.result:
            data.append({ "bug_name" : str(line[0]) ,"bug_summary" : str(line[1]) }) 
        result = { "status" : 1 , "data" : data , "scan_id": self.scan_id , "scan_type": "nmap" }
        self.back_fn(result)
                
def save(nmap_result):
    print nmap_result,"----------------"

t = Work(scan_target = "127.0.0.1",back_fn = save)
t.run()

使用多进程

使用200进程扫描端口,大约需要160多秒扫完65535
#!/user/bin python
# -*- coding:utf-8 -*- 
# Author:Bing
# Contact:[email protected]
# DateTime: 2017-01-17 19:06:06
# Description:  coding 

import socket
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool


remote_server_ip = "www.baidu.com"
ports = []
 
socket.setdefaulttimeout(0.5)
 
def scan_port(port):
    try:
        s = socket.socket(2,1)
        res = s.connect_ex((remote_server_ip,port))
        if res == 0: # 如果端口开启 发送 hello 获取banner
            print 'Port {}: OPEN'.format(port)
        s.close()
    except Exception,e:
        print str(e.message)
 
 
 
for i in range(1,65535):
    ports.append(i)
 
# Check what time the scan started
t1 = datetime.now()
 
 
pool = ThreadPool(processes = 200)
results = pool.map(scan_port,ports)
pool.close()
pool.join()
 
print 'Multiprocess Scanning Completed in  ', datetime.now() - t1


使用多线程

使用200线程扫描端口,大约需要2多秒扫完65535
import sys
sys.path.append("..")

import threading, socket, sys, cmd, os, Queue
from core.settings import *

#线程锁
lock = threading.Lock()

#制作扫描端口队列
def GetQueue(host):
    PortQueue = Queue.Queue()
    for port in range(1,65535):
        PortQueue.put((host,port))
    return PortQueue

class ScanThread(threading.Thread):
    def __init__(self,SingleQueue,outip):
        threading.Thread.__init__(self)
        self.setDaemon(True)            #设置后台运行,让join结束
        self.SingleQueue = SingleQueue
        self.outip = outip

    def get_port_service(self,text):
        service_path = "nmap-services.txt"            #这个地方需要修改为你自己nmap指纹的地址
        port_server = str(text)+"/tcp"
        with open(service_path,"r") as server:
            for finger in server.readlines():
                port = finger.strip().split(";")[1]
                if port == port_server:
                    fingers = str(finger.strip().split(";")[0])
                    return (port_server,fingers)
            return (port_server,"unknown")

    def Ping(self,scanIP, Port):
        global OpenPort, lock
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(0.1)
        address = (scanIP, Port)
        try:
            sock.connect(address)
        except:
            sock.close()
            return False
        sock.close()
        if lock.acquire():
            #print "IP:%s  Port:%d" % (scanIP, Port)
            self.outip.put(self.get_port_service(Port))
            lock.release()
        return True

    def run(self):
        while not self.SingleQueue.empty():
                #获取扫描队列,并扫描
            host,port = self.SingleQueue.get()
            self.Ping(host,port)


class Work(object):
        def __init__(self, scan_id = "", scan_target = "", scan_type = "" ,scan_args = "", back_fn = None):
            self.scan_id = scan_id
            self.target = scan_target
            self.scan_type = scan_type
            self.args = scan_args
            self.back_fn = back_fn
            self.result = []        

        def run(self):
            ThreadList = []
            #扫描队列
            SingleQueue = GetQueue(self.target)
            #存储结果队列
            resultQueue = Queue.Queue()
            #启动200线程并发
            for i in range(0, 200):
                t = ScanThread(SingleQueue,resultQueue)
                ThreadList.append(t)
            for t in ThreadList:
                t.start()
            for t in ThreadList:
                #需要设置线程为后台,然后没法结束;join等待结束后台线程
                t.join(0.1)

            data = []
            while not resultQueue.empty():
                line = resultQueue.get() 
                data.append({ "bug_name" : str(line[0]) ,"bug_summary" : str(line[1]) }) 
            result = { "status" : 1 , "data" : data , "scan_id": self.scan_id , "scan_type": "nmap" }
            self.back_fn(result)

def save(nmap_result):
    print nmap_result,"----------------"

t = Work(scan_target = "127.0.0.1",back_fn = save)
t.run()

多线程的用着好爽!

分享是一种美德

社区有专门的开源项目的账号,大家觉得不错的话可以 @admin 添加到社区的开源项目里面。

这个在内网里面用怕是要被发现哦

为何多线程比协程还快?并发还是golang比较快。

我多线程扫一个C段,几分钟后网就爆炸了。

感谢楼主分享

代码乍一看还行,仔细一看问题还是很大啊, 你把任务timeout 设置成了0.1 什么鬼, 协程 你用socket句柄设置timeout 意义不大,正确应该用gevent的timeout, 而且pool.map 不是协程。应该用pool.spawn


为您推荐了相关的技术文章:

  1. 漏洞检测的那些事儿 - 从理论到实战
  2. 没有钱的安全部之资产安全
  3. pythonran/Pcap_tools
  4. NSA如何定位泄密女临时工?E安全为您还原追踪全貌! | E安全
  5. 使用python-nmap不出https踩到的坑 - 屌丝归档笔记

原文链接: threathunter.org