ハッカーへのマイルストーン

ハッカーになるために、日々学んだことを記し、マイルストーンとしていきたい。

セキュリティプログラミング Day4

3日坊主からの脱却の4日目

第3章 ネットワーク:rawソケットと盗聴

この章はネットワークスニッファーについて。有名どころだとwiresharkとかscapyで通信を監視できるけど、ネットワーク通信を観測したりでコードしたりするスニッファーを即席で作ってみようって章。

低レベルネットワーク情報にアクセスするためにrawソケットを使うけども、IPレイヤーより低レイヤーのイーサネットの情報は「詳細 イーサネット 第2版」買ってって。 O'reillyってなんでこんな詳しいんやろ。 O'reillyの本全部理解できたら最強。。。ゼロデイには対応できんかな。。。でもある程度強くなれるな。精進。

UDPを使ってホスト発見

標的ネットワークで動作中のホストを発見するUDPベースのスニッファーの作成。

ほとんどのOSでは、特定のIPアドレスでホストが稼働しているか判断するために、閉じているUDPポートにパケットが届いたときの処理方法が共通しているらしい。

著作権大丈夫かなってくらい本書そのまましか書いてないけど、引っかかったら即停止してくださいね。

通常、UDPデータグラムをホスト上の閉じたポートに送る。→当該ポートに到達できないことを示すICMPメッセージが返信される。→もしUDPデータグラムに対する応答を受信できなければ、ホストは存在しないと考えられる。→逆にICMPメッセージを受信できるならばホストが稼働していることを意味する。

UDPデータグラムってのはUDPパケットと同義でフラグメンテーションが行われていない状態のデータの転送単位を指す。 フラグメンテーションはコンピュータ上のメモリの管理上の一単位が、内部断片化や外部断片化でそのままでは有効利用できない状態になることを言う。

詳しくはwikiで調べて

より多くの稼働中のホストを調べるには、消去法的に使われていないか使われている可能性の低いUDPポートを選択することが大切だと。

UDP???

あれ、てかなんでUDPなの? って思ったそこのあなた!僕も思いましたよ。

本書に書いてあるけど理由は、サブネット全体にメッセージを送り、そのICMP レスポンスを待つことに関して、オーバーヘッドが存在しないから、だって。 オーバーヘッドは、通信で言うと送りたいもの以外についてくる制御用のデータとそれを処理、伝送するためにかかる余計な負荷、時間のこと。 サブネットってよくきくけど、これはネットワークを分割して小さなネットワークに分けた一つ一つのことを言う。 サブネットマスクってよく聞くけど、これはIPアドレスにはクラスっていう考え方があって、でも現在のIPアドレスをクラスによってわけることは無理なので、代わりにサブネットマスクを使ってIPアドレス内のネットワーク部とホスト部を分類してるってやつ。ネットワーク部は1でホスト部は0 IPアドレスだけじゃなくてネットワークアドレスやブロードキャストアドレスにも適用できるし、2進法じゃなくてプレフィックスっていう書き方もできて0を省略して書けるらしい!

とにかく無駄な処理や時間がないってことでUDPを選びましたってことですねー。

これから説明するスキャナーの拡張性としては発見したすべてのホストに対してnmapによる完全なポートスキャンを開始させる処理を追加して、ポートスキャンすることでへっけんした各ホストについてネットワーク経由の攻撃が実行可能かを判断できるようになる。

