Mi è stato consigliato dapprima di usare boost::mutex (anche perchè portabile) e poi i vari mutex forniti dalla piattaforma win32. Ma alla fine mi è stato consigliato di preferire la sezione critica di win32 ovvero: CRITICAL_SECTION
Giusto per chiarezza. boost::mutex è portabile perchè boost è concepita per funzionare su diversi sistemi operativi. Se non hai esigenze di portabilità, puoi usare le primitive di sistema come stai facendo.
Il fatto è che le usi male. Le CRITICAL_SECTION vanno inizializzate una sola volta prima di far partire i thread e cancellate alla fine e solo se i thread sono terminati.
codice:
InitializeCriticalSection(&csProducer);
// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;
// Release resources used by the critical section object.
DeleteCriticalSection(&csProducer);
Questo è un errore grave. Una volta lanciato il producer, quello viaggia per i fatti suoi, ma il main continua: cancelli la CRITICAL_SECTION e quando il Producer ci accede hai comportamento indefinito.
Ancora peggio:
codice:
while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();
// Initialize the critical section
InitializeCriticalSection(&csConsumer);
// Spawn a new Consumr Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;
// Release resources used by the critical section object.
DeleteCriticalSection(&csConsumer);
}
Qui inizializzi e cancelli le CRITICAL_SECTION in continuazione, col risultato di avere accessi spuri a tutto quanto. E come se non bastasse, non proteggi nemmeno tutte le mappe globali.
Ricorda che i mutex servono per le mappe, non per i thread, quindi non ha senso chiamare csProducer e csConsumer; meglio chiamarle users_mutex e eventi_mutex
Queste sono le correzioni da fare, i commenti sono nel codice.
Codice PHP:
unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);
void getDateTime(char * szTime);
enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};
struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};
struct circular
{
circular_buffer<buffer> cb;
};
// Global maps
map<int, circular> users;
map<int, HANDLE> eventi;
// Declare Procuder && Cosumer CS
CRITICAL_SECTION users_mutex;
CRITICAL_SECTION eventi_mutex;
int main()
{
// Initialize all critical sections
InitializeCriticalSection(&users_mutex);
InitializeCriticalSection(&eventi_mutex);
// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;
// Server.
// Set up server (port: 8000, maxconn: 10)
//
SocketServer sockIn(8000, 10);
while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();
// Spawn a new Consumr Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;
}
DeleteCriticalSection(&users_mutex);
DeleteCriticalSection(&eventi_mutex);
sockIn.Close();
return EXIT_SUCCESS;
}
// Consumer Thread
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;
s->SendBytes("Hello World!" + CRLF);
int threadid = (int)GetCurrentThreadId();
// Create Event && Push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);
//////////////////////////////////////////////////////////////////////////
EnterCriticalSection(&eventi_mutex); // acquisisce la sezione critica.
eventi.insert(make_pair(threadid,hevent));
LeaveCriticalSection(&eventi_mutex); // rilascia la sezione critica.
//////////////////////////////////////////////////////////////////////////
// Prepare && Add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);
for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}
//////////////////////////////////////////////////////////////////////////
EnterCriticalSection(&users_mutex); // acquisisce la sezione critica.
users.insert(make_pair(threadid, c));
LeaveCriticalSection(&users_mutex); // rilascia la sezione critica.
//////////////////////////////////////////////////////////////////////////
//
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//
Sleep(500);
while(1)
{
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);
// Qui non ci dovrebbe essere una race condition dal momento che ogni consumer
// accede a un suo buffer. Nel caso invece ci sia tutto l'if va protetto dal mutex.
if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}
// Close & remove event from event map
CloseHandle(eventi[threadid]);
//////////////////////////////////////////////////////////////////////////
// Request ownership of the critical section.
EnterCriticalSection(&eventi_mutex);
eventi.erase(threadid);
// Release ownership of the critical section.
LeaveCriticalSection(&eventi_mutex);
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
EnterCriticalSection(&users_mutex);
// Remove buffer from the map
users.erase(threadid);
LeaveCriticalSection(&users_mutex);
//////////////////////////////////////////////////////////////////////////
// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);
// Disconnect client
cout << "Closing thread..." << endl;
s->Close();
delete s;
return 0;
}
// Producer Thread
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);
// questo iteratore è meglio renderlo locale.
map<int, circular>::iterator uit;
// Tutta questa parte va protetta dai mutex perché gli iteratori delle mappe potrebbero
// essere invalidati da un accesso in scrittura del consumer.
//////////////////////////////////////////////////////////////////////////
// Attenzione!!! L'ordine di rilascio deve avvenire in senso inverso o
// si verifica un dead lock.
EnterCriticalSection(&users_mutex);
EnterCriticalSection(&eventi_mutex);
for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer((unsigned char*)szTime, 30, BUFF_DONE));
if(eventi[uit->first])
SetEvent(eventi[uit->first]);
cout << "Producer is writing to: " << uit->first << endl;
}
LeaveCriticalSection(&eventi_mutex);
LeaveCriticalSection(&users_mutex);
//////////////////////////////////////////////////////////////////////////
}
return 0;
}
void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}
P.S.
Ho visto che frequenti Usenet. Se ti interessa, c'è il gruppo: comp.programming.threads dove si discute della programmazione multithreading senza distinzione di sistema operativo.