|
1 import ssl |
|
2 import socket |
|
3 import errno |
|
4 import select |
|
5 import asyncore |
|
6 import base64 |
|
7 import re |
|
8 import json |
|
9 import urllib |
|
10 import irc |
|
11 from datetime import datetime, timedelta |
|
12 from configfile import Config |
|
13 |
|
14 g_credentials = None |
|
15 g_portnumber = None |
|
16 g_throttle = [] |
|
17 |
|
18 valid_repos = ['Torr_Samaho/zandronum', 'Torr_Samaho/zandronum-stable', |
|
19 'crimsondusk/zandronum-sandbox', 'crimsondusk/zandronum-sandbox-stable', |
|
20 'crimsondusk/testrepo'] |
|
21 |
|
22 def is_throttled (address): |
|
23 i = 0 |
|
24 |
|
25 while i < len (g_throttle): |
|
26 if g_throttle[i][1] <= datetime.utcnow(): |
|
27 print 'Throttle of %s expired' % g_throttle[i][0][0] |
|
28 item = g_throttle.pop (i) # expired |
|
29 |
|
30 if item[0][0] == address[0]: |
|
31 return False # this address expired |
|
32 |
|
33 continue |
|
34 |
|
35 if g_throttle[i][0][0] == address[0]: |
|
36 return True # is throttled |
|
37 |
|
38 i += 1 |
|
39 |
|
40 return False # not throttled |
|
41 |
|
42 def throttle (address): |
|
43 tt = datetime.utcnow() + timedelta (0, 30) |
|
44 |
|
45 try: |
|
46 # attempt to just update the item |
|
47 g_throttle[g_throttle.index (address)][1] = tt |
|
48 except ValueError: |
|
49 # not in list |
|
50 g_throttle.append ([address, tt]) |
|
51 |
|
52 Irc.broadcast ('Throttling %s:%s for 30 seconds' % address) |
|
53 |
|
54 def handle_rest_http (data, address): |
|
55 global g_credentials |
|
56 displayaddress = '%s:%s' % address |
|
57 authrex = re.compile (r'^authorization: Basic (.+)$') |
|
58 payloadrex = re.compile (r'^payload=(.+)$') |
|
59 authenticated = False |
|
60 payload = '' |
|
61 |
|
62 if not g_credentials: |
|
63 g_credentials = base64.b64encode (Config.get_node ('rest').get_value ('credentials')) |
|
64 |
|
65 # Authenticate and find the payload |
|
66 for line in data: |
|
67 match = authrex.match (line) |
|
68 |
|
69 if match and match.group (1) == g_credentials) |
|
70 authenticated = True |
|
71 continue |
|
72 |
|
73 match = payloadrex.match (line) |
|
74 if match: |
|
75 payload = match.group (1) |
|
76 |
|
77 if not authenticated: |
|
78 Irc.broadcast ('%s:%s failed to authenticate' % address) |
|
79 throttle (address) |
|
80 return |
|
81 |
|
82 jsonstring = urllib.unquote_plus (payload).decode ('utf-8') |
|
83 |
|
84 try: |
|
85 jsondata = json.loads (jsonstring) |
|
86 repodata = jsondata['repository'] |
|
87 repopath = '%s/%s' % (repodata['owner'], repodata['name'] |
|
88 |
|
89 if repopath not in valid_repos: |
|
90 raise ValueError ('unknown repository %s' % repopath) |
|
91 |
|
92 if 'commits' in jsondata: |
|
93 for commit in jsondata['commits']: |
|
94 Irc.broadcast ('%s: new commit %s' % (address, commit['node'])) |
|
95 except Exception as e: |
|
96 Irc.broadcast ('%s provided bad JSON: %s' % (address, e)) |
|
97 |
|
98 try: |
|
99 with open ('rejected_json.txt', 'w') as fp: |
|
100 fp.write (jsonstring) |
|
101 Irc.broadcast ('bad json written to rejected_json.txt') |
|
102 except: |
|
103 Irc.broadcast ('failed to log json') |
|
104 pass |
|
105 |
|
106 throttle (address) |
|
107 return |
|
108 |
|
109 class RESTConnection (asyncore.dispatcher): |
|
110 httpdata = '' |
|
111 address=None |
|
112 writebuffer = '' |
|
113 |
|
114 def __init__ (self, conn, address): |
|
115 asyncore.dispatcher.__init__ (self, conn) |
|
116 self.socket = ssl.wrap_socket (conn, server_side=True, keyfile='key.pem', |
|
117 certfile='cert.pem', do_handshake_on_connect=False) |
|
118 self.socket.setblocking (0) |
|
119 self.address = address |
|
120 |
|
121 while True: |
|
122 try: |
|
123 self.socket.do_handshake() |
|
124 break |
|
125 except ssl.SSLError, err: |
|
126 if err.args[0] == ssl.SSL_ERROR_WANT_READ: |
|
127 select.select([self.socket], [], []) |
|
128 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: |
|
129 select.select([], [self.socket], []) |
|
130 else: |
|
131 Irc.broadcast ('%s:%s: SSL error: %s' % (self.address[0], self.address[1], err)) |
|
132 throttle (self.address) |
|
133 self.close() |
|
134 return |
|
135 |
|
136 def write (self, msg): |
|
137 self.writebuffer += msg |
|
138 |
|
139 def readable (self): |
|
140 return True |
|
141 |
|
142 def writable (self): |
|
143 return self.writebuffer |
|
144 |
|
145 def handle_close (self): |
|
146 self.finish() |
|
147 |
|
148 def handle_read (self): |
|
149 while 1: |
|
150 try: |
|
151 data = self.recv (4096) |
|
152 except ssl.SSLError, err: |
|
153 # EOF |
|
154 self.finish() |
|
155 return |
|
156 |
|
157 self.httpdata += data.replace ('\r', '') |
|
158 |
|
159 def finish (self): |
|
160 handle_rest_http (self.httpdata.split ('\n'), self.address) |
|
161 self.close() |
|
162 |
|
163 def handle_write (self): |
|
164 self.send (self.writebuffer) |
|
165 self.writebuffer='' |
|
166 |
|
167 def handle_error (self): |
|
168 raise |
|
169 |
|
170 class RESTServer (asyncore.dispatcher): |
|
171 def __init__ (self): |
|
172 global g_portnumber |
|
173 |
|
174 if g_portnumber == None: |
|
175 g_portnumber = Config.get_node ('rest').get_value ('port') |
|
176 |
|
177 asyncore.dispatcher.__init__ (self) |
|
178 self.create_socket (socket.AF_INET, socket.SOCK_STREAM) |
|
179 self.bind (('', g_portnumber)) |
|
180 self.listen (5) |
|
181 |
|
182 def handle_accept (self): |
|
183 sock, address = self.accept() |
|
184 |
|
185 if is_throttled (address): |
|
186 # throttled |
|
187 sock.close() |
|
188 return |
|
189 |
|
190 Irc.broadcast ('REST connection from %s:%s' % address) |
|
191 RESTConnection (sock, address) |
|
192 |
|
193 def handle_error (self): |
|
194 return |