Ciao a tutti, specialmente a daniele_dll che ha azzeccato in pieno il problema (non sapevo che tu stessi in questi forum).
Sto facendo esattamente un hub per direct connect, lo realizzo in java perkè ho bisogno che vada indistintamente su linux e windows.
In più avevo un amico che voleva reggere hubs con Mac, e purtroppo per mac non ci sono hub decenti (non che io ne conosca).
Prima il codice si basava su un thread per ogni utente che legge i messaggi dal socket, e quindi quando serve resta appeso sulla read().
Tutti i thread di lettura appendono i messaggi ricevuti in un unico buffer fifo (CentralBuffer), da questo buffer legge i messaggi un thread principale (unico, il Core) che li elabora uno dopo l'altro e appende ad altri buffer (uno per ogni user, CommandBuffer) i messaggi che devono essere spediti.
Contemporaneamente ci sono altri thread (uno per ogni user) che sono fermi in attesa che il proprio CommandBuffer abbia qualcosa da spedire, e poi appena c'è qualcosa fanno le write() sul socket.
Infine c'è un altro thread che cicla le accept() e inserisce un messaggio speciale nel CentralBuffer, il Core penserà poi a completare l'inizializzazione del nuovo utente connesso.
I buffer sono la parte più importante perkè permettono di bloccare il thread finchè non ci sono messaggi, e impediscono così fastidiosissimi loop.
codice:
package dcHub;
import java.util.*;
/**
* Fornisce al Core i messaggi provenienti dagli user
* @author Fedechicco
*/
public class CentralBuffer {
volatile Vector m_buf;
volatile int prioritymex = 0;
public CentralBuffer() {
m_buf = new Vector();
}
synchronized public CentralMessage getNext(boolean blocking){
if(blocking){
CentralMessage aux;
do{
while(m_buf.isEmpty()){
try{
wait();
}catch(Exception e){
}
}
aux = (CentralMessage)(m_buf.firstElement());
m_buf.removeElementAt(0);
if(prioritymex > 0){
prioritymex --;
}
}while(aux == null);
return aux;
}
else{
CentralMessage aux;
if(m_buf.size() > 0){
aux = (CentralMessage)(m_buf.firstElement());
m_buf.removeElementAt(0);
if(prioritymex > 0){
prioritymex --;
}
}
else {
aux = null;
}
return aux;
}
}
synchronized public void setNext(CentralMessage comm){
m_buf.add(comm);
notify();
return;
}
synchronized public void setNextFirst(CentralMessage comm){
m_buf.add(prioritymex, comm);
prioritymex++;
notify();
return;
}
}
Ok, questo era come l'avevo fatto prima, con 800 utenti lavora da dio, niente lag e non usa molta cpu, mi stupivo di quanto fosse performante.
Peccato che sopra i 1500 utenti più o meno mi esca spessissimo il fastidiosissimo messaggio di Exception: "java.lang.OutOfMemoryError: unable to create new native thread"
Ecco la mia triste storia.
Così ho deciso che era ora di studiare i socketUnbloking, e ho studiato i Selector e i SocketChannel, ho impostato un ciclo solo nel thread principale che usa il Selector per controllare in unblocking mode contemporaneamente tutti i socket, sia in accept (i ServerSocket)che in read, e se c'è qualcosa da scrivere controlla anke se il socket è pronto per la write e scrive.
Il Selector ha un metodo di select() che permette di restare appeso in wait finchè non c'è qualcosa da fare su uno qualunque dei socket registrati presso il Selector, e solo allora ritorna.
Il ciclo che fa la select() è infinito, sì, ma dovrebbe essere buono ugualmente!.
La cosa peggiore è che solo oltre i 150 utenti circa il programma inizia a usare il 100% di risorse di cpu, come se solo oltre una certa utenza comparisse un loop.
Ho provato anke a separare e mettere nel thread principale solo le accept e le read(), e mettere in un thread separato (un thread singolo) le write, con il loro selector. Ma nulla è cambiato, non che ci sperassi, più che altro era un tentativo disperato.
Il codice attuale del ciclo del thread Core è questo:
codice:
for(;!restart && !shutdown;){
try{
try {
selector.select();
} catch (IOException e) {
if(m_verbose || m_debug){
System.out.print(CommandMaker.completeDescription(e).toString());
}
}
Iterator it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey selKey = (SelectionKey)it.next();
it.remove();
try {
processSelectionKey(selKey);
} catch (IOException e) {
if(m_verbose || m_debug){
System.out.print(CommandMaker.completeDescription(e).toString());
}
selKey.cancel();
}
// forse non va bene qua perkè potrebbe incasinare a cazzo l'iteratore
for(now = m_buf.getNext(false); now != null; now = m_buf.getNext(false)){
try{
if(now.m_usr != null && now.m_mex != null){
if(m_debug){
m_lastmessage.delete(0, m_lastmessage.length());
m_lastmessage.append((new Date()).toString()).append(" (").append(now.m_usr.m_I.getNick()).append(" - ").append(now.m_usr.m_I.getIp().getHostAddress()).append(") => ").append(now.m_mex);
}
if(ora + (long)7200000 < (new Date()).getTime()){ // salva i files ogni 2 ore
ora = (new Date()).getTime();
saveFiles();
}
if(now.m_mex.toString().compareTo("/$/$Connected") == 0){
now.m_usr.coreThreadMustInit();
}
else{
now.m_usr.todo(now.m_mex);
}
}
}catch(Exception e){
if(m_verbose || m_debug){
System.out.print(CommandMaker.completeDescription(e).toString());
}
}
}
// fine del forse non va bene qua
}
}catch(Exception e){
if(m_verbose){
String str = e.toString() + "\nLocalizedMessage: " + e.getLocalizedMessage() + "\nMessage: " + e.getMessage() + "\n\nBackTrace:\n";
for(int i = 0; i < e.getStackTrace().length; i++){
str += ((e.getStackTrace())[i]).toString() + '\n';
}
str += "\nHave a nice day!";
System.out.print(str);
}
}
}
e la funzione processSelectionKey() è definita così:
codice:
public void processSelectionKey(SelectionKey selKey) throws IOException {
if (selKey.isValid() && selKey.isReadable()) {
// Get channel with bytes to read
SocketChannel sChannel = (SocketChannel)selKey.channel();
User now = (User)selKey.attachment();
//leggo dal socket
buf.clear();
int numBytesRead = -1;
try{
numBytesRead = sChannel.read(buf);
}catch(Exception e){
numBytesRead = -1;
}
if (numBytesRead == -1) {
// No more bytes can be read from the channel
sChannel.close();
selKey.cancel();
now.quitMe();
} else {
// To read the bytes, flip the buffer
buf.flip();
buf.get(bbuf, 0, numBytesRead);
try{
now.message_under_construction += new String(bbuf, 0, numBytesRead, "Cp1252");
}catch(Exception e){
if(m_verbose){
System.out.print("Error on decoding bytes");
}
return;
}
if(now.message_under_construction.length() > (long)(51200)){ // quello è 50*1024
now.message_under_construction = now.message_under_construction.substring(0, 256) + "***messaggio_tagliato,_superava_i_50_Kb...***|";
}
//fine gestione charset
while(now.message_under_construction.indexOf('|') != -1 && now.message_under_construction.length() > 0){
String com = now.message_under_construction.substring(0, now.message_under_construction.indexOf('|'));
//controllo com e altrimenti lo butto
for(int cc = 0; cc < m_patternCheckCmd.length; cc++){
if(m_patternCheckCmd[cc].matcher(com).matches()){
if(com.length() > 0 && com.charAt(0)=='<'){
m_buf.setNextFirst(new CentralMessage(now, new StringBuffer(com)));
}
else{
m_buf.setNext(new CentralMessage(now, new StringBuffer(com)));
}
break;
}
}
now.message_under_construction = now.message_under_construction.substring(now.message_under_construction.indexOf('|') + 1);
}
// Read the bytes from the buffer ...;
// see e159 Getting Bytes from a ByteBuffer
}
// See e174 Reading from a SocketChannel
}
if (selKey.isValid() && selKey.isAcceptable()){
ServerSocketChannel ssChannel = (ServerSocketChannel)selKey.channel();
SocketChannel sChannel = ssChannel.accept();
sChannel.configureBlocking(false);
// If serverSocketChannel is non-blocking, sChannel may be null
if (sChannel == null) {
// There were no pending connection requests; try again later.
// To be notified of connection requests,
// see e179 Using a Selector to Manage Non-Blocking Server Sockets.
} else {
// Use the socket channel to communicate with the client
boolean success = sChannel.finishConnect();
if (!success) {
// An error occurred; handle it
sChannel.close();
// Unregister the channel with this selector
selKey.cancel();
}
else{
User now = new User(this, sChannel);
sChannel.register(selector, SelectionKey.OP_READ);
sChannel.keyFor(selector).attach(now);
m_buf.setNext(new CentralMessage(now, new StringBuffer("/$/$Connected")));
}
}
}
}
Potete facilmente vedere che il codice è preso in buona parte da un sito che ho trovato con google dove si spiegava l'utilizzo dei SocketChannel.
Vedete anke qui l'utilizzo dei miei CentralBuffer, in unblocking mode.
Beh, le strade le ho provate entrambe, so che la unblocking è la migliore per questo tipo di programmi ad alta utenza, ma quell'utilizzo del 100% della cpu non ci vuole... Se qualcuno riesce ad aiutarmi gliene sono grato!
Ah, il progetto è anke su sourceForge, cerco collaboratori per chi volesse darmi una mano.
by Fedechicco