From 8c0499f27af7ed2ee97defbe21e3a86b57379b9f Mon Sep 17 00:00:00 2001
From: Ludovic Le Frioux <llefriou@po461-pro.paris.inria.fr>
Date: Mon, 28 Sep 2020 12:49:19 +0200
Subject: [PATCH] Update pseudocode

---
 Cache               | 12 ------------
 Collection          | 24 ++++++++++++++++--------
 Global              |  6 ++++++
 NotificationHandler |  2 ++
 ServiceHandler      | 21 +++++++++++++++++++++
 Session             | 24 ++++++++++++++++++++----
 Transaction         | 12 +++++++++---
 TransactionBody     |  2 ++
 8 files changed, 76 insertions(+), 27 deletions(-)
 delete mode 100644 Cache
 create mode 100644 NotificationHandler
 create mode 100644 ServiceHandler
 create mode 100644 TransactionBody

diff --git a/Cache b/Cache
deleted file mode 100644
index 2e8fcb4..0000000
--- a/Cache
+++ /dev/null
@@ -1,12 +0,0 @@
-class Cache {
-
-  private cobjects: Map<CObjectId, CObject>
-  
-  fun getObject(oid: CObjectId) {
-    return cobjects.get(oid)
-  }
-
-  fun addObject(oid: CObjectId, obj: CObject) {
-    this.cobjects.put(oid. obj)
-  }
-}
diff --git a/Collection b/Collection
index dca1d13..69c99fa 100644
--- a/Collection
+++ b/Collection
@@ -1,9 +1,12 @@
 class Collection {
 
+  // The collection id
   private id: CollectionId
 
+  // Is the collection open in read only mode
   private readOnly: Boolean
 
+  // Static boolean used to ensure that only one collection is openned
   private static otherOpenned: Boolean = false
 
   Collection(cid: CollectionId) {
@@ -12,21 +15,26 @@ class Collection {
     this.id = cid
   }
 
-  // c_open_read|write<T> handler?
-  // TODO: manage notifications
-  fun open<T>(oid: String, readOnly: Boolean, handler: (VersionVector, List<CObjectId>) -> Unit = {}): T {
+  // c_open_read|write<T>
+  fun open<T>(oid: String, readOnly: Boolean, handler: NotificationHandler): T {
+    if (this.readOnly AND NOT readOnly) throw Exception
     oid = CObjectId(this.id, oid, T)
     if (Global.Cache.contains(oid)) return Global.Cache.get(oid)
-    crdt = SERVICE.getObject(oid, TxnEnv.getState()) // SERVICE.subscribe(oid)
-    if (this.readOnly AND NOT readOnly) throw Exception
-    return CObject<T>(oid, crdt, readOnly)
+    crdt, _ = SERVICE.getObject(oid, TxnEnv.getState())
+    SERVICE.subscribe(oid)
+    obj = CObject<T>(oid, crdt, readOnly)
+    Global.Cache.put(oid, obj)
+    Global.Handlers.put(oid, handler)
+    return obj
   }
 
+  // c_close_collection
   fun close() {
-    for obj in Global.Cache {
-      SERVICE.unsubscribe(obj.id)
+    for oid in Global.Cache {
+      SERVICE.unsubscribe(oid)
     }
     Global.Cache.clear()
+    Global.Handlers.clear()
     Collection.otherOpenned = false
   }
 }
diff --git a/Global b/Global
index d922dc1..7d3cff3 100644
--- a/Global
+++ b/Global
@@ -1,5 +1,11 @@
+// Transaction environment should inherit from crdtlib.utils.Environment 
 TxnEnv: TransactionEnvironment
 
+// Map for dirty objects
 DirtyCObjects: Map<CObjectId, CObject>
 
+// Map for the cache
 Cache: Map<CObjectId, CObject>
+
+// Map for c-object handlers management
+Handlers: Map<CObjectId, NotificationHandler>
diff --git a/NotificationHandler b/NotificationHandler
new file mode 100644
index 0000000..3f95a44
--- /dev/null
+++ b/NotificationHandler
@@ -0,0 +1,2 @@
+// The notification handler (function) type accepted for c-objects
+type NotificationHandler = (VersionVector, CObjectId) -> Unit
diff --git a/ServiceHandler b/ServiceHandler
new file mode 100644
index 0000000..120db22
--- /dev/null
+++ b/ServiceHandler
@@ -0,0 +1,21 @@
+// Thread that will listen for server notifications
+class ServiceHandler: Thread {
+
+  fun run() {
+    this.run(work)
+  }
+
+  fun work() {
+    while (true) {
+      v, oid = readMessage()
+      if (Global.Handlers.contains(oid)) {
+        // call the corresponding handler
+        Handlers.get(oid)(v, oid)
+      }
+    }
+  }
+
+  fun stop() {
+    exit(this)
+  }
+}
diff --git a/Session b/Session
index 04a7076..57a8642 100644
--- a/Session
+++ b/Session
@@ -1,9 +1,14 @@
 class Session {
 
+  // Current consistency level
   private consistency: ConsistencyLevel
 
+  // The client id
   private cliendId: CliendId
 
+  // The attached service handler
+  private serviceHandler: ServiceHandler
+
   Session(cid: ClientId) {
     this.clientId = cid
   }
@@ -12,6 +17,8 @@ class Session {
   static fun open(dbName: String, credentials: String): Session {
     clientId = ClientId()
     if (NOT SERVICE.open(dbName, credentials, clientId)) throw Exception
+    this.serviceHandler = ServiceHandler()
+    this.serviceHandler.run()
     return Session(clientId)
   }
 
@@ -19,16 +26,24 @@ class Session {
   fun pull(type: ConsistencyLevel) {
     if (consistency != null AND consistency > type) throw Exception
     this.consistency = type
-    v = SERVICE.pull(type)
-    Global.TxnEnv.updateState(v)
+    for oid in Cache {
+      crdt, v = SERVICE.getObject(oid)
+      obj = Global.Cache.get(oid)
+      obj.crdt = crdt
+      Global.TxnEnv.updateSate(v)
+    } 
   }
 
   // c_pull_XX_view(v)
   fun pull(type: ConsistencyLevel, vv: VersionVector) {
     if (view != null AND view.type > type) throw Exception
     if (vv < global.TxnEnv.getState()) throw Exception
-    v = SERVICE.pull(type, vv)
-    Global.TxnEnv.updateState(v)
+    for oid in Cache {
+      crdt, v = SERVICE.getObject(oid, vv)
+      obj = Global.Cache.get(oid)
+      obj.crdt = crdt
+      Global.TxnEnv.updateSate(v)
+    } 
   }
   
   // c_open_collection_read|write
@@ -44,5 +59,6 @@ class Session {
   // c_end_session
   fun close() {
     SERVICE.close(clientId)
+    this.serviceHandler.stop()
   }
 }
diff --git a/Transaction b/Transaction
index 0bc5a0c..a30bacf 100644
--- a/Transaction
+++ b/Transaction
@@ -1,21 +1,27 @@
 class Transaction {
 
-  private transactionId: String
+  // Transaction id
+  private transactionId: TxnId
 
+  // Static boolean used to ensure that only one collection is openned
   private static otherOpenned: Boolean = false
 
+  // The version vector at which the transaction has started
   private begin: VersionVector
 
-  Transaction(tid: TxnId, body: () -> Unit) {
+  Transaction(tid: TxnId, body: TransactionBody) {
     if (Transaction.otherOpenned) throw Exception
     Transaction.otherOpenned = true
     this.begin = Global.TxnEnv.getState()
     this.transactionId = Global.TxnEnv.tickTransaction()
 
     try {
+      // call the body function
       body()
+      // if everything goes well commit
       this.commit()
     } catch {
+      // If there is an exception abort
       this.abort()
     }
   }
@@ -33,7 +39,7 @@ class Transaction {
   fun commit() {
     SERVICE.beginTransaction(this.transactionId)
     for obj in Global.DirtyCObjects {
-      SERVICE.updateObject(obj.crdt.getDelta(this.begin))
+      SERVICE.pushObject(obj.crdt.getDelta(this.begin))
       obj.txnCommit()
     }
     SERVICE.commitTransaction(this.transactionId)
diff --git a/TransactionBody b/TransactionBody
new file mode 100644
index 0000000..cb39263
--- /dev/null
+++ b/TransactionBody
@@ -0,0 +1,2 @@
+// The function type accepted for a transaction body
+type TransactionBody = () -> Unit
-- 
GitLab