Visualizzazione dei risultati da 1 a 7 su 7
  1. #1

    [Java] Socket & MultiThreading

    Ciao a tutti, ho un problema con un'applicazione server che sto sviluppando in java.
    Il software deve mantenere online in una sorta di chat più utenti contemporaneamente, e inizialmente avevo realizzato per ogni utente 2 thread, uno fermo bloccato sulla read() dal socket e l'altro pronto a fare una write quando richiesto.
    Ecco, 2 thread per utente...
    I problemi sono iniziati quando la mia chat ha oltrepassato i 1000 utenti, e la mia fedora ha deciso che non riusciva più a creare thread.

    Ho provato a eliminare i thread con le SocketChannel e le unblocking, ma la performance è scarsa per non dire pessima, la cpu lavora più di 5 volte tanto.
    Risulta anke a voi un utilizzo così massiccio di cpu per le socket unblocking?
    Metodi alternativi?
    Esempi di codici funzionanti su molte connessioni?
    DarkBios has you...

  2. #2
    scusa...ma cosa fa il tuo codice?

    la domanda mi sorge spontanea, xche, in php, senza usare threads, o altro, sono riuscito a far connettere 230 utenti e passa, prima che mi andasse in crash il codice per un'errore mio, e l'utilizzo era intorno al 4\5%

    :\

    non conosco molto bene java ma se hai a disposizione le socket asincrone le DEVI usare in accoppiata con select, pool o epool (laddove è supportato)

    guarda un po su google e guarda che trovi
    io ti ho dato i comandi usati in C per gestire le socket su praticamente tutti i sistemi operativi, penso che se cerchi bene nella docs del jdk trovi quello che ti serve

    cmq il select funziona che passi 3 elenchi (in php array, in C sono liste di puntatori generate con apposite funzioni, in java non so, ma credo array pure) che contengono il riferimento alla socket (in php la risorsa, in C il puntatore al file in java credo l'oggetto base)e altri 2 parametri che sono l'attesa in secondi e milli secondi, di solito la imposto su zero e 10 per far durare i controlli non + di 10 millisecondi
    dopo l'esecuzione nei 3 elenchi che ho possato mi trovo le socket sul quale si sono scatenati gli eventi
    il primo parametro contiene la lista delle socket per le quali si deve controllare se è possibile leggere, il secondo per le quali è possibile scrivere ed infine il terzo per le quali si è verificato un'eccezione.

    dovresti usare un sistema del genere che di solito per cose del genere sono i migliori oltre che in generale i + performanti dato che non ti danno l'overhead di threads\forks

    un software scritto in C++, verlihub, un server per il p2p, è in grado di gestire fino a 5000 e passa utenti contemporanei (poi laggava troppo a causa del consumo eccessivo di banda, + di 40mbit [il tutto era occupato dalle ricerche degli utenti a causa dell'orrendo procotollo])

    In java c'è l'overhead della VM (ove usi le vm che non usano tecniche come la JITC o comunque se usi quella ufficiale SUN) che non è indifferente ma credo che comunque dovresti essere in grado di gestire i tuoi tanti utenti senza particolari carichi

    sciauz
    The fastest Redis alternative ... cachegrand! https://github.com/danielealbano/cachegrand

  3. #3
    Che motivo hai di utilizzare due trhead per utente il TCP è full duplex.

  4. #4
    Originariamente inviato da Dorcan82
    Che motivo hai di utilizzare due trhead per utente il TCP è full duplex.
    in realtà non è necessario neanche un thread per questo tipo di cose

    se dovesse gestire 50.000 utenti allora li avrebbe avuto senso fare un sistema che sfruttasse a pieno il multi processing (SMP) ma...una cosa del genere non si sarebbe fatto in java

    cmq ti ripeto il mio consiglio, dai un occhio alla versione java di queste funzioni

    se ho intuito bene (quindi al 90% sbaglio) fai un while e provi a leggere a rotazione i dati? se fai cosi il soft muore anche se lo scrivevi in assembler
    The fastest Redis alternative ... cachegrand! https://github.com/danielealbano/cachegrand

  5. #5
    come non ha bisogno di un thread?penso che ci sia almeno un thread del server per accettare le connessioni e di conseguenza dopo la connessione un thread per "dialogare" con il client.
    Non capisco il tuo paragone con il php...è apache(o cmq il web server) che si occupa di gestire le connessioni ed aprire i vari thread con i client no??sbaglio?
    ciauz

  6. #6
    uso anche io questa tecnologia per un mio giocheto online per fortuna non arrivo oltre i 60 utenti.

    Se risolvi il problema ti pregherei di lasciare un msg qui per illuminarci

    Ciao
    TonyTalk
    Gioca a The Empire Multiplayer online game...ispirato a Risiko ma con scenari ed opzioni rivoluzionarie
    http://www.multiplayergames.it
    http://utenti.lycos.it/the_empire

  7. #7

    Vi illustro e vi copio anke qualche pezzo di codice

    Ciao a tutti, specialmente a daniele_dll che ha azzeccato in pieno il problema (non sapevo che tu stessi in questi forum).

    Sto facendo esattamente un hub per direct connect, lo realizzo in java perkè ho bisogno che vada indistintamente su linux e windows.
    In più avevo un amico che voleva reggere hubs con Mac, e purtroppo per mac non ci sono hub decenti (non che io ne conosca).

    Prima il codice si basava su un thread per ogni utente che legge i messaggi dal socket, e quindi quando serve resta appeso sulla read().

    Tutti i thread di lettura appendono i messaggi ricevuti in un unico buffer fifo (CentralBuffer), da questo buffer legge i messaggi un thread principale (unico, il Core) che li elabora uno dopo l'altro e appende ad altri buffer (uno per ogni user, CommandBuffer) i messaggi che devono essere spediti.

    Contemporaneamente ci sono altri thread (uno per ogni user) che sono fermi in attesa che il proprio CommandBuffer abbia qualcosa da spedire, e poi appena c'è qualcosa fanno le write() sul socket.

    Infine c'è un altro thread che cicla le accept() e inserisce un messaggio speciale nel CentralBuffer, il Core penserà poi a completare l'inizializzazione del nuovo utente connesso.

    I buffer sono la parte più importante perkè permettono di bloccare il thread finchè non ci sono messaggi, e impediscono così fastidiosissimi loop.
    codice:
    package dcHub;
    import java.util.*;
    /**
     * Fornisce al Core i messaggi provenienti dagli user
     * @author Fedechicco
     */
    public class CentralBuffer {
        volatile Vector m_buf;
        volatile int prioritymex = 0;
        public CentralBuffer() {
            m_buf = new Vector();
        }
        synchronized public CentralMessage getNext(boolean blocking){
            if(blocking){
                CentralMessage aux;
                do{
                    while(m_buf.isEmpty()){
                        try{
                            wait();
                        }catch(Exception e){
                        }
                    }
                    aux = (CentralMessage)(m_buf.firstElement());
                    m_buf.removeElementAt(0);
                    if(prioritymex > 0){
                        prioritymex --;
                    }
                }while(aux == null);
                return aux;
                
            }
            else{
                CentralMessage aux;
                if(m_buf.size() > 0){
                    aux = (CentralMessage)(m_buf.firstElement());
                    m_buf.removeElementAt(0);
                    if(prioritymex > 0){
                        prioritymex --;
                    }
                }
                else {
                    aux = null;
                }
                return aux;
            }
        }
        synchronized public void setNext(CentralMessage comm){
            m_buf.add(comm);
            notify();
            return;
        }
        synchronized public void setNextFirst(CentralMessage comm){
            m_buf.add(prioritymex, comm);
            prioritymex++;
            notify();
            return;
        }
    }
    Ok, questo era come l'avevo fatto prima, con 800 utenti lavora da dio, niente lag e non usa molta cpu, mi stupivo di quanto fosse performante.
    Peccato che sopra i 1500 utenti più o meno mi esca spessissimo il fastidiosissimo messaggio di Exception: "java.lang.OutOfMemoryError: unable to create new native thread"

    Ecco la mia triste storia.

    Così ho deciso che era ora di studiare i socketUnbloking, e ho studiato i Selector e i SocketChannel, ho impostato un ciclo solo nel thread principale che usa il Selector per controllare in unblocking mode contemporaneamente tutti i socket, sia in accept (i ServerSocket)che in read, e se c'è qualcosa da scrivere controlla anke se il socket è pronto per la write e scrive.

    Il Selector ha un metodo di select() che permette di restare appeso in wait finchè non c'è qualcosa da fare su uno qualunque dei socket registrati presso il Selector, e solo allora ritorna.
    Il ciclo che fa la select() è infinito, sì, ma dovrebbe essere buono ugualmente!.
    La cosa peggiore è che solo oltre i 150 utenti circa il programma inizia a usare il 100% di risorse di cpu, come se solo oltre una certa utenza comparisse un loop.

    Ho provato anke a separare e mettere nel thread principale solo le accept e le read(), e mettere in un thread separato (un thread singolo) le write, con il loro selector. Ma nulla è cambiato, non che ci sperassi, più che altro era un tentativo disperato.
    Il codice attuale del ciclo del thread Core è questo:
    codice:
    for(;!restart && !shutdown;){
                try{
                    try {
                        selector.select();
                    } catch (IOException e) {
                        if(m_verbose || m_debug){
                            System.out.print(CommandMaker.completeDescription(e).toString());
                        }
                    }
                    Iterator it = selector.selectedKeys().iterator();
                    while(it.hasNext()){
                        SelectionKey selKey = (SelectionKey)it.next();
                        it.remove();
                        try {
                            processSelectionKey(selKey);
                        } catch (IOException e) {
                            if(m_verbose || m_debug){
                                System.out.print(CommandMaker.completeDescription(e).toString());
                            }
                            selKey.cancel();
                        }
                        // forse non va bene qua perkè potrebbe incasinare a cazzo l'iteratore
                        for(now = m_buf.getNext(false); now != null; now = m_buf.getNext(false)){
                            try{
                                if(now.m_usr != null && now.m_mex != null){
                                    if(m_debug){
                                        m_lastmessage.delete(0, m_lastmessage.length());
                                        m_lastmessage.append((new Date()).toString()).append(" (").append(now.m_usr.m_I.getNick()).append(" - ").append(now.m_usr.m_I.getIp().getHostAddress()).append(") => ").append(now.m_mex);
                                    }
                                    if(ora + (long)7200000 < (new Date()).getTime()){ // salva i files ogni 2 ore
                                        ora = (new Date()).getTime();
                                        saveFiles();
                                    }
                                    if(now.m_mex.toString().compareTo("/$/$Connected") == 0){
                                        now.m_usr.coreThreadMustInit();
                                    }
                                    else{
                                        now.m_usr.todo(now.m_mex);
                                    }
                                }
                            }catch(Exception e){
                                if(m_verbose || m_debug){
                                    System.out.print(CommandMaker.completeDescription(e).toString());
                                }
                            }
                        }
                        // fine del forse non va bene qua
                    }
                }catch(Exception e){
                    if(m_verbose){
                        String str = e.toString() + "\nLocalizedMessage: " + e.getLocalizedMessage() + "\nMessage: " + e.getMessage() + "\n\nBackTrace:\n";
                        for(int i = 0; i < e.getStackTrace().length; i++){
                            str += ((e.getStackTrace())[i]).toString() + '\n';
                        }
                        str += "\nHave a nice day!";
                        System.out.print(str);
                    }
                }
                
            }
    e la funzione processSelectionKey() è definita così:
    codice:
    public void processSelectionKey(SelectionKey selKey) throws IOException {
            if (selKey.isValid() && selKey.isReadable()) {
                // Get channel with bytes to read
                SocketChannel sChannel = (SocketChannel)selKey.channel();
                User now = (User)selKey.attachment();
                //leggo dal socket
                buf.clear();
                int numBytesRead = -1;
                try{
                    numBytesRead = sChannel.read(buf);
                }catch(Exception e){
                    numBytesRead = -1;
                }
                if (numBytesRead == -1) {
                    // No more bytes can be read from the channel
                    sChannel.close();
                    selKey.cancel();
                    now.quitMe();
                } else {
                    // To read the bytes, flip the buffer
                    buf.flip();
                    buf.get(bbuf, 0, numBytesRead);
                    try{
                        now.message_under_construction += new String(bbuf, 0, numBytesRead, "Cp1252");
                    }catch(Exception e){
                        if(m_verbose){
                            System.out.print("Error on decoding bytes");
                        }
                        return;
                    }
                    if(now.message_under_construction.length() > (long)(51200)){ // quello è 50*1024
                        now.message_under_construction = now.message_under_construction.substring(0, 256) + "***messaggio_tagliato,_superava_i_50_Kb...***|";
                    }
                    //fine gestione charset
                    while(now.message_under_construction.indexOf('|') != -1 && now.message_under_construction.length() > 0){
                        String com = now.message_under_construction.substring(0, now.message_under_construction.indexOf('|'));
                        //controllo com e altrimenti lo butto
                        for(int cc = 0; cc < m_patternCheckCmd.length; cc++){
                            if(m_patternCheckCmd[cc].matcher(com).matches()){
                                if(com.length() > 0 && com.charAt(0)=='<'){
                                    m_buf.setNextFirst(new CentralMessage(now, new StringBuffer(com)));
                                }
                                else{
                                    m_buf.setNext(new CentralMessage(now, new StringBuffer(com)));
                                }
                                break;
                            }
                        }
                        now.message_under_construction = now.message_under_construction.substring(now.message_under_construction.indexOf('|') + 1);
                    }
                    // Read the bytes from the buffer ...;
                    // see e159 Getting Bytes from a ByteBuffer
                }
                
                // See e174 Reading from a SocketChannel
            }
            if (selKey.isValid() && selKey.isAcceptable()){
                ServerSocketChannel ssChannel = (ServerSocketChannel)selKey.channel();
                SocketChannel sChannel = ssChannel.accept();
                sChannel.configureBlocking(false);
                // If serverSocketChannel is non-blocking, sChannel may be null
                if (sChannel == null) {
                    // There were no pending connection requests; try again later.
                    // To be notified of connection requests,
                    // see e179 Using a Selector to Manage Non-Blocking Server Sockets.
                } else {
                    // Use the socket channel to communicate with the client
                    boolean success = sChannel.finishConnect();
                    if (!success) {
                        // An error occurred; handle it
                        sChannel.close();
                        // Unregister the channel with this selector
                        selKey.cancel();
                    }
                    else{
                        User now = new User(this, sChannel);
                        sChannel.register(selector, SelectionKey.OP_READ);
                        sChannel.keyFor(selector).attach(now);
                        m_buf.setNext(new CentralMessage(now, new StringBuffer("/$/$Connected")));
                    }
                }
            }
        }
    Potete facilmente vedere che il codice è preso in buona parte da un sito che ho trovato con google dove si spiegava l'utilizzo dei SocketChannel.

    Vedete anke qui l'utilizzo dei miei CentralBuffer, in unblocking mode.

    Beh, le strade le ho provate entrambe, so che la unblocking è la migliore per questo tipo di programmi ad alta utenza, ma quell'utilizzo del 100% della cpu non ci vuole... Se qualcuno riesce ad aiutarmi gliene sono grato!
    Ah, il progetto è anke su sourceForge, cerco collaboratori per chi volesse darmi una mano.

    by Fedechicco
    DarkBios has you...

Permessi di invio

  • Non puoi inserire discussioni
  • Non puoi inserire repliche
  • Non puoi inserire allegati
  • Non puoi modificare i tuoi messaggi
  •  
Powered by vBulletin® Version 4.2.1
Copyright © 2025 vBulletin Solutions, Inc. All rights reserved.