Package gwibber :: Package microblog :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module gwibber.microblog.dispatcher

  1  #!/usr/bin/env python 
  2   
  3  import multiprocessing, threading, traceback, json, time 
  4  import gobject, dbus, dbus.service, mx.DateTime 
  5  import twitter, identica, statusnet, flickr, facebook 
  6  import qaiku, friendfeed, digg 
  7   
  8  import urlshorter 
  9  import util, util.couch 
 10  from util import log 
 11  from util import resources 
 12  from util.couch import Monitor as CouchMonitor 
 13  from util.couch import RecordMonitor 
 14  from desktopcouch.records.server import CouchDatabase 
 15  from desktopcouch.records.record import Record as CouchRecord 
 16   
 17  from util.const import * 
 18   
 19  try: 
 20    import indicate 
 21  except: 
 22    indicate = None 
 23   
 24  gobject.threads_init() 
 25   
 26  log.logger.name = "Gwibber Dispatcher" 
 27   
 28  PROTOCOLS = { 
 29    "twitter": twitter, 
 30    "identica": identica, 
 31    "flickr": flickr, 
 32    "facebook": facebook, 
 33    "friendfeed": friendfeed, 
 34    "statusnet": statusnet, 
 35    "digg": digg, 
 36    "qaiku": qaiku, 
 37  } 
 38   
 39  FEATURES = json.loads(GWIBBER_OPERATIONS) 
 40  SERVICES = dict([(k, v.PROTOCOL_INFO) for k, v in PROTOCOLS.items()]) 
 41  SETTINGS = RecordMonitor(COUCH_DB_SETTINGS, COUCH_RECORD_SETTINGS, COUCH_TYPE_CONFIG, DEFAULT_SETTINGS) 
