forked from pkucoin/yfs-class
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lock_server_cache.cc
149 lines (140 loc) · 4.14 KB
/
lock_server_cache.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// the caching lock server implementation
#include "lock_server_cache.h"
#include <sstream>
#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "lang/verify.h"
#include "handle.h"
#include "tprintf.h"
lock_server_cache::lock_server_cache()
{
}
int lock_server_cache::acquire(lock_protocol::lockid_t lid, std::string id,
int &)
{
std::unique_lock<std::mutex> map_lock(server_mtx);
auto itr = lock_cache.find(lid);
std::shared_ptr<server_lock> cur_lock;
if (itr == lock_cache.end())
{
cur_lock = std::make_shared<server_lock>(id);
cur_lock->status = server_lock::LOCKED;
cur_lock->client_id = id;
lock_cache[lid] = cur_lock;
return lock_protocol::OK;
}
else
{
cur_lock = itr->second;
}
std::unique_lock<std::mutex> single_lock(cur_lock->mtx);
map_lock.unlock();
if (cur_lock->status == server_lock::FREE)
{
cur_lock->status = server_lock::LOCKED;
cur_lock->client_id = id;
if (!cur_lock->waiting_cids.empty())
{
handle cl_handle(id);
rpcc *cl = cl_handle.safebind();
if (cl)
{
int r;
cl->call(rlock_protocol::revoke, lid, r);
}
}
return lock_protocol::OK;
}
else if (cur_lock->status == server_lock::LOCKED)
{
// If lock is server_lock::LOCKED, it means another client holds it AND
// we are the first one to ask for it
// so use revoke rpc to tell the lock owner to give back the lock
handle cl_handle(cur_lock->client_id);
rpcc *cl = cl_handle.safebind();
if (cl)
{
int r;
cur_lock->status = server_lock::REVOKE_SENT;
cur_lock->waiting_cids.push(id);
single_lock.unlock();
cl->call(rlock_protocol::revoke, lid, r);
single_lock.lock();
/* it seems this is overthinking
while (ret != lock_protocol::OK)
{
// If due to any unknown reason, the revoke call fails
// we need to try again until it's OK
// otherwise, with no more revoke the lock would not be returned
ret = cl->call(lock_protocol::revoke, lid, r);
}
*/
return lock_protocol::RETRY;
}
else
{
// rpc conn fails
return lock_protocol::RPCERR;
}
}
else if (cur_lock->status == server_lock::REVOKE_SENT)
{
// server_lock::REVOKE_SENT means that another client holds the lock AND
// there have been other clients asking and waiting for it AND
// a revoke has been sent
cur_lock->waiting_cids.push(id);
return lock_protocol::RETRY;
}
return lock_protocol::OK;
}
int
lock_server_cache::release(lock_protocol::lockid_t lid, std::string id,
int &r)
{
lock_protocol::status ret = lock_protocol::OK;
std::unique_lock<std::mutex> map_lock(server_mtx);
auto itr = lock_cache.find(lid);
if (itr == lock_cache.end())
{
return lock_protocol::IOERR;
}
auto cur_lock = itr->second;
std::unique_lock<std::mutex> single_lock(cur_lock->mtx);
map_lock.unlock();
if (cur_lock->client_id != id || cur_lock->status == server_lock::FREE)
{
return ret;
}
cur_lock->status = server_lock::FREE;
if (!cur_lock->waiting_cids.empty())
{
auto waiting_cid = cur_lock->waiting_cids.front();
cur_lock->waiting_cids.pop();
handle cl_handler(waiting_cid);
rpcc *cl = cl_handler.safebind();
if (cl)
{
int r;
single_lock.unlock();
auto ret = cl->call(rlock_protocol::retry, lid, r);
single_lock.lock();
if (ret != lock_protocol::OK)
{
return ret;
}
}
else
{
return lock_protocol::IOERR;
}
}
return ret;
}
lock_protocol::status
lock_server_cache::stat(lock_protocol::lockid_t lid, int &r)
{
tprintf("stat request\n");
r = nacquire;
return lock_protocol::OK;
}