rest.py

changeset 92
da291d9426ea
child 93
125f3c97071e
equal deleted inserted replaced
91:60ead38a61af 92:da291d9426ea
1 import ssl
2 import socket
3 import errno
4 import select
5 import asyncore
6 import base64
7 import re
8 import json
9 import urllib
10 import irc
11 from datetime import datetime, timedelta
12 from configfile import Config
13
14 g_credentials = None
15 g_portnumber = None
16 g_throttle = []
17
18 valid_repos = ['Torr_Samaho/zandronum', 'Torr_Samaho/zandronum-stable',
19 'crimsondusk/zandronum-sandbox', 'crimsondusk/zandronum-sandbox-stable',
20 'crimsondusk/testrepo']
21
22 def is_throttled (address):
23 i = 0
24
25 while i < len (g_throttle):
26 if g_throttle[i][1] <= datetime.utcnow():
27 print 'Throttle of %s expired' % g_throttle[i][0][0]
28 item = g_throttle.pop (i) # expired
29
30 if item[0][0] == address[0]:
31 return False # this address expired
32
33 continue
34
35 if g_throttle[i][0][0] == address[0]:
36 return True # is throttled
37
38 i += 1
39
40 return False # not throttled
41
42 def throttle (address):
43 tt = datetime.utcnow() + timedelta (0, 30)
44
45 try:
46 # attempt to just update the item
47 g_throttle[g_throttle.index (address)][1] = tt
48 except ValueError:
49 # not in list
50 g_throttle.append ([address, tt])
51
52 Irc.broadcast ('Throttling %s:%s for 30 seconds' % address)
53
54 def handle_rest_http (data, address):
55 global g_credentials
56 displayaddress = '%s:%s' % address
57 authrex = re.compile (r'^authorization: Basic (.+)$')
58 payloadrex = re.compile (r'^payload=(.+)$')
59 authenticated = False
60 payload = ''
61
62 if not g_credentials:
63 g_credentials = base64.b64encode (Config.get_node ('rest').get_value ('credentials'))
64
65 # Authenticate and find the payload
66 for line in data:
67 match = authrex.match (line)
68
69 if match and match.group (1) == g_credentials)
70 authenticated = True
71 continue
72
73 match = payloadrex.match (line)
74 if match:
75 payload = match.group (1)
76
77 if not authenticated:
78 Irc.broadcast ('%s:%s failed to authenticate' % address)
79 throttle (address)
80 return
81
82 jsonstring = urllib.unquote_plus (payload).decode ('utf-8')
83
84 try:
85 jsondata = json.loads (jsonstring)
86 repodata = jsondata['repository']
87 repopath = '%s/%s' % (repodata['owner'], repodata['name']
88
89 if repopath not in valid_repos:
90 raise ValueError ('unknown repository %s' % repopath)
91
92 if 'commits' in jsondata:
93 for commit in jsondata['commits']:
94 Irc.broadcast ('%s: new commit %s' % (address, commit['node']))
95 except Exception as e:
96 Irc.broadcast ('%s provided bad JSON: %s' % (address, e))
97
98 try:
99 with open ('rejected_json.txt', 'w') as fp:
100 fp.write (jsonstring)
101 Irc.broadcast ('bad json written to rejected_json.txt')
102 except:
103 Irc.broadcast ('failed to log json')
104 pass
105
106 throttle (address)
107 return
108
109 class RESTConnection (asyncore.dispatcher):
110 httpdata = ''
111 address=None
112 writebuffer = ''
113
114 def __init__ (self, conn, address):
115 asyncore.dispatcher.__init__ (self, conn)
116 self.socket = ssl.wrap_socket (conn, server_side=True, keyfile='key.pem',
117 certfile='cert.pem', do_handshake_on_connect=False)
118 self.socket.setblocking (0)
119 self.address = address
120
121 while True:
122 try:
123 self.socket.do_handshake()
124 break
125 except ssl.SSLError, err:
126 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
127 select.select([self.socket], [], [])
128 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
129 select.select([], [self.socket], [])
130 else:
131 Irc.broadcast ('%s:%s: SSL error: %s' % (self.address[0], self.address[1], err))
132 throttle (self.address)
133 self.close()
134 return
135
136 def write (self, msg):
137 self.writebuffer += msg
138
139 def readable (self):
140 return True
141
142 def writable (self):
143 return self.writebuffer
144
145 def handle_close (self):
146 self.finish()
147
148 def handle_read (self):
149 while 1:
150 try:
151 data = self.recv (4096)
152 except ssl.SSLError, err:
153 # EOF
154 self.finish()
155 return
156
157 self.httpdata += data.replace ('\r', '')
158
159 def finish (self):
160 handle_rest_http (self.httpdata.split ('\n'), self.address)
161 self.close()
162
163 def handle_write (self):
164 self.send (self.writebuffer)
165 self.writebuffer=''
166
167 def handle_error (self):
168 raise
169
170 class RESTServer (asyncore.dispatcher):
171 def __init__ (self):
172 global g_portnumber
173
174 if g_portnumber == None:
175 g_portnumber = Config.get_node ('rest').get_value ('port')
176
177 asyncore.dispatcher.__init__ (self)
178 self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
179 self.bind (('', g_portnumber))
180 self.listen (5)
181
182 def handle_accept (self):
183 sock, address = self.accept()
184
185 if is_throttled (address):
186 # throttled
187 sock.close()
188 return
189
190 Irc.broadcast ('REST connection from %s:%s' % address)
191 RESTConnection (sock, address)
192
193 def handle_error (self):
194 return

mercurial