1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
use jfs; use std::path; use sda_protocol::Id; use sda_protocol::{AgentId, ClerkingJob, ClerkingJobId, ClerkingResult, SnapshotId}; use stores::{BaseStore, ClerkingJobsStore}; use jfs_stores::JfsStoreExt; use SdaServerResult; pub struct JfsClerkingJobsStore(path::PathBuf); impl JfsClerkingJobsStore { pub fn new<P: AsRef<path::Path>>(prefix: P) -> SdaServerResult<JfsClerkingJobsStore> { Ok(JfsClerkingJobsStore(prefix.as_ref().to_path_buf())) } fn store<I: Id>(&self, prefix: &str, id: &I) -> SdaServerResult<jfs::Store> { Ok(jfs::Store::new(self.0 .join(prefix) .join(id.to_string()) .to_str() .ok_or("pathbuf to string")?)?) } } impl BaseStore for JfsClerkingJobsStore { fn ping(&self) -> SdaServerResult<()> { Ok(()) } } impl ClerkingJobsStore for JfsClerkingJobsStore { fn enqueue_clerking_job(&self, job: &ClerkingJob) -> SdaServerResult<()> { self.store("queue", &job.clerk)?.create(job) } fn poll_clerking_job(&self, clerk: &AgentId) -> SdaServerResult<Option<ClerkingJob>> { Ok(self.store("queue", clerk)?.all::<ClerkingJob>()?.into_iter().next().map(|a| a.1)) } fn get_clerking_job(&self, clerk: &AgentId, job: &ClerkingJobId) -> SdaServerResult<Option<ClerkingJob>> { self.store("queue", clerk)?.get_option(job) } fn create_clerking_result(&self, result: &ClerkingResult) -> SdaServerResult<()> { let job: ClerkingJob = self.store("queue", &result.clerk)? .get_option(&result.job)? .ok_or("Job not found")?; self.store("results", &job.snapshot)?.upsert_with_id(result, &result.job)?; self.store("done", &result.clerk)?.upsert_with_id(&job, &result.job)?; self.store("queue", &result.clerk)?.delete(&*result.job.to_string())?; Ok(()) } fn list_results(&self, snapshot: &SnapshotId) -> SdaServerResult<Vec<ClerkingJobId>> { Ok(self.store("results", snapshot)? .all::<ClerkingResult>()? .iter() .map(|r| r.1.job) .collect::<Vec<ClerkingJobId>>()) } fn get_result(&self, snapshot: &SnapshotId, job: &ClerkingJobId) -> SdaServerResult<Option<ClerkingResult>> { self.store("results", snapshot)?.get_option(job) } }