git ssb

0+

Daan Patchwork / py-http-ssb



Tree: 2eae3f13696f12df75b1b2e93e8a83521ba60ca4

Files: 2eae3f13696f12df75b1b2e93e8a83521ba60ca4 / api.py

8061 bytesRaw
1#!/usr/bin/env python3
2
3from flask import Flask, request
4from flask import Flask
5
6from pprint import pprint
7from collections import defaultdict
8import sys
9from flask_restful import Resource, Api, abort, reqparse
10import threading
11import json
12import time
13
14from validation import has_correct_form, has_valid_signature, has_valid_hash
15from utils import get_db_conn
16
17
18app = Flask(__name__)
19
20FEED_LOCKS = defaultdict(threading.Lock)
21
22
23def prune_locks():
24 """
25 Safely disposes of unneeded lock objects.
26 """
27 for (key, lock) in list(FEED_LOCKS.items()):
28 success = lock.acquire(blocking=False)
29 if success and key in FEED_LOCKS:
30 del FEED_LOCKS[key]
31 lock.release()
32
33
34SELECT_ALL_MESSAGES = """
35 SELECT id, value
36 FROM message
37 ORDER BY sequence ASC;"""
38
39SELECT_FEED_MESSAGES_FMT = """
40 SELECT id, value
41 FROM message
42 WHERE feed_id=? AND sequence>? {extra_where}
43 ORDER BY sequence ASC
44 LIMIT ?;"""
45
46DB_FILE = "ssb_simple.db"
47
48SELECT_LAST_FEED_SEQS = """SELECT feed_id, MAX(sequence)
49 FROM message
50 GROUP BY feed_id;"""
51
52INSERT_MESSAGE_FMT = """INSERT INTO message
53('id', 'feed_id', 'sequence', 'type',
54 'timestamp_asserted', 'timestamp_received', 'value')
55VALUES (?, ?, ?, ?, ?, ?, ?);"""
56
57CREATE_MESSAGE_TABLE = """CREATE TABLE IF NOT EXISTS message (
58 id VARCHAR NOT NULL,
59 feed_id VARCHAR NOT NULL,
60 sequence INTEGER NOT NULL,
61 type VARCHAR,
62 timestamp_asserted INTEGER NOT NULL,
63 timestamp_received INTEGER NOT NULL,
64 value VARCHAR NOT NULL,
65 PRIMARY KEY (id),
66 FOREIGN KEY(feed_id) REFERENCES feed (id)
67);
68CREATE UNIQUE INDEX IF NOT EXISTS _feed_sequence_idx ON message (feed_id, sequence);
69CREATE INDEX IF NOT EXISTS _type_idx ON message (type);"""
70
71INSERT_FEED_FMT = """INSERT INTO feed
72('id', latest_sequence', 'latest_message_id')
73VALUES (?, ?, ?);"""
74
75CREATE_FEED_TABLE = """CREATE TABLE IF NOT EXISTS feed (
76 id VARCHAR NOT NULL,
77 latest_sequence INTEGER NOT NULL,
78 latest_message_id VARCHAR,
79 PRIMARY KEY (id)
80 FOREIGN KEY(latest_message_id) REFERENCES message (id)
81);"""
82
83SELECT_FEED_STATE = """SELECT id, sequence
84 FROM message WHERE feed_id=?
85 ORDER BY sequence DESC
86 LIMIT 1;"""
87
88
89def get_feed_state(conn, feed_id):
90 """
91 Given a feed id, returns the latest message and latest known verified number.
92 If the server has never seen meesages from this feed, returns (None, 0).
93 """
94 res = conn.execute(SELECT_FEED_STATE, [feed_id]).fetchone()
95 if res is not None:
96 return res
97 return None, 0
98
99
100def add_messages(data):
101 invalid_data = dict()
102 feed_batches = defaultdict(list)
103 for m in data:
104 feed_batches[m["value"]["author"]].append(m)
105 inserted = dict()
106 for feed_id, feed_content in feed_batches.items():
107 with FEED_LOCKS[feed_id]:
108 feed_content.sort(key=lambda m: m["value"]["sequence"])
109 insert_values = list()
110 with get_db_conn(DB_FILE) as conn:
111 latest_id, latest_seq = get_feed_state(conn, feed_id)
112 latest_seq_before = latest_seq
113
114 for entry in feed_content:
115 key = entry["key"]
116 message = entry["value"]
117 try:
118 has_correct_form(message)
119 except ValueError as e:
120 invalid_data[key] = e.args[0]
121 print("invalid form")
122 print(e)
123 pprint(message)
124 continue
125 if not has_valid_hash(message, key):
126 invalid_data[key] = "Invalid hash"
127 continue
128 if not has_valid_signature(message):
129 invalid_data[key] = "Invalid signature"
130 continue
131
132 sequence = message["sequence"]
133 if latest_seq >= sequence:
134 # FIXME: check for forks
135 # nothing to gain here otherwise
136 continue
137 if sequence > latest_seq + 1:
138 invalid_data[key] = (
139 f"Got sequence {sequence} by {feed_id} but only"
140 f" have local state up to {latest_seq}."
141 )
142 break
143
144 previous_key_ = message["previous"]
145 if previous_key_ != latest_id:
146 invalid_data[key] = (
147 f'Got bad "previous" entry.\n'
148 f"Local: {latest_seq}\n"
149 f"Received: {previous_key_}"
150 )
151 break
152 latest_seq += 1
153 latest_id = key
154
155 value = json.dumps(message)
156 msg_type = message["content"]
157 if isinstance(msg_type, str):
158 if "." in msg_type:
159 msg_type = msg_type.rsplit(".", maxsplit=1)[1]
160 else:
161 msg_type = None
162 else:
163 msg_type = msg_type["type"]
164
165 timestamp_asserted = message["timestamp"]
166 timestamp_received = int(time.time() * 1000)
167
168 insert_values.append(
169 (
170 key,
171 feed_id,
172 sequence,
173 msg_type,
174 timestamp_asserted,
175 timestamp_received,
176 value,
177 )
178 )
179
180 if insert_values:
181 with get_db_conn(DB_FILE) as conn:
182 conn.executemany(INSERT_MESSAGE_FMT, insert_values)
183 inserted[feed_id] = len(insert_values)
184
185 threading.Thread(target=prune_locks).start()
186 if invalid_data:
187 return (
188 {"invalid_messages": invalid_data},
189 422,
190 {"ContentType": "application/json"},
191 )
192 else:
193 return {
194 "success": True,
195 "inserted": sum(inserted.values()),
196 }
197
198
199class Feeds(Resource):
200 def get(self):
201 with get_db_conn(DB_FILE) as conn:
202 res = conn.execute(SELECT_LAST_FEED_SEQS)
203 return {id: count for id, count in res.fetchall()}
204
205
206class Messages(Resource):
207 def get(self, feed_id):
208 parser = reqparse.RequestParser()
209 parser.add_argument(
210 "type", type=str, action="append", help="filter by message type"
211 )
212 args = parser.parse_args()
213 wanted_types = set(args.type) if args.type else set()
214
215 gt = int(request.args.get("gt", 0))
216 limit = int(request.args.get("limit", -1))
217 if len(wanted_types) == 1:
218 extra_where = f" AND type='{wanted_types.pop()}'"
219 elif len(wanted_types) > 1:
220 # FIXME: sanitize SQL!
221 wanted_types = "', '".join(wanted_types)
222 extra_where = f" AND type in ('{wanted_types}') "
223 else:
224 extra_where = ""
225 select_query = SELECT_FEED_MESSAGES_FMT.format(extra_where=extra_where)
226 with get_db_conn(DB_FILE) as conn:
227 res = conn.execute(select_query, [feed_id, gt, limit])
228 res = list(res.fetchall())
229 return [{"key": k, "value": json.loads(v)} for (k, v) in res]
230
231
232class MessageReceiver(Resource):
233 def get(self):
234 with get_db_conn(DB_FILE) as conn:
235 res = conn.execute(SELECT_ALL_MESSAGES)
236 res = list(res.fetchall())
237 return [{"key": k, "value": json.loads(v)} for (k, v) in res]
238
239 def post(self):
240 data = request.get_json(force=True)
241 if not isinstance(data, list):
242 data = [data]
243 return add_messages(data)
244
245
246if __name__ == "__main__":
247 with get_db_conn(DB_FILE) as conn:
248 conn.executescript(CREATE_MESSAGE_TABLE)
249 conn.executescript(CREATE_FEED_TABLE)
250 conn.close()
251
252 api = Api(app)
253 api.add_resource(MessageReceiver, "/messages")
254 api.add_resource(Messages, "/<path:feed_id>/messages.json")
255 api.add_resource(Feeds, "/feeds.json")
256 app.run(debug=True)
257

Built with git-ssb-web