concurrency.rs 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. //! Concurrency tests for `InMemoryStore` primitives.
  2. //!
  3. //! The generated conformance suite drives the store through a single `&store`,
  4. //! so it never exercises two callers racing on the same rows. `reserve_postings`
  5. //! is the primitive the saga relies on to make double-spends impossible: it must
  6. //! flip each `Active` posting to `PendingInactive` for exactly one caller, even
  7. //! when many callers target the same postings at once.
  8. #![allow(missing_docs)]
  9. use std::sync::Arc;
  10. use kuatia_storage::mem_store::InMemoryStore;
  11. use kuatia_storage::store::PostingStore;
  12. use kuatia_types::*;
  13. fn posting(index: u16) -> Posting {
  14. Posting::new(
  15. PostingId {
  16. transfer: EnvelopeId([1; 32]),
  17. index,
  18. },
  19. AccountId::new(1),
  20. AssetId::new(1),
  21. Cent::from(100),
  22. )
  23. }
  24. /// Many tasks concurrently reserve the same set of postings, each with its own
  25. /// reservation id. Reservation is a claim, so each posting may be reserved by
  26. /// exactly one task: the per-task counts sum to the number of postings, and
  27. /// every posting ends `PendingInactive`.
  28. #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
  29. async fn concurrent_reserve_claims_each_posting_once() {
  30. const POSTINGS: u16 = 32;
  31. const TASKS: i64 = 8;
  32. let store = Arc::new(InMemoryStore::new());
  33. let all: Vec<Posting> = (0..POSTINGS).map(posting).collect();
  34. store.insert_postings(&all).await.unwrap();
  35. let ids: Vec<PostingId> = all.iter().map(|p| p.id).collect();
  36. let mut handles = Vec::new();
  37. for t in 0..TASKS {
  38. let store = Arc::clone(&store);
  39. let ids = ids.clone();
  40. handles.push(tokio::spawn(async move {
  41. store
  42. .reserve_postings(&ids, ReservationId::new(t + 1))
  43. .await
  44. .unwrap()
  45. }));
  46. }
  47. let mut total_reserved: u64 = 0;
  48. for h in handles {
  49. total_reserved += h.await.unwrap();
  50. }
  51. assert_eq!(
  52. total_reserved, POSTINGS as u64,
  53. "each posting is reserved by exactly one task"
  54. );
  55. let final_postings = store.get_postings(&ids).await.unwrap();
  56. assert!(
  57. final_postings
  58. .iter()
  59. .all(|p| p.status == PostingStatus::PendingInactive),
  60. "every posting ends reserved"
  61. );
  62. }