@ -37,11 +37,9 @@ namespace audio {
using Reason = QueueUpdate : : Reason ;
RandomIterator : : RandomIterator ( )
: seed_ ( 0 ) , pos_ ( 0 ) , size_ ( 0 ) { }
RandomIterator : : RandomIterator ( ) : seed_ ( 0 ) , pos_ ( 0 ) , size_ ( 0 ) { }
RandomIterator : : RandomIterator ( size_t size )
: seed_ ( ) , pos_ ( 0 ) , size_ ( size ) {
RandomIterator : : RandomIterator ( size_t size ) : seed_ ( ) , pos_ ( 0 ) , size_ ( size ) {
esp_fill_random ( & seed_ , sizeof ( seed_ ) ) ;
}
@ -95,7 +93,9 @@ auto notifyPlayFrom(uint32_t start_from_position) -> void {
events : : Audio ( ) . Dispatch ( ev ) ;
}
TrackQueue : : TrackQueue ( tasks : : WorkerPool & bg_worker , database : : Handle db , drivers : : NvsStorage & nvs )
TrackQueue : : TrackQueue ( tasks : : WorkerPool & bg_worker ,
database : : Handle db ,
drivers : : NvsStorage & nvs )
: mutex_ ( ) ,
bg_worker_ ( bg_worker ) ,
db_ ( db ) ,
@ -103,10 +103,17 @@ TrackQueue::TrackQueue(tasks::WorkerPool& bg_worker, database::Handle db, driver
playlist_ ( " .queue.playlist " ) ,
position_ ( 0 ) ,
shuffle_ ( ) ,
repeatMode_ ( static_cast < RepeatMode > ( nvs . QueueRepeatMode ( ) ) ) { }
repeatMode_ ( static_cast < RepeatMode > ( nvs . QueueRepeatMode ( ) ) ) ,
cancel_appending_async_ ( false ) ,
appending_async_ ( false ) ,
loading_ ( false ) ,
ready_ ( true ) { }
auto TrackQueue : : current ( ) const - > TrackItem {
const std : : shared_lock < std : : shared_mutex > lock ( mutex_ ) ;
if ( ! ready_ ) {
return { } ;
}
std : : string val ;
if ( opened_playlist_ & & position_ < opened_playlist_ - > size ( ) ) {
val = opened_playlist_ - > value ( ) ;
@ -205,6 +212,7 @@ auto TrackQueue::append(Item i) -> void {
if ( ! filename . empty ( ) ) {
playlist_ . append ( filename ) ;
}
ready_ = true ;
updateShuffler ( was_queue_empty ) ;
}
notifyChanged ( current_changed , Reason : : kExplicitUpdate ) ;
@ -214,6 +222,7 @@ auto TrackQueue::append(Item i) -> void {
{
const std : : unique_lock < std : : shared_mutex > lock ( mutex_ ) ;
playlist_ . append ( std : : get < std : : string > ( i ) ) ;
ready_ = true ;
updateShuffler ( was_queue_empty ) ;
}
notifyChanged ( current_changed , Reason : : kExplicitUpdate ) ;
@ -222,42 +231,117 @@ auto TrackQueue::append(Item i) -> void {
// Iterators can be very large, and retrieving items from them often
// requires disk i/o. Handle them asynchronously so that inserting them
// doesn't block.
bg_worker_ . Dispatch < void > ( [ = , this ] ( ) {
database : : TrackIterator it = std : : get < database : : TrackIterator > ( i ) ;
appendAsync ( std : : get < database : : TrackIterator > ( i ) , was_queue_empty ) ;
}
}
size_t next_update_at = 10 ;
while ( true ) {
auto next = * it ;
if ( ! next ) {
break ;
}
// Keep this critical section small so that we're not blocking methods
// like current().
auto TrackQueue : : appendAsync ( database : : TrackIterator it , bool was_empty )
- > void {
// First, check whether or not an async append is already running. Grab the
// mutex first to avoid races where we check appending_async_ between the bg
// task looking at pending_async_iterators_ and resetting appending_async_.
{
const std : : unique_lock < std : : shared_mutex > lock ( mutex_ ) ;
if ( appending_async_ ) {
// We are already appending, so just add to the queue.
pending_async_iterators_ . push_back ( it ) ;
return ;
} else {
// We need to start a new task.
appending_async_ = true ;
cancel_appending_async_ = false ;
}
}
bg_worker_ . Dispatch < void > ( [ = , this ] ( ) mutable {
bool update_current = was_empty ;
if ( update_current ) {
ready_ = false ;
}
loading_ = true ;
size_t next_update_at = 10 ;
while ( ! cancel_appending_async_ ) {
auto next = * it ;
if ( ! next ) {
// The current iterator is out of tracks. Is there another iterator for
// us to process?
{
const std : : unique_lock < std : : shared_mutex > lock ( mutex_ ) ;
auto filename = getFilepath ( * next ) . value_or ( " " ) ;
if ( ! filename . empty ( ) ) {
playlist_ . append ( filename ) ;
if ( ! pending_async_iterators_ . empty ( ) ) {
// Yes. Grab it and continue.
it = pending_async_iterators_ . front ( ) ;
pending_async_iterators_ . pop_front ( ) ;
continue ;
} else {
// No, time to finish up.
// First make sure the shuffler has the final count.
updateShuffler ( update_current ) ;
// Now reset all our state.
loading_ = false ;
ready_ = true ;
appending_async_ = false ;
appending_async_ . notify_all ( ) ;
notifyChanged ( update_current , Reason : : kExplicitUpdate ) ;
return ;
}
}
it + + ;
// Appending very large iterators can take a while. Send out periodic
// queue updates during them so that the user has an idea what's going
// on.
if ( ! - - next_update_at ) {
next_update_at = util : : sRandom - > RangeInclusive ( 10 , 20 ) ;
notifyChanged ( false , Reason : : kBulkLoadingUpdate ) ;
}
}
// Keep this critical section small so that we're not blocking methods
// like current().
{
const std : : unique_lock < std : : shared_mutex > lock ( mutex_ ) ;
updateShuffler ( was_queue_empty ) ;
auto filename = getFilepath ( * next ) . value_or ( " " ) ;
if ( ! filename . empty ( ) ) {
playlist_ . append ( filename ) ;
}
}
notifyChanged ( current_changed , Reason : : kExplicitUpdate ) ;
} ) ;
}
it + + ;
// Appending very large iterators can take a while. Send out periodic
// queue updates during them so that the user has an idea what's going
// on.
if ( ! - - next_update_at ) {
notifyChanged ( false , Reason : : kBulkLoadingUpdate ) ;
if ( update_current ) {
if ( shuffle_ & & playlist_ . size ( ) > = 100 ) {
// Special case for shuffling a large amount of tracks. Because
// shuffling many tracks can be slow to wait for them all to load,
// we wait for 100 or so to load, then start initially with a random
// track from this first lot.
updateShuffler ( true ) ;
ready_ = true ;
notifyChanged ( true , Reason : : kExplicitUpdate ) ;
update_current = false ;
} else if ( ! shuffle_ & & playlist_ . size ( ) > 0 ) {
// If the queue was empty, then we want to start playing the first
// track without waiting for the entire queue to finish loading
ready_ = true ;
notifyChanged ( true , Reason : : kExplicitUpdate ) ;
update_current = false ;
}
} else {
// Make sure the shuffler gets updated periodically so that skipping
// tracks whilst we're still loading gives us the whole queue to play
// with.
updateShuffler ( false ) ;
}
next_update_at = util : : sRandom - > RangeInclusive ( 10 , 20 ) ;
}
}
// If we're here, then the async append must have been cancelled. Bail out
// immediately and rely on whatever cancelled us to reset our state. This
// is a little messy, but we would have to gain a lock on mutex_ to reset
// ourselves properly, and at the moment the only thing that can cancel us
// is clear().
appending_async_ = false ;
appending_async_ . notify_all ( ) ;
} ) ;
}
auto TrackQueue : : next ( ) - > void {
@ -306,7 +390,7 @@ auto TrackQueue::next(Reason r) -> void {
position_ + + ; // Next track
changed = true ;
} else if ( repeatMode_ = = RepeatMode : : REPEAT_QUEUE ) {
position_ = 0 ; // Go to beginning
position_ = 0 ; // Go to beginning
changed = true ;
}
}
@ -329,7 +413,7 @@ auto TrackQueue::previous() -> void {
if ( position_ > 0 ) {
position_ - - ;
} else if ( repeatMode_ = = RepeatMode : : REPEAT_QUEUE ) {
position_ = totalSize ( ) - 1 ; // Go to the end of the queue
position_ = totalSize ( ) - 1 ; // Go to the end of the queue
}
}
goTo ( position_ ) ;
@ -347,17 +431,24 @@ auto TrackQueue::finish() -> void {
}
auto TrackQueue : : clear ( ) - > void {
if ( appending_async_ ) {
cancel_appending_async_ = true ;
appending_async_ . wait ( true ) ;
}
{
const std : : unique_lock < std : : shared_mutex > lock ( mutex_ ) ;
position_ = 0 ;
ready_ = false ;
loading_ = false ;
pending_async_iterators_ . clear ( ) ;
playlist_ . clear ( ) ;
opened_playlist_ . reset ( ) ;
if ( shuffle_ ) {
shuffle_ - > resize ( 0 ) ;
}
notifyChanged ( false , Reason : : kTrackFinished ) ;
}
notifyChanged ( true , Reason : : kExplicitUpdate ) ;
}
auto TrackQueue : : random ( bool en ) - > void {
@ -389,6 +480,16 @@ auto TrackQueue::repeatMode() const -> RepeatMode {
return repeatMode_ ;
}
auto TrackQueue : : isLoading ( ) const - > bool {
const std : : unique_lock < std : : shared_mutex > lock ( mutex_ ) ;
return loading_ ;
}
auto TrackQueue : : isReady ( ) const - > bool {
const std : : unique_lock < std : : shared_mutex > lock ( mutex_ ) ;
return ready_ ;
}
auto TrackQueue : : serialise ( ) - > std : : string {
cppbor : : Array tracks { } ;
cppbor : : Map encoded ;