パケット盗聴(Windows, Linux

windowslinuxではrawソケットへのアクセス方法が若干異なるので、動作しているプラットフォームを判定することにする。 Windowsではネットワークインターフェースのプロミスキャスモードを有効にするために、ioctlと呼ばれるソケット入出力制御用の仕組みを通じて、いくつかのフラグを追加で設定する必要がある。

ioctl(input/output control: 入出力制御) 主にUNIXライクなオペレーティングシステム上で、アプリケーションがデバイスドライバを制御したり、デバイスドライバと通常のデータの読み書きの流れの外で通信するために用意されたシステムコールのこと(wikiより) ユーザースペースのプログラムがカーネルモードのコンポーネントとやり取りするための仕組み。

結構重要な仕組みですね。

コードを見てみよう

sniffer.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket
import os

# リッスンするホストのIPアドレス
host = "192.168.0.100"

# rawソケットを作成しパブリックなインタフェースにバインド
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

# キャプチャー結果にIPヘッダーを含めるように指定
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# Windowsの場合はioctlを使用してプロミスキャスモードを有効化
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# 単一パケットの読み込み
print sniffer.recvfrom(65565)

# Windowsの場合はプロミスキャスモードを無効化
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

IPPROTO_IPはrawソケットを作成するのに必要。 sniffer.bindのところまででrawソケットを作成しパブリックなインターフェースにバインドしている。

ドキュメントのスクリプトと一緒ですね。 気を付けるべきことは、windowsでもlinuxでもプロミスキャスモードを使うときはadminもしくはroot権限がないといけない。プロミスキャスモードでは、宛先が自ホストではないパケットも含め、ネットワークカードで受信している全パケットの盗聴が可能になる。すご。

あとは書いてある通り。

IPレイヤーのデコード

IPヘッダー部分には、プロトコルタイプ(TCP,UDP,ICMP)、送信元IPアドレスや宛先IPアドレスといった情報が入っている。そのためにデコードしてみる。

sniffer_ip_header_decode.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket

import os
import struct
from ctypes import *

# リッスンするホストのIPアドレス
host   = "192.168.0.100"

# IPヘッダー
class IP(Structure):
    _fields_ = [
        ("ihl",           c_uint8, 4),
        ("version",       c_uint8, 4),
        ("tos",           c_uint8),
        ("len",           c_uint16),
        ("id",            c_uint16),
        ("offset",        c_uint16),
        ("ttl",           c_uint8),
        ("protocol_num",  c_uint8),
        ("sum",           c_uint16),
        ("src",           c_uint32),
        ("dst",           c_uint32)
    ]

まずは型の定義、ctypesを使ってCでデータ型を定義している。 , 4はビットフィールドが4ビットであることを意味している。 ビットフィールド:ブーリアン型のフラグをコンパクトなビットの並びとして格納する手法。

この時受信したバッファーの最初の20バイトを扱いやすいようにIPヘッダーにマップしている。

    def __new__(self, socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):

        # プロトコルの定数値を名称にマッピング
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 可読なIPアドレスの値に変換
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

        # 可読なプロトコル名称に変換
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

newメソッドは、生のバッファーを受け取り、それをもとに構造体を形成している。 initメソッドは、利用されているプロトコルIPアドレスを、人間が読みやすい形式にしている。 このときすでにnewメソッドによるバッファーの処理は完了している。

# 前の例と同様の処理
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

ここはスニッファーの部分

try:

    while True:

        # パケットの読み込み
        raw_buffer = sniffer.recvfrom(65565)[0]

        # バッファーの最初の20バイトからIP構造体を作成
        ip_header = IP(raw_buffer[0:20])

        # 検出されたプロトコルとホストを出力
        print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)

# Ctrl-Cを処理
except KeyboardInterrupt:

    # Windowsの場合はプロミスキャスモードを無効化
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

ここは新たに作られたIP構造体を使い、継続的にパケットを読んで解析するロジックを組み込んでいる。

パケットの読み込み→最初の20バイトをIP構造体の初期化処理に渡す→キャプチャーされた情報を単に出力する。

で、最後はちゃんとプロミスキャスモードを無効化している。(Windowsの場合)

実行するとパケットが読み取れるはず。

ICMPのデコード

今までのスキャナーは閉じたポートに対してUDPデータグラムを送信することでICMPレスポンスを発生させていた。 じゃあ次は、ICMPレスポンスをデコードしてみる。

ICMPメッセージの内容はタイプ、コード、チェックサムという3つの共通のフィールドを持っている。 タイプとコードのフィールドは受信したICMPメッセージの種類を表していて、これでICMPメッセージを正確にデコードする方法がわかる。

スキャナーにはタイプがあるらしく、これは3らしい??? タイプ3のICMPメッセージは宛先到達不可能クラスに該当するんだって。 で、コードも3なので、こそのときはポート到達不可能エラーが発生したことを意味するらしい。

