diff --git a/Producer.cc b/Producer.cc index 418d904dab59799973cb61980d751d3e23e2dd81..0aa48900f9fc6f3c487af1d2a594cd49e5a37715 100644 --- a/Producer.cc +++ b/Producer.cc @@ -42,7 +42,7 @@ using namespace RAMCloud; //how much memory the producer can use to batch records before sending them = CACHE_SIZE * BATCH_SIZE #define CACHE_SIZE 1000 #define MAXNUMSTREAMLETS 128 -#define MAX_OUT_RPCS 1000 +#define MAX_OUT_RPCS 2000 @@ -110,20 +110,17 @@ public: } }; -struct MultiWriteGroupBatchObjectE; struct Batch; //key=streamletId, value is a list of indexes to batchesReuse; we append to the tail of indexes, the others should be closed //to be used by producers for appending next record //to be used by writers to identify "to be/closed" batches to be written std::map<uint32_t, Tub<AtomicQueue<uint32_t>>> streamletQueues; -//std::map<uint32_t, AtomicQueue<uint32_t>> streamletBatches; //contains batch objects with objects' content in buffers[i] -//access is protected by streamletActiveLocks[streamletId%MAXNUMSTREAMLETS] -//std::array<MultiWriteGroupBatchObjectE*, CACHE_SIZE> batchesReuse; //size of CACHE_SIZE -std::array<Batch*, CACHE_SIZE> batchPointers; +//std::array<Batch*, CACHE_SIZE> batchPointers; +std::vector<Batch*> batchPointers; //next is used to access batchesReuseIndexes SpinLock cacheLock("batches"); //for batches @@ -138,7 +135,7 @@ std::atomic<uint64_t> inFlightRecords; std::atomic<bool> producerStop; std::atomic<bool> writerStop; -Tub<Buffer> buffers[CACHE_SIZE]; +//Tub<Buffer> buffers[CACHE_SIZE]; struct Batch{ @@ -172,24 +169,25 @@ struct Batch{ int reapRPCs(std::vector<Tub<BatchWriteRpc>>& rpcs, uint32_t* recordsWritten) { int counter = 0; + uint32_t nbRecords = 0; for (int i = 0; i < rpcs.size(); i++) { if (rpcs[i]){ if (rpcs[i]->isReady()) { rpcs[i]->wait(); - if(recordsWritten) - (*recordsWritten) += rpcs[i]->recordsWritten; + nbRecords += rpcs[i]->recordsWritten; rpcs[i].destroy(); counter++; } } } + *recordsWritten = nbRecords; return counter; } void produce_batches(uint64_t streamId, uint32_t BATCH_SIZE, uint32_t RECORD_SIZE, uint32_t NSTREAMLETS, - uint64_t recordCount, uint64_t PRODID) + uint64_t recordCount, uint64_t PRODID, uint32_t maxBatchesAllowed) try { uint32_t streamletId = 0; //to be initialized later, >=1 @@ -231,7 +229,7 @@ try while (!producerStop) { freeBatch = -1; - if(totalRecords + inFlightRecords >= recordCount) + if(totalRecords >= recordCount) break; streamletId = (++streamletId > NSTREAMLETS) ? 1 : streamletId; //next one @@ -242,11 +240,10 @@ try if(!freeBatches.empty()) { freeBatch = freeBatches.pop(); //can be NULL, first batch for this streamlet nextBatch = batchPointers[freeBatch]; - } else { + } else { //This should not happen! always allow cache to handle as - //enough data for the experiment - - Cycles::sleep(100000); + //enough data for the experiment + streamletId--; continue; } @@ -257,18 +254,20 @@ try nextBatch->length = 0; nextBatch->numberObjectsAppended = 0; nextBatch->streamletId = streamletId; + nextBatch->recordsCount = 0; nextBatch->creationTime = Cycles::toMicroseconds(Cycles::rdtsc()); - while (totalRecords + inFlightRecords < recordCount && nextBatch->length + objectLength < BATCH_SIZE) { - nextBatch->objects->get()->append(entryHeaderBuffer.get(), 0, entryHeaderBufferLength); - Object::appendHeaderKeysAndValueToBuffer(*key.get(), value, RECORD_SIZE, nextBatch->objects->get(), false); + while (totalRecords < recordCount && nextBatch->length + RECORD_SIZE + entryHeaderBufferLength < BATCH_SIZE) { + nextBatch->objects->get()->append(entryHeaderBuffer.get(), 0, entryHeaderBufferLength); + uint32_t objLen = 0; + Object::appendHeaderKeysAndValueToBuffer(*key.get(), value, RECORD_SIZE, nextBatch->objects->get(), false, &objLen); //update WriteGroupBatchPart - nextBatch->length += (entryHeaderBufferLength + objectLength); + nextBatch->length += (entryHeaderBufferLength + objLen); nextBatch->recordsCount += 1; totalRecords++; } assert(freeBatch != -1); streamletQueues[nextBatch->streamletId]->push(freeBatch); - totalProducedBatches++; + totalProducedBatches++; /* fprintf(stdout, "produced a batch with %u records for streamlet %u, with a total size %u bytes == total produced batches %u\n", nextBatch->recordsCount, nextBatch->streamletId, @@ -289,7 +288,7 @@ try fflush (stdout); - for (uint32_t b = 0; b < CACHE_SIZE; b++) { + for (uint32_t b = 0; b < maxBatchesAllowed; b++) { Batch* nextBatch = batchPointers[b]; nextBatch->objects->get()->reset(); } @@ -305,7 +304,7 @@ catch (RAMCloud::ClientException& e) { Tub<SpinLock::Guard> takeCacheLock; takeCacheLock.construct(cacheLock); - for (uint32_t b = 0; b < CACHE_SIZE; b++) { + for (uint32_t b = 0; b < maxBatchesAllowed; b++) { Batch* nextBatch = batchPointers[b]; nextBatch->objects->get()->reset(); } @@ -322,7 +321,7 @@ catch (RAMCloud::Exception& e) { Tub<SpinLock::Guard> takeCacheLock; takeCacheLock.construct(cacheLock); - for (uint32_t b = 0; b < CACHE_SIZE; b++) { + for (uint32_t b = 0; b < maxBatchesAllowed; b++) { Batch* nextBatch = batchPointers[b]; nextBatch->objects->get()->reset(); } @@ -362,10 +361,15 @@ try while (outRPCs == MAX_OUT_RPCS) { ramcloud->poll(); uint32_t objectsWritten = 0; - outRPCs -= reapRPCs(rpcs, &objectsWritten); - records += objectsWritten; - totalRecords += objectsWritten; - inFlightRecords -= objectsWritten; + int finishedRPCs = reapRPCs(rpcs, &objectsWritten); + if (finishedRPCs > 0) { + outRPCs -= finishedRPCs; + records += objectsWritten; + //fprintf(stdout, "Written %u objects",objectsWritten); + //fflush(stdout);i + inFlightRecords -= objectsWritten; + totalRecords += objectsWritten; + } } for (int i = 0; i < rpcs.size() && inFlightRecords + totalRecords < recordCount; i++) { @@ -394,14 +398,21 @@ try } end = Cycles::rdtsc(); //Reap RPCs each 100us - if (end - start > 100) { + if (end - start > 100 ) { ramcloud->poll(); start = Cycles::rdtsc(); - uint32_t objectsWritten = 0; - outRPCs -= reapRPCs(rpcs, &objectsWritten); - records += objectsWritten; - inFlightRecords -= objectsWritten; - totalRecords += objectsWritten; + uint32_t objectsWritten = 0; + int finishedRPCs = reapRPCs(rpcs, &objectsWritten); + if (finishedRPCs > 0) { + outRPCs -= finishedRPCs; + records += objectsWritten; + //fprintf(stdout, "Written %u objects",objectsWritten); + //fflush(stdout); + inFlightRecords -= objectsWritten; + totalRecords += objectsWritten; + } + + } } fprintf(stdout, "======== done write_batches =======\n"); @@ -442,19 +453,24 @@ try { uint64_t streamId = ramcloud->createTable("table1", NNODES, NSTREAMLETS); streamId = ramcloud->getTableId("table1"); + //A simple formula to avoid client memory overflow, just allow up to 2GB + uint32_t maxBatchesAllowed = 2000000000 / (RECORD_SIZE * BATCH_SIZE); + fprintf(stdout, "%u <== Max Number of Batches\n", maxBatchesAllowed); + fflush(stdout); + Tub<Buffer>* buffers = reinterpret_cast<Tub<Buffer>*> + (malloc(sizeof(Tub<Buffer>) * maxBatchesAllowed)); //A test - ramcloud->write(streamId, "42", 2, "Hello, World!", 0); + // ramcloud->write(streamId, "42", 2, "Hello, World!", 0); fprintf(stdout, "StreamId id is %lu\n", streamId); fflush(stdout); assert(RECORD_SIZE + 30 < BATCH_SIZE); uint64_t entryId = 0; - for (uint32_t b = 0; b < CACHE_SIZE; b++) { - buffers[b].construct(); - - //preparing a batch for batchesReuse + for (uint32_t b = 0; b < maxBatchesAllowed; b++) { + buffers[b].construct(); + //preparing a batch for batchesReuse Batch* nextBatch = new Batch(&buffers[b], entryId++, -1); //keyHash==-1, to reset at reuse @@ -467,7 +483,7 @@ try { nextBatch->streamletId = 0; //invalid, should be streamletId>0 nextBatch->numberObjectsAppended = 0; - batchPointers[b] = nextBatch; + batchPointers.push_back(nextBatch); freeBatches.push(b); } @@ -487,7 +503,7 @@ try { std::thread twrite(write_batches, streamId, ramcloud.get(), NNODES, PRODID, recordCount); twrite.detach(); //this thread is the source of the stream, continuously producing batches of records - std::thread tproduce(produce_batches, streamId, BATCH_SIZE, RECORD_SIZE, NSTREAMLETS, recordCount, PRODID); + std::thread tproduce(produce_batches, streamId, BATCH_SIZE, RECORD_SIZE, NSTREAMLETS, recordCount, PRODID, maxBatchesAllowed); tproduce.detach(); //next print throughput @@ -514,9 +530,9 @@ try { Cycles::sleep(999990); //1000ms ramcloudControl->serverControlAll(WireFormat::GET_PERF_STATS, NULL, 0, &statsAfter); - fprintf(stdout, "========\n%s", PerfStats::printMinClusterStats(&statsBefore - , &statsAfter).c_str()); - fflush(stdout); + fprintf(stdout, "========\n%s", PerfStats::printMinClusterStats(&statsBefore + , &statsAfter).c_str()); + fflush(stdout); } uint64_t stopInsert = Cycles::rdtsc(); @@ -528,7 +544,7 @@ try { writerStop = true; producerStop = true; - + //delete[] buffers; while (producerStop && writerStop) { Cycles::sleep(1000000); //1000ms } diff --git a/scripts/genLevels.py b/scripts/genLevels.py index fa226673e8f8bab5077c82ef5a2529374ed72c6f..d8a7d2e95153059730980368224e9bb189964223 100755 --- a/scripts/genLevels.py +++ b/scripts/genLevels.py @@ -79,7 +79,8 @@ callees = { "TX_REQUEST_ABORT": ["BACKUP_WRITE"], "WRITE": ["BACKUP_WRITE", "INSERT_INDEX_ENTRY", "REMOVE_INDEX_ENTRY"], - "BATCH_WRITE": ["BACKUP_WRITE"] + "BATCH_WRITE": ["BACKUP_WRITE", "INSERT_INDEX_ENTRY", + "REMOVE_INDEX_ENTRY", "GET_SERVER_ID"], } # The following dictionary maps from the name of an opcode to its diff --git a/src/BackupMasterRecovery.h b/src/BackupMasterRecovery.h index 4687784d38df7612689c73fa2dc327775e66cfc5..843bbbac4e29c9241b36e738bf3957807c37606d 100755 --- a/src/BackupMasterRecovery.h +++ b/src/BackupMasterRecovery.h @@ -118,10 +118,14 @@ class BackupMasterRecovery : public Task { uint64_t getRecoveryId(); void performTask(); + uint32_t getLastChecksum(BackupStorage::FrameRef& + frame, void* buffer); + PRIVATE: void populateStartResponse(Buffer* buffer, StartResponse* response); struct Replica; + void buildRecoverySegments(Replica& replica); bool getLogDigest(Replica& replica, Buffer* digestBuffer); /** @@ -282,7 +286,7 @@ class BackupMasterRecovery : public Task { * A map that uniquely identifies segments and their corresponding * replicas. The tuple takes stream, streamlet, and segment identifiers * to map to a replica. - */ + */ std::unordered_map<uint64_t, std::unordered_map<uint32_t, std::unordered_map<uint64_t, Replica*>>> diff --git a/src/BackupService.cc b/src/BackupService.cc index f76d337d5be22ccc293072aa772234172066dab1..401022bee8e5b4b057a4a60d934ea1fa31f5d138 100755 --- a/src/BackupService.cc +++ b/src/BackupService.cc @@ -77,6 +77,7 @@ BackupService::BackupService(Context* context, config->backup.numSegmentFrames, config->backup.writeRateLimit, maxWriteBuffers, + context, config->backup.file.c_str(), O_DIRECT | O_SYNC)); } @@ -663,8 +664,12 @@ BackupService::writeSegment(const WireFormat::BackupWrite::Request* reqHdr, // Perform close, if any. if (reqHdr->close) { - LOG(DEBUG, "Closing <%s,%lu>", masterId.toString().c_str(), segmentId); + LOG(NOTICE, "Closing <%s,%lu>", masterId.toString().c_str(), segmentId); frame->close(); + auto it = frames.find(MasterSegmentIdPair(masterId, segmentId)); + if (it != frames.end()) frames.erase(it); + + //frame->free(); } } diff --git a/src/BasicTransport.h b/src/BasicTransport.h index 77fe25cb1250b1c1e3f7a8cda4d2024a05cfced7..a8228c9e25cdfe2b50432a50a40ff40e43249c60 100755 --- a/src/BasicTransport.h +++ b/src/BasicTransport.h @@ -47,7 +47,8 @@ class BasicTransport : public Transport { // any transport or driver state. return new Session(this, serviceLocator, timeoutMs); } - void registerMemory(void* base, size_t bytes) { + void registerMemory(void* base, size_t bytes, + Transport::RDMAHandle* rdmaHandle = NULL) { driver->registerMemory(base, bytes); } diff --git a/src/Driver.h b/src/Driver.h index e6159946d6bc48e4b89043d466a2843e8b59329b..3fcbf46b4f7237feb0a5baf52b00d48622568c2a 100755 --- a/src/Driver.h +++ b/src/Driver.h @@ -21,6 +21,7 @@ #include "Common.h" #include "Buffer.h" +#include "Transport.h" #undef CURRENT_LOG_MODULE #define CURRENT_LOG_MODULE RAMCloud::TRANSPORT_MODULE @@ -325,7 +326,8 @@ class Driver { * The total size in bytes of the memory region starting at \a base * that is to be registered with the NIC. */ - virtual void registerMemory(void* base, size_t bytes) {} + virtual void registerMemory(void* base, size_t bytes, + Transport::RDMAHandle* rdmaHandle = NULL) {}; /** * Send a single packet out over this Driver. The packet will not diff --git a/src/InMemoryStorage.cc b/src/InMemoryStorage.cc index e5a979672d2aca8c8b38b0d6ca2e4be2c8d1673c..bf10e24aea8dc327e1352eeeb2553dfd2698b610 100755 --- a/src/InMemoryStorage.cc +++ b/src/InMemoryStorage.cc @@ -36,7 +36,7 @@ InMemoryStorage::Frame::Frame(InMemoryStorage* storage, size_t frameIndex) , appendedToByCurrentProcess() , loadRequested() , metadata(new char[METADATA_SIZE]) -{ +{ memset(metadata.get(), '\0', METADATA_SIZE); } @@ -213,8 +213,7 @@ InMemoryStorage::Frame::close() isOpen = false; isClosed = true; //It will probably make sense to add this when I move experiments to machines - //with large memory - //free(); + //with large memory } // See BackupStorage.h for documentation. diff --git a/src/InfRcTransport.cc b/src/InfRcTransport.cc index d6d0fbd759438ea0eda6f5cd652212001c593363..8bd00517626090a937b1b5fb472b2f3c5d6ea879 100755 --- a/src/InfRcTransport.cc +++ b/src/InfRcTransport.cc @@ -470,6 +470,7 @@ InfRcTransport::InfRcSession::cancelRequest( uint64_t start = Cycles::rdtsc(); while (transport->freeTxBuffers.size() != MAX_TX_QUEUE_DEPTH) { transport->reapTxBuffers(); + //transport->reapTxBuffers(transport); // Must invoke the poller to process incoming requests, // in order to handle occasional situations where all of the // transmit buffers are in use for messages coming to @@ -542,6 +543,35 @@ InfRcTransport::InfRcSession::sendRequest(Buffer* request, rpc->sendOrQueue(); } + +// A send request for RDMA operations +void +InfRcTransport::InfRcSession::sendRDMARequest(Buffer* request, Buffer* response, + RpcNotifier* notifier, Transport::RDMAHandle* rdmaHandle) +{ + response->reset(); + InfRcTransport *t = transport; + if (qp == NULL) { + notifier->failed(); + return; + } + + LOG(DEBUG, "Sending RDMA %s request to %s with %u bytes", + WireFormat::opcodeSymbol(request), serviceLocator.c_str(), + request->size()); + if (request->size() > t->getMaxRpcSize()) { + throw TransportException(HERE, + format("client request exceeds maximum rpc size " + "(attempted %u bytes, maximum %u bytes)", + request->size(), + t->getMaxRpcSize())); + } + ClientRpc *rpc = transport->clientRpcPool.construct(transport, this, + request, response, + notifier, + generateRandom()); + rpc->sendOrQueue(rdmaHandle); +} /** * Constrctor for ServerPort object **/ @@ -889,12 +919,12 @@ InfRcTransport::postSrqReceiveAndKickTransmit(ibv_srq* srq, * behaviour impacts our Transport API. */ Infiniband::BufferDescriptor* -InfRcTransport::getTransmitBuffer() +InfRcTransport::getTransmitBuffer(InfRcTransport* transport = NULL) { // if we've drained our free tx buffer pool, we must wait. while (freeTxBuffers.empty()) { reapTxBuffers(); - + //reapTxBuffers(t); if (freeTxBuffers.empty()) { // We are temporarily out of buffers. Time how long it takes // before a transmit buffer becomes available again (a long @@ -910,6 +940,7 @@ InfRcTransport::getTransmitBuffer() // in use sending messages to ourself. poller.poll(); reapTxBuffers(); + //reapTxBuffers(t); uint64_t now = Cycles::rdtsc(); if (now > nextLog) { LOG(WARNING, "Transmit buffers unavailable for %.1f " @@ -951,7 +982,7 @@ InfRcTransport::getTransmitBuffer() * The number of buffers reaped. */ int -InfRcTransport::reapTxBuffers() +InfRcTransport::reapTxBuffers(InfRcTransport* t) { ibv_wc retArray[MAX_TX_QUEUE_DEPTH]; int n = infiniband->pollCompletionQueue(commonTxCq, @@ -961,7 +992,41 @@ InfRcTransport::reapTxBuffers() for (int i = 0; i < n; i++) { BufferDescriptor* bd = reinterpret_cast<BufferDescriptor*>(retArray[i].wr_id); - pendingOutputBytes -= bd->messageBytes; + /* + * Reap RDMA request buffers + */ + if ((retArray[i].opcode == IBV_WC_RDMA_WRITE) && + !t->outstandingRpcs.empty()) { + ibv_wc* response = &retArray[i]; + CycleCounter<RawMetric> receiveTicks; + if (response->status != IBV_WC_SUCCESS) { + LOG(ERROR, "wc.status(%d:%s) != IBV_WC_SUCCESS", + response->status, + t->infiniband->wcStatusToString(response->status)); + // t->postSrqReceiveAndKickTransmit(t->clientSrq, bd); + throw TransportException(HERE, response->status); + } + foreach (ClientRpc& rpc, t->outstandingRpcs) { + if (rpc.idRDMA != response->wr_id) + continue; + t->outstandingRpcs.erase(t->outstandingRpcs.iterator_to(rpc)); + // XXX- The following line is nasty and should be fixed + // If we don't do so we'll get a "too-short response" error + rpc.response->alloc(sizeof(WireFormat::BackupWrite::Response)); + rpc.state = ClientRpc::RESPONSE_RECEIVED; + metrics->transport.receive.ticks += receiveTicks.stop(); + rpc.notifier->completed(); + t->clientRpcPool.destroy(&rpc); + if (t->outstandingRpcs.empty()) + t->clientRpcsActiveTime.destroy(); + break; + + } + freeTxBuffers.push_back(bd); + continue; + } + + pendingOutputBytes = bd->messageBytes; if (retArray[i].status != IBV_WC_SUCCESS) { LOG(ERROR, "Transmit failed for buffer %lu: destination " "lid %u, status %s, opcode %s", @@ -1041,7 +1106,8 @@ InfRcTransport::getServiceLocator() */ void InfRcTransport::sendZeroCopy(uint64_t nonce, Buffer* message, QueuePair* qp, - bool response) + bool response, Transport::RDMAHandle* rdmaHandle, uint64_t* idRDMA, + InfRcTransport* transport) { const bool allowZeroCopy = true; uint32_t lastChunkIndex = message->getNumberChunks() - 1; @@ -1050,6 +1116,7 @@ InfRcTransport::sendZeroCopy(uint64_t nonce, Buffer* message, QueuePair* qp, uint32_t chunksUsed = 0; uint32_t sgesUsed = 0; BufferDescriptor* bd = getTransmitBuffer(); + //BufferDescriptor* bd = getTransmitBuffer(transport); bd->messageBytes = message->size(); bd->remoteLid = qp->getRemoteLid(); bd->response = response; @@ -1061,8 +1128,10 @@ InfRcTransport::sendZeroCopy(uint64_t nonce, Buffer* message, QueuePair* qp, char* unaddedStart = bd->buffer; char* unaddedEnd = bd->buffer; - *(reinterpret_cast<uint64_t*>(unaddedStart)) = nonce; - unaddedEnd += sizeof(nonce); + if (!rdmaHandle){ + *(reinterpret_cast<uint64_t*>(unaddedStart)) = nonce; + unaddedEnd += sizeof(nonce); + } Buffer::Iterator it(message); while (!it.isDone()) { @@ -1127,10 +1196,22 @@ InfRcTransport::sendZeroCopy(uint64_t nonce, Buffer* message, QueuePair* qp, memset(&txWorkRequest, 0, sizeof(txWorkRequest)); txWorkRequest.wr_id = reinterpret_cast<uint64_t>(bd);// stash descriptor ptr + txWorkRequest.next = NULL; txWorkRequest.sg_list = isge; txWorkRequest.num_sge = sgesUsed; - txWorkRequest.opcode = IBV_WR_SEND; + + //Add the RDMA write stuff + if (rdmaHandle){ + *idRDMA = txWorkRequest.wr_id; + txWorkRequest.opcode = IBV_WR_RDMA_WRITE; + txWorkRequest.wr.rdma.remote_addr = rdmaHandle->remote_addr + + rdmaHandle->offset; + txWorkRequest.wr.rdma.rkey = rdmaHandle->rkey; + } else { + txWorkRequest.opcode = IBV_WR_SEND; + } + txWorkRequest.send_flags = IBV_SEND_SIGNALED; // We can get a substantial latency improvement (nearly 2usec less per RTT) @@ -1230,8 +1311,9 @@ InfRcTransport::ServerRpc::sendReply() replyPayload.size(), t->getMaxRpcSize())); } - + //t->sendZeroCopy(nonce, &replyPayload, qp, true, NULL, NULL, t); t->sendZeroCopy(nonce, &replyPayload, qp, true); + //t->sendZeroCopy(nonce, &replyPayload, qp, true, NULL, NULL, t); interval.stop(); // Restart port watchdog for this server port @@ -1291,6 +1373,7 @@ InfRcTransport::ClientRpc::ClientRpc(InfRcTransport* transport, , response(response) , notifier(notifier) , nonce(nonce) + , idRDMA(0) , waitStart(0) , state(PENDING) , queueEntries() @@ -1315,7 +1398,7 @@ InfRcTransport::ClientRpc::sendOrQueue() } ++metrics->transport.transmit.messageCount; ++metrics->transport.transmit.packetCount; - + //t->sendZeroCopy(nonce, request, session->qp, false, NULL, NULL, t); t->sendZeroCopy(nonce, request, session->qp, false); t->outstandingRpcs.push_back(*this); @@ -1329,6 +1412,33 @@ InfRcTransport::ClientRpc::sendOrQueue() } } + +/** + * Send the RPC request out onto the network if there is a receive buffer + * available for its response, or queue it for transmission otherwise. + */ +void +InfRcTransport::ClientRpc::sendOrQueue(Transport::RDMAHandle* rdmaHandle) +{ + assert(state == PENDING); + InfRcTransport* const t = transport; + // send out the request + if (t->outstandingRpcs.empty()) { + t->clientRpcsActiveTime.construct( + &metrics->transport.clientRpcsActiveTicks); + } + ++metrics->transport.transmit.messageCount; + ++metrics->transport.transmit.packetCount; + + t->sendZeroCopy(nonce, request, session->qp, false, rdmaHandle, &idRDMA, + transport); + + t->outstandingRpcs.push_back(*this); + //session->sessionAlarm.rpcStarted(); + //++t->numUsedClientSrqBuffers; + state = REQUEST_SENT; +} + /** * This method is invoked by the dispatcher's inner polling loop; it * checks for incoming RPC requests and responses and processes them. @@ -1498,7 +1608,8 @@ InfRcTransport::Poller::poll() metrics->transport.receive.ticks += receiveTicks.stop(); } } - + // XXX- removing this caused a deadlock + //t->reapTxBuffers(transport); done: // Retrieve transmission buffers from the NIC once they have been // sent. It's done here in the hopes that it will happen when we @@ -1508,6 +1619,7 @@ InfRcTransport::Poller::poll() // optimization improves the throughput of "clusterperf readThroughput" // by about 5% (as of 7/2015). if (t->freeTxBuffers.size() < 3) { + //t->reapTxBuffers(transport); t->reapTxBuffers(); if (t->freeTxBuffers.size() >= 3) { foundWork = 1; diff --git a/src/InfRcTransport.h b/src/InfRcTransport.h index cc217065acb9ea5e198c47be9f157c4baeb09af1..9e2389d9152c344fcca92e0489661c9990f48d5d 100755 --- a/src/InfRcTransport.h +++ b/src/InfRcTransport.h @@ -41,6 +41,7 @@ #include "RawMetrics.h" #include "DispatchExec.h" #include "PerfCounter.h" +#include "Transaction.h" #ifndef RAMCLOUD_INFRCTRANSPORT_H #define RAMCLOUD_INFRCTRANSPORT_H @@ -89,19 +90,32 @@ class InfRcTransport : public Transport { * \param bytes * Length of the region starting at \a base to register with the HCA. */ - void registerMemory(void* base, size_t bytes) + void registerMemory(void* base, size_t bytes, + Transport::RDMAHandle* rdmaHandle) { assert(logMemoryRegion == NULL); - logMemoryRegion = ibv_reg_mr(infiniband->pd.pd, base, bytes, + ibv_mr* mr = ibv_reg_mr(infiniband->pd.pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); - if (logMemoryRegion == NULL) { + if (mr == NULL) { LOG(ERROR, "ibv_reg_mr failed to register %Zd bytes at %p", bytes, base); - throw TransportException(HERE, "ibv_reg_mr failed"); + throw TransportException(HERE, "ibv_reg_mr failed"); + } + if (rdmaHandle) { + // We just want to register some memory with the controller and + // set the RDMA handle. + rdmaHandle->remote_addr = (uint64_t) mr->addr; + rdmaHandle->rkey = mr->rkey; + LOG(DEBUG, "ibv_reg_mr at %p rkey : %u, raddr : %lu with %zu", + base, mr->rkey, (uint64_t) base, mr->length); + } else { + // We are initializing the log memory region. + assert(logMemoryRegion == NULL); + logMemoryRegion = mr; + logMemoryBase = reinterpret_cast<uintptr_t>(base); + logMemoryBytes = bytes; + RAMCLOUD_LOG(NOTICE, "Registered %Zd bytes at %p", bytes, base); } - logMemoryBase = reinterpret_cast<uintptr_t>(base); - logMemoryBytes = bytes; - RAMCLOUD_LOG(NOTICE, "Registered %Zd bytes at %p", bytes, base); } static void setName(const char* name); @@ -130,6 +144,7 @@ class InfRcTransport : public Transport { RpcNotifier* notifier, uint64_t nonce); void sendOrQueue(); + void sendOrQueue(Transport::RDMAHandle* rdmaHandle); PRIVATE: InfRcTransport* transport; @@ -144,7 +159,7 @@ class InfRcTransport : public Transport { /// Uniquely identifies the RPC. uint64_t nonce; - + uint64_t idRDMA; /// If the RPC couldn't immediately be sent because there /// weren't enough client receive buffers available, this /// records the start of the waiting time, so we can print @@ -199,6 +214,8 @@ class InfRcTransport : public Transport { virtual string getRpcInfo(); virtual void sendRequest(Buffer* request, Buffer* response, RpcNotifier* notifier); + virtual void sendRDMARequest(Buffer* request, Buffer* response, + RpcNotifier* notifier, Transport::RDMAHandle* rdmaHandle); PRIVATE: // Transport that manages this session. @@ -293,7 +310,9 @@ class InfRcTransport : public Transport { // misc helper functions const char* getOpcodeFromBuffer(BufferDescriptor* bd); void sendZeroCopy(uint64_t nonce, Buffer* message, QueuePair* qp, - bool response); + bool response, Transport::RDMAHandle* rdmaHandle = NULL , + uint64_t* idRDMA = NULL, + InfRcTransport* transport = NULL); void setNonBlocking(int fd); // Extend Infiniband::postSrqReceive by issuing queued up transmissions @@ -301,10 +320,10 @@ class InfRcTransport : public Transport { // Grab a transmit buffer from our free list, or wait for completions if // necessary. - BufferDescriptor* getTransmitBuffer(); + BufferDescriptor* getTransmitBuffer(InfRcTransport* transport); // Pull TX buffers from completion queue and add to freeTxBuffers. - int reapTxBuffers(); + int reapTxBuffers(InfRcTransport* transport = NULL); // queue pair connection setup helpers QueuePair* clientTrySetupQueuePair(IpAddress& address); diff --git a/src/InfUdDriver.cc b/src/InfUdDriver.cc index e98a312b40310397a23f0adc1be0c1fced9439eb..470bfbf74872d5f0e34fcef6b336f36ba7726013 100755 --- a/src/InfUdDriver.cc +++ b/src/InfUdDriver.cc @@ -160,8 +160,7 @@ InfUdDriver::InfUdDriver(Context* context, const ServiceLocator *sl, throw DriverException(HERE, errno); } - qp = infiniband->createQueuePair(localMac ? IBV_QPT_RAW_ETH - : IBV_QPT_UD, + qp = infiniband->createQueuePair(IBV_QPT_UD, ibPhysicalPort, NULL, txcq, rxcq, MAX_TX_QUEUE_DEPTH, MAX_RX_QUEUE_DEPTH, @@ -295,7 +294,8 @@ InfUdDriver::reapTransmitBuffers() * See docs in the ``Driver'' class. */ void -InfUdDriver::registerMemory(void* base, size_t bytes) +InfUdDriver::registerMemory(void* base, size_t bytes, Transport::RDMAHandle* + rdmaHandle) { // We can only remember one region (the first) if (zeroCopyRegion == NULL) { diff --git a/src/InfUdDriver.h b/src/InfUdDriver.h index 16b3a003beea0f5da3019bc12953bcaf3e8ac045..e9d050d0e348e724e17e5b828b7935face0a77ac 100755 --- a/src/InfUdDriver.h +++ b/src/InfUdDriver.h @@ -51,7 +51,8 @@ class InfUdDriver : public Driver { virtual int getTransmitQueueSpace(uint64_t currentTime); virtual void receivePackets(uint32_t maxPackets, std::vector<Received>* receivedPackets); - virtual void registerMemory(void* base, size_t bytes); + virtual void registerMemory(void* base, size_t bytes, + Transport::RDMAHandle* rdmaHandle); virtual void release(char *payload); virtual void sendPacket(const Driver::Address* addr, const void* header, uint32_t headerLen, Buffer::Iterator* payload, diff --git a/src/Infiniband.cc b/src/Infiniband.cc index 93344f75aad2eb99c431dea5cce1444238237e41..36b1a410b344f52d156cb687458950a5d152ac0d 100755 --- a/src/Infiniband.cc +++ b/src/Infiniband.cc @@ -520,7 +520,7 @@ Infiniband::QueuePair::QueuePair(Infiniband& infiniband, ibv_qp_type type, peerLid(0) { snprintf(peerName, sizeof(peerName), "?unknown?"); - if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_ETH) + if (type != IBV_QPT_RC && type != IBV_QPT_UD) //&& type != IBV_QPT_RAW_ETH) throw TransportException(HERE, "invalid queue pair type"); ibv_qp_init_attr qpia; @@ -564,8 +564,8 @@ Infiniband::QueuePair::QueuePair(Infiniband& infiniband, ibv_qp_type type, mask |= IBV_QP_QKEY; mask |= IBV_QP_PKEY_INDEX; break; - case IBV_QPT_RAW_ETH: - break; + //case IBV_QPT_RAW_ETH: + // break; default: assert(0); } @@ -683,7 +683,7 @@ void Infiniband::QueuePair::activate(const Tub<MacAddress>& localMac) { ibv_qp_attr qpa; - if (type != IBV_QPT_UD && type != IBV_QPT_RAW_ETH) + if (type != IBV_QPT_UD) //&& type != IBV_QPT_RAW_ETH) throw TransportException(HERE, "activate() called on wrong qp type"); if (getState() != IBV_QPS_INIT) { @@ -704,16 +704,16 @@ Infiniband::QueuePair::activate(const Tub<MacAddress>& localMac) // now move to RTS state qpa.qp_state = IBV_QPS_RTS; int flags = IBV_QP_STATE; - if (type != IBV_QPT_RAW_ETH) { + //if (type != IBV_QPT_RAW_ETH) { qpa.sq_psn = initialPsn; flags |= IBV_QP_SQ_PSN; - } + //} ret = ibv_modify_qp(qp, &qpa, flags); if (ret) { LOG(ERROR, "failed to transition to RTS state"); throw TransportException(HERE, ret); } - +/* if (type == IBV_QPT_RAW_ETH) { ibv_gid mgid; memset(&mgid, 0, sizeof(mgid)); @@ -723,6 +723,7 @@ Infiniband::QueuePair::activate(const Tub<MacAddress>& localMac) throw TransportException(HERE, ret); } } +*/ } /** diff --git a/src/Log.cc b/src/Log.cc index d928765f491e8de9b6f6497a07eac992b3b0bf01..ec396b0742f6d2a0de85949106ec8dff91404f37 100755 --- a/src/Log.cc +++ b/src/Log.cc @@ -219,7 +219,7 @@ lock.construct(appendLock); originalHead->syncedLength = appendedLength; //RAMCLOUD_LOG(DEBUG, "log synced"); } else { - RAMCLOUD_LOG(DEBUG, "sync not needed: already fully replicated"); + RAMCLOUD_CLOG(DEBUG, "sync not needed: already fully replicated"); } } diff --git a/src/LogEntryTypes.cc b/src/LogEntryTypes.cc index 7318bb953a04e0cb969e1e94cad5e05fc05edb1b..db151e7a8557729bae08b62e369de6e87fb774e8 100755 --- a/src/LogEntryTypes.cc +++ b/src/LogEntryTypes.cc @@ -52,6 +52,8 @@ toString(LogEntryType type) return "Transaction Participant List Record"; case LOG_ENTRY_TYPE_BATCH: return "Batch Records"; + case LOG_ENTRY_TYPE_CHECKSUM: + return "Segment Checksum Entry"; default: return "<<Unknown>>"; } diff --git a/src/LogEntryTypes.h b/src/LogEntryTypes.h index 8918b9fc97c6b2799b6c5ce38c0a9cb91377b9b8..88018287914c4e2ad00cabd2f96667090f78ce0f 100755 --- a/src/LogEntryTypes.h +++ b/src/LogEntryTypes.h @@ -63,6 +63,8 @@ enum LogEntryType { /// Describes a batch of records LOG_ENTRY_TYPE_BATCH, + + LOG_ENTRY_TYPE_CHECKSUM, /// Not a type, but rather the total number of types we have defined. /// This is currently restricted by the lower 6 bits in a uint8_t field diff --git a/src/LogMetadata.h b/src/LogMetadata.h index a69fb139e21dbf7b2870dfc19b53e690c751a3cd..3a859721868fc1775e6de410a7f91e888c664a6e 100755 --- a/src/LogMetadata.h +++ b/src/LogMetadata.h @@ -133,6 +133,7 @@ class SegmentCertificate { friend class Segment; friend class SegmentIterator; + friend class MultiFileStorage; } __attribute__((__packed__)); static_assert(sizeof(SegmentCertificate) == 8, "Unexpected padding in SegmentCertificate"); diff --git a/src/MasterService.cc b/src/MasterService.cc index 6de369c12328e07ebea2366357aef2a52cea48b5..0b15c7952ff73d3703476226b3bb9d0f3a53dffb 100755 --- a/src/MasterService.cc +++ b/src/MasterService.cc @@ -2164,7 +2164,7 @@ MasterService::writeBatch(const WireFormat::BatchWrite::Request* reqHdr, rpc->sendReply(); return; }*/ - // RAMCLOUD_CLOG(NOTICE, "Received a request with %u records", reqHdr->recordsCount); + RAMCLOUD_CLOG(DEBUG, "Received a request with %u records", reqHdr->recordsCount); uint32_t reqOffset = sizeof32(*reqHdr); uint64_t streamId = reqHdr->tableId; @@ -2188,7 +2188,7 @@ MasterService::writeBatch(const WireFormat::BatchWrite::Request* reqHdr, RAMCLOUD_LOG(DEBUG, "Not enough space to append!"); throw RetryException(HERE, 1000, 2000, "Memory capacity exceeded"); } - // RAMCLOUD_CLOG(DEBUG, ">>> Done request: length=%u appended=%u", reqHdr->length, respHdr->numberObjectsAppended); + RAMCLOUD_CLOG(DEBUG, ">>> Done request: length=%u appended=%u", reqHdr->length, respHdr->numberObjectsAppended); respHdr->common.status = STATUS_OK; PerfStats::threadStats.writeCount += reqHdr->recordsCount; //assert(currentResp->numberObjectsAppended > 0); diff --git a/src/MockTransport.h b/src/MockTransport.h index 535374732b034948c374e30c7040164c1b6aa4ee..5a8f139d483a2094396ac8c971910d991a660bb5 100755 --- a/src/MockTransport.h +++ b/src/MockTransport.h @@ -44,7 +44,8 @@ class MockTransport : public Transport { virtual Transport::SessionRef getSession(); - void registerMemory(void* base, size_t bytes) { + void registerMemory(void* base, size_t bytes, + Transport::RDMAHandle* rdmaHandle = NULL) { RAMCLOUD_TEST_LOG("register %d bytes at %lu for %s", static_cast<int>(bytes), reinterpret_cast<uint64_t>(base), diff --git a/src/MultiFileStorage.cc b/src/MultiFileStorage.cc index bf422dfa6a9367f40f2a54248627502aff193285..d205758db678ea1f9afc47134e1be658a82618a1 100755 --- a/src/MultiFileStorage.cc +++ b/src/MultiFileStorage.cc @@ -31,6 +31,8 @@ #include "RawMetrics.h" #include "ShortMacros.h" #include "PerfStats.h" +#include "Transaction.h" +#include "SegmentIterator.h" namespace RAMCloud { @@ -363,9 +365,12 @@ MultiFileStorage::Frame::append(Buffer& source, } if (!isSynced()) { - if (sync) { + if (sync) { performWrite(lock); } else { + committedLength = appendedLength; + committedMetadataVersion = appendedMetadataVersion; + return; schedule(lock, LOW); } } @@ -406,6 +411,9 @@ MultiFileStorage::Frame::close() // at low priority. Now that it's closed, raise the priority // so it gets to secondary storage quickly and we can free its // buffer in memory. + committedLength = appendedLength; + committedMetadataVersion = appendedMetadataVersion; + return; schedule(lock, HIGH); } } @@ -859,6 +867,62 @@ MultiFileStorage::Frame::isSynced() const (appendedMetadataVersion == committedMetadataVersion); } + + + +/* + * + */ +void +MultiFileStorage::Frame::fixRDMASegment() { + + //We don't know the actual size of the segment if it was replicated using + //RDMAs. This will help us iterate over all data + + //Basically we're just adding the certificate here + const BackupReplicaMetadata* metadata = + static_cast<const BackupReplicaMetadata*>(this->getMetadata()); + SegmentCertificate certificate = metadata->certificate; + certificate.segmentLength = Segment::DEFAULT_SEGMENT_SIZE; + //This is unsafe to use, it is just a temporary hack to extract the + //latest checksum from the buffer. + SegmentIterator it(this->buffer.get(), certificate.segmentLength, + metadata->certificate); + + it.setLimit(certificate.segmentLength); + Segment* segment = it.getSegment(); + uint32_t checksum = 0; + uint32_t offset; + Crc32C currentChecksum; + + checksum = segment->fixRDMASegment(certificate.segmentLength, offset, + currentChecksum); + + LOG(NOTICE,"Size of the open segment: %u -" + " lastChecksum: 0x%08x -" , + offset, checksum); + if (checksum != 0) { + certificate.segmentLength = offset; + certificate.checksum = reinterpret_cast<Crc32C::ResultType>(checksum); + currentChecksum.update(&certificate, static_cast<unsigned> + (sizeof(certificate)-sizeof(certificate.checksum))); + certificate.checksum = currentChecksum.getResult(); + Tub<BackupReplicaMetadata> newMetadata; + newMetadata.construct(certificate, + metadata->serverId, + metadata->streamId, metadata->streamletId, metadata->groupId, + metadata->logId, metadata->segmentId, + offset, + metadata->segmentEpoch, + metadata->closed, metadata->primary); + memcpy(appendedMetadata.get(), + reinterpret_cast<const void *>(newMetadata.get()), + sizeof(newMetadata)); + } + +}; + + // --- MultiFileStorage::BufferDeleter --- /** @@ -891,6 +955,9 @@ MultiFileStorage::BufferDeleter::operator()(void* buffer) if (storage->buffers.size() >= MAX_POOLED_BUFFERS) { std::free(buffer); } else { + // LOG(WARNING,"BufferDeleter memset"); + memset(buffer, '\0', this->storage->getSegmentSize() + + METADATA_SIZE); storage->buffers.push(buffer); } } @@ -925,6 +992,7 @@ MultiFileStorage::MultiFileStorage(size_t segmentSize, size_t frameCount, size_t writeRateLimit, size_t maxWriteBuffers, + Context* context, const char* filePathsStr, int openFlags) : BackupStorage(segmentSize, Type::DISK, writeRateLimit) @@ -943,6 +1011,8 @@ MultiFileStorage::MultiFileStorage(size_t segmentSize, , maxWriteBuffers(maxWriteBuffers) , bufferDeleter(this) , buffers() + , rdmaHandles() + , sSize(segmentSize) { assert(filePathsStr); @@ -1007,10 +1077,36 @@ MultiFileStorage::MultiFileStorage(size_t segmentSize, { // Pre-fill the buffer pool. std::vector<BufferPtr> buffers; - for (int i = 0; i < INIT_POOLED_BUFFERS; ++i) - buffers.emplace_back(allocateBuffer()); + + for (int i = 0; i < INIT_POOLED_BUFFERS; ++i) { + + //For now I don't know why I have to add METADATA_SIZE otherwise + //Replications gets stuck as some point + //TODO + void* block = Memory::xmemalign(HERE, BUFFER_ALIGNMENT, + segmentSize + METADATA_SIZE); + memset(block, '\0', segmentSize + METADATA_SIZE); + buffers.emplace_back(BufferPtr{block, bufferDeleter}); + //Register the buffers for RDMA replication + Transport::RDMAHandle* rdmaHandle = new Transport::RDMAHandle(); + context->transportManager->registerMemory( + block, + segmentSize + METADATA_SIZE, + rdmaHandle); + auto pair = std::make_pair(block, *rdmaHandle); + this->rdmaHandles.insert(pair); + std::map<void*, Transport::RDMAHandle>::iterator it = + this->rdmaHandles.find(block); + if (it == this->rdmaHandles.end()){ + LOG(ERROR, "Insert in rdmaHandles failed"); + assert(0); } + } + + } + + for (size_t frame = 0; frame < frameCount; ++frame) frames.emplace_back(this, frame); @@ -1047,16 +1143,60 @@ MultiFileStorage::~MultiFileStorage() MultiFileStorage::BufferPtr MultiFileStorage::allocateBuffer() { + /* if (buffers.empty()) { void* block = Memory::xmemalign(HERE, BUFFER_ALIGNMENT, segmentSize + METADATA_SIZE); buffers.push(block); } + */ void* buffer = buffers.top(); buffers.pop(); return BufferPtr{buffer, bufferDeleter}; } +/** + * Fetches the RDMAHandle associated with the current buffer + * @param frame + * @return The RDMAHandle associated with the current buffers + * + */ +//I think we need to move it later to BackupService in order to support other +//Storage classes. +//TODO +Transport::RDMAHandle* +MultiFileStorage::getRDMAHandle(MultiFileStorage::FrameRef frame) +{ + + Frame* f = reinterpret_cast<Frame*>(frame.get()); + Transport::RDMAHandle* rdmaHandle = new Transport::RDMAHandle; + void* buffer_key = f->buffer.get(); + if (buffer_key == NULL) + LOG(ERROR, "Failed to get buffer from a frame"); + std::map<void*, Transport::RDMAHandle>::iterator it = + rdmaHandles.find(buffer_key); + if (it != rdmaHandles.end()) + *rdmaHandle = it->second; + else + // This address is unregistered; register it + LOG(ERROR, "Failed to get a RDMAHandle for a frame \n" + "RDMAHandles size : %zu ---- Buffers size : %zu " + ,rdmaHandles.size(), buffers.size()); + //assert(0); + + // This address is unregistered; register it + /* + LOG(DEBUG, "RDMAHandles size : %zu ", rdmaHandles.size()); + LOG(DEBUG, "Buffers size : %zu ", buffers.size()); + LOG(DEBUG, "@MultiFileStorage.cc:1105, return:" + "remote_addr = %lu" + "rkey = %u ", + rdmaHandle->remote_addr, + rdmaHandle->rkey); + */ + return rdmaHandle; +} + /** * Allocate a frame on storage, resetting its state to accept appends for a new * replica. Open is not synchronous itself. Even after return from open() if diff --git a/src/MultiFileStorage.h b/src/MultiFileStorage.h index 9591b6b5c61967cd22599cb8b80f95aaaa557f63..228330af3a55f89e55c2ed8153b86386ddde7ac1 100755 --- a/src/MultiFileStorage.h +++ b/src/MultiFileStorage.h @@ -21,6 +21,7 @@ #include "Common.h" #include "BackupStorage.h" #include "PriorityTaskQueue.h" +#include "Transport.h" namespace RAMCloud { @@ -92,6 +93,22 @@ class MultiFileStorage : public BackupStorage { void performWrite(Lock& lock); bool isSynced() const; + + void fixRDMASegment(); + void reclaimBuffer() { + committedLength = appendedLength; + committedMetadataVersion = appendedMetadataVersion; + isClosed = true; + // Release the in-memory copy if it won't be used again. + if (buffer) { + assert(isSynced()); + buffer.reset(); + --storage->writeBuffersInUse; + isWriteBuffer = false; + RAMCLOUD_LOG(NOTICE, "Released a buffer"); + } + storage->freeMap[frameIndex] = 1; + }; /// Storage where this frame resides. MultiFileStorage* storage; @@ -227,6 +244,7 @@ class MultiFileStorage : public BackupStorage { size_t frameCount, size_t writeRateLimit, size_t maxNonVolatileBuffers, + Context* context, const char* filePaths, int openFlags = 0); ~MultiFileStorage(); @@ -242,13 +260,16 @@ class MultiFileStorage : public BackupStorage { void quiesce(); void fry(); - BufferPtr allocateBuffer(); - + BufferPtr allocateBuffer(); /** * Internal use only; block size of storage. Needed to deal * with alignment constraints for O_DIRECT. */ enum { BLOCK_SIZE = 512 }; + + + + size_t getSegmentSize() { return this->sSize;}; static_assert(sizeof(Superblock) < BLOCK_SIZE, "Superblock doesn't fit in a single disk block"); /** @@ -358,7 +379,17 @@ class MultiFileStorage : public BackupStorage { * managed by allocateBuffer() and #bufferDeleter. */ std::stack<void*, std::vector<void*>> buffers; - + + + std::map<void*, Transport::RDMAHandle> rdmaHandles; + + Transport::RDMAHandle* getRDMAHandle(MultiFileStorage::FrameRef frame); + + /* The size (maximum) of a segment that was given when constucting this + * object + */ + size_t sSize; + DISALLOW_COPY_AND_ASSIGN(MultiFileStorage); }; diff --git a/src/MultiLog.cc b/src/MultiLog.cc index a7e2a3d68234c3184f6f552ff6de0f6031c42093..7dabbdda2c4581cdf9c06afd7639bfacfacfac1d 100755 --- a/src/MultiLog.cc +++ b/src/MultiLog.cc @@ -179,7 +179,7 @@ bool MultiLog::appendMultipleObjects(uint64_t streamId, Buffer& objectsBuffer, } - *numberObjectsAppended += recordsCount; + *numberObjectsAppended = recordsCount; lock.destroy(); log->sync(); diff --git a/src/ReplicatedSegment.cc b/src/ReplicatedSegment.cc index 4090f269eaf4f0eda7d3ed799745e77d0d7c7d1f..6addda29a8106b6f984e156a7e4e0d65b3984445 100755 --- a/src/ReplicatedSegment.cc +++ b/src/ReplicatedSegment.cc @@ -792,15 +792,15 @@ ReplicatedSegment::performWrite(Replica& replica) // handleBackupFailure to reset the replica and break this // loop. replica.sent = replica.acked; - LOG(WARNING, "Couldn't write to backup %s; server is down", + RAMCLOUD_CLOG(WARNING, "Couldn't write to backup %s; server is down", replica.backupId.toString().c_str()); } catch (const BackupOpenRejectedException& e) { // The open request was rejected; typically happens when // backups get overloaded because write traffic exceeds // bandwidth of secondary storage. Try assigning a // different backup for this replica. - LOG(WARNING, "BackupOpenRejectedException"); - replica.reset(replica.replacesLostReplica); + RAMCLOUD_CLOG(WARNING, "BackupOpenRejectedException"); + replica.reset(replica.replacesLostReplica); } catch (const CallerNotInClusterException& e) { // The backup seems to think we have crashed (or never existed). // Check with the coordinator to be sure, then retry. See @@ -871,7 +871,7 @@ ReplicatedSegment::performWrite(Replica& replica) length = 0; } - RAMCLOUD_LOG(NOTICE, "Sending open to backup %s -- %s -- segment %lu -- replica %lu", + RAMCLOUD_CLOG(NOTICE, "Sending open to backup %s -- %s -- segment %lu -- replica %lu", replica.backupId.toString().c_str(), replica.isActive ? "Active replica" : "Inactive replica!", segmentId, &replica - &replicas[0]); replica.writeRpc.construct(context, replica.backupId, masterId, streamId, streamletId, groupId, diff --git a/src/Seglet.h b/src/Seglet.h index 6e499473abf56fe1056c855c9e4619e8b14029a1..d89570038e1409d9726fcc8b5ae7abc7b61f9805 100755 --- a/src/Seglet.h +++ b/src/Seglet.h @@ -41,7 +41,7 @@ class Seglet { /// The default seglet size is a reasonable trade-off between reducing /// fragmentation and keeping the number of seglets per segment relatively /// small. - enum { DEFAULT_SEGLET_SIZE = 8 * 1024 * 1024 }; + enum { DEFAULT_SEGLET_SIZE = 16 * 1024 * 1024 }; Seglet(SegletAllocator& segletAllocator, void* buffer, uint32_t length); void free(); diff --git a/src/Segment.cc b/src/Segment.cc index 251ccb6cc9ce29655993c7bce49296d96e475df7..9d2098eb3fe157296f7e33113eab1212f0afec2b 100755 --- a/src/Segment.cc +++ b/src/Segment.cc @@ -1017,6 +1017,117 @@ Segment::checkStreamSegmentIntegrity(const SegmentCertificate& certificate) return true; } + +uint32_t +Segment::fixRDMASegment(uint32_t segmentLength, + uint32_t &lastOffset, + Crc32C ¤tChecksum) +{ + uint32_t offset = 0; + uint32_t lastCRC = 0; + uint32_t lastCRCOffset = 0; + LOG(NOTICE,"Fixing the last (RDMA) segment found on this backup"); + const void* unused = NULL; + uint64_t realSize = 0; + while (offset < segmentLength && peek(offset, &unused) > 0) { + EntryHeader header = getEntryHeader(offset); + + if (header.getType() == LOG_ENTRY_TYPE_INVALID) { + + LOG(WARNING, "Found invalid entry at offset: %u", offset); + bool end = true; + char zeros[4]; + copyOut(offset + sizeof32(header), + zeros, 4); + //TODO: What's the probability that 32 bits are equal 0 ? + for (int i=0; i<4; i++) if (zeros[i] != '\0') end = false; + + if (end) { + LOG(WARNING, "End of an RDMA segment at: %u", offset); + lastOffset = + lastCRCOffset + sizeof32(uint32_t) + + sizeof32(EntryHeader); + return lastCRC; + } + + } + //Fist update the calculated checksum to compared it with the stored one + currentChecksum.update(&header, sizeof(header)); + if (header.getType() == LOG_ENTRY_TYPE_CHECKSUM) { + //tempCRC stores the checksum located at the current segment offset + uint32_t tempCRC = 0; + copyOut(offset + sizeof32(header), + &tempCRC, sizeof32(uint32_t)); + if (currentChecksum.getResult() != tempCRC) { + /* We check if the checksum's value is actually zero. If it is + * not, then we reached the end of the segment and we shall + * rollback to the last valid checksum + */ + LOG(WARNING, "Checksums not equal found: %u and actual one: %u", + tempCRC, currentChecksum.getResult()); + if (currentChecksum.getResult() == 0 && tempCRC == 1) { + lastCRC = 0UL; + lastCRCOffset = offset; + } + else { + lastOffset = lastCRCOffset + sizeof32(uint32_t) + + sizeof32(EntryHeader); + LOG(WARNING, "End of an RDMA segment at: %u", offset); + return lastCRC; + } + + } else { + lastCRC = tempCRC; + lastCRCOffset = offset; + } + + + } + // LOG(NOTICE, "CRC is 0x%08x", currentChecksum.getResult()); + + //certificate.checksum = currentChecksum.getResult(); + //this->checksum = currentChecksum; + + uint32_t length = 0; + copyOut(offset + sizeof32(header), &length, header.getLengthBytes()); + currentChecksum.update(&length, header.getLengthBytes()); + if (header.getType() == LOG_ENTRY_TYPE_CHECKSUM) + length = sizeof32(uint32_t); + + offset += (sizeof32(header) + header.getLengthBytes() + length); + lastOffset = offset; + size_t segmentSize = segletBlocks.size() * segletSize; + realSize += sizeof32(header) + length + header.getLengthBytes(); + // LOG(DEBUG, "Entry is: %s -- length is: %u -- RealSize is %lu --", + // LogEntryTypeHelpers::toString(header.getType()), length, + // realSize); + if (offset > segmentSize) { + LOG(WARNING, "segment corrupt: entries run off past " + "allocated segment size (segment size %lu, next entry would " + "have started at %u)", + segmentSize, offset); + return 0; + } + } + if (offset > segmentLength) { + LOG(WARNING, "segment corrupt: entries run off past expected " + "length (expected %u, next entry would have started at %u)", + segmentLength, offset); + return 0; + } +/* + currentChecksum.update(&certificate, static_cast<unsigned> + (sizeof(certificate)-sizeof(certificate.checksum))); + + if (certificate.checksum != currentChecksum.getResult()) { + LOG(WARNING, "segment corrupt: bad checksum (expected 0x%08x, " + "was 0x%08x)", certificate.checksum, currentChecksum.getResult()); + return false; + } +*/ + return lastCRC; +} + /** * Copy data out of the segment and into a contiguous output buffer. * diff --git a/src/Segment.h b/src/Segment.h index 3fada212e8a66ab26dde5dc98a0b68f0b88d42ca..b7037ee5412c41dd42835bfb54530268558fcf35 100755 --- a/src/Segment.h +++ b/src/Segment.h @@ -135,7 +135,7 @@ class Segment { // can't use more than 1M, see http://bugs.kde.org/show_bug.cgi?id=203877 enum { DEFAULT_SEGMENT_SIZE = 1 * 1024 * 1024 }; #else - enum { DEFAULT_SEGMENT_SIZE = 8 * 1024 * 1024 }; + enum { DEFAULT_SEGMENT_SIZE = 16 * 1024 * 1024 }; #endif // PRIVATE: @@ -373,6 +373,8 @@ class Segment { bool checkMetadataIntegrity(const SegmentCertificate& certificate, uint32_t* offset = NULL); bool checkStreamSegmentIntegrity(const SegmentCertificate& certificate); + uint32_t fixRDMASegment (uint32_t segmentLength, uint32_t &lastOffset, + Crc32C ¤tChecksum); uint32_t copyOut(uint32_t offset, void* buffer, uint32_t length) const; Reference getReference(uint32_t offset); diff --git a/src/SegmentIterator.cc b/src/SegmentIterator.cc index a1938dc31b6a79783b7d2d8ebbcc8594fcc93e2c..e65a6eedd38f96f229457df6cbceb678bd3a8411 100755 --- a/src/SegmentIterator.cc +++ b/src/SegmentIterator.cc @@ -244,9 +244,14 @@ SegmentIterator::getLength() if (!currentLength) { uint32_t length = 0; + if (currentHeader.getType() == LOG_ENTRY_TYPE_CHECKSUM) { + length = sizeof32(uint32_t); + // printf("Checksum getLength; returned %u", length); + } else segment->copyOut(currentOffset + sizeof32(currentHeader), &length, currentHeader.getLengthBytes()); + currentLength.construct(length); } return *currentLength; diff --git a/src/SegmentIterator.h b/src/SegmentIterator.h index 134cafe0b4d1d7d944eba334286fb130a977e4ea..c030c256ef42392901af112479970c62e2fbd99b 100755 --- a/src/SegmentIterator.h +++ b/src/SegmentIterator.h @@ -71,6 +71,8 @@ class SegmentIterator { void checkMetadataIntegrity(uint32_t* offset = NULL); bool peekIntoBatch(); + //XXX + Segment* getSegment() {return this->segment;}; /** * Test if the SegmentIterator has exhausted all entries. More concretely, if * the current entry is valid, this will return false. After next() has been diff --git a/src/ServerMain.cc b/src/ServerMain.cc index 395d09755a3691a523f377de7360c709aea6eae3..f0d3e1c0359e6cb0b3127c3703d4dfb64a506275 100755 --- a/src/ServerMain.cc +++ b/src/ServerMain.cc @@ -180,7 +180,7 @@ main(int argc, char *argv[]) "The server should run the master service only (no backup)") ("maxCores", ProgramOptions::value<uint32_t>( - &config.maxCores)->default_value(4), + &config.maxCores)->default_value(8), "Limit on number of cores to use for the dispatch and worker " "threads. This value should not exceed the number of cores " "available on the machine. RAMCloud will try to keep its usage " @@ -189,7 +189,7 @@ main(int argc, char *argv[]) "include cleaner threads and some other miscellaneous functions.") ("maxNonVolatileBuffers", ProgramOptions::value<uint32_t>( - &config.backup.maxNonVolatileBuffers)->default_value(10), + &config.backup.maxNonVolatileBuffers)->default_value(0), "Maximum number of segments the backup will buffer in memory. The " "value 0 is special: it tells the server to set the " "limit equal to the \"segmentFrames\" value, effectively making " diff --git a/src/TcpTransport.h b/src/TcpTransport.h index e958a4ff32297824b1ef8d5a55917a20e685e7f0..002570e4b9dce144bbcd7b22c6afe785ffb7bb28 100755 --- a/src/TcpTransport.h +++ b/src/TcpTransport.h @@ -49,7 +49,8 @@ class TcpTransport : public Transport { string getServiceLocator() { return locatorString; } - void registerMemory(void* base, size_t bytes) {} + void registerMemory(void* base, size_t bytes, + Transport::RDMAHandle* rdmaHandle = NULL) {} class TcpServerRpc; PRIVATE: diff --git a/src/Transport.h b/src/Transport.h index 35c111b20513cc54e7fb0e21a6414ce4974385f0..9544353c8e7490d053e203431e24c0a8c607390d 100755 --- a/src/Transport.h +++ b/src/Transport.h @@ -60,6 +60,13 @@ class Transport { /// be large enough to hold an 8MB segment plus header information. static const uint32_t MAX_RPC_LEN = ((1 << 23) + 200); + + typedef struct RDMAHandle { + uint64_t remote_addr; + uint32_t rkey; + uint64_t offset; + uint32_t length; + } RDMAHandle; /** * An RPC request that has been received and is either being serviced or * waiting for service. @@ -207,7 +214,8 @@ class Transport { */ virtual void sendRequest(Buffer* request, Buffer* response, RpcNotifier* notifier) {} - + virtual void sendRDMARequest(Buffer* request, Buffer* response, + RpcNotifier* notifier, RDMAHandle* rdmaHandle) {}; /** * Cancel an RPC request that was sent previously. * \param notifier @@ -330,7 +338,9 @@ class Transport { * \bug A real solution requires some means of locking the region (or * a subset thereof) for updates so long as a Transport is using it. */ - virtual void registerMemory(void* base, size_t bytes) {}; + //virtual void registerMemory(void* base, size_t bytes) {}; + virtual void registerMemory(void* base, size_t bytes, + RDMAHandle* rdmaHandle) {}; /// Dump out performance and debugging statistics. virtual void dumpStats() {} diff --git a/src/TransportManager.cc b/src/TransportManager.cc index e4dae2493585d6c01d466e753be044387609cef0..3a5f6f623324cbf84e6b55f938a9e9bb8c91bd16 100755 --- a/src/TransportManager.cc +++ b/src/TransportManager.cc @@ -26,6 +26,7 @@ #include "FailSession.h" #include "WorkerManager.h" #include "WorkerSession.h" +#include "Transaction.h" #ifdef INFINIBAND #include "InfRcTransport.h" @@ -215,7 +216,7 @@ TransportManager::initialize(const char* localServiceLocator) &locator); for (uint32_t j = 0; j < registeredBases.size(); j++) { transport->registerMemory(registeredBases[j], - registeredSizes[j]); + registeredSizes[j], NULL); } if (transports[i] == NULL) { transports[i] = transport; @@ -398,7 +399,7 @@ TransportManager::openSessionInternal(const string& serviceLocator) transports[i] = factory->createTransport(context, NULL); for (uint32_t j = 0; j < registeredBases.size(); j++) { transports[i]->registerMemory(registeredBases[j], - registeredSizes[j]); + registeredSizes[j], NULL); } } catch (TransportException &e) { continue; @@ -442,12 +443,13 @@ TransportManager::openSessionInternal(const string& serviceLocator) * See #Transport::registerMemory. */ void -TransportManager::registerMemory(void* base, size_t bytes) +TransportManager::registerMemory(void* base, size_t bytes, + Transport::RDMAHandle* rdmaHandle) { Dispatch::Lock lock(context->dispatch); foreach (auto transport, transports) { if (transport != NULL) - transport->registerMemory(base, bytes); + transport->registerMemory(base, bytes, NULL); } registeredBases.push_back(base); registeredSizes.push_back(bytes); diff --git a/src/TransportManager.h b/src/TransportManager.h index b7a69b52fb87510bc2f68be0cb5e0aab34c9adce..d2281dbdcb0990aded292e6b996405bdb15e7858 100755 --- a/src/TransportManager.h +++ b/src/TransportManager.h @@ -51,7 +51,8 @@ class TransportManager { Transport::SessionRef getSession(const string& serviceLocator); string getListeningLocatorsString(); Transport::SessionRef openSession(const string& serviceLocator); - void registerMemory(void* base, size_t bytes); + void registerMemory(void* base, size_t bytes, + Transport::RDMAHandle* rdmaHandle = NULL); void dumpStats(); void dumpTransportFactories(); void setSessionTimeout(uint32_t timeoutMs);