1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
use std::{path::Path, sync::Arc, time::Duration};
use ansi_term::Colour;
use io::{IoContext, IoError, IoHandler, IoService, TimerToken};
use stop_guard::StopGuard;
use blockchain::{BlockChainDB, BlockChainDBHandler};
use ethcore::{
client::{ChainNotify, Client, ClientConfig, ClientIoMessage},
error::{Error as EthcoreError, ErrorKind},
miner::Miner,
snapshot::{
service::{Service as SnapshotService, ServiceParams as SnapServiceParams},
Error as SnapshotError, RestorationStatus, SnapshotService as _SnapshotService,
},
spec::Spec,
};
use Error;
pub struct ClientService {
io_service: Arc<IoService<ClientIoMessage>>,
client: Arc<Client>,
snapshot: Arc<SnapshotService>,
database: Arc<dyn BlockChainDB>,
_stop_guard: StopGuard,
}
impl ClientService {
pub fn start(
config: ClientConfig,
spec: &Spec,
blockchain_db: Arc<dyn BlockChainDB>,
snapshot_path: &Path,
restoration_db_handler: Box<dyn BlockChainDBHandler>,
_ipc_path: &Path,
miner: Arc<Miner>,
) -> Result<ClientService, Error> {
let io_service = IoService::<ClientIoMessage>::start("Client")?;
info!(
"Configured for {} using {} engine",
Colour::White.bold().paint(spec.name.clone()),
Colour::Yellow.bold().paint(spec.engine.name())
);
let pruning = config.pruning;
let client = Client::new(
config,
&spec,
blockchain_db.clone(),
miner.clone(),
io_service.channel(),
)?;
miner.set_io_channel(io_service.channel());
miner.set_in_chain_checker(&client.clone());
let snapshot_params = SnapServiceParams {
engine: spec.engine.clone(),
genesis_block: spec.genesis_block(),
restoration_db_handler: restoration_db_handler,
pruning: pruning,
channel: io_service.channel(),
snapshot_root: snapshot_path.into(),
client: client.clone(),
};
let snapshot = Arc::new(SnapshotService::new(snapshot_params)?);
let client_io = Arc::new(ClientIoHandler {
client: client.clone(),
snapshot: snapshot.clone(),
});
io_service.register_handler(client_io)?;
spec.engine.register_client(Arc::downgrade(&client) as _);
let stop_guard = StopGuard::new();
Ok(ClientService {
io_service: Arc::new(io_service),
client: client,
snapshot: snapshot,
database: blockchain_db,
_stop_guard: stop_guard,
})
}
pub fn register_io_handler(
&self,
handler: Arc<dyn IoHandler<ClientIoMessage> + Send>,
) -> Result<(), IoError> {
self.io_service.register_handler(handler)
}
pub fn client(&self) -> Arc<Client> {
self.client.clone()
}
pub fn snapshot_service(&self) -> Arc<SnapshotService> {
self.snapshot.clone()
}
pub fn io(&self) -> Arc<IoService<ClientIoMessage>> {
self.io_service.clone()
}
pub fn add_notify(&self, notify: Arc<dyn ChainNotify>) {
self.client.add_notify(notify);
}
pub fn db(&self) -> Arc<dyn BlockChainDB> {
self.database.clone()
}
pub fn shutdown(&self) {
trace!(target: "shutdown", "Shutting down Client Service");
self.snapshot.shutdown();
self.client.shutdown();
}
}
struct ClientIoHandler {
client: Arc<Client>,
snapshot: Arc<SnapshotService>,
}
const CLIENT_TICK_TIMER: TimerToken = 0;
const SNAPSHOT_TICK_TIMER: TimerToken = 1;
const CLIENT_TICK: Duration = Duration::from_secs(5);
const SNAPSHOT_TICK: Duration = Duration::from_secs(10);
impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK)
.expect("Error registering client timer");
io.register_timer(SNAPSHOT_TICK_TIMER, SNAPSHOT_TICK)
.expect("Error registering snapshot timer");
}
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
trace_time!("service::read");
match timer {
CLIENT_TICK_TIMER => {
use ethcore::snapshot::SnapshotService;
let snapshot_restoration =
if let RestorationStatus::Ongoing { .. } = self.snapshot.restoration_status() {
true
} else {
false
};
self.client.tick(snapshot_restoration)
}
SNAPSHOT_TICK_TIMER => self.snapshot.tick(),
_ => warn!("IO service triggered unregistered timer '{}'", timer),
}
}
fn message(&self, _io: &IoContext<ClientIoMessage>, net_message: &ClientIoMessage) {
trace_time!("service::message");
use std::thread;
match *net_message {
ClientIoMessage::BlockVerified => {
self.client.import_verified_blocks();
}
ClientIoMessage::BeginRestoration(ref manifest) => {
if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
warn!("Failed to initialize snapshot restoration: {}", e);
}
}
ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => {
self.snapshot.feed_state_chunk(*hash, chunk)
}
ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => {
self.snapshot.feed_block_chunk(*hash, chunk)
}
ClientIoMessage::TakeSnapshot(num) => {
let client = self.client.clone();
let snapshot = self.snapshot.clone();
let res = thread::Builder::new()
.name("Periodic Snapshot".into())
.spawn(move || {
if let Err(e) = snapshot.take_snapshot(&*client, num) {
match e {
EthcoreError(
ErrorKind::Snapshot(SnapshotError::SnapshotAborted),
_,
) => info!("Snapshot aborted"),
_ => warn!("Failed to take snapshot at block #{}: {}", num, e),
}
}
});
if let Err(e) = res {
debug!(target: "snapshot", "Failed to initialize periodic snapshot thread: {:?}", e);
}
}
ClientIoMessage::Execute(ref exec) => {
(*exec.0)(&self.client);
}
_ => {}
}
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, thread, time};
use tempdir::TempDir;
use super::*;
use ethcore::{client::ClientConfig, miner::Miner, spec::Spec, test_helpers};
use ethcore_db::NUM_COLUMNS;
use kvdb_rocksdb::{CompactionProfile, DatabaseConfig};
#[test]
fn it_can_be_started() {
let tempdir = TempDir::new("").unwrap();
let client_path = tempdir.path().join("client");
let snapshot_path = tempdir.path().join("snapshot");
let client_config = ClientConfig::default();
let mut client_db_config = DatabaseConfig::with_columns(NUM_COLUMNS);
client_db_config.memory_budget = client_config.db_cache_size;
client_db_config.compaction = CompactionProfile::auto(&client_path);
let client_db_handler = test_helpers::restoration_db_handler(client_db_config.clone());
let client_db = client_db_handler.open(&client_path).unwrap();
let restoration_db_handler = test_helpers::restoration_db_handler(client_db_config);
let spec = Spec::new_test();
let service = ClientService::start(
ClientConfig::default(),
&spec,
client_db,
&snapshot_path,
restoration_db_handler,
tempdir.path(),
Arc::new(Miner::new_for_tests(&spec, None)),
);
assert!(service.is_ok());
drop(service.unwrap());
thread::park_timeout(time::Duration::from_millis(100));
}
}