http://www.infraexpert.com/info/5.0adsl.htm

一覧表はここにある。

宛先到達不可能メッセージの場合、最初の8ビットがタイプで今回はタイプ3、次の8ビットにはICMPのコード3が格納されている。残り16ビットはヘッダーチェックサム。 このICMPメッセージがホストから返信される際、その元となったメッセージのIPヘッダーとデータ部分の最初の8バイトがICMPメッセージに含まれる。 そしてもとのデータグラムの先頭8バイトと照合することで、スキャナーにより生成されたICMPレスポンスかどうかを確認できる。

ここで、スニッファーにコードを追加する。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket

import os
import struct
from ctypes import *

# リッスンするホストのIPアドレス
host   = "192.168.0.100"

# IPヘッダー
class IP(Structure):
    _fields_ = [
        ("ihl",           c_uint8, 4),
        ("version",       c_uint8, 4),
        ("tos",           c_uint8),
        ("len",           c_uint16),
        ("id",            c_uint16),
        ("offset",        c_uint16),
        ("ttl",           c_uint8),
        ("protocol_num",  c_uint8),
        ("sum",           c_uint16),
        ("src",           c_uint32),
        ("dst",           c_uint32)
    ]

    def __new__(self, socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):

        # プロトコルの定数値を名称にマッピング
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 可読なIPアドレスの値に変換
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

        # 可読なプロトコル名称に変換
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

class ICMP(Structure):

    _fields_ = [
        ("type",         c_uint8),
        ("code",         c_uint8),
        ("checksum",     c_uint16),
        ("unused",       c_uint16),
        ("next_hop_mtu", c_uint16)
        ]

    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer):
        pass

# 前の例と同様の処理
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)


try:

    while True:

        # パケットの読み込み
        raw_buffer = sniffer.recvfrom(65565)[0]

        # バッファーの最初の20バイトからIP構造体を作成
        ip_header = IP(raw_buffer[0:20])

        # 検出されたプロトコルとホストを出力
        print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)

        # ICMPであればそれを処理
        if ip_header.protocol == "ICMP":

            # ICMPパケットの位置を計算
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]

            # ICMP構造体を作成
            icmp_header = ICMP(buf)

            print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code)

# Ctrl-Cを処理
except KeyboardInterrupt:

    # Windowsの場合はプロミスキャスモードを無効化
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

ICMP構造体の定義を追加。メインのパケット受信ループでは、ICMPパケットの受信を確認。 そのあと、生のパケットにおけるICMP本体のオフセットを計算。 そして、ICMP構造体を作成した後、そのタイプとコードのフィールドを出力している。

オフセットの計算は、IPヘッダーのihlフィールドに基づいて計算される。このフィールドはIPヘッダーに含まれる32ビットワードの数を示すので、4倍することでIPヘッダーのサイズを知ることができる、サイズからネットワークレイヤーの開始位置がわかる。

http://www.cresc.co.jp/tech/network/NET_TUTORIAL/Section_314.htm

ここも詳しいけどいまいちよくわかりませぬ。

ihlはIP header lengthの略で、IPヘッダ長を4で割った値(32ビットワード)が入る。通常はIPヘッダオプションを除いて5(20バイト)らしい。

最後にnetaddrライブラリを使ってホスト発見用のスキャンをサブネット全体に対して実施している。

netaddr

netaddrモジュールを用いると、サブネットとアドレスを簡単に処理することが可能になる。192.168.0.0/24(192.168/24)といったサブネットマスクも入力として受け付けている。 ネットワーク全体にパケットを送ることも可能。 https://pypi.python.org/pypi/netaddr

これを用いて作成する。

scanner.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket

import os
import struct
from ctypes import *

import threading
import time
from netaddr import IPNetwork,IPAddress

# リッスンするホストのIPアドレス
host   = "192.168.0.100"

# 標的のサブネット
subnet = "192.168.0.0/24"

# ICMPレスポンスのチェック用マジック文字列
magic_message = "PYTHONRULES!"

