1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
use crate::engine::KvsEngine;
use crate::error::Result;
use crate::tcp::protocol::DBCommands;
use std::io::Write;
use std::net::{TcpListener, TcpStream};
pub struct KvsServer<S: KvsEngine> {
addr: String,
store: S,
}
impl<S: KvsEngine> KvsServer<S> {
pub fn new(addr: String, store: S) -> Result<Self> {
let obj = KvsServer { addr, store };
log::info!("Version -- {}", env!("CARGO_PKG_VERSION"));
log::info!("Created KVSStore successful");
Ok(obj)
}
pub fn listen(&mut self) {
let listener = TcpListener::bind(&self.addr).unwrap();
log::info!("Running Server on {}", &self.addr);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
if let Err(e) = self.handle_connection(stream) {
log::error!("Error serving command: {}", e);
};
}
Err(e) => {
log::error!("Stream listener error: {}", e)
}
}
}
}
fn handle_connection(&mut self, mut stream: TcpStream) -> Result<()> {
let cmd = DBCommands::from_stream(&mut stream)?;
log::debug!("Command - {:?}", cmd);
let resp = cmd.invoke_cmd(&mut self.store);
log::debug!("Result - {:?}", resp);
let resp_bytes = resp.to_packet()?;
stream.write_all(&resp_bytes)?;
stream.flush()?;
Ok(())
}
}