From a02b3f787f3e3fbd01774e94e8b5fbb87ed8f0ec Mon Sep 17 00:00:00 2001
From: Mohammed-Yacine Taleb <ytaleb@taleb.irisa.fr>
Date: Mon, 12 Feb 2018 15:31:01 +0100
Subject: [PATCH] Adding multiRead

---
 src/Log.cc                              | 10 ++++----
 src/MasterService.cc                    | 32 +++++++++++++++----------
 src/MultiLog.cc                         |  4 ++--
 src/MultiReadWriteGroupBatchPerfTest.cc |  2 +-
 4 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/src/Log.cc b/src/Log.cc
index 1093263ec..75b62f683 100644
--- a/src/Log.cc
+++ b/src/Log.cc
@@ -168,7 +168,7 @@ Log::sync()
     CycleCounter<uint64_t> __(&PerfStats::threadStats.logSyncCycles);
 
     Tub<SpinLock::Guard> lock;
-    lock.construct(appendLock);
+//lock.construct(appendLock);
     metrics.totalSyncCalls++;
 
     // The only time 'head' should be NULL is after construction and before the
@@ -198,9 +198,9 @@ Log::sync()
     // lock and grab the sync lock. This allows other writers to append to the
     // log while we wait. Once we grab the sync lock, take the append lock again
     // to ensure our new view of the head is consistent.
-    lock.destroy();
-    SpinLock::Guard _(syncLock);
-    lock.construct(appendLock);
+//lock.destroy();
+//SpinLock::Guard _(syncLock);
+//lock.construct(appendLock);
 
     // See if we still have work to do. It's possible that another thread
     // already did the syncing we needed for us.
@@ -212,7 +212,7 @@ Log::sync()
 
         // Drop the append lock. We don't want to block other appending threads
         // while we sync.
-        lock.destroy();
+//lock.destroy();
 
         originalHead->replicatedSegment->sync(appendedLength, &certificate);
         originalHead->syncedLength = appendedLength;
diff --git a/src/MasterService.cc b/src/MasterService.cc
index c3c538e14..49513e5e0 100644
--- a/src/MasterService.cc
+++ b/src/MasterService.cc
@@ -1607,20 +1607,26 @@ MasterService::multiReadGroup(const WireFormat::MultiOp::Request* reqHdr,
         currentResp->offset = 0;
         
         bool succeeded = false;
-        succeeded = objectManager.getMultiLog(currentReq->streamId, currentReq->streamletId)
-                                ->readSegment(
-				currentReq->segmentId, currentReq->groupId, currentReq->offset,
-				currentReq->maxObjects,
-				&currentResp->numberObjectsRead, maxResponseSize,
-				rpc->replyPayload, &currentResp->offset,
-				&currentResp->numberOfObjectEntries,
-				&currentResp->isSegmentClosed);
+        MultiLog* streamlet = objectManager.getMultiLog(currentReq->streamId, currentReq->streamletId);
+        if(streamlet)
+            succeeded = streamlet->readSegment( currentReq->segmentId, 
+                currentReq->groupId, currentReq->offset, currentReq->maxObjects,
+		&currentResp->numberObjectsRead, maxResponseSize,
+		rpc->replyPayload, &currentResp->offset, 
+                &currentResp->numberOfObjectEntries, 
+                &currentResp->isSegmentClosed);
         
-		if (!succeeded) { //some error in log.multireadSegment
-			currentResp->status = STATUS_RETRY;
-		} else {
-			currentResp->status = STATUS_OK;
-		}
+	if (!succeeded) { //some error in log.multireadSegment
+            if(!streamlet) {
+                currentResp->status = STATUS_REQUEST_FORMAT_ERROR;
+                RAMCLOUD_LOG(WARNING, "Consumer requesting non-existing streamId:  <%lu>  - streamletID: <%u>", currentReq->streamId, currentReq->streamletId);
+            }
+            else
+		currentResp->status = STATUS_RETRY;
+                
+	} else {
+		currentResp->status = STATUS_OK;
+	}
 
 //        RAMCLOUD_LOG(NOTICE, ">>> request service input: groupId=%lu  segmentId=%lu  "
 //      		" offset=%u  maxObjects=%u numberObjectsRead=%u  status=%d", currentReq->groupId, currentReq->segmentId,
diff --git a/src/MultiLog.cc b/src/MultiLog.cc
index e3f46264a..83514426b 100644
--- a/src/MultiLog.cc
+++ b/src/MultiLog.cc
@@ -149,7 +149,8 @@ bool MultiLog::appendMultipleObjects(uint64_t tableId, Buffer& objectsBuffer,
 		throw FatalError(HERE, "Guaranteed append managed to fail");
     
     *numberObjectsAppended += recordsCount;
-
+     
+    log->sync();
     return true;
     
     
@@ -264,7 +265,6 @@ LogSegment* MultiLog::allocHead(uint64_t* groupId, Log* log) {
     
     //Check the safety TODO
     log->head = groupHead; 
-    
     return groupHead;
     
 };
diff --git a/src/MultiReadWriteGroupBatchPerfTest.cc b/src/MultiReadWriteGroupBatchPerfTest.cc
index a88d780a7..22ae25011 100644
--- a/src/MultiReadWriteGroupBatchPerfTest.cc
+++ b/src/MultiReadWriteGroupBatchPerfTest.cc
@@ -642,7 +642,7 @@ TEST_F(MultiReadWriteGroupBatchPerfTest, multiwritegroupbatch_multireadbatch_pro
 //				memset(data, 0, dataLength);
 //		        objreq.get()->allocFirstChunkInternal(dataLength);
 //		        objreq.get()->allocFirstChunk(dataLength, data);
-
+                        RAMCLOUD_LOG(NOTICE, "Requesting object with stream: %lu - streamlet %u", tableId1, streamletId);
 				multiReadGroupObject.construct(tableId1, streamletId,  "object1-1", keysize, &respvalues[0], groupList);
 				MultiReadGroupObject* requests[] = {multiReadGroupObject.get()};
 
-- 
GitLab