# UDPデータグラムをサブネット全体に送信
def udp_sender(subnet,magic_message):
    time.sleep(5)
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    for ip in IPNetwork(subnet):

        try:
            sender.sendto(magic_message,("%s" % ip,65212))
        except:
            pass

PYTHONRULES!は受信したレスポンスがこのツールが送信したUDPパケットに対するものかどうかをチェックするマジック文字列(シグネチャ

udp_sender関数はサブネットのすべてのIPアドレスを列挙して、各IPアドレスUDPデータグラムを送出する。

# IPヘッダー
class IP(Structure):
    _fields_ = [
        ("ihl",           c_uint8, 4),
        ("version",       c_uint8, 4),
        ("tos",           c_uint8),
        ("len",           c_uint16),
        ("id",            c_uint16),
        ("offset",        c_uint16),
        ("ttl",           c_uint8),
        ("protocol_num",  c_uint8),
        ("sum",           c_uint16),
        ("src",           c_uint32),
        ("dst",           c_uint32)
    ]

    def __new__(self, socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):

        # プロトコルの定数値を名称にマッピング
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 可読なIPアドレスの値に変換
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

        # 可読なプロトコル名称に変換
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

class ICMP(Structure):

    _fields_ = [
        ("type",         c_uint8),
        ("code",         c_uint8),
        ("checksum",     c_uint16),
        ("unused",       c_uint16),
        ("next_hop_mtu", c_uint16)
        ]

    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer):
        pass

# 前の例と同様の処理
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

ここまでは同じ

# パケットの送信開始
t = threading.Thread(target=udp_sender,args=(subnet,magic_message))
t.start()

try:

    while True:

        # パケットの読み込み
        raw_buffer = sniffer.recvfrom(65565)[0]

        # バッファーの最初の20バイトからIP構造体を作成
        ip_header = IP(raw_buffer[0:20])

        # 検出されたプロトコルとホストを出力
        #print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)

        # ICMPであればそれを処理
        if ip_header.protocol == "ICMP":

            # ICMPパケットの位置を計算
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]

            # ICMP構造体を作成
            icmp_header = ICMP(buf)

            #print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code)

            # コードとタイプが3であるかチェック
            if icmp_header.code == 3 and icmp_header.type == 3:

                # 標的サブネットのホストかを確認
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):

                    # マジック文字列を含むか確認
                    if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
                        print "Host Up: %s" % ip_header.src_address

# Ctrl-Cを処理
except KeyboardInterrupt:

    # Windowsの場合はプロミスキャスモードを無効化
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

udp_senderはメインの中で別のスレッドとして起動されている。これはICMPレスポンスの受信の妨げにならないように。

このあとのループはパケットでコード用。

期待するICMPメッセージを発見すると、ICMPレスポンスが標的サブネットから来たものかを確認。そして最終チェックとして、ICMPレスポンスにマジック文字列が含まれているかを確かめる。

チェックをすべてクリアすれば、ICMPメッセージの送信元IPアドレスを出力する。

んー単に最初から出力するだけじゃなくて、正確に求めるために文字列用意したんですねー。

でもなんでコードとタイプは3なんだろう。たまたま、該当するものを探しただけか。unreachableなものを探すってよくわかんない。 あ、destination unreachableは宛先がわからなくて届かないだけで、一応ルーティングテーブルまでは載ってるらしいですねー。 port unreachableはサーバーまでたどり着いたけど、ポートが解放されてないらしい。 つまりどっちもICMPレスポンス自体は受信できてるので、どっから来たのかは判別できるってことか。 そりゃコードもタイプもわかるんだから当たり前か。てへぺろ

最後に

だんだん難しくなってきて、よくわかんないことが多いので、ひとまず休憩にしてpythonを学びなおします。 一応、進めるけど深く理解するには毎日は厳しいので、代わりに他のを挙げる。 競プロ始めて(やったことない)、C++がいいのかなー、でもpythonで。 あとはネットワークの勉強ですねー。各自調べて実装のほうがいいのかも。 今は実装から調べてるのでどうにも頭に入らない。 とりあえずマスタリングTCP/IPマスターしようぜ。 APとろうぜ。ってことでばいちゃ。