42 43 -def perform_operation((acctid, opname, args, transient)):
44 try: 45 stream = FEATURES[opname]["stream"] or opname 46 accounts = CouchDatabase(COUCH_DB_ACCOUNTS, create=True) 47 messages = CouchDatabase(COUCH_DB_MESSAGES, create=True) 48 account = dict(accounts.get_record(acctid).items()) 49 50 logtext = "<%s:%s>" % (account["protocol"], opname) 51 log.logger.debug("%s Performing operation", logtext) 52 53 args = dict((str(k), v) for k, v in args.items()) 54 message_data = PROTOCOLS[account["protocol"]].Client(account)(opname, **args) 55 new_messages = [] 56 57 for m in message_data: 58 key = (m["id"], m["account"], opname, transient) 59 key = "-".join(x for x in key if x) 60 if not messages.record_exists(key): 61 m["operation"] = opname 62 m["stream"] = stream 63 m["transient"] = transient 64 65 log.logger.debug("%s Adding record", logtext) 66 new_messages.append(m) 67 messages.put_record(CouchRecord(m, COUCH_TYPE_MESSAGE, key)) 68 69 log.logger.debug("%s Finished operation", logtext) 70 return ("Success", new_messages) 71 except Exception as e: 72 if not "logtext" in locals(): logtext = "<UNKNOWN>" 73 log.logger.error("%s Operation failed", logtext) 74 log.logger.debug("Traceback:\n%s", traceback.format_exc()) 75 return ("Failure", traceback.format_exc())
76
77 -class OperationCollector:
78 - def __init__(self):
79 self.accounts = CouchDatabase(COUCH_DB_ACCOUNTS, create=True) 80 self.settings = CouchDatabase(COUCH_DB_SETTINGS, create=True) 81 self.messages = CouchDatabase(COUCH_DB_MESSAGES, create=True) 82 util.couch.init_design_doc(self.messages, "messages", COUCH_VIEW_MESSAGES)
83
84 - def handle_max_id(self, acct, opname, id=None):
85 if not id: id = acct["_id"] 86 if "sinceid" in SERVICES[acct["protocol"]]["features"]: 87 view = self.messages.execute_view("maxid", "messages") 88 result = view[[id, opname]][[id, opname]].rows 89 if len(result) > 0: return {"since": result[0].value} 90 return {}
91
92 - def validate_operation(self, acct, opname, enabled="receive_enabled"):
93 protocol = SERVICES[acct["protocol"]] 94 return acct["protocol"] in PROTOCOLS and \ 95 opname in protocol["features"] and \ 96 opname in FEATURES and acct[enabled]
97
98 - def stream_to_operation(self, stream):
99 account = self.accounts.get_record(stream["account"]) 100 args = stream["parameters"] 101 opname = stream["operation"] 102 if self.validate_operation(account, opname): 103 args.update(self.handle_max_id(account, opname, stream["_id"])) 104 return (stream["account"], stream["operation"], args, stream["_id"])
105
106 - def search_to_operations(self, search):
107 for account in self.accounts.get_records(COUCH_TYPE_ACCOUNT, True): 108 account = account.value 109 args = {"query": search["query"]} 110 if self.validate_operation(account, "search"): 111 args.update(self.handle_max_id(account, "search", search["_id"])) 112 yield (account["_id"], "search", args, search["_id"])
113
114 - def account_to_operations(self, acct):
115 if isinstance(acct, basestring): 116 acct = dict(self.accounts.get_record(acct).items()) 117 for opname in SERVICES[acct["protocol"]]["default_streams"]: 118 if self.validate_operation(acct, opname): 119 args = self.handle_max_id(acct, opname) 120 yield (acct["_id"], opname, args, False)
121
122 - def get_send_operations(self, message):
123 for account in self.accounts.get_records(COUCH_TYPE_ACCOUNT, True): 124 account = account.value 125 if self.validate_operation(account, "send", "send_enabled"): 126 yield (account["_id"], "send", {"message": message}, False)
127
128 - def get_operation_by_id(self, id):
129 if self.settings.record_exists(id): 130 item = dict(self.settings.get_record(id).items()) 131 if item["record_type"] == COUCH_TYPE_STREAM: 132 return [self.stream_to_operation(item)] 133 if item["record_type"] == COUCH_TYPE_SEARCH: 134 return list(self.search_to_operations(item))
135
136 - def get_operations(self):
137 for acct in self.accounts.get_records(COUCH_TYPE_ACCOUNT, True): 138 acct = acct.value 139 for o in self.account_to_operations(acct): 140 yield o 141 142 for stream in self.settings.get_records(COUCH_TYPE_STREAM, True): 143 stream = stream.value 144 if self.accounts.record_exists(stream["account"]): 145 o = self.stream_to_operation(stream) 146 if o: yield o 147 148 for search in self.settings.get_records(COUCH_TYPE_SEARCH, True): 149 search = search.value 150 for o in self.search_to_operations(search): 151 yield o
152
153 -class StreamMonitor(dbus.service.Object):
154 __dbus_object_path__ = "/com/gwibber/Streams" 155
156 - def __init__(self):
157 self.bus = dbus.SessionBus() 158 bus_name = dbus.service.BusName("com.Gwibber.Streams", bus=self.bus) 159 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__) 160 161 setting_monitor = CouchMonitor(COUCH_DB_SETTINGS) 162 setting_monitor.connect("record-updated", self.on_setting_changed) 163 setting_monitor.connect("record-deleted", self.on_setting_deleted)
164
165 - def on_setting_changed(self, monitor, id):
166 if id == "settings": self.SettingChanged() 167 else: 168 log.logger.debug("Stream changed: %s", id) 169 self.StreamChanged(id)
170
171 - def on_setting_deleted(self, monitor, id):
172 log.logger.debug("Stream closed: %s", id) 173 self.StreamClosed(id)
174 175 @dbus.service.signal("com.Gwibber.Streams", signature="s")
176 - def StreamChanged(self, id): pass
177 178 @dbus.service.signal("com.Gwibber.Streams", signature="s")
179 - def StreamClosed(self, id): pass
180 181 @dbus.service.signal("com.Gwibber.Streams")
182 - def SettingChanged(self): pass
183
184 -class AccountMonitor(dbus.service.Object):
185 __dbus_object_path__ = "/com/gwibber/Accounts" 186
187 - def __init__(self):
188 self.bus = dbus.SessionBus() 189 bus_name = dbus.service.BusName("com.Gwibber.Accounts", bus=self.bus) 190 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__) 191 192 account_monitor = CouchMonitor(COUCH_DB_ACCOUNTS) 193 account_monitor.connect("record-updated", self.on_account_changed) 194 account_monitor.connect("record-deleted", self.on_account_deleted)
195
196 - def on_account_changed(self, monitor, id):
197 log.logger.debug("Account changed: %s", id) 198 self.AccountChanged(id)
199
200 - def on_account_deleted(self, monitor, id):
201 log.logger.debug("Account deleted: %s", id) 202 self.AccountDeleted(id)
203 204 @dbus.service.signal("com.Gwibber.Accounts", signature="s")
205 - def AccountChanged(self, id): pass
206 207 @dbus.service.signal("com.Gwibber.Accounts", signature="s")
208 - def AccountDeleted(self, id): pass
209
210 -class MessagesMonitor(dbus.service.Object):
211 __dbus_object_path__ = "/com/gwibber/Messages" 212
213 - def __init__(self):
214 self.bus = dbus.SessionBus() 215 bus_name = dbus.service.BusName("com.Gwibber.Messages", bus=self.bus) 216 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__) 217 218 self.monitor = CouchMonitor(COUCH_DB_MESSAGES) 219 self.monitor.connect("record-updated", self.on_message_updated) 220 221 self.messages = CouchDatabase(COUCH_DB_MESSAGES, create=True) 222 223 self.indicator_items = {} 224 self.notified_items = [] 225 226 if indicate and util.resources.get_desktop_file(): 227 self.indicate = indicate.indicate_server_ref_default() 228 self.indicate.set_type("message.gwibber") 229 self.indicate.set_desktop_file(util.resources.get_desktop_file()) 230 self.indicate.connect("server-display", self.on_indicator_activate) 231 self.indicate.show()
232
233 - def on_message_updated(self, monitor, id):
234 try: 235 #log.logger.debug("Message updated: %s", id) 236 message = self.messages.get_record(id) 237 self.new_message(message) 238 except: 239 log.logger.error("Message updated: %s, failed", id)
240 241 242 @dbus.service.signal("com.Gwibber.Messages", signature="s")
243 - def MessageUpdated(self, id): pass
244
245 - def on_indicator_activate(self, indicator, timestamp):
246 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 247 client_bus = dbus.SessionBus() 248 log.logger.debug("Raising gwibber client") 249 try: 250 client_obj = client_bus.get_object("com.GwibberClient", 251 "/com/GwibberClient", follow_name_owner_changes = True, 252 introspect = False) 253 gw = dbus.Interface(client_obj, "com.GwibberClient") 254 gw.focus_client(reply_handler=self.handle_focus_reply, 255 error_handler=self.handle_focus_error) 256 except dbus.DBusException: 257 print_exc()
258 259
260 - def on_indicator_reply_activate(self, indicator, timestamp):
261 log.logger.debug("Raising gwibber client, focusing replies stream") 262 client_bus = dbus.SessionBus() 263 try: 264 client_obj = client_bus.get_object("com.GwibberClient", "/com/GwibberClient") 265 gw = dbus.Interface(client_obj, "com.GwibberClient") 266 gw.show_replies(reply_handler=self.handle_focus_reply, 267 error_handler=self.handle_focus_error) 268 indicator.hide() 269 except dbus.DBusException: 270 print_exc()
271
272 - def handle_focus_reply(self, *args):
273 log.logger.debug("Gwibber Client raised")
274
275 - def handle_focus_error(self, *args):
276 log.logger.error("Failed to raise client %s", args)
277
278 - def new_message(self, message):
279 min_time = mx.DateTime.DateTimeFromTicks() - mx.DateTime.TimeDelta(minutes=10.0) 280 log.logger.debug("Checking message %s timestamp (%s) to see if it is newer than %s", message["id"], mx.DateTime.DateTimeFromTicks(message["time"]).localtime(), min_time) 281 if mx.DateTime.DateTimeFromTicks(message["time"]).localtime() > mx.DateTime.DateTimeFromTicks(min_time): 282 log.logger.debug("Message %s newer than %s, notifying", message["id"], min_time) 283 #FIXME: confirm we can use to_me here too 284 if indicate and message["stream"] == "replies": 285 if message["id"] not in self.indicator_items: 286 log.logger.debug("Message %s is a reply, adding messaging indicator", message["id"]) 287 self.handle_indicator_item(message) 288 if message["id"] not in self.notified_items: 289 self.notified_items.append(message["id"]) 290 self.show_notification_bubble(message)
291
292 - def handle_indicator_item(self, message):
293 indicator = indicate.Indicator() if hasattr(indicate, "Indicator") else indicate.IndicatorMessage() 294 indicator.connect("user-display", self.on_indicator_reply_activate) 295 indicator.set_property("subtype", "im.gwibber") 296 indicator.set_property("sender", message["sender"].get("name", "")) 297 indicator.set_property("body", message["text"]) 298 indicator.set_property_time("time", 299 mx.DateTime.DateTimeFromTicks(message["time"]).localtime().ticks()) 300 self.indicator_items[message["id"]] = indicator 301 indicator.show() 302 log.logger.debug("Message from %s added to indicator", message["sender"].get("name", ""))
303
304 - def show_notification_bubble(self, message):
305 if util.can_notify and SETTINGS["show_notifications"]: 306 if SETTINGS["notify_mentions_only"] and not message["to_me"]: return 307 #until image caching is working again, we will post the gwibber icon 308 #image = hasattr(message, "image_path") and message["image_path"] or '' 309 image = util.resources.get_ui_asset("gwibber.svg") 310 expire_timeout = 5000 311 n = util.notify(message["sender"].get("name", ""), message["text"], image, expire_timeout)
312
313 -class MapAsync(threading.Thread):
314 - def __init__(self, func, iterable, cbsuccess, cbfailure, timeout=120):
315 threading.Thread.__init__(self) 316 self.iterable = iterable 317 self.callback = cbsuccess 318 self.failure = cbfailure 319 self.timeout = timeout 320 self.daemon = True 321 self.func = func 322 self.start()
323
324 - def run(self):
325 try: 326 pool = multiprocessing.Pool() 327 pool.map_async(self.func, self.iterable, callback=self.callback).get(self.timeout) 328 except Exception as e: 329 self.failure(e, traceback.format_exc())
330
331 -class Dispatcher(dbus.service.Object, threading.Thread):
332 __dbus_object_path__ = "/com/gwibber/Service" 333
334 - def __init__(self, loop, autorefresh=True):
335 self.bus = dbus.SessionBus() 336 bus_name = dbus.service.BusName("com.Gwibber.Service", bus=self.bus) 337 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__) 338 threading.Thread.__init__(self) 339 340 self.collector = OperationCollector() 341 342 self.refresh_count = 0 343 self.mainloop = loop 344 345 if autorefresh: 346 gobject.timeout_add(60 * 1000 * 5, self.refresh) 347 self.refresh() 348 349 self.accounts = CouchDatabase(COUCH_DB_ACCOUNTS, create=True)
350 351 @dbus.service.signal("com.Gwibber.Service")
352 - def LoadingComplete(self): pass
353 354 @dbus.service.signal("com.Gwibber.Service")
355 - def LoadingStarted(self): pass
356 357 @dbus.service.method("com.Gwibber.Service")
358 - def Refresh(self):
359 """ 360 Calls the Gwibber Service to trigger a refresh operation 361 """ 362 self.refresh()
363 364 @dbus.service.method("com.Gwibber.Service", in_signature="s")
365 - def PerformOp(self, opdata):
366 try: o = json.loads(opdata) 367 except: return 368 369 log.logger.debug("** Starting Single Operation **") 370 self.LoadingStarted() 371 372 params = ["account", "operation", "args", "transient"] 373 operation = None 374 375 if "id" in o: 376 operation = self.collector.get_operation_by_id(o["id"]) 377 elif all(i in o for i in params): 378 operation = [tuple(o[i] for i in params)] 379 elif "account" in o and self.accounts.record_exists(o["account"]): 380 operation = self.collector.account_to_operations(o["account"]) 381 382 if operation: 383 MapAsync(perform_operation, operation, self.loading_complete, self.loading_failed)
384 385 @dbus.service.method("com.Gwibber.Service", in_signature="s")
386 - def SendMessage(self, message):
387 """ 388 Posts a message/status update to all accounts with send_enabled = True. It 389 takes one argument, which is a message formated as a string. 390 """ 391 self.send(self.collector.get_send_operations(message))
392 393 @dbus.service.method("com.Gwibber.Service", in_signature="s")
394 - def Send(self, opdata):
395 try: 396 o = json.loads(opdata) 397 if "target" in o: 398 args = {"message": o["message"], "target": o["target"]} 399 operations = [(o["target"]["account"], "send_thread", args, None)] 400 elif "accounts" in o: 401 operations = [(a, "send", {"message": o["message"]}, None) for a in o["accounts"]] 402 self.send(operations) 403 except: pass
404 405 @dbus.service.method("com.Gwibber.Service", out_signature="s")
406 - def GetServices(self):
407 """ 408 Returns a list of services available as json string 409 """ 410 return json.dumps(SERVICES)
411 412 @dbus.service.method("com.Gwibber.Service", out_signature="s")
413 - def GetFeatures(self): return json.dumps(FEATURES)
414 415 @dbus.service.method("com.Gwibber.Service", out_signature="s")
416 - def GetAccounts(self):
417 """ 418 Returns a list of accounts as json string 419 """ 420 all_accounts = [] 421 for account in self.accounts.get_records(COUCH_TYPE_ACCOUNT, True): 422 all_accounts.append(account.value) 423 return json.dumps(all_accounts)
424 425 @dbus.service.method("com.Gwibber.Service")
426 - def Quit(self):
427 log.logger.info("Gwibber Service is being shutdown") 428 self.mainloop.quit() 429
430 - def loading_complete(self, output):
431 self.refresh_count += 1 432 self.LoadingComplete() 433 log.logger.info("Loading complete: %s - %s", self.refresh_count, [o[0] for o in output])
434
435 - def loading_failed(self, exception, tb):
436 log.logger.error("Loading failed: %s - %s", exception, tb)
437
438 - def send(self, operations):
439 operations = util.compact(operations) 440 if operations: 441 self.LoadingStarted() 442 log.logger.debug("*** Sending Message ***") 443 MapAsync(perform_operation, operations, self.loading_complete, self.loading_failed)
444
445 - def refresh(self):
446 operations = list(self.collector.get_operations()) 447 if operations: 448 log.logger.debug("** Starting Refresh **") 449 self.LoadingStarted() 450 MapAsync(perform_operation, operations, self.loading_complete, self.loading_failed) 451 return True
452
453 -class ConnectionMonitor(dbus.service.Object):
454 __dbus_object_path__ = "/com/gwibber/Connection" 455
456 - def __init__(self):
457 self.bus = dbus.SessionBus() 458 bus_name = dbus.service.BusName("com.Gwibber.Connection", bus=self.bus) 459 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__) 460 461 self.sysbus = dbus.SystemBus() 462 try: 463 self.nm = self.sysbus.get_object(NM_DBUS_SERVICE, NM_DBUS_OBJECT_PATH) 464 self.nm.connect_to_signal("StateChanged", self.on_connection_changed) 465 except: 466 pass
467
468 - def on_connection_changed(self, state):
469 if state == NM_STATE_CONNECTED: 470 log.logger.info("Network state changed to Online") 471 self.ConnectionOnline(state) 472 if state == NM_STATE_DISCONNECTED: 473 log.logger.info("Network state changed to Offline") 474 self.ConnectionOffline(state)
475 476 @dbus.service.signal("com.Gwibber.Connetion", signature="u")
477 - def ConnectionOnline(self, state): pass
478 479 @dbus.service.signal("com.Gwibber.Connection", signature="u")
480 - def ConnectionOffline(self, state): pass
481 482 @dbus.service.method("com.Gwibber.Connection")
483 - def isConnected(self):
484 try: 485 if self.nm.state() == NM_STATE_CONNECTED: 486 return True 487 return False 488 except: 489 return True
490
491 -class URLShorten(dbus.service.Object):
492 __dbus_object_path__ = "/com/gwibber/URLShorten" 493
494 - def __init__(self):
495 self.bus = dbus.SessionBus() 496 bus_name = dbus.service.BusName("com.Gwibber.URLShorten", bus=self.bus) 497 dbus.service.Object.__init__(self, bus_name, self.__dbus_object_path__)
498 499 @dbus.service.method("com.Gwibber.URLShorten", in_signature="ss", out_signature="s")
500 - def Shorten(self, url, service):
501 if self.IsShort(url): return url 502 try: 503 s = urlshorter.PROTOCOLS[service].URLShorter() 504 return s.short(url) 505 except: return url
506 507 @dbus.service.method("com.Gwibber.URLShorten", in_signature="s", out_signature="b")
508 - def IsShort(self, url):
509 for us in urlshorter.PROTOCOLS.values(): 510 if url.startswith(us.PROTOCOL_INFO["fqdn"]): 511 return True 512 return False
513