Package gwibber ::
Package microblog ::
Module dispatcher
|
|
1
2
3 import multiprocessing, threading, traceback, json, time
4 import gobject, dbus, dbus.service, mx.DateTime
5 import twitter, identica, statusnet, flickr, facebook
6 import qaiku, friendfeed, digg
7
8 import urlshorter
9 import util, util.couch
10 from util import log
11 from util import resources
12 from util.couch import Monitor as CouchMonitor
13 from util.couch import RecordMonitor
14 from desktopcouch.records.server import CouchDatabase
15 from desktopcouch.records.record import Record as CouchRecord
16
17 from util.const import *
18
19 try:
20 import indicate
21 except:
22 indicate = None
23
24 gobject.threads_init()
25
26 log.logger.name = "Gwibber Dispatcher"
27
28 PROTOCOLS = {
29 "twitter": twitter,
30 "identica": identica,
31 "flickr": flickr,
32 "facebook": facebook,
33 "friendfeed": friendfeed,
34 "statusnet": statusnet,
35 "digg": digg,
36 "qaiku": qaiku,
37 }
38
39 FEATURES = json.loads(GWIBBER_OPERATIONS)
40 SERVICES = dict([(k, v.PROTOCOL_INFO) for k, v in PROTOCOLS.items()])
41 SETTINGS = RecordMonitor(COUCH_DB_SETTINGS, COUCH_RECORD_SETTINGS, COUCH_TYPE_CONFIG, DEFAULT_SETTINGS)
76
79 self.accounts = CouchDatabase(COUCH_DB_ACCOUNTS, create=True)
80 self.settings = CouchDatabase(COUCH_DB_SETTINGS, create=True)
81 self.messages = CouchDatabase(COUCH_DB_MESSAGES, create=True)
82 util.couch.init_design_doc(self.messages, "messages", COUCH_VIEW_MESSAGES)
83
85 if not id: id = acct["_id"]
86 if "sinceid" in SERVICES[acct["protocol"]]["features"]:
87 view = self.messages.execute_view("maxid", "messages")
88 result = view[[id, opname]][[id, opname]].rows
89 if len(result) > 0: return {"since": result[0].value}
90 return {}
91
93 protocol = SERVICES[acct["protocol"]]
94 return acct["protocol"] in PROTOCOLS and \
95 opname in protocol["features"] and \
96 opname in FEATURES and acct[enabled]
97
99 account = self.accounts.get_record(stream["account"])
100 args = stream["parameters"]
101 opname = stream["operation"]
102 if self.validate_operation(account, opname):
103 args.update(self.handle_max_id(account, opname, stream["_id"]))
104 return (stream["account"], stream["operation"], args, stream["_id"])
105
107 for account in self.accounts.get_records(COUCH_TYPE_ACCOUNT, True):
108 account = account.value
109 args = {"query": search["query"]}
110 if self.validate_operation(account, "search"):
111 args.update(self.handle_max_id(account, "search", search["_id"]))
112 yield (account["_id"], "search", args, search["_id"])
113
115 if isinstance(acct, basestring):
116 acct = dict(self.accounts.get_record(acct).items())
117 for opname in SERVICES[acct["protocol"]]["default_streams"]:
118 if self.validate_operation(acct, opname):
119 args = self.handle_max_id(acct, opname)
120 yield (acct["_id"], opname, args, False)
121
123 for account in self.accounts.get_records(COUCH_TYPE_ACCOUNT, True):
124 account = account.value
125 if self.validate_operation(account, "send", "send_enabled"):
126 yield (account["_id"], "send", {"message": message}, False)
127
129 if self.settings.record_exists(id):
130 item = dict(self.settings.get_record(id).items())
131 if item["record_type"] == COUCH_TYPE_STREAM:
132 return [self.stream_to_operation(item)]
133 if item["record_type"] == COUCH_TYPE_SEARCH:
134 return list(self.search_to_operations(item))
135
137 for acct in self.accounts.get_records(COUCH_TYPE_ACCOUNT, True):
138 acct = acct.value
139 for o in self.account_to_operations(acct):
140 yield o
141
142 for stream in self.settings.get_records(COUCH_TYPE_STREAM, True):
143 stream = stream.value
144 if self.accounts.record_exists(stream["account"]):
145 o = self.stream_to_operation(stream)
146 if o: yield o
147
148 for search in self.settings.get_records(COUCH_TYPE_SEARCH, True):
149 search = search.value
150 for o in self.search_to_operations(search):
151 yield o
152
154 __dbus_object_path__ = "/com/gwibber/Streams"
155
157 self.bus = dbus.SessionBus()
158 bus_name = dbus.service.BusName("com.Gwibber.Streams", bus=self.bus)
159 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__)
160
161 setting_monitor = CouchMonitor(COUCH_DB_SETTINGS)
162 setting_monitor.connect("record-updated", self.on_setting_changed)
163 setting_monitor.connect("record-deleted", self.on_setting_deleted)
164
170
172 log.logger.debug("Stream closed: %s", id)
173 self.StreamClosed(id)
174
175 @dbus.service.signal("com.Gwibber.Streams", signature="s")
177
178 @dbus.service.signal("com.Gwibber.Streams", signature="s")
180
181 @dbus.service.signal("com.Gwibber.Streams")
183
185 __dbus_object_path__ = "/com/gwibber/Accounts"
186
188 self.bus = dbus.SessionBus()
189 bus_name = dbus.service.BusName("com.Gwibber.Accounts", bus=self.bus)
190 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__)
191
192 account_monitor = CouchMonitor(COUCH_DB_ACCOUNTS)
193 account_monitor.connect("record-updated", self.on_account_changed)
194 account_monitor.connect("record-deleted", self.on_account_deleted)
195
197 log.logger.debug("Account changed: %s", id)
198 self.AccountChanged(id)
199
201 log.logger.debug("Account deleted: %s", id)
202 self.AccountDeleted(id)
203
204 @dbus.service.signal("com.Gwibber.Accounts", signature="s")
206
207 @dbus.service.signal("com.Gwibber.Accounts", signature="s")
209
211 __dbus_object_path__ = "/com/gwibber/Messages"
212
214 self.bus = dbus.SessionBus()
215 bus_name = dbus.service.BusName("com.Gwibber.Messages", bus=self.bus)
216 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__)
217
218 self.monitor = CouchMonitor(COUCH_DB_MESSAGES)
219 self.monitor.connect("record-updated", self.on_message_updated)
220
221 self.messages = CouchDatabase(COUCH_DB_MESSAGES, create=True)
222
223 self.indicator_items = {}
224 self.notified_items = []
225
226 if indicate and util.resources.get_desktop_file():
227 self.indicate = indicate.indicate_server_ref_default()
228 self.indicate.set_type("message.gwibber")
229 self.indicate.set_desktop_file(util.resources.get_desktop_file())
230 self.indicate.connect("server-display", self.on_indicator_activate)
231 self.indicate.show()
232
234 try:
235
236 message = self.messages.get_record(id)
237 self.new_message(message)
238 except:
239 log.logger.error("Message updated: %s, failed", id)
240
241
242 @dbus.service.signal("com.Gwibber.Messages", signature="s")
244
246 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
247 client_bus = dbus.SessionBus()
248 log.logger.debug("Raising gwibber client")
249 try:
250 client_obj = client_bus.get_object("com.GwibberClient",
251 "/com/GwibberClient", follow_name_owner_changes = True,
252 introspect = False)
253 gw = dbus.Interface(client_obj, "com.GwibberClient")
254 gw.focus_client(reply_handler=self.handle_focus_reply,
255 error_handler=self.handle_focus_error)
256 except dbus.DBusException:
257 print_exc()
258
259
261 log.logger.debug("Raising gwibber client, focusing replies stream")
262 client_bus = dbus.SessionBus()
263 try:
264 client_obj = client_bus.get_object("com.GwibberClient", "/com/GwibberClient")
265 gw = dbus.Interface(client_obj, "com.GwibberClient")
266 gw.show_replies(reply_handler=self.handle_focus_reply,
267 error_handler=self.handle_focus_error)
268 indicator.hide()
269 except dbus.DBusException:
270 print_exc()
271
273 log.logger.debug("Gwibber Client raised")
274
276 log.logger.error("Failed to raise client %s", args)
277
279 min_time = mx.DateTime.DateTimeFromTicks() - mx.DateTime.TimeDelta(minutes=10.0)
280 log.logger.debug("Checking message %s timestamp (%s) to see if it is newer than %s", message["id"], mx.DateTime.DateTimeFromTicks(message["time"]).localtime(), min_time)
281 if mx.DateTime.DateTimeFromTicks(message["time"]).localtime() > mx.DateTime.DateTimeFromTicks(min_time):
282 log.logger.debug("Message %s newer than %s, notifying", message["id"], min_time)
283
284 if indicate and message["stream"] == "replies":
285 if message["id"] not in self.indicator_items:
286 log.logger.debug("Message %s is a reply, adding messaging indicator", message["id"])
287 self.handle_indicator_item(message)
288 if message["id"] not in self.notified_items:
289 self.notified_items.append(message["id"])
290 self.show_notification_bubble(message)
291
293 indicator = indicate.Indicator() if hasattr(indicate, "Indicator") else indicate.IndicatorMessage()
294 indicator.connect("user-display", self.on_indicator_reply_activate)
295 indicator.set_property("subtype", "im.gwibber")
296 indicator.set_property("sender", message["sender"].get("name", ""))
297 indicator.set_property("body", message["text"])
298 indicator.set_property_time("time",
299 mx.DateTime.DateTimeFromTicks(message["time"]).localtime().ticks())
300 self.indicator_items[message["id"]] = indicator
301 indicator.show()
302 log.logger.debug("Message from %s added to indicator", message["sender"].get("name", ""))
303
305 if util.can_notify and SETTINGS["show_notifications"]:
306 if SETTINGS["notify_mentions_only"] and not message["to_me"]: return
307
308
309 image = util.resources.get_ui_asset("gwibber.svg")
310 expire_timeout = 5000
311 n = util.notify(message["sender"].get("name", ""), message["text"], image, expire_timeout)
312
314 - def __init__(self, func, iterable, cbsuccess, cbfailure, timeout=120):
315 threading.Thread.__init__(self)
316 self.iterable = iterable
317 self.callback = cbsuccess
318 self.failure = cbfailure
319 self.timeout = timeout
320 self.daemon = True
321 self.func = func
322 self.start()
323
325 try:
326 pool = multiprocessing.Pool()
327 pool.map_async(self.func, self.iterable, callback=self.callback).get(self.timeout)
328 except Exception as e:
329 self.failure(e, traceback.format_exc())
330
331 -class Dispatcher(dbus.service.Object, threading.Thread):
332 __dbus_object_path__ = "/com/gwibber/Service"
333
334 - def __init__(self, loop, autorefresh=True):
335 self.bus = dbus.SessionBus()
336 bus_name = dbus.service.BusName("com.Gwibber.Service", bus=self.bus)
337 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__)
338 threading.Thread.__init__(self)
339
340 self.collector = OperationCollector()
341
342 self.refresh_count = 0
343 self.mainloop = loop
344
345 if autorefresh:
346 gobject.timeout_add(60 * 1000 * 5, self.refresh)
347 self.refresh()
348
349 self.accounts = CouchDatabase(COUCH_DB_ACCOUNTS, create=True)
350
351 @dbus.service.signal("com.Gwibber.Service")
353
354 @dbus.service.signal("com.Gwibber.Service")
356
357 @dbus.service.method("com.Gwibber.Service")
359 """
360 Calls the Gwibber Service to trigger a refresh operation
361 """
362 self.refresh()
363
364 @dbus.service.method("com.Gwibber.Service", in_signature="s")
384
385 @dbus.service.method("com.Gwibber.Service", in_signature="s")
387 """
388 Posts a message/status update to all accounts with send_enabled = True. It
389 takes one argument, which is a message formated as a string.
390 """
391 self.send(self.collector.get_send_operations(message))
392
393 @dbus.service.method("com.Gwibber.Service", in_signature="s")
394 - def Send(self, opdata):
395 try:
396 o = json.loads(opdata)
397 if "target" in o:
398 args = {"message": o["message"], "target": o["target"]}
399 operations = [(o["target"]["account"], "send_thread", args, None)]
400 elif "accounts" in o:
401 operations = [(a, "send", {"message": o["message"]}, None) for a in o["accounts"]]
402 self.send(operations)
403 except: pass
404
405 @dbus.service.method("com.Gwibber.Service", out_signature="s")
407 """
408 Returns a list of services available as json string
409 """
410 return json.dumps(SERVICES)
411
412 @dbus.service.method("com.Gwibber.Service", out_signature="s")
414
415 @dbus.service.method("com.Gwibber.Service", out_signature="s")
417 """
418 Returns a list of accounts as json string
419 """
420 all_accounts = []
421 for account in self.accounts.get_records(COUCH_TYPE_ACCOUNT, True):
422 all_accounts.append(account.value)
423 return json.dumps(all_accounts)
424
425 @dbus.service.method("com.Gwibber.Service")
427 log.logger.info("Gwibber Service is being shutdown")
428 self.mainloop.quit()
429
431 self.refresh_count += 1
432 self.LoadingComplete()
433 log.logger.info("Loading complete: %s - %s", self.refresh_count, [o[0] for o in output])
434
436 log.logger.error("Loading failed: %s - %s", exception, tb)
437
438 - def send(self, operations):
444
452
454 __dbus_object_path__ = "/com/gwibber/Connection"
455
457 self.bus = dbus.SessionBus()
458 bus_name = dbus.service.BusName("com.Gwibber.Connection", bus=self.bus)
459 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__)
460
461 self.sysbus = dbus.SystemBus()
462 try:
463 self.nm = self.sysbus.get_object(NM_DBUS_SERVICE, NM_DBUS_OBJECT_PATH)
464 self.nm.connect_to_signal("StateChanged", self.on_connection_changed)
465 except:
466 pass
467
469 if state == NM_STATE_CONNECTED:
470 log.logger.info("Network state changed to Online")
471 self.ConnectionOnline(state)
472 if state == NM_STATE_DISCONNECTED:
473 log.logger.info("Network state changed to Offline")
474 self.ConnectionOffline(state)
475
476 @dbus.service.signal("com.Gwibber.Connetion", signature="u")
478
479 @dbus.service.signal("com.Gwibber.Connection", signature="u")
481
482 @dbus.service.method("com.Gwibber.Connection")
484 try:
485 if self.nm.state() == NM_STATE_CONNECTED:
486 return True
487 return False
488 except:
489 return True
490
492 __dbus_object_path__ = "/com/gwibber/URLShorten"
493
495 self.bus = dbus.SessionBus()
496 bus_name = dbus.service.BusName("com.Gwibber.URLShorten", bus=self.bus)
497 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__)
498
499 @dbus.service.method("com.Gwibber.URLShorten", in_signature="ss", out_signature="s")
501 if self.IsShort(url): return url
502 try:
503 s = urlshorter.PROTOCOLS[service].URLShorter()
504 return s.short(url)
505 except: return url
506
507 @dbus.service.method("com.Gwibber.URLShorten", in_signature="s", out_signature="b")
509 for us in urlshorter.PROTOCOLS.values():
510 if url.startswith(us.PROTOCOL_INFO["fqdn"]):
511 return True
512 return False
513