2009年1月7日水曜日

pythonでCAS

CASサーバとクライアントアプリのサンプルを書く。

サーバーのコード(casserver.py)

# -*- encoding: cp932 -*-

"""
極めて簡易なCASサーバー実装。2.0プロトコルのつもり。

使用法:スクリプトをダブルクリックでサーバープロセスが開始する。

・https:// でなく http:// で動く(多分仕様違反だが)
・すべてのログインIDを、入力したパスワードを評価せずに認証する
・service値のチェックを省略している
・シングルサインオンには未対応(Cookieを交換しないので)
・その他、最初のログインに関わる機能以外をたくさん省略
"""
import cgi, random

def get_cgi_params(environ):
pars = cgi.parse(environ['wsgi.input'], environ=environ, keep_blank_values=True)
for k in pars: pars[k] = "".join(pars[k])
return pars

ticketstore = dict()
ticketseed = '012456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'

def app_cas_login(environ,start_response):
pars = get_cgi_params(environ)
service = pars.get("service", "")
userid = pars.get("userid", "")
if userid:
ticket = 'ST-' + ''.join([random.choice(ticketseed) for i in xrange(32)])
ticketstore[ticket] = (userid, service)
print 'ticket:', ticket, userid, service
location = "%s?ticket=%s" % (service, ticket,)
print "redirect:", location
start_response("302 Moved Temporarily", [('Location', location)])
return []
start_response("200 OK", [('Content-Type','text/html')])
return ["""<html><head>
<meta http-equiv="Content-Type" content="text/html; charset=Shift_JIS">
<title>LOGIN</title></head><body>
<form action="login" method="post">
<input type="hidden" name="service" value="%s">
ID:<input name="userid"><br>PASS:<input name="pass"><br><input type="submit">
</form></body></html>""" % (service,), '']

def app_cas_validate(environ,start_response):
pars = get_cgi_params(environ)
service = pars.get("service", "")
ticket = pars.get("ticket", "")
try:
userid = ticketstore[ticket][0]
ticketstore.pop(ticket)
start_response("200 OK", [('Content-Type','text/xml')])
return ["""<cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
<cas:authenticationSuccess>
<cas:user>%s</cas:user>
<cas:ticket>%s</cas:ticket>
</cas:Attributes>
</cas:authenticationSuccess>
</cas:serviceResponse>""" % (userid, ticket, ), '']
except KeyError:
start_response("200 OK", [('Content-Type','text/xml')])
return ["""<cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
<cas:authenticationFailure code='INVALID_TICKET'>
ticket '%s' is expired or not recognized
</cas:authenticationFailure>
</cas:serviceResponse>""" % (ticket,), '']

dispatch_table = {
'login': app_cas_login,
'serviceValidate':app_cas_validate,
}

def app_dispatcher(environ, start_response):
""" URL情報から実行するWSGIを決めるミニミドルウェア """
if environ.has_key('PATH_INFO'):
a = environ['PATH_INFO'].split("/")[-1]
else:
a = environ['SCRIPT_NAME'].split("/")[-1]
a = a.split(".")[0]
app = dispatch_table.get(a)
return app(environ, start_response)

if __name__ == '__main__':
from wsgiref.simple_server import make_server
srv = make_server('localhost', 8081, app_dispatcher)
srv.serve_forever()


クライアントアプリのコード(client.py)

# -*- encoding: cp932 -*-
import urllib

# 自分自身のログインURL(service引数)
url_mylogin = "http://localhost:8082/mylogin"
# CASサーバーの認証画面
url_caslogin = "http://localhost:8081/login"
# CASサーバーのticket問い合わせ用ベースURL
url_casvalidate = "http://localhost:8081/serviceValidate"

def get_cgi_params(environ):
import cgi
pars = cgi.parse(environ['wsgi.input'], environ=environ, keep_blank_values=True)
for k in pars: pars[k] = "".join(pars[k])
return pars

def my_login_test(environ,start_response):
"""クライアントアプリ側の動作テスト"""
pars = get_cgi_params(environ)
ticket = pars.get("ticket", "")
# ticket引数がなければ、CASサーバーの認証画面にリダイレクトする
if not ticket:
url = "%s?service=%s" % (url_caslogin, urllib.quote(url_mylogin))
print "redirect:", url
start_response("302 Moved Temporarily", [('Location', url)])
return []
# 受け取ったticket引数を使って、CASのヴァリデーションをする。
import re
url = "%s?ticket=%s&service=%s" % (url_casvalidate, ticket, urllib.quote(url_mylogin))
print "validate:", url
# レスポンスを文字列に格納して、中身を調べる
r = urllib.urlopen(url).read()
# ここでは簡易に行うためにXMLのパースでなく正規表現による抽出で済ます
r1 = re.compile("<cas:user>(.+?)</cas:user>")
r2 = r1.findall(r)
if len(r2) > 0:
userid = r2[0]
start_response("200 OK", [('Content-Type','text/plain')])
return ['hello,', userid, '']
else:
start_response("200 OK", [('Content-Type','text/plain')])
return ['something wrong', '']

dispatch_table = {
'mylogin': my_login_test,
}

def app_dispatcher(environ, start_response):
""" URL情報から実行するWSGIを決めるミニミドルウェア """
if environ.has_key('PATH_INFO'):
a = environ['PATH_INFO'].split("/")[-1]
else:
a = environ['SCRIPT_NAME'].split("/")[-1]
a = a.split(".")[0]
app = dispatch_table.get(a)
return app(environ, start_response)

if __name__ == '__main__':
from wsgiref.simple_server import make_server
srv = make_server('localhost', 8082, app_dispatcher)
srv.serve_forever()


ふたつのスクリプトをそれぞれダブルクリックなんかで実行しっぱなして、
http://localhost:8082/mylogin
をブラウザから表示するとCAS側のログイン画面にリダイレクトされる。
ここで認証に成功するともとのアプリに戻ってログインIDが表示される。

CASサーバーがないのにCASのクライアントアプリを書くような羽目になったらこれで仮に開発を進めておくとか。

0 コメント: